autom8ter / Machine
Licence: apache-2.0
Machine is a zero dependency library for highly concurrent Go applications. It is inspired by errgroup.Group with extra bells & whistles
Stars: â 346
Programming Languages
go
31211 projects - #10 most used programming language
Projects that are alternatives of or similar to Machine
Bree
đĨ The best job scheduler for Node.js and JavaScript with cron, dates, ms, later, and human-friendly support. Works in Node v10+ and browsers, uses workers to spawn sandboxed processes, and supports async/await, retries, throttling, concurrency, and graceful shutdown. Simple, fast, and lightweight. Made for @ForwardEmail and @ladjs.
Stars: â 933 (+169.65%)
Mutual labels: concurrency, cron
Rxgo
Reactive Extensions for the Go language.
Stars: â 3,907 (+1029.19%)
Mutual labels: streaming, concurrency
Streamly
Beautiful Streaming, Concurrent and Reactive Composition (Haskell)
Stars: â 553 (+59.83%)
Mutual labels: concurrency, streaming
Go Concurrency
This repos has lots of Go concurrency, goroutine and channel usage and best practice examples
Stars: â 84 (-75.72%)
Mutual labels: concurrency, goroutine
workerpool
A workerpool that can get expanded & shrink dynamically.
Stars: â 55 (-84.1%)
Mutual labels: concurrency, goroutine
go-workshops
Go language basic workshops for devz
Stars: â 68 (-80.35%)
Mutual labels: concurrency, goroutine
Advanced Http4s
đ Code samples of advanced features of Http4s in combination with some features of Fs2 not often seen.
Stars: â 136 (-60.69%)
Mutual labels: concurrency, streaming
Akka
Build highly concurrent, distributed, and resilient message-driven applications on the JVM
Stars: â 11,938 (+3350.29%)
Mutual labels: concurrency, streaming
noroutine
Goroutine analogue for Node.js, spreads I/O-bound routine calls to utilize thread pool (worker_threads) using balancer with event loop utilization. đą
Stars: â 86 (-75.14%)
Mutual labels: concurrency, goroutine
traffic
Massively real-time traffic streaming application
Stars: â 25 (-92.77%)
Mutual labels: streaming, concurrency
Django Concurrency
Optimistic lock implementation for Django. Prevents users from doing concurrent editing.
Stars: â 327 (-5.49%)
Mutual labels: concurrency
Spscqueue
A bounded single-producer single-consumer wait-free and lock-free queue written in C++11
Stars: â 307 (-11.27%)
Mutual labels: concurrency
Chartjs Plugin Streaming
Chart.js plugin for live streaming data
Stars: â 310 (-10.4%)
Mutual labels: streaming
Umka Lang
Umka: a statically typed embeddable scripting language
Stars: â 308 (-10.98%)
Mutual labels: concurrency
Libconcurrent
Šī¸ Concurrent Programming Library (Coroutine) for C11
Stars: â 335 (-3.18%)
Mutual labels: concurrency
Swoole Src
đ Coroutine-based concurrency library for PHP
Stars: â 17,175 (+4863.87%)
Mutual labels: concurrency
Wipe Modules
đī¸ Easily remove the node_modules folder of non-active projects
Stars: â 304 (-12.14%)
Mutual labels: cron
Machine
import "github.com/autom8ter/machine/v2"
Machine is a zero dependency library for highly concurrent Go applications.
It is inspired by errgroup
.
Group
with extra bells & whistles:
- In memory Publish Subscribe for asynchronously broadcasting & consuming messages in memory
- Asynchronous worker groups similar to errgroup.Group
- Throttled max active goroutine count
- Asynchronous error handling(see
WithErrorHandler
to override default error handler) - Asynchronous cron jobs-
Cron()
Use Cases
Machine is meant to be completely agnostic and dependency free- its use cases are expected to be emergent. Really, it can be used anywhere goroutines are used.
Highly concurrent and/or asynchronous applications include:
-
gRPC streaming servers
-
websocket servers
-
pubsub servers
-
reverse proxies
-
cron jobs
-
custom database/cache
-
ETL pipelines
-
log sink
-
filesystem walker
-
code generation
// Machine is an interface for highly asynchronous Go applications
type Machine interface {
// Publish synchronously publishes the Message
Publish(ctx context.Context, msg Message)
// Subscribe synchronously subscribes to messages on a given channel, executing the given HandlerFunc UNTIL the context cancels OR false is returned by the HandlerFunc.
// Glob matching IS supported for subscribing to multiple channels at once.
Subscribe(ctx context.Context, channel string, handler MessageHandlerFunc, opts ...SubscriptionOpt)
// Go asynchronously executes the given Func
Go(ctx context.Context, fn Func)
// Cron asynchronously executes the given function on a timed interval UNTIL the context cancels OR false is returned by the CronFunc
Cron(ctx context.Context, interval time.Duration, fn CronFunc)
// Loop asynchronously executes the given function repeatedly UNTIL the context cancels OR false is returned by the LoopFunc
Loop(ctx context.Context, fn LoopFunc)
// Wait blocks until all active async functions(Loop, Go, Cron) exit
Wait()
// Close blocks until all active routine's exit(calls Wait) then closes all active subscriptions
Close()
}
Example
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
var (
m = machine.New()
results []string
mu sync.RWMutex
)
defer m.Close()
m.Go(ctx, func(ctx context.Context) error {
m.Subscribe(ctx, "accounting.*", func(ctx context.Context, msg machine.Message) (bool, error) {
mu.Lock()
results = append(results, fmt.Sprintf("(%s) received msg: %v\n", msg.GetChannel(), msg.GetBody()))
mu.Unlock()
return true, nil
})
return nil
})
m.Go(ctx, func(ctx context.Context) error {
m.Subscribe(ctx, "engineering.*", func(ctx context.Context, msg machine.Message) (bool, error) {
mu.Lock()
results = append(results, fmt.Sprintf("(%s) received msg: %v\n", msg.GetChannel(), msg.GetBody()))
mu.Unlock()
return true, nil
})
return nil
})
m.Go(ctx, func(ctx context.Context) error {
m.Subscribe(ctx, "human_resources.*", func(ctx context.Context, msg machine.Message) (bool, error) {
mu.Lock()
results = append(results, fmt.Sprintf("(%s) received msg: %v\n", msg.GetChannel(), msg.GetBody()))
mu.Unlock()
return true, nil
})
return nil
})
m.Go(ctx, func(ctx context.Context) error {
m.Subscribe(ctx, "*", func(ctx context.Context, msg machine.Message) (bool, error) {
mu.Lock()
results = append(results, fmt.Sprintf("(%s) received msg: %v\n", msg.GetChannel(), msg.GetBody()))
mu.Unlock()
return true, nil
})
return nil
})
<-time.After(1 * time.Second)
m.Publish(ctx, machine.Msg{
Channel: "human_resources.chat_room6",
Body: "hello world human resources",
})
m.Publish(ctx, machine.Msg{
Channel: "accounting.chat_room2",
Body: "hello world accounting",
})
m.Publish(ctx, machine.Msg{
Channel: "engineering.chat_room1",
Body: "hello world engineering",
})
m.Wait()
sort.Strings(results)
for _, res := range results {
fmt.Print(res)
}
// Output:
//(accounting.chat_room2) received msg: hello world accounting
//(accounting.chat_room2) received msg: hello world accounting
//(engineering.chat_room1) received msg: hello world engineering
//(engineering.chat_room1) received msg: hello world engineering
//(human_resources.chat_room6) received msg: hello world human resources
//(human_resources.chat_room6) received msg: hello world human resources
Extended Examples
All examples are < 500 lines of code(excluding code generation)
Note that the project description data, including the texts, logos, images, and/or trademarks,
for each open source project belongs to its rightful owner.
If you wish to add or remove any projects, please contact us at [email protected].