All Projects → thomasnield → rxkotlin-poc

thomasnield / rxkotlin-poc

Licence: other
Experimental Rx implementation in Kotlin

Programming Languages

kotlin
9241 projects

RxKotlin POC

NOTE: This project is at the moment abandoned

This is an attempt to implement Rx in pure Kotlin using Kotlin coroutines to handle concurrency and backpressure. There are only a handful of operators at the moment, and a couple loose ends need to be worked out like disposal.

But so far, it is clear there is an opportunity to create an Rx implementation in Kotlin with less effort than Java.

Jake Wharton's POC

Be sure to check out Jake Wharton's Reagent POC, which is seeking to be multiplatform and a more thought-out type hierarchy.

https://github.com/JakeWharton/Reagent/

Recommended reading to help contribute

Coroutines Guide

Coroutines and Reactive Streams

Examples

@Test
fun testJust() {
    Observable.just("Alpha", "Beta", "Gamma", "Delta")
            .map { it.length }
            .filter { it >= 5 }
            .subscribe { println(it) }
}

@Test
fun testSubscribeOn() = runBlocking {
    Observable.just("Alpha", "Beta", "Gamma", "Delta")
            .subscribeOn(CommonPool)
            .subscribe {
                println("$it ${Thread.currentThread().name}")
            }
}

@Test
fun testObserveOn() = runBlocking {
    Observable.just("Alpha", "Beta", "Gamma", "Delta")
            .subscribeOn(CommonPool)
            .doOnNext { println("$it ${Thread.currentThread().name}") }
            .map { it.length }
            .observeOn(CommonPool)
            .subscribe {
                println("$it ${Thread.currentThread().name}")
            }
}

@Test
fun testObservableDefer() {
    val state = AtomicBoolean(true)

    val source = Observable.defer { Observable.just(state.getAndSet(false)) }

    source.subscribe { println("Sub 1: $it")}
    source.subscribe { println("Sub 2: $it")}
}

@Test
fun testRangeAndTake() {
    Observable.range(1,10)
            .take(5)
            .subscribe(onNext=::println)
}

@Test
fun testInterval() = runBlocking {

    Observable.interval(1, TimeUnit.SECONDS)
            .subscribe(onNext=::println)

    delay(5000)
}
Note that the project description data, including the texts, logos, images, and/or trademarks, for each open source project belongs to its rightful owner. If you wish to add or remove any projects, please contact us at [email protected].