All Projects → mtrunkat → flowage

mtrunkat / flowage

Licence: Apache-2.0 license
Easy transformations and filtering for NodeJS object streams.

Programming Languages

javascript
184084 projects - #8 most used programming language

Projects that are alternatives of or similar to flowage

compressstream-explainer
Compression Streams Explained
Stars: ✭ 22 (-74.12%)
Mutual labels:  streams
RNiftyReg
An R interface to the NiftyReg medical image registration library
Stars: ✭ 32 (-62.35%)
Mutual labels:  transformations
labeledpipe
Lazypipe with labels.
Stars: ✭ 15 (-82.35%)
Mutual labels:  streams
muxrpc
lightweight multiplexed rpc
Stars: ✭ 96 (+12.94%)
Mutual labels:  streams
morphmorph
😱 Isomorphic transformations. Map, transform, filter, and morph your objects
Stars: ✭ 26 (-69.41%)
Mutual labels:  transformations
monogram
Aspect-oriented layer on top of the MongoDB Node.js driver
Stars: ✭ 76 (-10.59%)
Mutual labels:  streams
svelte-datagrid
Svelte data grid spreadsheet best best features and performance from excel
Stars: ✭ 48 (-43.53%)
Mutual labels:  filtering
.
A simple streaming library
Stars: ✭ 99 (+16.47%)
Mutual labels:  streams
notification-thing
Python-based implementation of Desktop Notifications Specification (notification-daemon)
Stars: ✭ 24 (-71.76%)
Mutual labels:  filtering
go-estimate
State estimation and filtering algorithms in Go
Stars: ✭ 98 (+15.29%)
Mutual labels:  filtering
sortboard
A small ES6 library for easy sorting and filtering of elements.
Stars: ✭ 29 (-65.88%)
Mutual labels:  filtering
spring-boot-jpa-rest-demo-filter-paging-sorting
Spring Boot Data JPA with Filter, Pagination and Sorting
Stars: ✭ 70 (-17.65%)
Mutual labels:  filtering
fetch
A fetch API polyfill for React Native with text streaming support.
Stars: ✭ 27 (-68.24%)
Mutual labels:  streams
polliwog
2D and 3D computational geometry library
Stars: ✭ 22 (-74.12%)
Mutual labels:  transformations
java-core
Collections of solutions for micro-tasks created while building modules as part of project. Also has very fun stuffs :)
Stars: ✭ 35 (-58.82%)
Mutual labels:  streams
Gridify
Easy and optimized way to apply Filtering, Sorting, and Pagination using text-based data.
Stars: ✭ 372 (+337.65%)
Mutual labels:  filtering
miniflux-sidekick
A sidekick for Miniflux, filter out items by regex or tags. Compatible with killfiles. 🔪
Stars: ✭ 34 (-60%)
Mutual labels:  filtering
go-streams
Stream Collections for Go. Inspired in Java 8 Streams and .NET Linq
Stars: ✭ 127 (+49.41%)
Mutual labels:  streams
JavaCertification
This is a full resource guide for my attempt to get Java 11 Certified
Stars: ✭ 67 (-21.18%)
Mutual labels:  streams
streams-workshop
A workshop on Node.js Streams
Stars: ✭ 176 (+107.06%)
Mutual labels:  streams

Flowage

npm version

Contents

Motivation

This package simplifies transformations and filtering of NodeJS object streams. Think about it as Underscore.js for streams.

The basic use case I faced many times was a transformation of a large number of JSON objects that are finally stored in some database. Transformation is the quick part but then you have to then chunk data in size allowed by your database to limit the number of queries and control the flow of the whole stream based on how fast you are able to save the transformed data.

Basic usage

const { Readable } = require('stream');
const Flowage =  require('flowage');

// Let's have some stream that will output a series of objects { n: 0 }, { n: 1 }, { n: 2 }, { n: 3 }, ...
const readable = new Readable({ objectMode: true });
let n = 0;
setInterval(() => readable.push({ n: n++ }), 1000);

// Pipe it thru Flowage() to get stream extended by helper methods.
const flowage = readable.pipe(new Flowage());

// Split the stream into a stream of odd objects and even objects and extend them with some field is='odd' or is='even'.
const oddStream = flowage
    .filter(obj => obj.n % 2)
    .map(obj => Object.assign({}, obj, { is: 'odd' }));

const evenStream = flowage
    .filter(obj => obj.n % 2 === 0)
    .map(obj => Object.assign({}, obj, { is: 'even' }));

// Then merge them back.
const mergedStream = oddStream.merge(evenStream);

// Chunk them by 100 records.
const chunkedStream = mergedStream.chunk(100);

// Save them to MongoDB in batches of 100 items with concurrency 2.
// This also corks the stream everytime the period when max concurrency is reached.
chunkedStream.onSeries(async (arrayOf100Items) => {
    await datase.collection('test').insert(arrayOf100Items);
}, { concurrency: 2 });

Reference

merge stream1.merge(stream2)

Returns stream containing values merged from 2 given streams. Merged stream ends when both streams ends.

const mergedStream = stream1.merge(stream2);

collect stream.collect()

Returns Promise that gets resolved when stream ends to an array of all the values.

const data = await stream.collect();

filter stream.filter(function)

Returns stream containing filtered values.

// Filter out even items from stream.
const filteredStream = stream.filter(val => val.index % 2 === 0);

chunk stream.chunk(length)

Returns stream where each item is an array given number of items from original stream.

// Chunk values into arrays of 10 items.
const chunkedStream = stream.chunk(10);

map stream.map(function)

Returns stream where original items are transformed using given function.

// Extend each object in the stream with `.foo = 'bar'` field.
const mappedStream = stream.map(val => Object.assign({}, val, { foo: 'bar' }));

omit stream.omit(field1, field2, ...)

Returns stream where given fields where omitted.

// Omit field1 and field2 from stream objects.
const resultingStream = stream.omit('field1', 'field2');

pick stream.pick(field1, field2, ...)

Returns stream where each item contains only the given fields.

// Pick only field1 and field2 from stream objects.
const resultingStream = stream.pick('field1', 'field2');

pluck stream.pluck(field);

Returns stream with given field picked from each item.

// Pick only field1 and field2 from stream objects.
const resultingStream = stream.pluck('field1');

uniq stream.uniq(field)

Returns stream containing only unique items based on given field. You need enough memory to keep a set of all unique values hashed using sha256.

// Filter unique items based on id field.
const uniquesStream = stream.uniq('id');

weakSort stream.weakSort(sortFunction, [bufferMinSize=75], [bufferMaxSize=100])

Returns stream containing values sorted using given function and floating buffer of a given size.

This method is helpful when only a few neighboring items may have the wrong order. This may happen for example when a client is pushing data into the storage via API with concurrency higher than 1 and the quests reach the server in the wrong order. Or the API has multiple redundant instances that may process the incoming requests with different speed.

This method uses a buffer for streamed items. Every time the buffer reaches bufferMaxSize gets sorted and bufferMaxSize - bufferMinSize items are outputted to the stream.

const sortFunction = (a, b) => a.index < b.index ? -1 : 1;
const sortedStream = stream.sort(sortFunction, 75, 100);

onSeries stream.onSeries(async function, [concurrency=1])

Returns a promise that gets resolved when given function gets finished for the last item of the stream.

Everytime the given concurrency is reached it pauses the stream.

// Store items in MongoDB with concurrency 10.
await stream.onSeries(async (item) => {
    await database.collection('items').insert(item);
}, 10);
Note that the project description data, including the texts, logos, images, and/or trademarks, for each open source project belongs to its rightful owner. If you wish to add or remove any projects, please contact us at [email protected].