akarnokd / Kotlin Flow Extensions
Projects that are alternatives of or similar to Kotlin Flow Extensions
kotlin-flow-extensions
Extensions to the Kotlin Flow library.
dependency
dependencies {
implementation "com.github.akarnokd:kotlin-flow-extensions:0.0.7"
}
Features
Table of contents
- Hot Flows
- Sources
range
timer
- Intermediate Flow operators (
FlowExtensions
)Flow.concatWith
Flow.groupBy
Flow.parallel
Flow.publish
Flow.replay
Flow.startCollectOn
Flow.takeUntil
Flow.onBackpressureDrop
Flow.flatMapDrop
-
ParallelFlow
operators (FlowExtensions
)ParallelFlow.concatMap
ParallelFlow.filter
ParallelFlow.map
ParallelFlow.reduce
ParallelFlow.sequential
ParallelFlow.transform
ConnectableFlow
PublishSubject
Multicasts values to one or more flow collectors in a coordinated fashion, awaiting each collector to be ready to receive the next item or termination.
import hu.akarnokd.kotlin.flow.*
runBlocking {
val publishSubject = PublishSubject<Int>()
val job = launch(Dispatchers.IO) {
publishSubject.collect {
println(it)
}
println("Done")
}
// wait for the collector to arrive
while (!publishSubject.hasCollectors()) {
delay(1)
}
publishSubject.emit(1)
publishSubject.complete()
job.join()
}
ReplaySubject
Caches and replays some or all items to collectors. Constructors for size-bound, time-bound and both size-and-time bound
replays are available. An additional constructor with a TimeUnit -> Long
has been defined to allow virtualizing
the progression of time for testing purposes
import hu.akarnokd.kotlin.flow.*
runBlocking {
val replaySubject = ReplaySubject<Int>()
val job = launch(Dispatchers.IO) {
replaySubject.collect {
println(it)
}
println("Done")
}
// wait for the collector to arrive
while (!replaySubject.hasCollectors()) {
delay(1)
}
replaySubject.emit(1)
replaySubject.emit(2)
replaySubject.emit(3)
replaySubject.complete()
job.join()
replaySubject.collect {
println(it)
}
println("Done 2")
}
BehaviorSubject
Caches the last item received and multicasts it and subsequent items (continuously) to collectors, awaiting each collector to be ready to receive the next item or termination. It is possible to set an initial value to be sent to fresh collectors via a constructor.
import hu.akarnokd.kotlin.flow.*
runBlocking {
val behaviorSubject = BehaviorSubject<Int>()
behaviorSubject.emit(1)
// OR
// val behaviorSubject = BehaviorSubject<Int>(1)
val job = launch(Dispatchers.IO) {
behaviorSubject.collect {
println(it)
}
println("Done")
}
// wait for the collector to arrive
while (!behaviorSubject.hasCollectors()) {
delay(1)
}
behaviorSubject.emit(2)
behaviorSubject.emit(3)
behaviorSubject.complete()
job.join()
}
Flow.flatMapDrop
Maps the upstream value into a Flow
and relays its items while ignoring further upstream items until the current
inner Flow
completes.
import hu.akarnokd.kotlin.flow.*
range(1, 10)
.map {
delay(100)
it
}
.flatMapDrop {
range(it * 100, 5)
.map {
delay(30)
it
}
}
.assertResult(
100, 101, 102, 103, 104,
300, 301, 302, 303, 304,
500, 501, 502, 503, 504,
700, 701, 702, 703, 704,
900, 901, 902, 903, 904
)
Flow.publish
Shares a single connection to the upstream source which can be consumed by many collectors inside a transform
function,
which then yields the resulting items for the downstream.
Effectively, one collector to the output Flow<R>
will trigger exactly one collection of the upstream Flow<T>
. Inside
the transformer
function though, the presented Flow<T>
can be collected as many times as needed; it won't trigger
new collections towards the upstream but share items to all inner collectors as they become available.
Unfortunately, the suspending nature of coroutines/Flow
doesn't give a clear indication when the transformer
chain
has been properly established, which can result in item loss or run-to-completion without any item being collected.
If the number of the inner collectors inside transformer
can be known, the publish(expectedCollectors)
overload
can be used to hold back the upstream until the expected number of collectors have started/ready collecting items.
Example:
range(1, 5)
.publish(2) {
shared -> merge(shared.filter { it % 2 == 0 }, shared.filter { it % 2 != 0 })
}
.assertResult(1, 2, 3, 4, 5)
In the example, it is known merge
will establish 2 collectors, thus the publish
can be instructed to await those 2.
Without the argument, range
would rush through its items as merge
doesn't start collecting in time, causing an
empty result list.