All Projects → Hirrolot → mux-stream

Hirrolot / mux-stream

Licence: MIT License
(De)multiplex asynchronous streams

Programming Languages

rust
11053 projects

Projects that are alternatives of or similar to mux-stream

Concurrencpp
Modern concurrency for C++. Tasks, executors, timers and C++20 coroutines to rule them all
Stars: ✭ 340 (+900%)
Mutual labels:  concurrency, concurrent-programming, asynchronous-programming, async-await
scalable-concurrent-containers
High performance containers and utilities for concurrent and asynchronous programming
Stars: ✭ 101 (+197.06%)
Mutual labels:  concurrency, concurrent-programming, asynchronous-programming
Tascalate Concurrent
Implementation of blocking (IO-Bound) cancellable java.util.concurrent.CompletionStage and related extensions to java.util.concurrent.ExecutorService-s
Stars: ✭ 144 (+323.53%)
Mutual labels:  concurrency, concurrent-programming, asynchronous-programming
Tascalate Async Await
Async / Await asynchronous programming model for Java similar to the functionality available in C# 5. The implementation is based on continuations for Java (see my other projects).
Stars: ✭ 60 (+76.47%)
Mutual labels:  concurrent-programming, asynchronous-programming, async-await
Zio
ZIO — A type-safe, composable library for async and concurrent programming in Scala
Stars: ✭ 3,167 (+9214.71%)
Mutual labels:  concurrency, concurrent-programming, asynchronous-programming
async-oneshot
A fast, small, full-featured, no-std compatible oneshot channel
Stars: ✭ 55 (+61.76%)
Mutual labels:  concurrency, async-await, futures
swoole-futures
⏳ Futures, Streams & Async/Await for PHP's Swoole asynchronous run-time.
Stars: ✭ 100 (+194.12%)
Mutual labels:  concurrency, async-await, futures
Chymyst Core
Declarative concurrency in Scala - The implementation of the chemical machine
Stars: ✭ 142 (+317.65%)
Mutual labels:  concurrency, concurrent-programming, asynchronous-programming
Brightfutures
Write great asynchronous code in Swift using futures and promises
Stars: ✭ 1,890 (+5458.82%)
Mutual labels:  concurrency, asynchronous-programming, futures
java-multithread
Códigos feitos para o curso de Multithreading com Java, no canal RinaldoDev do YouTube.
Stars: ✭ 24 (-29.41%)
Mutual labels:  concurrency, concurrent-programming
python3-concurrency
Python3爬虫系列的理论验证,首先研究I/O模型,分别用Python实现了blocking I/O、nonblocking I/O、I/O multiplexing各模型下的TCP服务端和客户端。然后,研究同步I/O操作(依序下载、多进程并发、多线程并发)和异步I/O(asyncio)之间的效率差别
Stars: ✭ 49 (+44.12%)
Mutual labels:  concurrency, futures
conquerant
lightweight async/await for Clojure
Stars: ✭ 31 (-8.82%)
Mutual labels:  concurrency, async-await
async
Asynchronous programming for R -- async/await and generators/yield
Stars: ✭ 37 (+8.82%)
Mutual labels:  asynchronous-programming, async-await
practice
Java并发编程与高并发解决方案:http://coding.imooc.com/class/195.html Java开发企业级权限管理系统:http://coding.imooc.com/class/149.html
Stars: ✭ 39 (+14.71%)
Mutual labels:  concurrency, concurrent-programming
EnumerableAsyncProcessor
Process Multiple Asynchronous Tasks in Various Ways - One at a time / Batched / Rate limited / Concurrently
Stars: ✭ 84 (+147.06%)
Mutual labels:  asynchronous-programming, async-await
asyncoro
Python framework for asynchronous, concurrent, distributed, network programming with coroutines
Stars: ✭ 50 (+47.06%)
Mutual labels:  concurrent-programming, asynchronous-programming
swift-futures
Demand-driven asynchronous programming in Swift
Stars: ✭ 32 (-5.88%)
Mutual labels:  concurrency, futures
Lazy
Light-weight header-only library for parallel function calls and continuations in C++ based on Eric Niebler's talk at CppCon 2019.
Stars: ✭ 93 (+173.53%)
Mutual labels:  concurrent-programming, asynchronous-programming
await-lock
Mutex locks for async functions
Stars: ✭ 66 (+94.12%)
Mutual labels:  concurrency, async-await
concurrent-resource
A header-only C++ library that allows easily creating thread-safe, concurrency friendly resources.
Stars: ✭ 17 (-50%)
Mutual labels:  concurrency, concurrency-patterns

mux-stream

Continious integration Crates.io Docs.rs

This crate empahises the first-class nature of asynchronous streams in Rust by deriving the value construction & pattern matching operations from ADTs, depicted by the following correspondence:

ADTs Streams
Value construction Multiplexing
Pattern matching Demultiplexing

Table of contents

Installation

Copy this into your Cargo.toml:

[dependencies]
mux-stream = "0.3"
tokio = "1.6"
tokio-stream = "0.1"
futures = "0.3"

Motivation

In many problem domains, we encounter the need to process incoming hierarchical structures. Suppose you're writing a social network, and the following kinds of updates might come at any moment:


In terms of Rust, you might want to express such updates via sum types:

enum UserReq {
    SendMsg(SendMsgReq),
    Follow(FollowReq),
    MuteFriend(MuteFriendReq)
}

enum SendMsgReq {
    Photo(...),
    Video(...),
    Text(...)
}

struct FollowReq {
    ...
}

enum MuteFriendReq {
    Forever(...),
    ForInterval(...)
}

This is where the story begins: now you need to process user requests. Let's formulate some general requirements of requests-processing code:

  • Conciseness. Avoid boilerplate where possible. Life is too short to write boilerplate code.
  • Single-responsibility principle (SRP). For our needs it means that each processor must be responsible for exactly one kind of request. No less and no more.
  • Compatible with other Rusty code. Our requests-processing solution must be able to be easily integrated into existing code bases.
  • Stay Rusty. eDSLs implemented via macros are fun, but be ready for confusing compilation errors when business logic is expressed in terms of such eDSLs. What is more, they are computer languages on their own -- it takes some time to become familiar with them.
  • Type safety. Do not spread the pain of upcasting/downcasting types you're already aware of.

This crate addresses all of the aforementioned requirements. The approach is based upon functional asynchronous dataflow programming: we augment asynchronous data streams with pattern matching. Your code would reflect the following structure (concerning with the example of a social network):


(Note the similarities with the chain-of-responsibility pattern.)

That is, each function takes a stream of updates and propagates (demultiplexes, pattern matches) them into processors of lower layers, and hence addressing the single-responsibility principle. What is more, you're able to use all the power of stream adaptors, which let you deal with updates as with chains, not as with single objects, declaratively.

The sections below are dedicated to demultiplexing and multiplexing separately. See also examples/admin_panel.rs, an elaborated demonstration of the most prominent aspects of the paradigm.

Demultiplexing

Given Stream<T1 | ... | Tn>, demultiplexing produces Stream<T1>, ..., Stream<Tn>. See the illustration below, in which every circle is an item of a stream and has a type (its colour):

That is, once an update from an input stream is available, it's pushed into the corresponding output stream in a separate Tokio task. No output stream can slow down another one.

Example

[examples/demux.rs]

use mux_stream::{demux, error_handler};

use futures::StreamExt;
use tokio_stream::wrappers::UnboundedReceiverStream;

#[tokio::main]
async fn main() {
    #[derive(Debug)]
    enum MyEnum {
        A(i32),
        B(f64),
        C(&'static str),
    }

    let stream = tokio_stream::iter(vec![
        MyEnum::A(123),
        MyEnum::B(24.241),
        MyEnum::C("Hello"),
        MyEnum::C("ABC"),
        MyEnum::A(811),
    ]);

    let (i32_stream, f64_stream, str_stream) =
        demux!(MyEnum { A, B, C })(stream, error_handler::panicking());

    let mut i32_stream = UnboundedReceiverStream::new(i32_stream);
    let mut f64_stream = UnboundedReceiverStream::new(f64_stream);
    let mut str_stream = UnboundedReceiverStream::new(str_stream);

    assert_eq!(i32_stream.next().await, Some(123));
    assert_eq!(i32_stream.next().await, Some(811));
    assert_eq!(i32_stream.next().await, None);

    assert_eq!(f64_stream.next().await, Some(24.241));
    assert_eq!(f64_stream.next().await, None);

    assert_eq!(str_stream.next().await, Some("Hello"));
    assert_eq!(str_stream.next().await, Some("ABC"));
    assert_eq!(str_stream.next().await, None);
}

Multiplexing

Multiplexing is the opposite of demultiplexing: given Stream<T1>, ..., Stream<Tn>, it produces Stream<T1 | ... | Tn>. Again, the process is illustrated below:

That is, once an update from any input streams is available, it's pushed into the output stream. Again, this work is performed asynchronously in a separate Tokio task.

Example

[examples/mux.rs]

use mux_stream::{error_handler, mux};

use std::{collections::HashSet, iter::FromIterator};

use futures::StreamExt;
use tokio::sync::mpsc::UnboundedReceiver;
use tokio_stream::wrappers::UnboundedReceiverStream;

#[derive(Debug)]
enum MyEnum {
    A(i32),
    B(u8),
    C(&'static str),
}

#[tokio::main]
async fn main() {
    let i32_values = HashSet::from_iter(vec![123, 811]);
    let u8_values = HashSet::from_iter(vec![88]);
    let str_values = HashSet::from_iter(vec!["Hello", "ABC"]);

    let result: UnboundedReceiver<MyEnum> = mux!(MyEnum { A, B, C })(
        tokio_stream::iter(i32_values.clone()),
        tokio_stream::iter(u8_values.clone()),
        tokio_stream::iter(str_values.clone()),
        error_handler::panicking(),
    );

    let (i32_results, u8_results, str_results) = UnboundedReceiverStream::new(result)
        .fold(
            (HashSet::new(), HashSet::new(), HashSet::new()),
            |(mut i32_results, mut u8_results, mut str_results), update| async move {
                match update {
                    MyEnum::A(x) => i32_results.insert(x),
                    MyEnum::B(x) => u8_results.insert(x),
                    MyEnum::C(x) => str_results.insert(x),
                };

                (i32_results, u8_results, str_results)
            },
        )
        .await;

    assert_eq!(i32_results, i32_values);
    assert_eq!(u8_results, u8_values);
    assert_eq!(str_results, str_values);
}

Hash sets are used here owing to the obvious absence of order preservation of updates from input streams.

FAQ

Q: Is only Tokio supported now?

A: Yes. I have no plans yet to support other asynchronous runtimes.

Note that the project description data, including the texts, logos, images, and/or trademarks, for each open source project belongs to its rightful owner. If you wish to add or remove any projects, please contact us at [email protected].