All Projects → akarnokd → Kotlin Flow Extensions

akarnokd / Kotlin Flow Extensions

Licence: apache-2.0
Extensions to the Kotlin Flow library.

Programming Languages

kotlin
9241 projects
flow
126 projects

Projects that are alternatives of or similar to Kotlin Flow Extensions

Asyncninja
A complete set of primitives for concurrency and reactive programming on Swift
Stars: ✭ 146 (-63.86%)
Mutual labels:  async, functional
Vertx Lang Kotlin
Vert.x for Kotlin
Stars: ✭ 215 (-46.78%)
Mutual labels:  async, coroutines
Minicoro
Single header asymmetric stackful cross-platform coroutine library in pure C.
Stars: ✭ 164 (-59.41%)
Mutual labels:  async, coroutines
Cppcoro
A library of C++ coroutine abstractions for the coroutines TS
Stars: ✭ 2,118 (+424.26%)
Mutual labels:  async, coroutines
Rsocket Kotlin
RSocket Kotlin multi-platform implementation
Stars: ✭ 256 (-36.63%)
Mutual labels:  async, coroutines
Redux Most
Most.js based middleware for Redux. Handle async actions with monadic streams & reactive programming.
Stars: ✭ 137 (-66.09%)
Mutual labels:  async, functional
Aioreactive
Async/await reactive tools for Python 3.9+
Stars: ✭ 215 (-46.78%)
Mutual labels:  async, functional
Coroutines
A simple system for running nested coroutines in C#.
Stars: ✭ 100 (-75.25%)
Mutual labels:  async, coroutines
snap
Snap Programming Language
Stars: ✭ 20 (-95.05%)
Mutual labels:  functional, coroutines
kotlin-coroutines-jdbc
A library for interacting with blocking JDBC drivers using Kotlin Coroutines.
Stars: ✭ 40 (-90.1%)
Mutual labels:  functional, coroutines
Mioco
[no longer maintained] Scalable, coroutine-based, fibers/green-threads for Rust. (aka MIO COroutines).
Stars: ✭ 125 (-69.06%)
Mutual labels:  async, coroutines
Amp
A non-blocking concurrency framework for PHP applications. 🐘
Stars: ✭ 3,457 (+755.69%)
Mutual labels:  async, coroutines
Tina
Tina is a teeny tiny, header only, coroutine and job library.
Stars: ✭ 125 (-69.06%)
Mutual labels:  async, coroutines
Unityfx.async
Asynchronous operations (promises) for Unity3d.
Stars: ✭ 143 (-64.6%)
Mutual labels:  async, coroutines
Open.channelextensions
A set of extensions for optimizing/simplifying System.Threading.Channels usage.
Stars: ✭ 106 (-73.76%)
Mutual labels:  async, extensions
Fooproxy
稳健高效的评分制-针对性- IP代理池 + API服务,可以自己插入采集器进行代理IP的爬取,针对你的爬虫的一个或多个目标网站分别生成有效的IP代理数据库,支持MongoDB 4.0 使用 Python3.7(Scored IP proxy pool ,customise proxy data crawler can be added anytime)
Stars: ✭ 195 (-51.73%)
Mutual labels:  async, coroutines
Suave
Suave is a simple web development F# library providing a lightweight web server and a set of combinators to manipulate route flow and task composition.
Stars: ✭ 1,196 (+196.04%)
Mutual labels:  async, functional
Kotlinx.coroutines
Library support for Kotlin coroutines
Stars: ✭ 10,194 (+2423.27%)
Mutual labels:  async, coroutines
React Coroutine
Make your async components compact and descriptive by leveraging the power of the language features
Stars: ✭ 246 (-39.11%)
Mutual labels:  async, coroutines
Creed
Sophisticated and functionally-minded async with advanced features: coroutines, promises, ES2015 iterables, fantasy-land
Stars: ✭ 265 (-34.41%)
Mutual labels:  async, coroutines

kotlin-flow-extensions

Extensions to the Kotlin Flow library.

codecov.io Maven Central

dependency

Maven

dependencies {
    implementation "com.github.akarnokd:kotlin-flow-extensions:0.0.7"
}

Features

Table of contents

  • Hot Flows
  • Sources
    • range
    • timer
  • Intermediate Flow operators (FlowExtensions)
    • Flow.concatWith
    • Flow.groupBy
    • Flow.parallel
    • Flow.publish
    • Flow.replay
    • Flow.startCollectOn
    • Flow.takeUntil
    • Flow.onBackpressureDrop
    • Flow.flatMapDrop
  • ParallelFlow operators (FlowExtensions)
    • ParallelFlow.concatMap
    • ParallelFlow.filter
    • ParallelFlow.map
    • ParallelFlow.reduce
    • ParallelFlow.sequential
    • ParallelFlow.transform
  • ConnectableFlow

PublishSubject

Multicasts values to one or more flow collectors in a coordinated fashion, awaiting each collector to be ready to receive the next item or termination.

import hu.akarnokd.kotlin.flow.*

runBlocking {
    
    val publishSubject = PublishSubject<Int>()

    val job = launch(Dispatchers.IO) {
        publishSubject.collect {
            println(it)
        }
        println("Done")
    }
    
    // wait for the collector to arrive
    while (!publishSubject.hasCollectors()) {
        delay(1)
    }

   
    publishSubject.emit(1)
    publishSubject.complete()
   
    job.join()
}

ReplaySubject

Caches and replays some or all items to collectors. Constructors for size-bound, time-bound and both size-and-time bound replays are available. An additional constructor with a TimeUnit -> Long has been defined to allow virtualizing the progression of time for testing purposes

import hu.akarnokd.kotlin.flow.*

runBlocking {
    
    val replaySubject = ReplaySubject<Int>()

    val job = launch(Dispatchers.IO) {
        replaySubject.collect {
            println(it)
        }
        println("Done")
    }
   
    // wait for the collector to arrive
    while (!replaySubject.hasCollectors()) {
        delay(1)
    }

    replaySubject.emit(1)
    replaySubject.emit(2)
    replaySubject.emit(3)
    replaySubject.complete()
   
    job.join()

    replaySubject.collect {
        println(it)
    }
    println("Done 2")
}

BehaviorSubject

Caches the last item received and multicasts it and subsequent items (continuously) to collectors, awaiting each collector to be ready to receive the next item or termination. It is possible to set an initial value to be sent to fresh collectors via a constructor.

import hu.akarnokd.kotlin.flow.*

runBlocking {
    
    val behaviorSubject = BehaviorSubject<Int>()
    behaviorSubject.emit(1)
  
    // OR
    // val behaviorSubject = BehaviorSubject<Int>(1)


    val job = launch(Dispatchers.IO) {
        behaviorSubject.collect {
            println(it)
        }
        println("Done")
    }
   
    // wait for the collector to arrive
    while (!behaviorSubject.hasCollectors()) {
        delay(1)
    }

    behaviorSubject.emit(2)
    behaviorSubject.emit(3)
    behaviorSubject.complete()
   
    job.join()
}

Flow.flatMapDrop

Maps the upstream value into a Flow and relays its items while ignoring further upstream items until the current inner Flow completes.

import hu.akarnokd.kotlin.flow.*

range(1, 10)
.map {
    delay(100)
    it
}
.flatMapDrop {
    range(it * 100, 5)
            .map {
                delay(30)
                it
            }
}
.assertResult(
        100, 101, 102, 103, 104,
        300, 301, 302, 303, 304,
        500, 501, 502, 503, 504,
        700, 701, 702, 703, 704,
        900, 901, 902, 903, 904
)

Flow.publish

Shares a single connection to the upstream source which can be consumed by many collectors inside a transform function, which then yields the resulting items for the downstream.

Effectively, one collector to the output Flow<R> will trigger exactly one collection of the upstream Flow<T>. Inside the transformer function though, the presented Flow<T> can be collected as many times as needed; it won't trigger new collections towards the upstream but share items to all inner collectors as they become available.

Unfortunately, the suspending nature of coroutines/Flow doesn't give a clear indication when the transformer chain has been properly established, which can result in item loss or run-to-completion without any item being collected. If the number of the inner collectors inside transformer can be known, the publish(expectedCollectors) overload can be used to hold back the upstream until the expected number of collectors have started/ready collecting items.

Example:

    range(1, 5)
    .publish(2) { 
         shared -> merge(shared.filter { it % 2 == 0 }, shared.filter { it % 2 != 0 }) 
    }
    .assertResult(1, 2, 3, 4, 5)

In the example, it is known merge will establish 2 collectors, thus the publish can be instructed to await those 2. Without the argument, range would rush through its items as merge doesn't start collecting in time, causing an empty result list.

Note that the project description data, including the texts, logos, images, and/or trademarks, for each open source project belongs to its rightful owner. If you wish to add or remove any projects, please contact us at [email protected].