All Projects → hibiken → Asynq

hibiken / Asynq

Licence: mit
Asynq: simple, reliable, and efficient distributed task queue in Go

Programming Languages

go
31211 projects - #10 most used programming language

Projects that are alternatives of or similar to Asynq

Django Rq
A simple app that provides django integration for RQ (Redis Queue)
Stars: ✭ 1,361 (+45.72%)
Mutual labels:  background-jobs, redis, task-queue
Rq
Simple job queues for Python
Stars: ✭ 8,065 (+763.49%)
Mutual labels:  background-jobs, redis, task-queue
Qutee
PHP Background Jobs (Tasks) Manager
Stars: ✭ 63 (-93.25%)
Mutual labels:  background-jobs, redis
Curlyq
Efficient and reliable background processing for Go
Stars: ✭ 110 (-88.22%)
Mutual labels:  background-jobs, redis
gohive
🐝 A Highly Performant and easy to use goroutine pool for Go
Stars: ✭ 41 (-95.61%)
Mutual labels:  asynchronous-tasks, task-queue
Rqueue
Rqueue aka Redis Queue [Task Queue, Message Broker] for Spring framework
Stars: ✭ 76 (-91.86%)
Mutual labels:  asynchronous-tasks, redis
Kiq
📮 Robust job queue powered by GenStage and Redis
Stars: ✭ 49 (-94.75%)
Mutual labels:  background-jobs, redis
python-task-queue
Asynchronous serverless task queue with timed leasing of tasks. Threaded implementations for SQS and local filesystem.
Stars: ✭ 24 (-97.43%)
Mutual labels:  asynchronous-tasks, task-queue
Pytask Io
Python Async Task Queue
Stars: ✭ 81 (-91.33%)
Mutual labels:  redis, task-queue
Huey
a little task queue for python
Stars: ✭ 3,761 (+302.68%)
Mutual labels:  redis, task-queue
celery.node
Celery task queue client/worker for nodejs
Stars: ✭ 164 (-82.44%)
Mutual labels:  background-jobs, task-queue
Lmstfy
Implements task queue in Golang which based on Redis storage
Stars: ✭ 373 (-60.06%)
Mutual labels:  redis, task-queue
Django Carrot
A lightweight task queue for Django using RabbitMQ
Stars: ✭ 58 (-93.79%)
Mutual labels:  asynchronous-tasks, task-queue
Flask Redis Queue
Example of how to handle background processes with Flask, Redis Queue, and Docker
Stars: ✭ 163 (-82.55%)
Mutual labels:  redis, task-queue
Taskq
Golang asynchronous task/job queue with Redis, SQS, IronMQ, and in-memory backends
Stars: ✭ 555 (-40.58%)
Mutual labels:  redis, task-queue
Lowkiq
Ordered background jobs processing
Stars: ✭ 129 (-86.19%)
Mutual labels:  background-jobs, redis
uwsgi tasks
Asynchronous tasks management with UWSGI server
Stars: ✭ 88 (-90.58%)
Mutual labels:  background-jobs, asynchronous-tasks
Flower
Real-time monitor and web admin for Celery distributed task queue
Stars: ✭ 5,036 (+439.19%)
Mutual labels:  redis, task-queue
Node Celery
Celery client for Node.js
Stars: ✭ 648 (-30.62%)
Mutual labels:  background-jobs, redis
Hzdtf.foundation.framework
基础框架系统,支持.NET和.NET Core平台,语言:C#,DB支持MySql和SqlServer,主要功能有抽象持久化、服务层,将业务基本的增删改查抽离复用;提供代码生成器从DB生成实体、持久化、服务以及MVC控制器,每层依赖接口,并需要在客户端将对应实现层用Autofac程序集依赖注入,用AOP提供日志跟踪、事务、模型验证等。对Autofac、Redis、RabbitMQ封装扩展;DB访问提供自动主从访问,Redis客户端分区。特别适合管理系统。
Stars: ✭ 22 (-97.64%)
Mutual labels:  redis

Asynq

Build Status GoDoc Go Report Card License: MIT Gitter chat

Overview

Asynq is a Go library for queueing tasks and processing them asynchronously with workers. It's backed by Redis and is designed to be scalable yet easy to get started.

Highlevel overview of how Asynq works:

  • Client puts task on a queue
  • Server pulls task off queues and starts a worker goroutine for each task
  • Tasks are processed concurrently by multiple workers

Task queues are used as a mechanism to distribute work across multiple machines.
A system can consist of multiple worker servers and brokers, giving way to high availability and horizontal scaling.

Task Queue Diagram

Stability and Compatibility

Important Note: Current major version is zero (v0.x.x) to accomodate rapid development and fast iteration while getting early feedback from users (Feedback on APIs are appreciated!). The public API could change without a major version update before v1.0.0 release.

Status: The library is currently undergoing heavy development with frequent, breaking API changes.

Features

Quickstart

First, make sure you are running a Redis server locally.

$ redis-server

Next, write a package that encapsulates task creation and task handling.

package tasks

import (
    "fmt"

    "github.com/hibiken/asynq"
)

// A list of task types.
const (
    TypeEmailDelivery   = "email:deliver"
    TypeImageResize     = "image:resize"
)

//----------------------------------------------
// Write a function NewXXXTask to create a task.
// A task consists of a type and a payload.
//----------------------------------------------

func NewEmailDeliveryTask(userID int, tmplID string) *asynq.Task {
    payload := map[string]interface{}{"user_id": userID, "template_id": tmplID}
    return asynq.NewTask(TypeEmailDelivery, payload)
}

func NewImageResizeTask(src string) *asynq.Task {
    payload := map[string]interface{}{"src": src}
    return asynq.NewTask(TypeImageResize, payload)
}

//---------------------------------------------------------------
// Write a function HandleXXXTask to handle the input task.
// Note that it satisfies the asynq.HandlerFunc interface.
//
// Handler doesn't need to be a function. You can define a type
// that satisfies asynq.Handler interface. See examples below.
//---------------------------------------------------------------

func HandleEmailDeliveryTask(ctx context.Context, t *asynq.Task) error {
    userID, err := t.Payload.GetInt("user_id")
    if err != nil {
        return err
    }
    tmplID, err := t.Payload.GetString("template_id")
    if err != nil {
        return err
    }
    fmt.Printf("Send Email to User: user_id = %d, template_id = %s\n", userID, tmplID)
    // Email delivery code ...
    return nil
}

// ImageProcessor implements asynq.Handler interface.
type ImageProcessor struct {
    // ... fields for struct
}

func (p *ImageProcessor) ProcessTask(ctx context.Context, t *asynq.Task) error {
    src, err := t.Payload.GetString("src")
    if err != nil {
        return err
    }
    fmt.Printf("Resize image: src = %s\n", src)
    // Image resizing code ...
    return nil
}

func NewImageProcessor() *ImageProcessor {
    // ... return an instance
}

In your application code, import the above package and use Client to put tasks on the queue.

package main

import (
    "fmt"
    "log"
    "time"

    "github.com/hibiken/asynq"
    "your/app/package/tasks"
)

const redisAddr = "127.0.0.1:6379"

func main() {
    r := asynq.RedisClientOpt{Addr: redisAddr}
    c := asynq.NewClient(r)
    defer c.Close()

    // ------------------------------------------------------
    // Example 1: Enqueue task to be processed immediately.
    //            Use (*Client).Enqueue method.
    // ------------------------------------------------------

    t := tasks.NewEmailDeliveryTask(42, "some:template:id")
    res, err := c.Enqueue(t)
    if err != nil {
        log.Fatal("could not enqueue task: %v", err)
    }
    fmt.Printf("Enqueued Result: %+v\n", res)


    // ------------------------------------------------------------
    // Example 2: Schedule task to be processed in the future.
    //            Use ProcessIn or ProcessAt option.
    // ------------------------------------------------------------

    t = tasks.NewEmailDeliveryTask(42, "other:template:id")
    res, err = c.Enqueue(t, asynq.ProcessIn(24*time.Hour))
    if err != nil {
        log.Fatal("could not schedule task: %v", err)
    }
    fmt.Printf("Enqueued Result: %+v\n", res)


    // ----------------------------------------------------------------------------
    // Example 3: Set other options to tune task processing behavior.
    //            Options include MaxRetry, Queue, Timeout, Deadline, Unique etc.
    // ----------------------------------------------------------------------------

    c.SetDefaultOptions(tasks.TypeImageResize, asynq.MaxRetry(10), asynq.Timeout(3*time.Minute))

    t = tasks.NewImageResizeTask("some/blobstore/path")
    res, err = c.Enqueue(t)
    if err != nil {
        log.Fatal("could not enqueue task: %v", err)
    }
    fmt.Printf("Enqueued Result: %+v\n", res)

    // ---------------------------------------------------------------------------
    // Example 4: Pass options to tune task processing behavior at enqueue time.
    //            Options passed at enqueue time override default ones, if any.
    // ---------------------------------------------------------------------------

    t = tasks.NewImageResizeTask("some/blobstore/path")
    res, err = c.Enqueue(t, asynq.Queue("critical"), asynq.Timeout(30*time.Second))
    if err != nil {
        log.Fatal("could not enqueue task: %v", err)
    }
    fmt.Printf("Enqueued Result: %+v\n", res)
}

Next, start a worker server to process these tasks in the background.
To start the background workers, use Server and provide your Handler to process the tasks.

You can optionally use ServeMux to create a handler, just as you would with "net/http" Handler.

package main

import (
    "log"

    "github.com/hibiken/asynq"
    "your/app/package/tasks"
)

const redisAddr = "127.0.0.1:6379"

func main() {
    r := asynq.RedisClientOpt{Addr: redisAddr}

    srv := asynq.NewServer(r, asynq.Config{
        // Specify how many concurrent workers to use
        Concurrency: 10,
        // Optionally specify multiple queues with different priority.
        Queues: map[string]int{
            "critical": 6,
            "default":  3,
            "low":      1,
        },
        // See the godoc for other configuration options
    })

    // mux maps a type to a handler
    mux := asynq.NewServeMux()
    mux.HandleFunc(tasks.TypeEmailDelivery, tasks.HandleEmailDeliveryTask)
    mux.Handle(tasks.TypeImageResize, tasks.NewImageProcessor())
    // ...register other handlers...

    if err := srv.Run(mux); err != nil {
        log.Fatalf("could not run server: %v", err)
    }
}

For a more detailed walk-through of the library, see our Getting Started Guide.

To Learn more about asynq features and APIs, see our Wiki and godoc.

Web UI

Asynqmon is a web based tool for monitoring and administrating Asynq queues and tasks. Please see the tool's README for details.

Here's a few screenshots of the web UI.

Queues view
Web UI QueuesView

Tasks view
Web UI TasksView

Command Line Tool

Asynq ships with a command line tool to inspect the state of queues and tasks.

Here's an example of running the stats command.

Gif

For details on how to use the tool, refer to the tool's README.

Installation

To install asynq library, run the following command:

go get -u github.com/hibiken/asynq

To install the CLI tool, run the following command:

go get -u github.com/hibiken/asynq/tools/asynq

Requirements

Dependency Version
Redis v3.0+
Go v1.13+

Contributing

We are open to, and grateful for, any contributions (Github issues/pull-requests, feedback on Gitter channel, etc) made by the community. Please see the Contribution Guide before contributing.

Acknowledgements

  • Sidekiq : Many of the design ideas are taken from sidekiq and its Web UI
  • RQ : Client APIs are inspired by rq library.
  • Cobra : Asynq CLI is built with cobra

License

Asynq is released under the MIT license. See LICENSE.

Note that the project description data, including the texts, logos, images, and/or trademarks, for each open source project belongs to its rightful owner. If you wish to add or remove any projects, please contact us at [email protected].