whitaker-io / Machine
Projects that are alternatives of or similar to Machine
Machine
is a library for creating data workflows. These workflows can be either very concise or quite complex, even allowing for cycles for flows that need retry or self healing mechanisms. It supports opentelemetry spans and metrics out of the box and supports building dynamic pipelines using native go plugins and hashicorp or yaegi based plugins by using the providers here.
Installation
Add the primary library to your project
go get -u github.com/whitaker-io/machine
Foundry is a tool used to generate new projects quickly
Components is a repository of different vertex implementations
Documentation
Example
Redis Subscription with basic receive
-> process
-> send
Stream
// logStore allows for running a cluster and handles communication
var logStore machine.LogStore
// pool is a redigo Pool for a redis cluster to read the stream from
// see also the Google Pub/Sub, Kafka, and SQS implementations
var pool *redigo.Pool
// publisher is a machine.Publisher used for sending data outside of the Stream
var publisher machine.Publisher
redisStream := redis.New(pool, logger)
// NewPipe creates a pipe in which you can run multiple streams
// the id is the instance identifier for the cluster
p := NewPipe(uuid.New().String(), nil, logStore, fiber.Config{
ReadTimeout: time.Second,
WriteTimeout: time.Second,
BodyLimit: 4 * 1024 * 1024,
DisableKeepalive: true,
})
// StreamSubscription takes an instance of machine.Subscription
// and a time interval in which to read
// the id here needs to be the same for all the nodes for the clustering to work
builder := p.StreamSubscription("unique_stream_id", redisStream, 5*time.Millisecond,
&Option{FIFO: boolP(false)},
&Option{Injectable: boolP(true)},
&Option{Metrics: boolP(true)},
&Option{Span: boolP(false)},
&Option{BufferSize: intP(0)},
).Builder()
builder.Map("unique_id2",
func(m Data) error {
var err error
// ...do some processing
return err
},
).
Publish("unique_id3", publisher)
// Run requires a context, the port to run the fiber.App,
// and the timeout for graceful shutdown
if err := p.Run(context.Background(), ":5000", 10 * time.Second); err != nil {
// Run will return an error in the case that
// one of the paths is not terminated (i.e. missing a Transmit)
panic(err)
}
🤝 Contributing
Contributions, issues and feature requests are welcome.
Feel free to check issues page if you want to contribute.
Check the contributing guide.
Author
👤 Jonathan Whitaker
- Twitter: @io_whitaker
- Github: @jonathan-whitaker
Show your support
Please ⭐️ this repository if this project helped you!
License
Machine is provided under the MIT License.
The MIT License (MIT)
Copyright (c) 2020 Jonathan Whitaker