All Projects → bamlab → nest-rabbit-tasks

bamlab / nest-rabbit-tasks

Licence: MIT license
nest-rabbit-worker is a TaskQueue based upon RabbitMQ for NestJS

Programming Languages

typescript
32286 projects
javascript
184084 projects - #8 most used programming language

Projects that are alternatives of or similar to nest-rabbit-tasks

nestjs-rmq
A custom library for NestJS microservice. It allows you to use RabbitMQ or AMQP.
Stars: ✭ 182 (+527.59%)
Mutual labels:  rabbitmq, amqp, nestjs
mom
Proof of concept for Message-Oriented-Middleware based architecture.
Stars: ✭ 39 (+34.48%)
Mutual labels:  rabbitmq, amqp, nestjs
nabbitmq
Node.js library for interacting with RabbitMQ based on RxJS streams
Stars: ✭ 20 (-31.03%)
Mutual labels:  rabbitmq, amqp, amqp0-9-1
postman
Reverse proxy for async microservice communication
Stars: ✭ 30 (+3.45%)
Mutual labels:  rabbitmq, amqp
Garagemq
AMQP message broker implemented with golang
Stars: ✭ 153 (+427.59%)
Mutual labels:  rabbitmq, amqp
Enqueue Dev
Message Queue, Job Queue, Broadcasting, WebSockets packages for PHP, Symfony, Laravel, Magento. DEVELOPMENT REPOSITORY - provided by Forma-Pro
Stars: ✭ 1,977 (+6717.24%)
Mutual labels:  rabbitmq, amqp
Rabtap
RabbitMQ wire tap and swiss army knife
Stars: ✭ 171 (+489.66%)
Mutual labels:  rabbitmq, amqp
Enqueue Bundle
[READ-ONLY] Message queue bundle for Symfony. RabbitMQ, Amazon SQS, Redis, Service bus, Async events, RPC over MQ and a lot more
Stars: ✭ 233 (+703.45%)
Mutual labels:  rabbitmq, amqp
Pika
Pure Python RabbitMQ/AMQP 0-9-1 client library
Stars: ✭ 2,866 (+9782.76%)
Mutual labels:  rabbitmq, amqp0-9-1
ridge
Pure asynchronous PHP implementation of the AMQP 0-9-1 protocol.
Stars: ✭ 49 (+68.97%)
Mutual labels:  amqp, amqp0-9-1
aop
AMQP on Pulsar protocol handler
Stars: ✭ 93 (+220.69%)
Mutual labels:  amqp, amqp0-9-1
rejected
rejected is a consumer framework for RabbitMQ
Stars: ✭ 56 (+93.1%)
Mutual labels:  rabbitmq, amqp
Gen rmq
Elixir AMQP consumer and publisher behaviours
Stars: ✭ 146 (+403.45%)
Mutual labels:  rabbitmq, amqp
Spring Boot Amqp Messaging
This is a simple spring-boot app that shows how to configure easily RabbitMQ with AMQP for producing and consuming messages in default format and JSON.
Stars: ✭ 142 (+389.66%)
Mutual labels:  rabbitmq, amqp
Cony
Simple AMQP wrapper around github.com/streadway/amqp
Stars: ✭ 158 (+444.83%)
Mutual labels:  rabbitmq, amqp
Rabbitmq Dump Queue
Dump messages from a RabbitMQ queue to files, without affecting the queue.
Stars: ✭ 139 (+379.31%)
Mutual labels:  rabbitmq, amqp
Amqpstorm
Thread-safe Python RabbitMQ Client & Management library
Stars: ✭ 130 (+348.28%)
Mutual labels:  rabbitmq, amqp
BUA-FE
本科毕设,搭建一套小而全面的校园外卖系统。主要使用wei-xin-mini + TypeScript + nest.js + typeORM + rabbitmq技术栈。
Stars: ✭ 20 (-31.03%)
Mutual labels:  rabbitmq, nestjs
Amqproxy
An intelligent AMQP proxy, with connection and channel pooling/reusing
Stars: ✭ 115 (+296.55%)
Mutual labels:  rabbitmq, amqp
Bitnami Docker Rabbitmq
Bitnami Docker Image for RabbitMQ
Stars: ✭ 120 (+313.79%)
Mutual labels:  rabbitmq, amqp

nest-rabbit-tasks

nest-rabbit-tasks is a TaskQueue based upon RabbitMQ for NestJS.

NPM

David npm bundle size

CircleCI

Quality Gate Status Maintainability Rating Reliability Rating Security Rating

rabbit

Motivation

By working hard on abstracting NestJS injection and RabbitMQ connection, we wanted to make it easy to do background tasks in NestJS.

nest-bull is a reference in this domain: it use bull and redis to provide the task queue. Moreover nest-bull and bull are the state of art about how TaskQueue complexity can be hidden and abstracted. With them, building a queue is actually fun.

However, complex system may need other technologies than redis: RabbitMQ provides more features, such as exchange (to allow task routing).

We needed it on a project, so here it is made available for you.

Disclaimer

This is beta quality software. We just started it so it has probably a few bugs.

We are actively working on it so it may break without notice / API may change.

⚠️ Use with caution, be ready to pin a version and make some modification on the code ⚠️

Contributors

Thanks goes to these wonderful people (emoji key):

Tycho Tatitscheff
Tycho Tatitscheff

💻 📖 🤔 🚇
MattAgn
MattAgn

💻 📖 🤔

This project follows the all-contributors specification. Contributions of any kind welcome!

Get Started

Declare the module:

import { Module } from '@nestjs/common';
import { NestRabbitTasksModule } from 'nest-rabbit-tasks';

// Import the worker.
import { EmailWorker } from './worker/email.worker';
// And import some services.
import { SMTPService } from './service/smtp.service';

@Module({
  imports: [
    // Note that there is also a `registerAsync` method
    // that allows the parameters to depend on an injected configService.
    // You can have a look at the interface to see what options it takes.
    NestRabbitTasksModule.registerSync({
      // That is not the name of RabbitMQ queue
      // but an unique reference that identify the queue and the worker
      // and allow us to link the conf here and the worker implementation there
      reference: 'worker-email-queue',
      // Queue or exchange (but exchange are not implemented yet ^^).
      entityType: 'queue',
      // AMQP connection/channel wide options.
      amqpOptions: { connectionUrl: 'amqp://localhost:5672' },
      // Module option that dictate the worker behavior
      globalOptions: {
        // By default RabbitMQ create queue if the queue doesn't exist
        // By setting `{ immutableInfrastructure: true }` will throw if the queue does not exist
        // Note that this is not yet implemented yet but will be pretty soon
        immutableInfrastructure: true,
        // How many messages the Worker should take.
        // Increasing it increase the throughput but decrease the consistency
        prefetchSize: 1,
      },
      // Definition of the queue and queue specific options
      // such as `single-active-consumer`
      entity: { queueName: 'worker.queue.1', queueOptions: {} },
      // Worker used to handle the AMQP message
      worker: EmailWorker,
    }),
  ],
  // Here you should import the worker (EmailWorker)
  // and other service you need
  providers: [EmailWorker, SMTPService],
})
export class AppModule {}

// ---

// in main.js
import { NestFactory } from '@nestjs/core';

async function bootstrap() {
  // Start the module like always
  const app = await NestFactory.create(AppModule);
  // And launch it.
  // Having an HTTP port is useful to configure an health-check route
  await app.listen(3000);
}

and create a worker:

import { RabbitWorker, RabbitTasksWorker, RabbitTasksWorker } from 'nest-rabbit-tasks';

// You can specify what the Event looks like.
// By default it is any.
interface Event {
  name: string;
  recipient: string[];
  cc?: string[];
  bcc?: string[];
  content: string;
}

@RabbitTasksWorker({ reference: 'worker-email-queue' })
export class EmailWorker extends RabbitWorker<Event> {
  // Please do dependency injection
  // and inject what need for the worker to work
  public constructor(private readonly smtpService: SMTPService) {}

  // This method is mandatory
  public async handleMessage(data: Event, message: RabbitTasksWorker<Event, void>) {
    if (data.name === 'send-mail') {
      await this.smtpService.sendEmail(data.recipient, data.cc, data.bcc, data.content);
      // Acknowledge the message to remove it from the queue
      message.ack();
    } else {
      log.error('unknown event');
      if (message.getHeader('x-retries') < 3) {
        // Non acknowledge the message but with retry
        // to requeue it and process it later
        message.nack(true);
      } else {
        // Non acknowledge the message without retry
        // to remove it from the queue
        message.nack(false);
      }
    }
  }
}

Advanced usage

Async configuration

Why: your configuration may be dynamic, depends on env variable or API calls.

@Module({
  imports: [
    NestRabbitTasksModule.registerAsync({
      // The reference must be static and unique
      reference: 'toto',
      // Same for the entity type
      entityType: 'queue',
      // The handler is static too
      // (but that is not a mandatory constraint in the code, let me know if you have usages that)
      // (... requires it to be dynamic. I just found it made more sense like this in my use cases.)
      worker: TestWorker,
      // The rest of the options are dynamic
      // (We only provide `useFactory` for now but `useExisting` and `useClass` can be easily implemented)
      useFactory: async (configService: ConfigService) => {
        const queueName = await configService.getQueueName();
        return {
          entity: { queueName },
          amqpOptions: { connectionUrl: 'amqp://localhost:5672' },
          globalOptions: { immutableInfrastructure: true, prefetchSize: 1 },
        };
      },
      // For it to work you have to import a module, eg. a config module
      // that export a service, eg. a config service
      // and inject the configService, so `useFactory` can resolve it
      imports: [ConfigModule],
      inject: [ConfigService],
    }),
  ],
  providers: [TestWorker],
})
export class AppModule {}

Road-map

  • implement the immutableInfrastructure mode

  • connect Rabbit logger to nest Logger and improve debug logs

  • properly check that config is correct and report error if not

  • implement async configuration (registerAsync)

  • prevent Haredo deps to leak

  • implement @OnEvent(rabbitEventName: string) to decorate a method of the Worker that will be call when rabbitEventName is emitted in the queue

  • work on quality (unit tests, E2E tests)

  • improve the doc (registerAsync)

  • implement an Exchange class (so users can publish to exchange using this)

Note that the project description data, including the texts, logos, images, and/or trademarks, for each open source project belongs to its rightful owner. If you wish to add or remove any projects, please contact us at [email protected].