All Projects → vladimirvivien → Automi

vladimirvivien / Automi

Licence: apache-2.0
A stream processing API for Go (alpha)

Programming Languages

go
31211 projects - #10 most used programming language
golang
3204 projects

Projects that are alternatives of or similar to Automi

godsend
A simple and eloquent workflow for streaming messages to micro-services.
Stars: ✭ 15 (-97.57%)
Mutual labels:  stream-processing, streaming-api
mxfactorial
a payment application intended for deployment by the united states treasury
Stars: ✭ 36 (-94.17%)
Mutual labels:  stream-processing, streaming-api
Benthos
Fancy stream processing made operationally mundane
Stars: ✭ 3,705 (+500.49%)
Mutual labels:  stream-processing
Stream Json
The micro-library of Node.js stream components for creating custom JSON processing pipelines with a minimal memory footprint. It can parse JSON files far exceeding available memory streaming individual primitives using a SAX-inspired API.
Stars: ✭ 462 (-25.12%)
Mutual labels:  stream-processing
Awesome System Design
A curated list of awesome System Design (A.K.A. Distributed Systems) resources.
Stars: ✭ 4,999 (+710.21%)
Mutual labels:  stream-processing
Smooks
An extensible Java framework for building XML and non-XML streaming applications
Stars: ✭ 293 (-52.51%)
Mutual labels:  stream-processing
Xchange Stream
XChange-stream is a Java library providing a simple and consistent streaming API for interacting with Bitcoin and other crypto currency exchanges via WebSocket protocol. It is build on top of of XChange library providing new interfaces for streaming API. User can subscribe for live updates via reactive streams of RxJava library.
Stars: ✭ 402 (-34.85%)
Mutual labels:  streaming-api
JPStream
JPStream: JSONPath Stream Processing in Parallel
Stars: ✭ 19 (-96.92%)
Mutual labels:  stream-processing
Kafka Streams
equivalent to kafka-streams 🐙 for nodejs ✨🐢🚀✨
Stars: ✭ 613 (-0.65%)
Mutual labels:  stream-processing
Json Machine
Efficient, easy-to-use, and fast PHP JSON stream parser
Stars: ✭ 376 (-39.06%)
Mutual labels:  stream-processing
Hazelcast
Open-source distributed computation and storage platform
Stars: ✭ 4,662 (+655.59%)
Mutual labels:  stream-processing
Swim
Distributed software platform for building stateful, massively real-time streaming applications.
Stars: ✭ 368 (-40.36%)
Mutual labels:  streaming-api
Yomo
🦖 Streaming-Serverless Framework for Low-latency Edge Computing applications, running atop QUIC protocol, engaging 5G technology.
Stars: ✭ 279 (-54.78%)
Mutual labels:  stream-processing
Kasper
Kasper is a lightweight library for processing Kafka topics.
Stars: ✭ 413 (-33.06%)
Mutual labels:  stream-processing
Google Drive Player Script
Grab google drive streaming links (redirector.googlevideo.com/videoplayback?..)
Stars: ✭ 283 (-54.13%)
Mutual labels:  streaming-api
Swell
Swell: API development tool that enables developers to test endpoints served over streaming technologies including Server-Sent Events (SSE), WebSockets, HTTP2, GraphQL, and gRPC.
Stars: ✭ 517 (-16.21%)
Mutual labels:  streaming-api
kerala
Distributed KV Streams
Stars: ✭ 16 (-97.41%)
Mutual labels:  stream-processing
Bistro
A general-purpose data analysis engine radically changing the way batch and stream data is processed
Stars: ✭ 333 (-46.03%)
Mutual labels:  stream-processing
Awesome Kafka
A list about Apache Kafka
Stars: ✭ 397 (-35.66%)
Mutual labels:  stream-processing
Go Streams
A lightweight stream processing library for Go
Stars: ✭ 615 (-0.32%)
Mutual labels:  stream-processing

automi

A data stream processing API for Go (alpha)


GoDoc Build Status

Automi is an API for processing streams of data using idiomatic Go. Using Automi, programs can process streaming of data chunks by composing stages of operations that are applied to each element of the stream.

Concept

Automi streaming concepts


The Automi API expresses a stream with four primitives including:

  • An emitter: an in-memory, network, or file resource that can emit elements for streaming
  • The stream: represents a conduit whithin which data elements are streamed
  • Stream operations: code which can be attached to the stream to process streamed elements
  • A collector: an in-memory, network, or file resource that can collect streamed data.

Automi streams use Go channels internally to route data. This means Automi streams automatically support features such as buffering, automatic back-pressure queuing, and concurrency safety.

Using Automi

Now, let us explore some examples to see how easy it is to use Automi to stream and process data.

See all examples in the ./example directory.

Example: streaming from a slice into stdout

This first example shows how easy it is to compose and express stream operations with Automi. In this example, rune values are emitted from a slice and are streamed invidividually. Stream operator method Filter is applied to filter out unwanted rune values and the Sort operator method sorts the remaining items. Lastly, a collector is used to collect the result into an io.Writer and piped to stdout.

func main() {
	strm := stream.New([]rune("B世!ぽ@opqDQRS#$%^&*()ᅖ...O6PTnVWXѬYZbcef7ghijCklrAstvw"))

	strm.Filter(func(item rune) bool {
		return item >= 65 && item < (65+26)
	}).Map(func(item rune) string {
		return string(item) 
	}).Batch().Sort() 
	strm.Into(collectors.Writer(os.Stdout))

	if err := <-strm.Open(); err != nil {
		fmt.Println(err)
		return
	}
}

See the full source code.

How it works
  1. Create the stream with an emitter source. Automi supports several types of sources including channels, io.Reader, slices, etc. (see list of emitters below). Each element in the slice will be streamed individually.
strm := stream.New([]rune(`B世!ぽ@opqDQRS#$%^&*()ᅖ...O6PTnVWXѬYZbcef7ghijCklrAstvw`))
  1. Apply user-provided or built-in stream operations as shown below:
strm.Filter(func(item rune) bool {
    return item >= 65 && item < (65+26)
}).Map(func(item rune) string {
    return string(item)
}).Batch().Sort()
  1. Collect the result. In this example, the result is collected into an io.Writer which further streams the data into standard output:
strm.Into(collectors.Writer(os.Stdout))
  1. Lastly, open the stream once it is properly composed:
if err := <-strm.Open(); err != nil {
    fmt.Println(err)
    return
}  

Example: streaming from an io.Reader into collector function

The next example shows how to use Automi to stream data from an io.Reader emitting buffered string values from an in-memory source in 50-byte chunks. The data is processed with a Map and Filter opertor methods and the result is sent to a user-provided collector function which prints the result.

func main() {
	data := `"request", "/i/a", "00:11:51:AA", "accepted"
"response", "/i/a/", "00:11:51:AA", "served"
"response", "/i/a", "00:BB:22:DD", "served"...`

 	reader := strings.NewReader(data)
    
	// create stream from a buffered io.Reader emitter,
	// emitting 50-byte chunks.
	stream := stream.New(emitters.Reader(reader).BufferSize(50))
	stream.Map(func(chunk []byte) string {
		str := string(chunk)
		return str
	})
	stream.Filter(func(e string) bool {
		return (strings.Contains(e, `"response"`))
	})
	stream.Into(collectors.Func(func(data interface{}) error {
		e := data.(string)
		fmt.Println(e)
		return nil
	}))

	if err := <-stream.Open(); err != nil {
		fmt.Println(err)
		return
	}
}

See complete example here.

Example: streaming using CSV files

The following example streams data from a CSV source file. Each row is mapped to a custom type, filtered, then mapped to a slice of strings which is then collected into another CSV file.

type scientist struct {
	FirstName string
	LastName  string
	Title     string
	BornYear  int
}

func main() {
    // creates a stream using a CSV emitter
    // emits each row as []string
    stream := stream.New("./data.txt")

    // Map each CSV row, []string, to type scientist
    stream.Map(func(cs []string) scientist {
        yr, _ := strconv.Atoi(cs[3])
        return scientist{
            FirstName: cs[1],
            LastName:  cs[0],
            Title:     cs[2],
            BornYear:  yr,
        }
    })
    stream.Filter(func(cs scientist) bool {
        return (cs.BornYear > 1930)
    })
    stream.Map(func(cs scientist) []string {
        return []string{cs.FirstName, cs.LastName, cs.Title}
    })
    stream.Into("./result.txt")

    // open the stream
    if err := <-stream.Open(); err != nil {
        fmt.Println(err)
        os.Exit(1)
    }
    fmt.Println("wrote result to file result.txt")
}

See complete example here.

Example: streaming HTTP requests and responses

The following example shows how to use Automi to stream and process data using HTTP requests and responses. The following HTTP server program streams data from the request Body, encodes it using base64, and streams the result into the HTTP response:

func main() {

	http.HandleFunc(
		"/",
		func(resp http.ResponseWriter, req *http.Request) {
			resp.Header().Add("Content-Type", "text/html")
			resp.WriteHeader(http.StatusOK)

			strm := stream.New(req.Body)
			strm.Process(func(data []byte) string {
				return base64.StdEncoding.EncodeToString(data)
			}).Into(resp)

			if err := <-strm.Open(); err != nil {
				resp.WriteHeader(http.StatusInternalServerError)
				log.Printf("Stream error: %s", err)
			}
		},
	)

	log.Println("Server listening on :4040")
	http.ListenAndServe(":4040", nil)
}

See complete example here.

Streaming gRPC service payload

The following example shows how to use Automi to stream data items from a gRPC streaming sevice. The following gRPC client setups an Automi emitter to emit time values that are streamed from a gRPC time service:

// setup an Automi emitter function to stream from the gRPC service
func emitStreamFrom(client pb.TimeServiceClient) <-chan []byte {
	source := make(chan []byte)
	timeStream, err := client.GetTimeStream(context.Background(), &pb.TimeRequest{Interval: 3000})
	...
	go func(stream pb.TimeService_GetTimeStreamClient, srcCh chan []byte) {
		defer close(srcCh)
		for {
			t, err := stream.Recv()
			srcCh <- t.Value
		}
	}(timeStream, source)

	return source
}

func main() {
	...
	client := pb.NewTimeServiceClient(conn)
	// create automi stream
	stream := stream.New(emitStreamFrom(client))
	stream.Map(func(item []byte) time.Time {
		secs := int64(binary.BigEndian.Uint64(item))
		return time.Unix(int64(secs), 0)
	})
	stream.Into(collectors.Func(func(item interface{}) error {
		time := item.(time.Time)
		fmt.Println(time)
		return nil
	}))

	// open the stream
	if err := <-stream.Open(); err != nil {
		fmt.Println(err)
		os.Exit(1)
	}

}

See complete example here.

More Examples

Examples - View a long list of examples that cover all aspects of using Automi.

Automi components

Automi comes with a set of built-in components to get you started with stream processing including the followings.

Emitters

  • Channel
  • CSV
  • io.Reader
  • io.Scanner
  • Slice

Operators

  • Stream.Filter
  • Stream.Map
  • Stream.FlatMap
  • Stream.Reduce
  • Stream.GroupByKey
  • Stream.GroupByName
  • Stream.GroupByPos
  • Stream.Sort
  • Stream.SortByKey
  • Stream.SortByName
  • Stream.SortByPos
  • Stream.SortWith
  • Stream.Sum
  • Stream.SumByKey
  • Stream.SumByName
  • Stream.SumByPos
  • Stream.SumAllKeys

Collectors

  • CSV
  • Func
  • Null
  • Slice
  • Writer

Licence

Apache 2.0

Note that the project description data, including the texts, logos, images, and/or trademarks, for each open source project belongs to its rightful owner. If you wish to add or remove any projects, please contact us at [email protected].