All Projects → mmucklo → Dtcqueuebundle

mmucklo / Dtcqueuebundle

Licence: mit
Symfony2/3/4/5 Queue Bundle (for background jobs) supporting Mongo (Doctrine ODM), Mysql (and any Doctrine ORM), RabbitMQ, Beanstalkd, Redis, and ... {write your own}

Projects that are alternatives of or similar to Dtcqueuebundle

Mall
mall项目是一套电商系统,包括前台商城系统及后台管理系统,基于SpringBoot+MyBatis实现,采用Docker容器化部署。 前台商城系统包含首页门户、商品推荐、商品搜索、商品展示、购物车、订单流程、会员中心、客户服务、帮助中心等模块。 后台管理系统包含商品管理、订单管理、会员管理、促销管理、运营管理、内容管理、统计报表、财务管理、权限管理、设置等模块。
Stars: ✭ 54,797 (+47549.57%)
Mutual labels:  mysql, redis, mongodb, rabbitmq
Mall Learning
mall学习教程,架构、业务、技术要点全方位解析。mall项目(40k+star)是一套电商系统,使用现阶段主流技术实现。涵盖了SpringBoot 2.3.0、MyBatis 3.4.6、Elasticsearch 7.6.2、RabbitMQ 3.7.15、Redis 5.0、MongoDB 4.2.5、Mysql5.7等技术,采用Docker容器化部署。
Stars: ✭ 10,236 (+8800.87%)
Mutual labels:  mysql, redis, mongodb, rabbitmq
Phalcon Vm
Vagrant configuration for PHP7, Phalcon 3.x and Zephir development.
Stars: ✭ 43 (-62.61%)
Mutual labels:  mysql, redis, mongodb, rabbitmq
Docker Compose
一些基础服务的docker-compose配置文件,方便在一台新电脑上快速开始工作
Stars: ✭ 163 (+41.74%)
Mutual labels:  mysql, redis, mongodb, rabbitmq
Full Stack Notes
全栈工程师手册
Stars: ✭ 366 (+218.26%)
Mutual labels:  mysql, redis, mongodb, rabbitmq
Mall Swarm
mall-swarm是一套微服务商城系统,采用了 Spring Cloud Hoxton & Alibaba、Spring Boot 2.3、Oauth2、MyBatis、Docker、Elasticsearch、Kubernetes等核心技术,同时提供了基于Vue的管理后台方便快速搭建系统。mall-swarm在电商业务的基础集成了注册中心、配置中心、监控中心、网关等系统功能。文档齐全,附带全套Spring Cloud教程。
Stars: ✭ 7,874 (+6746.96%)
Mutual labels:  mysql, redis, mongodb, rabbitmq
Bifrost
Bifrost ---- 面向生产环境的 MySQL 同步到Redis,MongoDB,ClickHouse,MySQL等服务的异构中间件
Stars: ✭ 701 (+509.57%)
Mutual labels:  mysql, redis, mongodb, rabbitmq
Treefrog Framework
TreeFrog Framework : High-speed C++ MVC Framework for Web Application
Stars: ✭ 885 (+669.57%)
Mutual labels:  orm, mysql, redis, mongodb
Spring Boot 2.x Examples
Spring Boot 2.x code examples
Stars: ✭ 104 (-9.57%)
Mutual labels:  mysql, redis, mongodb
Ymate Platform V2
YMP是一个非常简单、易用的轻量级Java应用开发框架,涵盖AOP、IoC、WebMVC、ORM、Validation、Plugin、Serv、Cache等特性,让开发工作像搭积木一样轻松!
Stars: ✭ 106 (-7.83%)
Mutual labels:  orm, redis, mongodb
Flink Learning
flink learning blog. http://www.54tianzhisheng.cn/ 含 Flink 入门、概念、原理、实战、性能调优、源码解析等内容。涉及 Flink Connector、Metrics、Library、DataStream API、Table API & SQL 等内容的学习案例,还有 Flink 落地应用的大型项目案例(PVUV、日志存储、百亿数据实时去重、监控告警)分享。欢迎大家支持我的专栏《大数据实时计算引擎 Flink 实战与性能优化》
Stars: ✭ 11,378 (+9793.91%)
Mutual labels:  mysql, redis, rabbitmq
Aclify
🔒 Node Access Control Lists (ACL).
Stars: ✭ 49 (-57.39%)
Mutual labels:  mysql, redis, mongodb
Shw server
使用SpringCloud和Spring WebFlux开发的学生作业管理系统服务端,前后端分离项目,微服务架构。支持Docker集群化部署,Redis集群化缓存,文件在线预览 压缩包预览 打包上传下载
Stars: ✭ 66 (-42.61%)
Mutual labels:  mysql, redis, mongodb
Bankflix
Aplicação que simula um banco digital, contendo a área do cliente e administrativa, permitindo depósitos e transferências entre contas do mesmo banco. | Application that simulates a digital bank, containing the customer and administrative areas, allowing deposits and transfers between accounts of the same bank.
Stars: ✭ 82 (-28.7%)
Mutual labels:  redis, mongodb, rabbitmq
Butterfly
🔥 蝴蝶--【简单】【稳定】【好用】的 Python web 框架🦋 除 Python 2.7,无其他依赖; 🦋 butterfly 是一个 RPC 风格 web 框架,同时也是微服务框架,自带消息队列通信机制实现分布式
Stars: ✭ 82 (-28.7%)
Mutual labels:  orm, mysql, redis
Nagios Plugins
450+ AWS, Hadoop, Cloud, Kafka, Docker, Elasticsearch, RabbitMQ, Redis, HBase, Solr, Cassandra, ZooKeeper, HDFS, Yarn, Hive, Presto, Drill, Impala, Consul, Spark, Jenkins, Travis CI, Git, MySQL, Linux, DNS, Whois, SSL Certs, Yum Security Updates, Kubernetes, Cloudera etc...
Stars: ✭ 1,000 (+769.57%)
Mutual labels:  mysql, redis, rabbitmq
Socket Io
基于Hyperf微服务协程框架开发的sokcet-io分布式系统
Stars: ✭ 38 (-66.96%)
Mutual labels:  mysql, redis, rabbitmq
Springboot Templates
springboot和dubbo、netty的集成,redis mongodb的nosql模板, kafka rocketmq rabbit的MQ模板, solr solrcloud elasticsearch查询引擎
Stars: ✭ 100 (-13.04%)
Mutual labels:  redis, mongodb, rabbitmq
Supermarket
设计精良的网上商城系统,包括前端、后端、数据库、负载均衡、数据库缓存、分库分表、读写分离、全文检索、消息队列等,使用SpringCloud框架,基于Java开发。该项目可部署到服务器上,不断完善中……
Stars: ✭ 1,278 (+1011.3%)
Mutual labels:  mysql, redis, rabbitmq
Blog
我的日记
Stars: ✭ 110 (-4.35%)
Mutual labels:  mysql, redis, mongodb

DtcQueueBundle

Build Status Scrutinizer Code Quality Code Coverage SymfonyInsight

Allow symfony developers to create background job as easily as: $worker->later()->process(1,2,3)

6.0 Release

See changes

Upgrading from 5.0: see UPGRADING-6.0.md

Supported Queues

  • MongoDB via Doctrine-ODM
  • Mysql / Doctrine 2 supported databases via Doctrine-ORM
  • Beanstalkd via pheanstalk
  • RabbitMQ via php-amqplib
  • Redis support via Predis or PhpRedis, or through SncRedisBundle

Trends

Introduction

This bundle provides a way to easily create and manage queued background jobs

Basic Features:

  • Ease of Use
    • Kickoff background tasks with a line of code or two
    • Easily add background worker services
      • Turn any code into background task with a few lines
  • Atomic operation for jobs
    • For ORM-based queues this is done without relying on transactions.
  • Admin interface
    • Web-based Admin interface with an optional performance graph
  • Command Line Interface
    • Commands to run, manage and debug jobs from console
  • Job Archival
    • ORM and ODM managers have built-in job-archival for finished jobs
  • Logs errors from worker
  • Various safety checks for things such as stalled jobs, exception jobs
    • Allows for reseting stalled and exception jobs via console commands
  • Built in Event Dispatcher

Job-specific Features:

  • Auto-retry on failure, exception
    • If a job exits with a failure code, it can auto-retry
    • Same for Exception if desired
  • Priority
    • Jobs can have levels of priority so that higher priority jobs can get processed first even if they were added
    • to the queue later.
  • Future Jobs (ODM / ORM / Redis)
    • Jobs can be scheduled to run at some time in the future
  • Batch
    • Jobs can be "batched" so that only one job runs, even if multiple are queued of the same type
  • Expires
    • Jobs can have an "expires" time so that they wont run after a certain point
      • (useful if the queue gets backed up and a job is worthless after a certain time)
  • Stalls (ODM / ORM)
    • Jobs that crash the interpreter, or get terminated for some other reason can be detected
      • These can be re-queued to run in the future.

Installation

Symfony 2/3

see /Resources/doc/symfony2-3.md

Symfony 4/5

see /Resources/doc/symfony4-5.md

Troubleshooting

see /Resources/doc/troubleshooting.md

Usage

Create a worker class that will work on the background job.

Example:

  • src/Worker/Fibonacci.php: (symfony 4/5)
  • src/AppBundle/Worker/Fibonacci.php: (symfony 2/3)
<?php
namespace App\Worker; // for symfony 2/3, the namespace would typically be AppBundle\Worker

class Fibonacci
    extends \Dtc\QueueBundle\Model\Worker
{
    private $filename;
    public function __construct() {
        $this->filename = '/tmp/fib-result.txt';
    }

    public function fibonacciFile($n) {
        $fib = $this->fibonacci($n);
        file_put_contents($this->filename, "{$n}: {$fib}");
    }


    public function fibonacci($n)
    {
        if($n == 0)
            return 0; //F0
        elseif ($n == 1)
            return 1; //F1
        else
            return $this->fibonacci($n - 1) + $this->fibonacci($n - 2);
    }

    public function getName() {
        return 'fibonacci';
    }

    public function getFilename()
    {
        return $this->filename;
    }
}

Create a DI service for the job, and tag it as a background worker.

YAML:

Symfony 5, 4 and 3.3, 3.4:

services:
    # for symfony 3 the class name would likely be AppBundle\Worker\FibonacciWorker
    App\Worker\Fibonacci:
        # public: false is possible if you completely use DependencyInjection for access to the service
        public: true
        tags:
            - { name: "dtc_queue.worker" }

Symfony 2, and 3.0, 3.1, 3.2:

services:
    app.worker.fibonacci:
        class: AppBundle\Worker\Fibonacci:
        tags:
            - { name: "dtc_queue.worker" }
XML:
<services>
	<!-- ... -->
	<service id="fibonacci" class="Fibonacci">
	    <tag name="dtc_queue.worker" />
	</service>
	<!-- ... -->
</services>

Create a job

Simple examples:

Command line:
bin/console dtc:queue:create_job <worker> <method> <arg>

bin/console dtc:queue:create_job fibonacci fibonacci 3

bin/console dtc:queue:create_job fibonacci fibonacciFile 8
Code:
// Dependency inject the worker or fetch it from the container
$fibonacci = $container->get('App\Worker\Fibonacci');

// For Symfony 3.3, 3.4
//     $fibonacci = $container->get('AppBundle\Worker\Fibonacci');
//

// For Symfony 2, 3.0, 3.1, 3.2:
//     $fibonacci = $container->get('app.worker.fibonacci');


// Basic Examples
$fibonacci->later()->fibonacci(20);
$fibonacci->later()->fibonacciFile(20);

// Batch Example
$fibonacci->batchLater()->fibonacci(20); // Batch up runs into a single run

// Timed Example
$fibonacci->later(90)->fibonacci(20); // Run 90 seconds later

// Priority
//    Note: whether 1 == High or Low priority is configurable, but by default it is High
$fibonacci->later(0, 1); // As soon as possible, High priority
$fibonacci->later(0, 125); // Medium priority
$fibonacci->later(0, 255); // Low priority

// Advanced Usage Example:
//  (If the job is not processed by $expireTime, then don't execute it ever...)
$expireTime = time() + 3600;
$fibonacci->later()->setExpiresAt(new \DateTime("@$expireTime"))->fibonacci(20); // Must be run within the hour or not at all
Create Jobs - Additional Information

For further instructions on creating jobs, including how to create a job from the command line using json-encoded arguments, see:

/Resources/doc/create-job.md

Running Jobs

It's recommended that you background the following console commands

bin/console dtc:queue:run -d 120
# the -d parameter is the number of seconds during which to keep executing jobs before ending. 
#  For example you could put the above command into cron or a cron-like system to run every 2 minutes
#
# There are a number of other parameters that could be passed to dtc:queue:run run this for a full list:
bin/console dtc:queue:run --help

Pruning Jobs

For ODM and ORM based stores, the archive tables and the regular job table (queue) can require periodic pruning.

The regular job table is for waiting and running jobs. If a job throws an exception that can't be caught or the process segfaults, machine crashes, etc. then jobs which never finished can remain in the job queue in the "Running" state.

The archive table is where finished and errored jobs end up after execution; this table can grow indefinitely.

For Mongo in production, it may be prudent to use a capped collection or TTL Indexes

For Mysql you could create an event to delete data periodically.

Nevertheless there are also several commands that exist that do similarly (and could be put into a periodic cron job as well):

bin/console dtc:queue:prune old --older 1m
# (deletes jobs older than one month from the Archive table)

# Clear out stalled jobs from the regular job table:
bin/console dtc:queue:prune stalled

# If you're recording runs...this is recommended:
bin/console dtc:queue:prune stalled_runs

# If you're recording runs...another recommendation
bin/console dtc:queue:prune old_runs --older 1m

# If you're recording timings
bin/console dtc:queue:prune old_job_timings --older 1m

# You can tune 1m to a smaller interval such as 10d (10 days) or even 1800s (1/2 hour)
#  if you have too many jobs flowing through the system.
bin/console dtc:queue:prune --help # lists other prune commands

Debugging

These commands may help with debugging issues with the queue:

bin/console dtc:queue:count # some status about the queue if available (ODM/ORM only)
bin/console dtc:queue:reset # resets errored and/or stalled jobs

# This is really only good for ORM/ODM based stores.
bin/console dtc:queue:run --id={jobId}

# (jobId could be obtained from mongodb / or your database, if using an ORM / ODM solution)

Tracking Runs

Each job run can be tracked in a table in an ORM / ODM backed datastore.

Ways to configure: app/config/config.yml: (symfony 2/3) config/packages/dtc_queue.yaml: (symfony 4/5)

dtc_queue:
    manager:
        # run defaults to whatever job is set to (which defaults to "odm", i.e. mongodb)
        #   If you set the job to rabbit_mq, or beanstalkd or something else, you need to set run
        #   to an ORM / ODM run_manager (or a custom such one) in order to get the runs to save
        #
        run: orm # other possible option is "odm" (i.e. mongodb)
    #
    # (optionally define your own run manager with id: dtc_queue.manager.run.{some_name} and put {some_name} under run:

MongoDB DocumentManager

Change the document manager

app/config/config.yml: (symfony 2/3) config/packages/dtc_queue.yaml: (symfony 4/5)

dtc_queue:
    odm:
        document_manager: {something} # default is "default"

Mysql / ORM Setup

As of 4.0, ORM requires the bcmath extension to be enabled

app/config/config.yml: (symfony 2/3) config/packages/dtc_queue.yaml: (symfony 4/5)

dtc_queue:
    manager:
       job: orm

Change the EntityManager:

dtc_queue:
    orm:
        entity_manager: {something} # default is "default"

NOTE: You may need to add DtcQueueBundle to your mappings section in config.yml if auto_mapping is not enabled

doctrine:
   #...
   orm:
       #...
       mappings:
           DtcQueueBundle: ~

Note on NON-ORM Setups:

If you plan on using ODM or Redis or another configuration, but you have Doctrine ORM enabled elsewhere, it's recommended that you use the schema_filter configuration parameter so that schema dumps and/or migration diffs don't pickup those tables (see issue #77).

E.g.

doctrine:
   # ...
   dbal:
       # ...
       schema_filter: ~^(?!dtc_)~

(if you already have a schema_filter, you can just add the "dtc_" prefix to it.)

Beanstalk Configuration

app/config/config.yml: (symfony 2/3) config/packages/dtc_queue.yaml: (symfony 4/5)

dtc_queue:
    beanstalkd:
        host: beanstalkd
        tube: some-tube-name [optional]
    manager:
        job: beanstalkd

RabbitMQ Configuration

app/config/config.yml: (symfony 2/3) config/packages/dtc_queue.yaml: (symfony 4/5)

dtc_queue:
    manager:
        job: rabbit_mq
    rabbit_mq:
        host: rabbitmq
        port: 5672
        user: guest
        password: guest
        vhost: "/" [optional defaults to "/"]
        ssl: [optional defaults to false - toggles to use AMQPSSLConnection]
        options: [optional options to pass to AMQPStreamConnection or AMQPSSLConnection]
        ssl_options: [optional extra ssl options to pass to AMQPSSLConnection]
        queue_args: [optional]
            queue: [optional queue name]
            passive: [optional defaults to false]
            durable: [optional defaults to true]
            exlusive: [optional defaults to false]
            auto_delete: [optional defaults to false]
        exchange_args: [optional]
            exchange: [optional queue name]
            type: [optional defaults to "direct"]
            passive: [optional defaults to false]
            durable: [optional defaults to true]
            auto_delete: [optional defaults to false]

Redis Configuration

app/config/config.yml: (symfony 2/3) config/packages/dtc_queue.yaml: (symfony 4/5)

dtc_queue:
    manager:
        job: redis
    redis:
        # choose one of the below snc_redis, predis, or phpredis
        snc_redis:
           type: predis
           alias: default
        predis:
            # choose one of dns or connection_parameters
            dsn: redis://localhost
            connection_parameters:
                scheme: tcp
                host: localhost
                port: 6379
                path: ~
                database: ~
                password: ~
                async: false
                persistent: false
                timeout: 5.0
                read_write_timeout: ~
                alias: ~
                weight: ~
                iterable_multibulk: false
                throw_errors: true
        phpredis:
            # minimum fill host and port if needed
            host: localhost
            port: 6379
            timeout: 0
            retry_interval: ~
            read_timeout: 0
            auth: ~

Custom Jobs and Managers

app/config/config.yml: (symfony 2/3) config/packages/dtc_queue.yaml: (symfony 4/5)

dtc_queue:
    class_job: Some\Job\ClassName [optional]
    manager:
        job: some_name [optional]
    # (create your own manager service and name or alias it:
    #   dtc_queue.manager.job.<some_name> and put
    #   <some_name> in the manager: job field above)

Rename the Database or Table Name

  1. Extend the following:
Dtc\QueueBundle\Document\Job
Dtc\QueueBundle\Document\JobArchive

or

Dtc\QueueBundle\Entity\Job
Dtc\QueueBundle\Entity\JobArchive

(Depending on whether you're using MongoDB or an ORM)

  1. Change the parameters on the class appropriately
<?php
namespace App\Entity; // Or whatever

use Dtc\QueueBundle\Entity\Job as BaseJob;
use Doctrine\ORM\Mapping as ORM;

/**
 * @ORM\Entity
 * @ORM\Table(name="job_some_other_name", indexes={@ORM\Index(name="job_crc_hash_idx", columns={"crcHash","status"}),
 *                  @ORM\Index(name="job_priority_idx", columns={"priority","whenAt"}),
 *                  @ORM\Index(name="job_when_idx", columns={"whenAt"}),
 *                  @ORM\Index(name="job_status_idx", columns={"status","whenAt"})})
 */
class Job extends BaseJob {
}

// ... similarly for Entity\JobArchive if necessary
<?php
namespace App\Document;

use Doctrine\ODM\MongoDB\Mapping\Annotations as ODM;
use Dtc\QueueBundle\Document\Job as BaseJob;

/**
 * @ODM\Document(db="my_db", collection="my_job_collection")
 */
class Job extends BaseJob
{
}

// ... similarly for Document\JobArchive if necessary
  1. Add the new class(es) to config.yml
# config.yml
# ...
dtc_queue:
    class_job: App\Entity\Job
    class_job_archive: App\Entity\JobArchive

Job Event Subscriber

It's useful to listen to event in a long running script to clear doctrine manager or send email about status of a job. To add a job event subscriber, create a new service with tag: dtc_queue.event_subscriber:

services:
    voices.queue.listener.clear_manager:
        class: ClearManagerSubscriber
        arguments:
            - '@service_container'
        tags:
            - { name: dtc_queue.event_subscriber, connection: default }

ClearManagerSubscriber.php

<?php
use Dtc\QueueBundle\EventDispatcher\Event;
use Dtc\QueueBundle\EventDispatcher\EventSubscriberInterface;
use Symfony\Component\DependencyInjection\ContainerInterface;

class ClearManagerSubscriber
    implements EventSubscriberInterface
{
    private $container;
    public function __construct(ContainerInterface $container) {
        $this->container = $container;
    }

    public function onPostJob(Event $event)
    {
        $managerIds = [
            'doctrine.odm.mongodb.document_manager',
            'doctrine.orm.default_entity_manager',
            'doctrine.orm.content_entity_manager'
        ];

        foreach ($managerIds as $id) {
            $manager = $this->container->get($id);
            $manager->clear();
        }
    }

    public static function getSubscribedEvents()
    {
        return array(
            Event::POST_JOB => 'onPostJob',
        );
    }
}

Running as upstart service:

  1. Create the following file in /etc/init/. PHP is terrible at memory management and garbage collection: to deal with out of memory issues, run 20 jobs at a time. (Or a manageable job size)
# /etc/init/queue.conf

author "David Tee"
description "Queue worker service, run 20 jobs at a time, process timeout of 3600"

respawn
start on startup

script
        /{path to}/console dtc:queue:run --max-count 20 -v -t 3600>> /var/logs/queue.log 2>&1
end script
  1. Reload config: sudo initctl reload-configuration
  2. Start the script: sudo start queue

Admin

NOTE: ORM And ODM (MongoDB) require mmucklo/grid-bundle in order to view the jobs/runs admin page.

You can register admin routes to see queue status. In your routing.yml file, add the following:

dtc_queue:
    resource: '@DtcQueueBundle/Resources/config/routing.yml'

Testing

You can run unittest by typing bin/phpunit in source folder. If you want to run integration testing with Mongodb, you need to set up Mongodb server on localhost and run:

bin/phpunit Tests/Document/JobManagerTest.php

If you want to run Beanstalkd integration testing, you need to run a local, empty instance of beanstalkd for testing.

sudo service beanstalkd restart; BEANSTALD_HOST=localhost bin/phpunit Tests/BeanStalkd/JobManagerTest.php

Full Configuration

See /Resources/doc/full-configuration.md

License

This bundle is under the MIT license.

Credit

Originally written by @dtee Enhanced and maintained by @mmucklo

Note that the project description data, including the texts, logos, images, and/or trademarks, for each open source project belongs to its rightful owner. If you wish to add or remove any projects, please contact us at [email protected].