All Projects → c-cube → lwt-pipe

c-cube / lwt-pipe

Licence: BSD-2-Clause license
[beta] A multi-consumer, multi-producers blocking queue and stream for Lwt

Programming Languages

ocaml
1615 projects
Makefile
30231 projects

Projects that are alternatives of or similar to lwt-pipe

pv
Unix Pipe Viewer (pv) utility in Node.js
Stars: ✭ 20 (-33.33%)
Mutual labels:  stream, pipe
files-io
Read many files with node
Stars: ✭ 19 (-36.67%)
Mutual labels:  stream, pipe
tpack
Pack a Go workflow/function as a Unix-style pipeline command
Stars: ✭ 55 (+83.33%)
Mutual labels:  stream, pipe
prox
A Scala library for working with system processes
Stars: ✭ 93 (+210%)
Mutual labels:  stream, pipe
xbvr
Tool to organize and stream your VR porn library
Stars: ✭ 186 (+520%)
Mutual labels:  stream
Commander
Arduino Command Line Utility
Stars: ✭ 20 (-33.33%)
Mutual labels:  stream
streamplify
Java 8 combinatorics-related streams and other utilities
Stars: ✭ 40 (+33.33%)
Mutual labels:  stream
sox-stream
📣 A stream-friendly wrapper around SoX
Stars: ✭ 50 (+66.67%)
Mutual labels:  stream
aws-kinesis-consumer
Consume an AWS Kinesis Data Stream to look over the records from a terminal.
Stars: ✭ 23 (-23.33%)
Mutual labels:  stream
http-emitter
📡 Emitting psr-7 responses.
Stars: ✭ 31 (+3.33%)
Mutual labels:  stream
dot
distributed data sync with operational transformation/transforms
Stars: ✭ 73 (+143.33%)
Mutual labels:  stream
missive
Fast, lightweight library for encoding and decoding JSON messages over streams.
Stars: ✭ 16 (-46.67%)
Mutual labels:  stream
papilo
DEPRECATED: Stream data processing micro-framework
Stars: ✭ 24 (-20%)
Mutual labels:  stream
mock-hls-server
Fake a live/event HLS stream from a VOD one. Useful for testing. Supports looping.
Stars: ✭ 61 (+103.33%)
Mutual labels:  stream
ngx-stream-request-module
基于ngx-stream-module 实现长连接的处理,把长连接数据按照使用的协议转切分为请求(request),与后端服务器使用短连接通讯,完全兼容后端http协议。后端服务器使用推送协议可以很方便的把数据推送到客户端。
Stars: ✭ 15 (-50%)
Mutual labels:  stream
create-music-stream
Creates a PCM 16 bit Little Endian Stream from a mp3 file or youtube video
Stars: ✭ 21 (-30%)
Mutual labels:  stream
TweakIt-Desktop
An Android Debugging Application
Stars: ✭ 33 (+10%)
Mutual labels:  stream
rtsp-video-recorder
Provides an API to record RTSP video stream to filesystem.
Stars: ✭ 21 (-30%)
Mutual labels:  stream
log
A thin (and fast) PSR-3 logger.
Stars: ✭ 45 (+50%)
Mutual labels:  stream
tubo.js
🏄 Your functional (sync/async) pipe | operator
Stars: ✭ 73 (+143.33%)
Mutual labels:  pipe

Lwt Pipe build

An alternative to Lwt_stream with interfaces for producers and consumers and a bounded internal buffer.

Online Documentation

Build

opam install lwt-pipe

or:

opam pin https://github.com/c-cube/lwt-pipe.git

License

permissive free software (BSD-2)

Use

A pipe can be used as a regular iterator:

# #require "lwt";;
# #require "lwt-pipe";;

# open Lwt.Infix;;

# let l = [1;2;3;4];;
val l : int list = [1; 2; 3; 4]

# Lwt_pipe.of_list l
  |> Lwt_pipe.Reader.map ~f:(fun x->x+1)
  |> Lwt_pipe.to_list;;
- : int list = [2; 3; 4; 5]

But also as a streaming queue (here with two producers push_ints that will put 1, 2, … 5 into the pipe, and one reader that consumes the whole pipe):

# let rec push_ints p i : unit Lwt.t =
  if i <= 0 then Lwt.return ()
  else Lwt_pipe.write_exn p i >>= fun () -> push_ints p (i-1) ;;
val push_ints : (int, [< `r | `w > `w ]) Lwt_pipe.t -> int -> unit Lwt.t =
  <fun>

# let reader =
    let p = Lwt_pipe.create ~max_size:3 () in
    let t1 = push_ints p 5
    and t2 = push_ints p 5
    and t_read = Lwt_pipe.to_list p in
    Lwt.join [t1;t2] >>= fun () ->
    Lwt_pipe.close p >>= fun () ->
    t_read
  in
  List.sort compare @@ Lwt_main.run reader
  ;;
- : int list = [1; 1; 2; 2; 3; 3; 4; 4; 5; 5]

This can be expressed with higher level constructs:

# let rec list_range i = if i<=0 then [] else i :: list_range (i-1);;
val list_range : int -> int list = <fun>
# let int_range n = Lwt_pipe.of_list @@ list_range n ;;
val int_range : int -> int Lwt_pipe.Reader.t = <fun>

# Lwt_main.run @@ Lwt_pipe.to_list (int_range 5);;
- : int list = [5; 4; 3; 2; 1]

# let reader =
    let p1 = int_range 6
    and p2 = int_range 6
    and p3 = int_range 6 in
    Lwt_pipe.to_list (Lwt_pipe.Reader.merge_all [p1;p2;p3])
  in
  List.sort compare @@ Lwt_main.run reader
  ;;
- : int list = [1; 1; 1; 2; 2; 2; 3; 3; 3; 4; 4; 4; 5; 5; 5; 6; 6; 6]
Note that the project description data, including the texts, logos, images, and/or trademarks, for each open source project belongs to its rightful owner. If you wish to add or remove any projects, please contact us at [email protected].