All Projects → picadoh → gostreamer

picadoh / gostreamer

Licence: other
Go example that uses channels to build an execution pipeline

Programming Languages

go
31211 projects - #10 most used programming language

Projects that are alternatives of or similar to gostreamer

Imageselector
图片选择器, 支持多图选择和图片预览
Stars: ✭ 62 (-17.33%)
Mutual labels:  lib
Weworkapi php
official lib of wework api
Stars: ✭ 225 (+200%)
Mutual labels:  lib
Mal4J
Java wrapper for the official MyAnimeList API
Stars: ✭ 23 (-69.33%)
Mutual labels:  lib
Yamaha Nodejs
A node module to control your yamaha receiver
Stars: ✭ 103 (+37.33%)
Mutual labels:  lib
Notion Js
🤯 Notion API
Stars: ✭ 136 (+81.33%)
Mutual labels:  lib
Mtproto Core
Telegram API JS (MTProto) client library for browser and nodejs
Stars: ✭ 242 (+222.67%)
Mutual labels:  lib
Xtd forms
Modern c++17 library to create native gui for Microsoft Windows, Apple macOS and Linux.
Stars: ✭ 25 (-66.67%)
Mutual labels:  lib
goroutines
It is an efficient, flexible, and lightweight goroutine pool. It provides an easy way to deal with concurrent tasks with limited resource.
Stars: ✭ 88 (+17.33%)
Mutual labels:  goroutines
React Scoped Css
CSS encapsulation solution for React
Stars: ✭ 214 (+185.33%)
Mutual labels:  lib
impromptu.nvim
Create prompts fast and easy
Stars: ✭ 39 (-48%)
Mutual labels:  lib
Dtkwidget
Deepin Toolkit, widget module for DDE look and feel
Stars: ✭ 112 (+49.33%)
Mutual labels:  lib
Vue Loaders
Vue + loaders.css
Stars: ✭ 127 (+69.33%)
Mutual labels:  lib
BEW-2.5-Strongly-Typed-Languages
💪 Learn and implement the design patterns and best practices that make Go a top choice at high-velocity startups like Lyft, Heroku, Docker, Medium, and more!
Stars: ✭ 14 (-81.33%)
Mutual labels:  goroutines
Appmetrics.js
A small (< 1kb) library for measuring things in your web app and reporting the results to Google Analytics.
Stars: ✭ 1,383 (+1744%)
Mutual labels:  lib
AltiumLibrary
Useful Altium pcb library (3D)
Stars: ✭ 33 (-56%)
Mutual labels:  lib
Easygo
基于Kotlin、OkHttp的声明式网络框架,像写HTML界面一样写网络调用代码
Stars: ✭ 40 (-46.67%)
Mutual labels:  lib
Stdlib
✨ Standard library for JavaScript and Node.js. ✨
Stars: ✭ 2,749 (+3565.33%)
Mutual labels:  lib
metalsmith-imagemin
Metalsmith plugin to minify images
Stars: ✭ 17 (-77.33%)
Mutual labels:  lib
tgcalls
Voice chats, private incoming and outgoing calls in Telegram for Developers
Stars: ✭ 408 (+444%)
Mutual labels:  lib
rawhttp
Raw HTTP client in Go for complete request control and customization.
Stars: ✭ 100 (+33.33%)
Mutual labels:  lib

Gostreamer allows one to compose building blocks in a processing pipeline. A building block can process inputs and generate outputs to the subsequent building blocks through channels. The level of paralellism of each block can also be controlled.

How does it work?

Configuration

Configuration is a simple map context that can be used to pass dynamic information to be used by the functions. It can be created by loading the properties from a properties file (file representing each property by a = format).

cfg := streamer.LoadProperties("samplepipeline.properties")

The samplepipeline.properties file could look as follows:

greeting=hello world

Collectors

A collector is a component responsible for gather information from a specific source and publish it into a channel in the form of a keyed message.

Example:

func TextCollector(name string, cfg streamer.Config, out chan streamer.Message) {
	out_message := streamer.NewMessage()
	out_message.Put("greeting", cfg.GetString("greeting"))
	out <- out_message
}

The above function publishes static "hello world" message keyed with "greeting". With this signature, this function can be used to build a collector, as below:

collector := streamer.NewCollector("collector", cfg, TextCollector)

Processors

A processor is responsible for consuming keyed messages from an input channel, do some processing and possibly publish more keyed messages into an output channel. The processor delivers each message from the input channel into a function for processing.

Example:

func WordExtractor(name string, cfg streamer.Config, input streamer.Message, out chan streamer.Message) {
	text, _ := input.Get("greeting").(string)

	words := strings.Split(text, " ")

	for _, word := range words {
		out_message := streamer.NewMessage()
		out_message.Put("word", word)
		out <- out_message
	}
}

The above function picks up messages keyed with "greeting" and splits the message by the single whitespace delimiter, then it publishes a single word to the output channel as a messaged keyed by "word". With this signature this function can be used to build a processor, as below:

extractor := streamer.NewProcessor("extractor", cfg, WordExtractor, <Demux>)

Another processor could be used to print each individual message to the output:

func WordPrinter(name string, cfg streamer.Config, input streamer.Message, out chan streamer.Message) {
	word, _ := input.Get("word").(string)

	// simulate some processing time
	time.Sleep(1 * time.Second)

	log.Println(word)
}

Demux

When creating a processor, one of the arguments is a Demux. The Demux is a special component that allows to build concurrent work inside a processor. It picks the processor input channel and demultiplexes into multiple output channels that will be each processed by a separate routine. A demux can be created as follows:

demux := streamer.NewIndexedChannelDemux(2, streamer.RandomIndex)

A Demux receives a parallelism hint. If possible, it will be run in parallel, depending on the parallelism that can be achieved in the underlying system.

The indexed channel demux is a default implementation that creates an array of output channels. The first argument is the parallelism hint, i.e, the number of channels and routines that will be created for each individual message picked up from the input. The second argument is a function that maps the input to a specific output channel.

This function should respect the following signature:

func <name>(fanOut int, input streamer.Message) int

The default streamer.RandomIndex functions randomly selects the output channel.

An example of a custom static mapping could be:

func StaticIndex(fanOut int, input streamer.Message) int {
	switch input.Get("word").(string) {
	case "hello":
		return 0
	default:
		return 1
	}
}

The above function gets an input message keyed with word and routes the word hello to channel at index 0 and every other word to channel at index 1.

Building the topology

The topology can be built by chaining the multiple components together, as in the following example:

// build the components
cfg, _ := streamer.LoadProperties("samplepipeline.properties")
collector := streamer.NewCollector("collector", cfg, TextCollector)
extractor := streamer.NewProcessor("extractor", cfg, WordExtractor, streamer.NewIndexedChannelDemux(1, streamer.RandomIndex))
printer := streamer.NewProcessor("printer", cfg, WordPrinter, streamer.NewIndexedChannelDemux(2, StaticIndex))

// execute pipeline
<-printer.Execute(
	extractor.Execute(
		collector.Execute()))

Sample Pipeline

Please refer to Sample Go Pipiline as full-running example that uses Gostreamer to read input text from a file or from a socket and processes the words, separating hashtags and counting them.

Build

$ go build streamer/*.go

Run tests

$ go test test/*.go
Note that the project description data, including the texts, logos, images, and/or trademarks, for each open source project belongs to its rightful owner. If you wish to add or remove any projects, please contact us at [email protected].