All Projects → scramjetorg → Scramjet

scramjetorg / Scramjet

Licence: mit
Simple yet powerful live data computation framework

Programming Languages

javascript
184084 projects - #8 most used programming language
es6
455 projects

Projects that are alternatives of or similar to Scramjet

wise-river
Object streaming the way it should be.
Stars: ✭ 33 (-80.7%)
Mutual labels:  stream, promise
Node Fetch
A light-weight module that brings the Fetch API to Node.js
Stars: ✭ 7,176 (+4096.49%)
Mutual labels:  stream, promise
QuickImageFX
Simplifying image manipulation using GDI, Graphics32, OpenCV or Vampyre Imaging libraries
Stars: ✭ 41 (-76.02%)
Mutual labels:  stream, transformations
Paguro
Generic, Null-safe, Immutable Collections and Functional Transformations for the JVM
Stars: ✭ 231 (+35.09%)
Mutual labels:  stream, transformations
Bach
Compose your async functions with elegance.
Stars: ✭ 117 (-31.58%)
Mutual labels:  stream, promise
dot
distributed data sync with operational transformation/transforms
Stars: ✭ 73 (-57.31%)
Mutual labels:  stream, transformations
tish
A replacement of shell script with TypeScript, for those who love TypeScript and tired of writing shell script, aiming to emulate shell script in TypeScript.
Stars: ✭ 119 (-30.41%)
Mutual labels:  stream, promise
futura
Asynchronous Swift made easy. The project was made by Miquido. https://www.miquido.com/
Stars: ✭ 34 (-80.12%)
Mutual labels:  stream, promise
Node Promisepipe
Safely pipe node.js streams while capturing all errors to a single promise
Stars: ✭ 79 (-53.8%)
Mutual labels:  stream, promise
Write
Write data to the file system, creating any intermediate directories if they don't already exist. Used by flat-cache and many others!
Stars: ✭ 68 (-60.23%)
Mutual labels:  stream, promise
Xstream
An extremely intuitive, small, and fast functional reactive stream library for JavaScript
Stars: ✭ 2,259 (+1221.05%)
Mutual labels:  stream, reactive-programming
Azure Event Hubs Spark
Enabling Continuous Data Processing with Apache Spark and Azure Event Hubs
Stars: ✭ 140 (-18.13%)
Mutual labels:  stream, spark-streaming
Node Instagram
Instagram api client for node that support promises.
Stars: ✭ 185 (+8.19%)
Mutual labels:  stream, promise
mst-effect
💫 Designed to be used with MobX-State-Tree to create asynchronous actions using RxJS.
Stars: ✭ 19 (-88.89%)
Mutual labels:  stream, promise
Download
Download and extract files
Stars: ✭ 1,064 (+522.22%)
Mutual labels:  stream, promise
Easyreact
Are you confused by the functors, applicatives, and monads in RxSwift and ReactiveCocoa? It doesn't matter, the concepts are so complicated that not many developers actually use them in normal projects. Is there an easy-to-use way to use reactive programming? EasyReact is born for this reason.
Stars: ✭ 1,616 (+845.03%)
Mutual labels:  stream, transformations
Fpgo
Monad, Functional Programming features for Golang
Stars: ✭ 165 (-3.51%)
Mutual labels:  stream, reactive-programming
Metasync
Asynchronous Programming Library for JavaScript & Node.js
Stars: ✭ 164 (-4.09%)
Mutual labels:  promise
Rxjava2 Extras
Utilities for use with RxJava 2
Stars: ✭ 167 (-2.34%)
Mutual labels:  stream
Rapidbay
Self-hosted torrent video streaming service compatible with Chromecast and AppleTV deployable in the cloud
Stars: ✭ 163 (-4.68%)
Mutual labels:  stream

Scramjet Logo

Version 4

Master Build Status Develop Build Status FOSSA Status Known Vulnerabilities DeepScan grade

What does it do?

Scramjet is a fast, simple, functional reactive stream programming framework written on top of node.js object streams. The code is written by chaining functions that transform the streamed data, including well known map, filter and reduce and fully compatible with ES7 async/await. Thanks to it some built in optimizations scramjet is much faster and much much simpler than similar frameworks when using asynchronous operations.

The main advantage of scramjet is running asynchronous operations on your data streams. First of all it allows you to perform the transformations both synchronously and asynchronously by using the same API - so now you can "map" your stream from whatever source and call any number of API's consecutively. And if you're after some heavy maths there's an option of running your stream as multi-threaded!

The benchmarks are published in the scramjet-benchmark repo.

Example

How about a full API to API migration, reading a long list of items from one API and checking them one after another, pushing them to another API? With simultaneous request control? And outputting the log of the conversion? Easy!

const fetch = require("node-fetch");
const get = async (url, options = {}) => (await fetch(url, options)).json;
const { StringStream } = require("scramjet");

StringStream.from(                                 // fetch your API to a scramjet stream
    () => get("https://api.example.org/v1/shows/list")
)
    .setOptions({maxParallel: 4})                  // set your options
    .lines()                                       // split the stream by line
    .parse(line => {                               // parse strings to data
        const [id, title, url] = line.split(",");
        return { id, title, url };
    })
    .map(async myShow => get({                      // use asynchronous mapping (for example send requests)
        uri: `http://api.local/set/${myShow.id}`,
        body: JSON.stringify(myShow)
    }))
    .stringify(resp => `+ Updated "${resp}"`)
    .catch(err => `! Error occured ${err.uri}`)    // handle errors
    .append("\n")
    .pipe(process.stdout)                          // use any stream
;

Here you can find a most basic guide on how to execute the above example starting from just having access to some command line: Scramjet from Scratch

Usage

Scramjet uses functional programming to run transformations on your data streams in a fashion very similar to the well known event-stream node module. First create a stream from a source:

Use DataStream.from(someThing) to create a new stream from an Array, Generator, AsyncGenerator, Iterator or Readable stream. See the DataStream.from docs for more information, here's a sample.

/* global StringStream, fs */
StringStream
    .from(fs.createReadStream("./log.txt"))     // get from any readable stream
    .lines()                                 // split the stream by line
    .use("./your-file")                      // use some trasforms from another file
;

Use DataStream.pipeline(readable, transforms) to create a pipeline of transform streams and/or stream modules. Any number of consecutive arguments will get piped one into another.

/* global StringStream, fs, gzip */
StringStream
    .pipeline(                              // process a number of streams
        fs.createReadStream("./log.txt.gz"),
        gzip.unzip()                        // all errors here will get forwarded
    )
    .lines()                                // split the stream by line
    .use("./your-file")                     // use some trasforms from another file
;

Some methods like from, use, flatMap allow using ES6 generators and ES7 async generators:

const fetch = require("node-fetch");
const { StringStream } = require("scramjet");

StringStream
    .from(
        async function* () {                       // construct a stream from an async generator
            yield "houses\n";                      // yield - push a stream chunk
                                                   // yield - push a whole stream
            yield* (await fetch("https://example.org/categories")).body;
        },
        {maxParallel: 4}                           // set your options
    )
    .lines()                                       // split the stream by line
    .flatMap(async function* (category) {
        const req = await fetch(`https://example.org/posts/${category}/`);
        yield* await req.json();                   // yield - push a whole array
    })
    .catch(err => `! Error occured ${err.uri}`)
    .toStringStream()
    .append("\n")
    .pipe(process.stdout)   // pipe to any output
;

Most transformations are done by passing a transform function. You can write your function in three ways:

  1. Synchronous

Example: a simple stream transform that outputs a stream of objects of the same id property and the length of the value string.

DataStream
   .from(items)
   .map(
       (item) => ({id: item.id, length: item.value.length})
   )
  1. Asynchronous using ES2015 async await

Example: A simple stream that uses Fetch API to get all the contents of all entries in the stream

StringStream
    .from(urls)
    .map(
        async (url) => fetch(url).then(res => res.json())
    )
    .JSONParse()
  1. Asynchronous using Promises

Example: A simple stream that fetches an url mentioned in the incoming object

   datastream.map(
       (item) => new Promise((resolve, reject) => {
           request(item.url, (err, res, data) => {
               if (err)
                   reject(err); // will emit an "error" event on the stream
               else
                   resolve(data);
           });
       })
   )

The actual logic of this transform function is as if you passed your function to the then method of a Promise resolved with the data from the input stream.

  1. Streams with multi-threading

To distribute your code among the processor cores, just use the method distribute:

   datastream.distribute(
       16, // number of threads
       (stream) => {
           // multi-threaded code goes here.
           // it MUST return a valid stream back to the main thread.
       }
   )

Writing modules

Scramjet allows writing simple modules that are resolved in the same way as node's require. A module is a simple javascript file that exposes a function taking a stream and any number of following arguments as default export.

Here's an example:

module.exports = (stream, arg1) => {
    const mapper = (chunk) => mapper(chunk, arg1);
    return stream.map(mapper);
}

Then it can be used with DataStream.use function like this:

myStream.use("./path/to/my-module", "arg1");

If these modules are published you can also simply use myStream.use("published-module").

For more universal modules you can use helper methods createTransformModule and createReadModule that scramjet exports. See more in about this in this blog post Scramjet Modules.

Typescript support

Scramjet aims to be fully documented and expose TypeScript declarations. First version to include definitions in .d.ts folder is 4.15.0. More TypeScript support will be added with next versions, so feel free to report issues in GitHub.

Detailed docs

Here's the list of the exposed classes and methods, please review the specific documentation for details:

Note that:

  • Most of the methods take a Function argument that operates on the stream items.
  • The Function, unless it's stated otherwise, will receive an argument with the next chunk.
  • If you want to perform your operations asynchronously, return a Promise, otherwise just return the right value.

CLI

Check out the command line interface for simplified scramjet usage with scramjet-cli

$ sjr -i http://datasource.org/file.csv ./transform-module-1 ./transform-module-1 | gzip > logs.gz

Quick reference of some methods

:DataStream

DataStream is the primary stream type for Scramjet. When you parse your stream, just pipe it you can then perform calculations on the data objects streamed through your flow.

Use as:

const { DataStream } = require('scramjet');

await (DataStream.from(aStream) // create a DataStream
    .map(findInFiles)           // read some data asynchronously
    .map(sendToAPI)             // send the data somewhere
    .run());                    // wait until end

Detailed :DataStream docs here

Most popular methods:

:StringStream

A stream of string objects for further transformation on top of DataStream.

Example:

StringStream.from(async () => (await fetch('https://example.com/data/article.txt')).text())
    .lines()
    .append("\r\n")
    .pipe(fs.createWriteStream('./path/to/file.txt'))

Detailed :StringStream docs here

Most popular methods:

:BufferStream

A facilitation stream created for easy splitting or parsing buffers.

Useful for working on built-in Node.js streams from files, parsing binary formats etc.

A simple use case would be:

 fs.createReadStream('pixels.rgba')
     .pipe(new BufferStream)         // pipe a buffer stream into scramjet
     .breakup(4)                     // split into 4 byte fragments
     .parse(buffer => [
         buffer.readInt8(0),            // the output is a stream of R,G,B and Alpha
         buffer.readInt8(1),            // values from 0-255 in an array.
         buffer.readInt8(2),
         buffer.readInt8(3)
     ]);

Detailed :BufferStream docs here

Most popular methods:

:MultiStream

An object consisting of multiple streams than can be refined or muxed.

The idea behind a MultiStream is being able to mux and demux streams when needed.

Usage:

new MultiStream([...streams])
 .mux();

new MultiStream(function*(){ yield* streams; })
 .map(stream => stream.filter(myFilter))
 .mux();

Detailed :MultiStream docs here

Most popular methods:

:NumberStream

Simple scramjet stream that by default contains numbers or other containing with valueOf method. The streams provides simple methods like sum, average. It derives from DataStream so it's still fully supporting all map, reduce etc.

Detailed :NumberStream docs here

Most popular methods:

:WindowStream

A stream for moving window calculation with some simple methods.

In essence it's a stream of Array's containing a list of items - a window. It's best used when created by the `DataStream..window`` method.

Detailed :WindowStream docs here

Most popular methods:

:StreamWorker

StreamWorker class - intended for internal use

This class provides control over the subprocesses, including:

  • spawning
  • communicating
  • delivering streams

Detailed :StreamWorker docs here

Most popular methods:

Scramjet core

Don't like dependencies? Scramjet packs just a couple of those, but if you are really really annoyed by second depth of deps, please try scramjet-core.

Only the most vital methods there, but the library is dependency free.

License and contributions

As of version 2.0 Scramjet is MIT Licensed.

FOSSA Status

Help wanted

The project need's your help! There's lots of work to do - transforming and muxing, joining and splitting, browserifying, modularizing, documenting and issuing those issues.

If you want to help and be part of the Scramjet team, please reach out to us, on slack or email us: [email protected].

Note that the project description data, including the texts, logos, images, and/or trademarks, for each open source project belongs to its rightful owner. If you wish to add or remove any projects, please contact us at [email protected].