All Projects → vmihailenco → Taskq

vmihailenco / Taskq

Licence: bsd-2-clause
Golang asynchronous task/job queue with Redis, SQS, IronMQ, and in-memory backends

Programming Languages

go
31211 projects - #10 most used programming language
golang
3204 projects

Projects that are alternatives of or similar to Taskq

orkid-node
Reliable and modern Redis Streams based task queue for Node.js 🤖
Stars: ✭ 61 (-89.01%)
Mutual labels:  queue, message-queue, task-queue
Redis Smq
A simple high-performance Redis message queue for Node.js.
Stars: ✭ 230 (-58.56%)
Mutual labels:  redis, queue, message-queue
Rsmq
Redis Simple Message Queue
Stars: ✭ 1,556 (+180.36%)
Mutual labels:  redis, queue, message-queue
Yii Queue
Queue extension for Yii 3.0
Stars: ✭ 38 (-93.15%)
Mutual labels:  sqs, redis, queue
Huey
a little task queue for python
Stars: ✭ 3,761 (+577.66%)
Mutual labels:  redis, queue, task-queue
Simpleue
PHP queue worker and consumer - Ready for AWS SQS, Redis, Beanstalkd and others.
Stars: ✭ 124 (-77.66%)
Mutual labels:  sqs, redis, queue
Flower
Real-time monitor and web admin for Celery distributed task queue
Stars: ✭ 5,036 (+807.39%)
Mutual labels:  redis, task-queue
QueueBundle
QueueBundle for Symfony Framework
Stars: ✭ 40 (-92.79%)
Mutual labels:  queue, sqs
jrsmq
A lightweight message queue for Java that requires no dedicated queue server. Just a Redis server.
Stars: ✭ 28 (-94.95%)
Mutual labels:  queue, message-queue
Redisson
Redisson - Redis Java client with features of In-Memory Data Grid. Over 50 Redis based Java objects and services: Set, Multimap, SortedSet, Map, List, Queue, Deque, Semaphore, Lock, AtomicLong, Map Reduce, Publish / Subscribe, Bloom filter, Spring Cache, Tomcat, Scheduler, JCache API, Hibernate, MyBatis, RPC, local cache ...
Stars: ✭ 17,972 (+3138.2%)
Mutual labels:  redis, queue
theeye-of-sauron
TheEye Dockers and QuickStart
Stars: ✭ 27 (-95.14%)
Mutual labels:  queue, task-queue
psched
Priority-based Task Scheduling for Modern C++
Stars: ✭ 59 (-89.37%)
Mutual labels:  queue, task-queue
Saea
SAEA.Socket is a high-performance IOCP framework TCP based on dotnet standard 2.0; Src contains its application test scenarios, such as websocket,rpc, redis driver, MVC WebAPI, lightweight message server, ultra large file transmission, etc. SAEA.Socket是一个高性能IOCP框架的 TCP,基于dotnet standard 2.0;Src中含有其应用测试场景,例如websocket、rpc、redis驱动、MVC WebAPI、轻量级消息服务器、超大文件传输等
Stars: ✭ 318 (-42.7%)
Mutual labels:  redis, queue
queue
A task queue library for Go.
Stars: ✭ 26 (-95.32%)
Mutual labels:  queue, task-queue
wtsqs
Simplified Node AWS SQS Worker Wrapper
Stars: ✭ 18 (-96.76%)
Mutual labels:  queue, sqs
dynamic-queue
The dynamic queue
Stars: ✭ 17 (-96.94%)
Mutual labels:  queue, message-queue
celery.node
Celery task queue client/worker for nodejs
Stars: ✭ 164 (-70.45%)
Mutual labels:  queue, task-queue
Xxl Mq
A lightweight distributed message queue framework.(分布式消息队列XXL-MQ)
Stars: ✭ 358 (-35.5%)
Mutual labels:  queue, message-queue
Lmstfy
Implements task queue in Golang which based on Redis storage
Stars: ✭ 373 (-32.79%)
Mutual labels:  redis, task-queue
Mail
Library to send e-mails over different transports and protocols (like SMTP and IMAP) using immutable messages and streams. Also includes SMTP server.
Stars: ✭ 399 (-28.11%)
Mutual labels:  redis, queue

Golang asynchronous task/job queue with Redis, SQS, IronMQ, and in-memory backends

Build Status GoDoc

Installation

taskq supports 2 last Go versions and requires a Go version with modules support. So make sure to initialize a Go module:

go mod init github.com/my/repo

And then install taskq/v3 (note v3 in the import; omitting it is a popular mistake):

go get github.com/vmihailenco/taskq/v3

Features

  • Redis, SQS, IronMQ, and in-memory backends.
  • Automatically scaling number of goroutines used to fetch (fetcher) and process messages (worker).
  • Global rate limiting.
  • Global limit of workers.
  • Call once - deduplicating messages with same name.
  • Automatic retries with exponential backoffs.
  • Automatic pausing when all messages in queue fail.
  • Fallback handler for processing failed messages.
  • Message batching. It is used in SQS and IronMQ backends to add/delete messages in batches.
  • Automatic message compression using snappy / s2.

Quickstart

I recommend that you split your app into the two parts:

  • An API that accepts requests from customers and adds tasks to the queues.
  • A Worker that fetches tasks from the queues and processes them.

This way you can:

  • Isolate API and worker from each other.
  • Scale API and worker separately.
  • Have different configs for API and worker (like timeouts).

There is an api_worker example that demonstrates this approach using Redis as a backend:

cd example/api_worker
go run worker/worker.go
go run api/api.go

You start by choosing a backend to use - in our case Redis:

package api_worker

var QueueFactory = redisq.NewFactory()

Using that factory you create a queue that contains tasks:

var MainQueue = QueueFactory.RegisterQueue(&taskq.QueueOptions{
    Name:  "api-worker",
    Redis: Redis, // go-redis client
})

Using the queue you create a task with handler that does some useful work:

var CountTask = taskq.RegisterTask(&taskq.TaskOptions{
    Name: "counter",
    Handler: func() error {
        IncrLocalCounter()
        return nil
    },
})

Then in an API binary you use tasks to add messages/jobs to queues:

ctx := context.Background()
for {
    // call task handler without any args
    err := api_worker.MainQueue.Add(api_worker.CountTask.WithArgs(ctx))
    if err != nil {
        log.Fatal(err)
    }
}

And in a worker binary you start processing queues:

err := api_worker.MainQueue.Start(context.Background())
if err != nil {
    log.Fatal(err)
}

API overview

t := myQueue.RegisterTask(&taskq.TaskOptions{
    Name:    "greeting",
    Handler: func(name string) error {
        fmt.Println("Hello", name)
        return nil
    },
})

// Say "Hello World".
err := myQueue.Add(t.WithArgs(context.Background(), "World"))
if err != nil {
    panic(err)
}

// Say "Hello World" with 1 hour delay.
msg := t.WithArgs(ctx, "World")
msg.Delay = time.Hour
_ = myQueue.Add(msg)

// Say "Hello World" once.
for i := 0; i < 100; i++ {
    msg := t.WithArgs(ctx, "World")
    msg.Name = "hello-world" // unique
    _ = myQueue.Add(msg)
}

// Say "Hello World" once with 1 hour delay.
for i := 0; i < 100; i++ {
    msg := t.WithArgs(ctx, "World")
    msg.Name = "hello-world"
    msg.Delay = time.Hour
    _ = myQueue.Add(msg)
}

// Say "Hello World" once in an hour.
for i := 0; i < 100; i++ {
    msg := t.WithArgs(ctx, "World").OnceInPeriod(time.Hour)
    _ = myQueue.Add(msg)
}

// Say "Hello World" for Europe region once in an hour.
for i := 0; i < 100; i++ {
    msg := t.WithArgs(ctx, "World").OnceInPeriod(time.Hour, "World", "europe")
    _ = myQueue.Add(msg)
}

Message deduplication

If a Message has a Name then this will be used as unique identifier and messages with the same name will be deduplicated (i.e. not processed again) within a 24 hour period (or possibly longer if not evicted from local cache after that period). Where Name is omitted then non deduplication occurs and each message will be processed. Task's WithMessage and WithArgs both produces messages with no Name so will not be deduplicated. OnceWithArgs sets a name based off a consistent hash of the arguments and a quantised period of time (i.e. 'this hour', 'today') passed to OnceWithArgs a period. This guarantees that the same function will not be called with the same arguments during `period'.

Handlers

A Handler and FallbackHandler are supplied to RegisterTask in the TaskOptions.

There are three permitted types of signature:

  1. A zero-argument function
  2. A function whose arguments are assignable in type from those which are passed in the message
  3. A function which takes a single *Message argument

If a task is registered with a handler that takes a Go context.Context as its first argument then when that handler is invoked it will be passed the same Context that was passed to Consumer.Start(ctx). This can be used to transmit a signal to abort to all tasks being processed:

var AbortableTask = MainQueue.RegisterTask(&taskq.TaskOptions{
    Name: "SomethingLongwinded",
    Handler: func(ctx context.Context) error {
        for range time.Tick(time.Second) {
            select {
                case <-ctx.Done():
                    return ctx.Err()
                default:
                    fmt.Println("Wee!")
            }
        }
        return nil
    },
})

Custom message delay

If error returned by handler implements Delay() time.Duration interface then that delay is used to postpone message processing.

type RateLimitError string

func (e RateLimitError) Error() string {
    return string(e)
}

func (RateLimitError) Delay() time.Duration {
    return time.Hour
}

func handler() error {
    return RateLimitError("calm down")
}

Tracing

taskq supports tracing out-of-the-box using OpenTelemetry API. To instrument a queue, use the following code:

import "github.com/vmihailenco/taskq/extra/taskqotel"

consumer := queue.Consumer()
consumer.AddHook(&taskqotel.OpenTelemetryHook{})

or using a taskq.Factory:

factory.Range(func(q taskq.Queue) bool {
    consumer := q.Consumer()
    consumer.AddHook(&taskqext.OpenTelemetryHook{})

    return true
})

We recommend using Uptrace.dev as a tracing backend.

Note that the project description data, including the texts, logos, images, and/or trademarks, for each open source project belongs to its rightful owner. If you wish to add or remove any projects, please contact us at [email protected].