All Projects → badmonkee → coronamq

badmonkee / coronamq

Licence: other
The simplest way to implement a task queue with Java, Vertx and PostgreSQL.

Programming Languages

java
68154 projects - #9 most used programming language

Projects that are alternatives of or similar to coronamq

Hippo
💨A well crafted go packages that help you build robust, reliable, maintainable microservices.
Stars: ✭ 134 (+482.61%)
Mutual labels:  message-queue
Hangfire.topshelf
Best practice for hangfire samples
Stars: ✭ 192 (+734.78%)
Mutual labels:  message-queue
Akka Rabbitmq
RabbitMq client in Scala and Akka actors
Stars: ✭ 228 (+891.3%)
Mutual labels:  message-queue
Gores
👷 Redis-backed library for creating background jobs in Go. Placing jobs in multiple queues, and process them later asynchronously.
Stars: ✭ 137 (+495.65%)
Mutual labels:  message-queue
Liftbridge
Lightweight, fault-tolerant message streams.
Stars: ✭ 2,175 (+9356.52%)
Mutual labels:  message-queue
Phpnats
A PHP client for the NATSio cloud messaging system.
Stars: ✭ 209 (+808.7%)
Mutual labels:  message-queue
Mqperf
Stars: ✭ 122 (+430.43%)
Mutual labels:  message-queue
Kmq
Kafka-based message queue
Stars: ✭ 239 (+939.13%)
Mutual labels:  message-queue
Kombu
Kombu is a messaging library for Python.
Stars: ✭ 2,263 (+9739.13%)
Mutual labels:  message-queue
Nats.c
A C client for NATS
Stars: ✭ 220 (+856.52%)
Mutual labels:  message-queue
Autocser
AutoCSer is a high-performance RPC framework. AutoCSer 是一个以高效率为目标向导的整体开发框架。主要包括 TCP 接口服务框架、TCP 函数服务框架、远程表达式链组件、前后端一体 WEB 视图框架、ORM 内存索引缓存框架、日志流内存数据库缓存组件、消息队列组件、二进制 / JSON / XML 数据序列化 等一系列无缝集成的高性能组件。
Stars: ✭ 140 (+508.7%)
Mutual labels:  message-queue
Message Bus
Go simple async message bus
Stars: ✭ 166 (+621.74%)
Mutual labels:  message-queue
Vernemq
A distributed MQTT message broker based on Erlang/OTP. Built for high quality & Industrial use cases.
Stars: ✭ 2,628 (+11326.09%)
Mutual labels:  message-queue
Rxmq.js
JavaScript pub/sub library based on RxJS
Stars: ✭ 134 (+482.61%)
Mutual labels:  message-queue
Redis Smq
A simple high-performance Redis message queue for Node.js.
Stars: ✭ 230 (+900%)
Mutual labels:  message-queue
Bull
Premium Queue package for handling distributed jobs and messages in NodeJS.
Stars: ✭ 11,748 (+50978.26%)
Mutual labels:  message-queue
Qmq
QMQ是去哪儿网内部广泛使用的消息中间件,自2012年诞生以来在去哪儿网所有业务场景中广泛的应用,包括跟交易息息相关的订单场景; 也包括报价搜索等高吞吐量场景。
Stars: ✭ 2,420 (+10421.74%)
Mutual labels:  message-queue
Advanced Java
😮 Core Interview Questions & Answers For Experienced Java(Backend) Developers | 互联网 Java 工程师进阶知识完全扫盲:涵盖高并发、分布式、高可用、微服务、海量数据处理等领域知识
Stars: ✭ 59,142 (+257039.13%)
Mutual labels:  message-queue
Enqueue Bundle
[READ-ONLY] Message queue bundle for Symfony. RabbitMQ, Amazon SQS, Redis, Service bus, Async events, RPC over MQ and a lot more
Stars: ✭ 233 (+913.04%)
Mutual labels:  message-queue
Sidekiq monitor
Advanced monitoring for Sidekiq
Stars: ✭ 220 (+856.52%)
Mutual labels:  message-queue

The simplest way to implement a task queue with Java, Vertx and PostgreSQL.

Simple

There are only three participants in CoronaMQ you have to reason about:

  1. Worker(s) process tasks that are added to the queue. Workers are bound to a label which describes the unit of work. There can be multiple labels (e.g. PLACE_ORDER, CHECKOUT, etc) and thus workers.
  2. The broker listens to inserts made to the task queue and sends these tasks over the EventBus.
  3. The TaskRepository is interacting with the queue in the database. You can deploy it together with the broker, but you don't have to.

There is also the Dispatcher: A dispatcher can add tasks to the queue by sending a message on the EventBus. Another way to add tasks is by adding them directly into the tasks-table.

Corona MQ Overview

Fast

Thanks to PostgresSQL's NOTIFY/LISTEN and the fastest PostgresSQL driver for Java 1 out there, tasks are instantly pushed to the EventBus. There is no polling. To empty the task queue as fast as possible, workers additionally request tasks when they are deployed and after they've completed a task.

Persistent

The tasks are stored in a PostgreSQL database guaranteeing durability and consistency. Your application might already use a PostgreSQL database in the persistence layer, so you don't have to bring another player to your system architecture. The fewer players, the fewer errors.

Once

Many queues out there guarantee at least once-delivery which means tasks might get handled twice. But what you really want is exactly once delivery. You have one job - and it should be done once. However, in a real world, there are network timeouts, database errors et al. so the best you can get is effectively once delivery and this is what CoronaMQ aims for.

Usage

Maven

CoronaMQ supports the following API-flavors:

vertx-Future

<dependency>
    <groupId>io.github.jklingsporn</groupId>
    <artifactId>coronamq-core</artifactId>
    <version>0.2</version>
</dependency>

Mutiny

<dependency>
    <groupId>io.github.jklingsporn</groupId>
    <artifactId>coronamq-mutiny</artifactId>
    <version>0.2</version>
</dependency>

rxjava3

<dependency>
    <groupId>io.github.jklingsporn</groupId>
    <artifactId>coronamq-rxjava</artifactId>
    <version>0.2</version>
</dependency>

Initial setup

  • To run the examples, you need a running docker daemon.
  • For a more advanced test, you can use the provided docker file.
  • All others need to add the following SQL to their existing database:
CREATE TYPE task_status AS ENUM ('NEW', 'RUNNING','COMPLETED','FAILED');
CREATE TABLE tasks(id UUID,label text, 	payload JSONB, status task_status, update_time timestamp);

CREATE OR REPLACE FUNCTION task_status_notify()
	RETURNS trigger AS
$$
BEGIN
	PERFORM pg_notify('coronamq_task_update', jsonb_build_object('id', NEW.id::text, 'payload',new.payload, 'label',new.label)::text);
	RETURN NEW;
END;
$$ LANGUAGE plpgsql;


CREATE TRIGGER task_status_change
	AFTER INSERT
	ON tasks
	FOR EACH ROW
EXECUTE PROCEDURE task_status_notify();

Code example

The following example uses the Future-API

@Test
public void boostrapExample(Vertx vertx, VertxTestContext testContext){
    //configurable options
    CoronaMqOptions coronaMqOptions = new CoronaMqOptions();
    
    //a simple worker that just completes any task it receives
    SimpleWorker worker = new SimpleWorker(vertx, coronaMqOptions);
    
    //an example configuration with a broker, repository and worker
    Future<BootstrapSpreadStep> spread = CoronaMq.create(vertx,coronaMqOptions)
            .withBroker()
            .withRepository()
            .withWorker(worker)
            .spread(); //spread the wor(l)d
            
    testContext
            .assertComplete(spread)
            //send a new task with "test"-label to the queue
            .compose(s-> s.dispatch("test",new JsonObject().put("someValue","hi")))
            .onComplete(testContext.succeeding(UUID::fromString))
            //complete the work
            .compose(v-> worker.getCurrentWork())
            //shut down all components
            .compose(v->spread.compose(BootstrapSpreadStep::vaccinate))
            .onComplete(testContext.succeedingThenComplete())
    ;
}

More examples can be found in the examples-module.

Awesome

This project is a showcase for various cool features that go beyond a simple "Hello world"-example:

Community has spoken

I originally created the project under the name PoXMQ (Postgres VertX MQ) but wasn't completely satisfied with it. So I made a poll on twitter - what could possibly go wrong? Never feed the trolls

Footnotes 😷

Footnotes

  1. https://www.techempower.com/benchmarks/#section=data-r15&hw=ph&test=db

Note that the project description data, including the texts, logos, images, and/or trademarks, for each open source project belongs to its rightful owner. If you wish to add or remove any projects, please contact us at [email protected].