All Projects → oze4 → workerpoolxt

oze4 / workerpoolxt

Licence: MIT license
Concurrency limiting goroutine pool without upper limit on queue length. Extends github.com/gammazero/workerpool

Programming Languages

go
31211 projects - #10 most used programming language
shell
77523 projects

Projects that are alternatives of or similar to workerpoolxt

wasm-bindgen-rayon
An adapter for enabling Rayon-based concurrency on the Web with WebAssembly.
Stars: ✭ 257 (+1613.33%)
Mutual labels:  concurrency, multithreading
Sobjectizer
An implementation of Actor, Publish-Subscribe, and CSP models in one rather small C++ framework. With performance, quality, and stability proved by years in the production.
Stars: ✭ 172 (+1046.67%)
Mutual labels:  concurrency, multithreading
Important Java Concepts
🚀 Complete Java - A to Z ║ 📚 Notes and Programs of all Important Concepts of Java - OOPS, Data Structures, Algorithms, Design Patterns & Development + Kotlin + Android 🔥
Stars: ✭ 135 (+800%)
Mutual labels:  concurrency, multithreading
Left Right
A lock-free, read-optimized, concurrency primitive.
Stars: ✭ 1,245 (+8200%)
Mutual labels:  concurrency, multithreading
workerpool
A workerpool that can get expanded & shrink dynamically.
Stars: ✭ 55 (+266.67%)
Mutual labels:  concurrency, worker-pool
Go Concurrency
This repos has lots of Go concurrency, goroutine and channel usage and best practice examples
Stars: ✭ 84 (+460%)
Mutual labels:  concurrency, multithreading
Lightio
LightIO is a userland implemented green thread library for ruby
Stars: ✭ 165 (+1000%)
Mutual labels:  concurrency, multithreading
Concurrencpp
Modern concurrency for C++. Tasks, executors, timers and C++20 coroutines to rule them all
Stars: ✭ 340 (+2166.67%)
Mutual labels:  concurrency, multithreading
Yew
Yew is a modern Rust framework for creating multi-threaded front-end web apps with WebAssembly.
Stars: ✭ 18,243 (+121520%)
Mutual labels:  concurrency, multithreading
Pht
A new threading extension for PHP
Stars: ✭ 175 (+1066.67%)
Mutual labels:  concurrency, multithreading
Evenk
A C++ library for concurrent programming
Stars: ✭ 68 (+353.33%)
Mutual labels:  concurrency, multithreading
thread-pool
BS::thread_pool: a fast, lightweight, and easy-to-use C++17 thread pool library
Stars: ✭ 1,043 (+6853.33%)
Mutual labels:  concurrency, multithreading
Javamtp
《Java多线程编程实战指南(设计模式篇)》源码
Stars: ✭ 575 (+3733.33%)
Mutual labels:  concurrency, multithreading
So 5 5
SObjectizer: it's all about in-process message dispatching!
Stars: ✭ 87 (+480%)
Mutual labels:  concurrency, multithreading
Hamsters.js
100% Vanilla Javascript Multithreading & Parallel Execution Library
Stars: ✭ 517 (+3346.67%)
Mutual labels:  concurrency, multithreading
Chymyst Core
Declarative concurrency in Scala - The implementation of the chemical machine
Stars: ✭ 142 (+846.67%)
Mutual labels:  concurrency, multithreading
trading sim
📈📆 Backtest trading strategies concurrently using historical chart data from various financial exchanges.
Stars: ✭ 21 (+40%)
Mutual labels:  concurrency, multithreading
Object threadsafe
We make any object thread-safe and std::shared_mutex 10 times faster to achieve the speed of lock-free algorithms on >85% reads
Stars: ✭ 280 (+1766.67%)
Mutual labels:  concurrency, multithreading
Java Concurrency Examples
Java Concurrency/Multithreading Tutorial with Examples for Dummies
Stars: ✭ 173 (+1053.33%)
Mutual labels:  concurrency, multithreading
noroutine
Goroutine analogue for Node.js, spreads I/O-bound routine calls to utilize thread pool (worker_threads) using balancer with event loop utilization. 🌱
Stars: ✭ 86 (+473.33%)
Mutual labels:  concurrency, multithreading

workerpoolxt GitHub

GitHub Workflow Status Coveralls github
Codacy grade

Worker pool library that extends https://github.com/gammazero/workerpool

Note

If you are using context, please use with caution in production!! See here for more info

  • Go Playground which shows the area of concern

  • Wonderful explanation from the author of workerpool:

Once a goroutine is started, there no way to kill it unless there is something inside that goroutine that is looking for a signal, waiting on a context or channel, etc. In the case of workerpool, and workerpoolxt, the pool is providing the goroutine and calling someone else's function in that goroutine. There is no telling if that function will ever return and allow your goroutine to finish or run another task. The only thing that workerpoolxt can do is to run the task function in another goroutine and wait for the task to finish or for a timer to expire or some other signal to give up waiting (such as context being canceled). When this signal to give up happens, the worker goroutine can return and report an error, but the task goroutine is still running somewhere in the background and may or may not ever finish. All you have done is given up waiting for it to do so.

- @gammazero github.com/gammazero

Synopsis


Hello World

  • Obligatory "as simple as it gets" example
package main

import (
    "context"
    "fmt"
    wpxt "github.com/oze4/workerpoolxt"
)

func main() {
    ctx := context.Background()
    numWorkers := 10

    wp := wpxt.New(ctx, numWorkers)

    wp.SubmitXT(wpxt.Job{
        Name: "My first job",
        Task: func(o wpxt.Options) wpxt.Result {
            return wpxt.Result{Data: "Hello, world!"}
        },
    })

    jobResults := wp.StopWaitXT()

    for _, jobresult := range jobResults {
        fmt.Println(jobresult)
    }
}

How we extend workerpool

Results

// ...
// ... pretend we submitted jobs here
// ...

results := wp.StopWaitXT() // -> []wpxt.Result

for _, result := range results {
    // If job failed, `result.Error != nil`
}

Error Handling

  • What if I encounter an error in one of my jobs?
  • How can I handle or check for errors/timeout?

Return Error From Job

// Just set the `Error` field on the `wpxt.Result` you return
wp.SubmitXT(wpxt.Job{
    Name: "How to handle errors",
    Task: func(o wpxt.Options) wpxt.Result {
        // Pretend we got an error doing something
        if theError != nil {
            return wpxt.Result{Error: theError}
        }
    },
})

Check For Errors In Result

// ... pretend we submitted a bunch of jobs
//
// StopWaitXT() returns []wpxt.Result
// Each result has an `Error` field
// Whether a timeout, or an error you set
// Check for it like
if someResultFromSomeJob.Error != nil {
    // ....
}

Context

  • Required default context when creating new workerpoolxt
  • You can override default context per job

Default Context

myctx := context.Background() // Any `context.Context`
numWorkers := 10
wp := wpxt.New(myctx, numWorkers)

Per Job Context

Timeouts

defaultCtx := context.Background()
numWorkers := 10
wp := wpxt.New(defaultCtx, numWorkers)
timeout := time.Duration(time.Millisecond)

myCtx, done := context.WithTimeout(context.Background(), timeout)
defer done()

wp.SubmitXT(wpxt.Job{
    Name: "my ctx job",
    Context: myCtx,
    Task: func(o wpxt.Options) wpxt.Result {
        // Simulate long running task
        time.Sleep(time.Second*10) 
        return wpxt.Result{Data: "I could be anything"}
    },
})
// > `Result.Error` will be `context.DeadlineExceeded`

Retry

  • Optional
  • Seamlessly retry failed jobs
wp.SubmitXT(wpxt.Job{
    // This job is configured to fail immediately, 
    // therefore it will retry 5 times
    // (as long as we have not exceeded our job timeout)
    timeoutctx, _ := context.WithTimeout(context.Background(), time.Duration(time.Millisecond*500))
    Retry: 5,
    // ^^^^^^
    Name: "I will retry 5 times",
    // Set timeout field on job
    Context: timeoutctx,
    Task: func(o wpxt.Options) wpxt.Result {
        return wpxt.Result{Error: errors.New("some_err")}
    },
})

Options

  • Help make jobs flexible

Default Options

myopts := map[string]interface{}{
    "myclient": &http.Client{},
}

wp := wpxt.New(context.Background(), 10)
wp.WithOptions(myopts)

wp.SubmitXT(wpxt.Job{
    Name: "myjob",
    Task: func(o wpxt.Options) wpxt.Result {
        // access options here
        client := o["myclient"]
    },
})

Per Job Options

myhttpclient := &http.Client{}
myk8sclient := kubernetes.Clientset{}

// This Job Only Needs an HTTP Client
wp.SubmitXT(wpxt.Job{
    Name: "This Job Only Needs an HTTP Client",
    Options: map[string]interface{}{
        "http": myhttpclient,
    },
    Task: func(o wpxt.Options) wpxt.Result {
        // access options here
        httpclient := o["http"]
        // ... do work with `httpclient`
    },
})

// This Job Only Needs Kubernetes Clientset
wp.SubmitXT(wpxt.Job{
    Name: "This Job Only Needs Kubernetes Clientset",
    Options: map[string]interface{}{
        "kube": myk8sclient,
    },
    Task: func(o wpxt.Options) wpxt.Result {
        // access options here
        kubernetesclient := o["kube"]
        // ... do work with `kubernetesclient`
    },
})
Note that the project description data, including the texts, logos, images, and/or trademarks, for each open source project belongs to its rightful owner. If you wish to add or remove any projects, please contact us at [email protected].