All Projects → Open-NET-Libraries → Open.channelextensions

Open-NET-Libraries / Open.channelextensions

Licence: mit
A set of extensions for optimizing/simplifying System.Threading.Channels usage.

Projects that are alternatives of or similar to Open.channelextensions

Async Techniques Python Course
Async Techniques and Examples in Python Course
Stars: ✭ 314 (+196.23%)
Mutual labels:  async, threading
Asyncawaitbestpractices
Extensions for System.Threading.Tasks.Task and System.Threading.Tasks.ValueTask
Stars: ✭ 693 (+553.77%)
Mutual labels:  async, threading
Concurrencpp
Modern concurrency for C++. Tasks, executors, timers and C++20 coroutines to rule them all
Stars: ✭ 340 (+220.75%)
Mutual labels:  tasks, threading
Codejam
Set of handy reusable .NET components that can simplify your daily work and save your time when you copy and paste your favorite helper methods and classes from one project to another
Stars: ✭ 217 (+104.72%)
Mutual labels:  async, threading
Frame Scheduling
Asynchronous non-blocking running many tasks in JavaScript. Demo https://codesandbox.io/s/admiring-ride-jdoq0
Stars: ✭ 64 (-39.62%)
Mutual labels:  tasks, async
awesome-dotnet-async
A curated list of awesome articles and resources to learning and practicing about async, threading, and channels in .Net platform. 😉
Stars: ✭ 84 (-20.75%)
Mutual labels:  channels, threading
Vs Threading
The Microsoft.VisualStudio.Threading is a xplat library that provides many threading and synchronization primitives used in Visual Studio and other applications.
Stars: ✭ 585 (+451.89%)
Mutual labels:  async, threading
ObviousAwait
🧵 Expressive aliases to ConfigureAwait(true) and ConfigureAwait(false)
Stars: ✭ 55 (-48.11%)
Mutual labels:  tasks, threading
Unityasync
Task and Async Utility Package for Unity. Start co-routines from anywhere.
Stars: ✭ 58 (-45.28%)
Mutual labels:  tasks, async
Cpprestsdk
The C++ REST SDK is a Microsoft project for cloud-based client-server communication in native code using a modern asynchronous C++ API design. This project aims to help C++ developers connect to and interact with services.
Stars: ✭ 6,631 (+6155.66%)
Mutual labels:  tasks, async
Fooproxy
稳健高效的评分制-针对性- IP代理池 + API服务,可以自己插入采集器进行代理IP的爬取,针对你的爬虫的一个或多个目标网站分别生成有效的IP代理数据库,支持MongoDB 4.0 使用 Python3.7(Scored IP proxy pool ,customise proxy data crawler can be added anytime)
Stars: ✭ 195 (+83.96%)
Mutual labels:  async, threading
Alecrimasynckit
async and await for Swift.
Stars: ✭ 89 (-16.04%)
Mutual labels:  tasks, async
Kotlin Flow Extensions
Extensions to the Kotlin Flow library.
Stars: ✭ 404 (+281.13%)
Mutual labels:  async, extensions
Arq
Fast job queuing and RPC in python with asyncio and redis.
Stars: ✭ 695 (+555.66%)
Mutual labels:  tasks, async
Multitasking
Non-blocking Python methods using decorators
Stars: ✭ 87 (-17.92%)
Mutual labels:  async, threading
Taskorama
⚙ A Task/Future data type for JavaScript
Stars: ✭ 90 (-15.09%)
Mutual labels:  tasks, async
Governor
A rate-limiting library for Rust (formerly ratelimit_meter)
Stars: ✭ 99 (-6.6%)
Mutual labels:  async
Dapeng Soa
A lightweight, high performance micro-service framework
Stars: ✭ 101 (-4.72%)
Mutual labels:  async
Base64 Async
Non-blocking chunked Base64 encoding
Stars: ✭ 98 (-7.55%)
Mutual labels:  async
Sake Cli
🍶 Sake is a build tool for JavaScript.
Stars: ✭ 97 (-8.49%)
Mutual labels:  tasks

Open.ChannelExtensions

NuGet

A set of extensions for optimizing/simplifying System.Threading.Channels usage.

Click here for detailed documentation.

Highlights

Read & Write

With optional concurrency levels.

  • Reading all entries in a channel.
  • Writing all entries from a source to a channel.
  • Piping (consuming) all entries to a buffer (channel).
  • .AsAsyncEnumerable() (IAsyncEnumerable) support for .NET Standard 2.1+ and .NET Core 3+

Special ChannelReader Operations

  • Filter
  • Transform
  • Batch
  • Join

Examples

Being able to define an asynchronous pipeline with best practice usage using simple expressive syntax:

await Channel
    .CreateBounded<T>(10)
    .SourceAsync(source /* IEnumerable<Task<T>> */)
    .PipeAsync(
        maxConcurrency: 2,
        capacity: 5,
        transform: asyncTransform01)
    .Pipe(transform02, /* capacity */ 3)
    .ReadAllAsync(finalTransformedValue => {
        // Do something async with each final value.
    });
await source /* IEnumerable<T> */
    .ToChannel(boundedSize: 10, singleReader: true)
    .PipeAsync(asyncTransform01, /* capacity */ 5)
    .Pipe(
        maxConcurrency: 2,
        capacity: 3,
        transform: transform02)
    .ReadAll(finalTransformedValue => {
        // Do something with each final value.
    });

Reading (until the channel is closed)

One by one read each entry from the channel

await channel.ReadAll(
    entry => { /* Processing Code */ });
await channel.ReadAll(
    (entry, index) => { /* Processing Code */ });
await channel.ReadAllAsync(
    async entry => { await /* Processing Code */ });
await channel.ReadAllAsync(
    async (entry, index) => { await /* Processing Code */ });

Read concurrently each entry from the channel

await channel.ReadAllConcurrently(
    maxConcurrency,
    entry => { /* Processing Code */ });
await channel.ReadAllConcurrentlyAsync(
    maxConcurrency,
    async entry => { await /* Processing Code */ });

Writing

If complete is true, the channel will be closed when the source is empty.

Dump a source enumeration into the channel

// source can be any IEnumerable<T>.
await channel.WriteAll(source, complete: true);
// source can be any IEnumerable<Task<T>> or IEnumerable<ValueTask<T>>.
await channel.WriteAllAsync(source, complete: true);

Synchronize reading from the source and process the results concurrently

// source can be any IEnumerable<Task<T>> or IEnumerable<ValueTask<T>>.
await channel.WriteAllConcurrentlyAsync(
    maxConcurrency, source, complete: true);

Filter & Transform

// Filter and transform when reading.
channel.Reader
    .Filter(predicate) // .Where()
    .Transform(selector) // .Select()
    .ReadAllAsync(async value => {/*...*/});

Batching

values.Reader
    .Batch(10 /*batch size*/)
    .ReadAllAsync(async batch => {/*...*/});

Joining

batches.Reader
    .Join()
    .ReadAllAsync(async value => {/*...*/});

Pipelining / Transforming

Transform and buffer entries

// Transform values in a source channel to new unbounded channel.
var transformed = channel.Pipe(
    async value => /* transformation */);
// Transform values in a source channel to new unbounded channel with a max concurrency of X.
const X = 4;
var transformed = channel.Pipe(
    X, async value => /* transformation */);
// Transform values in a source channel to new bounded channel bound of N entries.
const N = 5;
var transformed = channel.Pipe(
    async value => /* transformation */, N);
// Transform values in a source channel to new bounded channel bound of N entries with a max concurrency of X.
const X = 4;
const N = 5;
var transformed = channel.Pipe(
    X, async value => /* transformation */, N);

// or
transformed = channel.Pipe(
    maxConcurrency: X,
    capacity: N,
    transform: async value => /* transformation */);
Note that the project description data, including the texts, logos, images, and/or trademarks, for each open source project belongs to its rightful owner. If you wish to add or remove any projects, please contact us at [email protected].