All Projects → MattMorgis → async-stream-generator

MattMorgis / async-stream-generator

Licence: MIT license
Pipe ES6 Async Generators through Node.js Streams

Programming Languages

javascript
184084 projects - #8 most used programming language

Projects that are alternatives of or similar to async-stream-generator

Kafka Book
《Kafka技术内幕》代码
Stars: ✭ 175 (+264.58%)
Mutual labels:  streams
Multistream
A stream that emits multiple other streams one after another (streams3)
Stars: ✭ 237 (+393.75%)
Mutual labels:  streams
logstreamer
Prefixes streams (e.g. stdout or stderr) in Go
Stars: ✭ 41 (-14.58%)
Mutual labels:  streams
Frideos flutter
An all-in-one Fllutter package for state management, reactive objects, animations, effects, timed widgets etc.
Stars: ✭ 187 (+289.58%)
Mutual labels:  streams
Mug
A small Java 8 util library, complementary to Guava (BiStream, Substring, MoreStreams, Parallelizer).
Stars: ✭ 236 (+391.67%)
Mutual labels:  streams
module-dependents
Get the list of npm modules that depend on the specified npm module.
Stars: ✭ 15 (-68.75%)
Mutual labels:  streams
Bfj
MOVED TO GITLAB
Stars: ✭ 164 (+241.67%)
Mutual labels:  streams
ng-observe
Angular reactivity streamlined...
Stars: ✭ 65 (+35.42%)
Mutual labels:  streams
Anydlbot
An Open Source GPLv3 All-In-One Telegram Bot
Stars: ✭ 236 (+391.67%)
Mutual labels:  streams
data examples
An example app showing different ways to pass to and share data with widgets and pages.
Stars: ✭ 56 (+16.67%)
Mutual labels:  streams
Aioreactive
Async/await reactive tools for Python 3.9+
Stars: ✭ 215 (+347.92%)
Mutual labels:  streams
Smallrye Mutiny
An Intuitive Event-Driven Reactive Programming Library for Java
Stars: ✭ 231 (+381.25%)
Mutual labels:  streams
html
HTML templating and streaming response library for Service Worker-like environments such as Cloudflare Workers.
Stars: ✭ 41 (-14.58%)
Mutual labels:  streams
Logrange
High performance data aggregating storage
Stars: ✭ 181 (+277.08%)
Mutual labels:  streams
quick-csv-streamer
Quick CSV Parser with Java 8 Streams API
Stars: ✭ 29 (-39.58%)
Mutual labels:  streams
Scrape Twitter
🐦 Access Twitter data without an API key. [DEPRECATED]
Stars: ✭ 166 (+245.83%)
Mutual labels:  streams
Kafka Ui
Open-Source Web GUI for Apache Kafka Management
Stars: ✭ 230 (+379.17%)
Mutual labels:  streams
JavaSE8-Features
Take a tour of the new features in Java SE 8, the platform designed to support faster and easier Java development. Learn about Project Lambda, a new syntax to support lambda expressions in Java code; the new Stream API for processing collections and managing parallel processing; the DateTime API for representing, managing and calculating date an…
Stars: ✭ 51 (+6.25%)
Mutual labels:  streams
streamplify
Java 8 combinatorics-related streams and other utilities
Stars: ✭ 40 (-16.67%)
Mutual labels:  streams
aurum
Fast and concise declarative DOM rendering library for javascript
Stars: ✭ 17 (-64.58%)
Mutual labels:  streams

async-stream-generator

Pipe ES6 Async Generators through Node.js Streams.

10 Second Tutorial

streamify is a function that takes an async generator function and when invoked, returns a Readable Stream.

const fs = require("fs");
const streamify = require("async-stream-generator");

async function* generator(stream) {
  for await (const chunk of stream) {
    yield chunk;
  }
}

const main = () => {
  const readStream = fs.createReadStream("path-to-data.json");
  streamify(generator(readStream)).pipe(process.stdout);
};

main();

What are Streams and Why Should I Care?

I/O in node is asynchronous. The early days of Node.js required interacting with the disk and network by passing callbacks to functions.

For example, here is code that serves up a file from disk:

const http = require("http");
const fs = require("fs");

const server = http.createServer((request, response) => {
  fs.readFile(__dirname + "/mock-data.json", (error, data) => {
    response.end(data);
  });
});
server.listen(8000);

This code works but it buffers up the entire file into memory for every request before writing the result back to clients. If the file is very large, your program could start eating a lot of memory as it serves lots of users concurrently, particularly for users on slow connections.

The user experience is poor too because users will need to wait for the whole file to be buffered into memory on your server before they can start receiving any content.

However, both request and response are streams.

const http = require("http");
const fs = require("fs");

const server = http.createServer((req, res) => {
  const stream = fs.createReadStream(__dirname + "/mock-data.json");
  stream.pipe(res);
});
server.listen(8000);

This is where Node.js shines. .pipe() will write to clients one chunk at at a time immediately as they are received from disk.

Using .pipe() has other benefits too, like handling backpressure automatically so that node won't buffer chunks into memory needlessly when the remote client is on a really slow or high-latency connection.

This is very much like what you might do on the command-line to pipe programs together except in node instead of the shell!

a | b | c | d

Once you learn the stream api, you can just snap together streaming modules like lego bricks instead of having to remember how to push data through non-streaming, custom APIs.

Streams make programming in node simple, elegant, and composable.

What are Async Iterators and Generators?

Previously to read the contents of a stream asynchronously, you used callbacks:

const fs = require("fs");

const main = inputFilePath => {
  const readStream = fs.createReadStream(inputFilePath, {
    encoding: "utf8",
    highWaterMark: 256
  });

  readStream.on("data", chunk => {
    console.log(">>> " + chunk);
    console.log("\n");
  });

  readStream.on("end", () => {
    console.log("### DONE ###");
  });
};

main("./mock-data.json");

As of Node.js v10, you can use asynchronous iteration to read the stream of a file, which enables the for-await-of syntax:

const fs = require("fs");

const main = async inputFilePath => {
  const readStream = fs.createReadStream(inputFilePath, {
    encoding: "utf8",
    highWaterMark: 256
  });

  for await (const chunk of readStream) {
    console.log(">>> " + chunk);
    console.log("\n");
  }

  console.log("### DONE ###");
};

main("./mock-data.json");

Output for both:

...

>>> ld":"Indonesia","customer_title":"Honorable"}
{"guid":"bf62800e-b3b1-46f2-a3f2-dc17c66c90a1","car_make":"Ford","car_model":"Bronco II","car_model_year":1986,"car_color":"Pink","car_country_cold":"Philippines","customer_title":"Rev"}
{"guid":"32a2f79b-5a0b-


>>> 4072-9ebb-0e3600d0f714","car_make":"Toyota","car_model":"RAV4","car_model_year":2001,"car_color":"Purple","car_country_cold":"China","customer_title":"Mr"}
{"guid":"6d52f031-c7e7-4167-81bc-e2879d6630e2","car_make":"Lexus","car_model":"SC","car_model_year":


>>> 1998,"car_color":"Teal","car_country_cold":"Russia","customer_title":"Rev"}



### DONE ###

You can use async generators to process input similiar to Unix piping. Generator functions use the async and function* keywords, consume an async iterator and use yield instead of return.

Example of Generator #1, which will process our chunks of data into lines:

async function* chunksToLines(chunks) {
  let previous = "";

  for await (const chunk of chunks) {
    previous += chunk;
    let eolIndex;

    while ((eolIndex = previous.indexOf("\n")) >= 0) {
      // this line includes the EOL
      const line = previous.slice(0, eolIndex + 1);
      yield line;
      previous = previous.slice(eolIndex + 1);
    }
  }

  if (previous.length > 0) {
    yield previous;
  }
}

Example of Generator #2, which will number each line

async function* numberOfLines(lines) {
  let counter = 1;
  for await (const line of lines) {
    yield counter + ": " + line;
    counter++;
  }
}

Now you can snap these generators together using function composition to stream the file to the console line by line.

The whole program will read in the file 256 bytes at a time (defined by highWaterMark). Break each chunk into lines, number them, print them, and repeat.

const printAsyncIterable = async numberedLines => {
  for await (const line of numberedLines) {
    console.log(line);
  }
};

const main = () => {
  const readStream = fs.createReadStream("./mock-data.json", {
    encoding: "utf8",
    highWaterMark: 256
  });
  printAsyncIterable(numberOfLines(chunksToLines(readStream)));
};

main();

Output

...
3999: {"guid":"32a2f79b-5a0b-4072-9ebb-0e3600d0f714","car_make":"Toyota","car_model":"RAV4","car_model_year":2001,"car_color":"Purple","car_country_cold":"China","customer_title":"Mr"}

4000: {"guid":"6d52f031-c7e7-4167-81bc-e2879d6630e2","car_make":"Lexus","car_model":"SC","car_model_year":1998,"car_color":"Teal","car_country_cold":"Russia","customer_title":"Rev"}

Where Async Generators Fall Short

These new tools are great for reading streams, however, it's still not clear how to write() to another stream or create a processing pipeline with pipe().

This was discussed here.

Enter this module.

Using the same generators from above, we can pipe() the results to a writeable stream.

const http = require("http");
const fs = require("fs");
const streamify = require("async-stream-generator");

const server = http.createServer(async (req, res) => {
  const readStream = fs.createReadStream("./mock-data.json", {
    encoding: "utf8",
    highWaterMark: 256
  });

  streamify(numberOfLines(chunksToLines(readStream))).pipe(res);
});

server.listen(8000);

References and Thank Yous

License

MIT

Note that the project description data, including the texts, logos, images, and/or trademarks, for each open source project belongs to its rightful owner. If you wish to add or remove any projects, please contact us at [email protected].