All Projects → skif48 → nabbitmq

skif48 / nabbitmq

Licence: MIT license
Node.js library for interacting with RabbitMQ based on RxJS streams

Programming Languages

typescript
32286 projects

Projects that are alternatives of or similar to nabbitmq

ridge
Pure asynchronous PHP implementation of the AMQP 0-9-1 protocol.
Stars: ✭ 49 (+145%)
Mutual labels:  amqp, rabbitmq-client, amqp0-9-1, amqp-client
fs2-rabbit
🐰 RabbitMQ stream-based client built on top of Fs2
Stars: ✭ 143 (+615%)
Mutual labels:  amqp, rabbitmq-client, amqp-client
Enqueue Dev
Message Queue, Job Queue, Broadcasting, WebSockets packages for PHP, Symfony, Laravel, Magento. DEVELOPMENT REPOSITORY - provided by Forma-Pro
Stars: ✭ 1,977 (+9785%)
Mutual labels:  rabbitmq, amqp, rabbitmq-client
node-carotte-amqp
An amqplib wrapper for microservices
Stars: ✭ 27 (+35%)
Mutual labels:  rabbitmq, amqp, rabbitmq-client
nest-rabbit-tasks
nest-rabbit-worker is a TaskQueue based upon RabbitMQ for NestJS
Stars: ✭ 29 (+45%)
Mutual labels:  rabbitmq, amqp, amqp0-9-1
watermill-amqp
AMQP Pub/Sub for the Watermill project.
Stars: ✭ 27 (+35%)
Mutual labels:  rabbitmq, amqp
rejected
rejected is a consumer framework for RabbitMQ
Stars: ✭ 56 (+180%)
Mutual labels:  rabbitmq, amqp
docker-rabbitmq-node
🐳 A playground for Docker with RabbitMQ and Node.
Stars: ✭ 32 (+60%)
Mutual labels:  rabbitmq, amqp
nestjs-rmq
A custom library for NestJS microservice. It allows you to use RabbitMQ or AMQP.
Stars: ✭ 182 (+810%)
Mutual labels:  rabbitmq, amqp
longears
The RabbitMQ client for R
Stars: ✭ 32 (+60%)
Mutual labels:  rabbitmq, rabbitmq-client
postman
Reverse proxy for async microservice communication
Stars: ✭ 30 (+50%)
Mutual labels:  rabbitmq, amqp
go-mq
Declare AMQP entities like queues, producers, and consumers in a declarative way. Can be used to work with RabbitMQ.
Stars: ✭ 76 (+280%)
Mutual labels:  rabbitmq, amqp
aop
AMQP on Pulsar protocol handler
Stars: ✭ 93 (+365%)
Mutual labels:  amqp, amqp0-9-1
roger-rabbit
A module that makes the process of consuming and publishing messages in message brokers easier
Stars: ✭ 12 (-40%)
Mutual labels:  rabbitmq, amqp
amqpprox
An AMQP 0.9.1 proxy server, designed for use in front of an AMQP 0.9.1 compliant message queue broker such as RabbitMQ.
Stars: ✭ 51 (+155%)
Mutual labels:  amqp, amqp0-9-1
tomodachi
💻 Microservice library / framework using Python's asyncio event loop with full support for HTTP + WebSockets, AWS SNS+SQS, RabbitMQ / AMQP, middleware, etc. Extendable for GraphQL, protobuf, gRPC, among other technologies.
Stars: ✭ 170 (+750%)
Mutual labels:  rabbitmq, amqp
Haredo
Node.js library for RabbitMQ
Stars: ✭ 76 (+280%)
Mutual labels:  rabbitmq, amqp0-9-1
opentelemetry-ext-js
js extensions for the open-telemetry project
Stars: ✭ 122 (+510%)
Mutual labels:  rabbitmq, amqp0-9-1
Rxjs Hooks
React hooks for RxJS
Stars: ✭ 1,953 (+9665%)
Mutual labels:  rxjs, rxjs6
the-white-rabbit
The White Rabbit is an asynchronous RabbitMQ (AMQP) client based on Kotlin coroutines
Stars: ✭ 90 (+350%)
Mutual labels:  amqp, rabbitmq-client

NabbitMQ Logo

NabbitMQ

Node.js library for interacting with RabbitMQ based on RxJS streams

npm version CircleCI

Installation

npm install --save nabbitmq

API Docs

Detailed API docs can be found here. Generated with TypeDoc.

Project status

Project is being actively developed and improved. Any suggestions, help and criticism are warmly welcomed.

Description

NabbitMQ is a library that makes it easy for Node.js developers to interact with RabbitMQ. It's built on top of famous amqplib package and it leverages RxJS streams.

Message queues naturally are streams of events, therefore using RxJS with them is an efficient way for developers to solve complex problems in a very elegant fashion.

There are a lot of use cases, when we don't need to setup non standard exchanges and non trivial bindings to queues. In fact, most of the time what we actually need is just a simple queue, just out of the box. And NabbitMQ is here to help you with that! All you need is to provide custom name for the queue and you're ready to go, everything else is handled for you!

However, NabbitMQ allows you to use amqplib's promise-based api directly, so that you can build a more complex solution for your specific needs and still make use of RxJS streams.

Principles and reasons

Obviously, one of the main reasons for this library to even exist is to have the threshold of entry to RabbitMQ world a bit lower, than it is now, but at the same time to allow us to make use of any piece of API that RabbitMQ provides us with.

The other reason is seamless error handling and helping developers to easily build fault tolerant solutions. For example, NabbitMQ will provide you with an automatically set up dead letter queue that listens to your main queue, unless you just don't need to have.

NabbitMQ has its own set of error classes, therefore it makes it easy for developers to debug and build solutions, that will survive even in the most "cornery" corner cases.

In the end, the main principle and goal is to have a solid and reliable solution out of the box, while working with RabbitMQ.

Examples

You can find examples under the examples folder. There are required nodemon configs and also a docker container of RabbitMq with management plugin within for your convenience.

Quick start

This snippet demonstrates how you can easily spin up a solid RabbitMQ setup and quickly start to consume a stream of events from it. Under the hood, NabbitMQ creates all necessary bindings, exchanges, dead letter queues and provides you with reconnect logic.

import { RabbitMqConnectionFactory, ConsumerFactory, PublisherFactory } from 'nabbitmq';

async function main() {
  const RabbitMqConnectionFactory = new RabbitMqConnectionFactory();
  RabbitMqConnectionFactory.setUri('amqp://localhost:5672');
  const connection = await RabbitMqConnectionFactory.newConnection();
  const consumerFactory = new ConsumerFactory(connection);
  consumerFactory.setConfigs({queue: {name: 'super_queue'}});
  const consumer = await consumerFactory.newConsumer();

  consumer.startConsuming().subscribe({next: console.log, error: console.error});

  const anotherConnection = await RabbitMqConnectionFactory.newConnection();
  const publisherFactory = new PublisherFactory(anotherConnection);
  publisherFactory.setConfigs({exchange: {name: consumer.getActiveConfigs().exchange.name}});
  const publisher = await publisherFactory.newPublisher();
  setInterval(() => publisher.publishMessage(Buffer.from('hello hello!'), `${consumer.getActiveConfigs().queue.name}_rk`), 1000);
}

main();

Overview

NabbitMQ provides you with two main abstractions: Publisher and Consumer. Each is represented by a class, that implements RabbitMqPeer interface. They are supposed to be instantiated with PublisherFactory and ConsumerFactory classes. However, there is a third abstraction called RabbitMqConnection. This is a class, that holds an active connection data to the used RabbitMQ server. It is injected into publishers and consumers via their factories. Configs to setup RabbitMQ internal structure of exchanges, queues and bindings, are provided to factories in form of plain JavaScript/TypeScript objects. There are interfaces for these objects, called ConsumerConfigs and PublisherConfigs. Most of the values for these objects are optional, the consumers and publishers themselves fill them up with some standard values. For example, if your provide a queue name like my_queue, but don't provide an exchange name, the exchange will be called exchange_my_queue - you can rely on that. Also, if dead letter queue has to be set up (which is optional), but no name for it provided, consumer will also result with default name like my_queue_dlq. Dead letter exchange will have the following form: exchange_my_queue_dlq.

However, it is obvious that you might need to have a rare and not so generic RabbitMQ structure with more than one queue and more than one exchange. Therefore there is an option for you not to provide these configs, but to provide a so called custom setup function. This function accepts a connection object from underlying amqplib package. Inside of this function you can do whatever you need, but it should return a promise that resolves with an object that contains amqplib channel instance and optional consumer's prefetch count, if you use this function to set up a consumer (not mandatory though, a default prefetch value will be set if not provided).

Basics

Consumer configs

The only required field to setup a consumer is name of the queue you want to use. Every other field is optional and will be filled by consumer itself. Here is an example of how consumer configs object will look like, when there was only queue name my_queue provided:

const configs = { 
  queue: { 
    name: 'my_queue',
    bindingPattern: 'my_queue_rk', // routing key name: `${your queue name}_rk`
    options: {
      durable: true, // queue persistence is enabled by default
    },
  },
  exchange: { 
    name: 'exchange_my_queue', // exchange name: `exchange_${your queue name}`
    type: 'direct', // direct binding type by default with a name
    options: {
      durable: true, // exchange persistence is enabled by default
    },
  },
  autoAck: false, // RabbitMQ acknowledge on send is disabled by default, meaning that by default you have to commit your messages.
  prefetch: 100, // consumer prefetch
  reconnectAttempts: -1, // infinite amount of reconnect attempts
  reconnectTimeoutMillis: 1000, // 1 second window between failing reconnect attempts
  deadLetterQueue: { // dead letter queue is built and bound by default
    queue: {
      name: 'dlq_my_queue', // dead letter queue name: `dlq_${your queue name}`
      options: {
        durable: true, // dead letter queue is also persistent by default
      },
    },
    exchange: {
      name: 'exchange_dlq_my_queue', // dead letter queue exchange name: `exchange_${dead letter queue name}`
      type: 'fanout',  // fanout type by default
      options: {
        durable: true, // dead letter exchange persistent by default
      },
    },
  },
};

As for publisher, the only required field is the name of the exchange to publish to, everything else will be filled with default values by publisher itself. Here is an example of publisher configs with my_exchange exchange name:

const configs = { 
  exchange: {
    name: 'my_exchange',
    options: {
      durable: true,  // exchange persistence is enabled by default
    },
    type: 'direct', // direct binding type by default
  },
  publisherConfirms: true, // publisher confirmations are enabled by default
  reconnectAttempts: -1, // infinite amount of reconnect attempts
  reconnectTimeoutMillis: 1000, // 1 second window between failing reconnect attempts
};

Usage

Setting up topic exchange type

import { ConsumerFactory, PublisherFactory, RabbitMqConnectionFactory } from 'nabbitmq';

async function main() {
  const connectionFactory = new RabbitMqConnectionFactory();
  connectionFactory.setUri('amqp://localhost:5672');
  const connection = await connectionFactory.newConnection();
  const consumerFactory = new ConsumerFactory(connection);
  consumerFactory.setConfigs({
    queue: {
      name: 'queue',
      bindingPattern: 'route.#',
    },
    exchange: {
      name: 'exchange',
      type: 'topic',
    },
    prefetch: 50,
  });
  const consumer = await consumerFactory.newConsumer();

  consumer.startConsuming().subscribe({
    next: (msg) => {
      console.log(msg);
      consumer.commitMessage(msg);
    },
    error: console.error,
  });

  const anotherConnection = await connectionFactory.newConnection();
  const publisherFactory = new PublisherFactory(anotherConnection);
  publisherFactory.setConfigs({
    exchange: {
      name: 'exchange',
      type: 'topic',
    },
    publisherConfirms: false,
  });
  const publisher = await publisherFactory.newPublisher();
  publisher.actionsStream().subscribe({next: console.log, error: console.error});
  setInterval(() => publisher.publishMessage(Buffer.from('hello hello!'), `route.${Math.ceil(Math.random() * 10)}`), 1000);
}

main();

With custom setup function

Let's see how we can achieve the same as in the example above, but instead of config objects we're going to supply a custom setup function. For publishers and consumers there are different type aliases and requirements for these functions.

import { RabbitMqConnectionFactory, ConsumerFactory, PublisherFactory, RabbitMqChannelCancelledError, RabbitMqChannelClosedError, RabbitMqConnectionClosedError, RabbitMqPublisherConfirmationError } from 'nabbitmq';

async function main() {
  const connectionFactory = new RabbitMqConnectionFactory();
  connectionFactory.setUri('amqp://localhost:5672');
  const rabbitMqConnection = await connectionFactory.newConnection();
  const consumerFactory = new ConsumerFactory(rabbitMqConnection);
  consumerFactory.setCustomSetupFunction(async (connection) => {
    const channel = await connection.createChannel();
    await channel.assertExchange('exchange', 'topic', {});
    const queueMetadata = await channel.assertQueue('queue', {
      durable: true,
    });

    await channel.bindQueue(queueMetadata.queue, 'exchange', 'route.#');
    await channel.prefetch(10);

    return {channel, queue: 'queue', prefetch: 10, autoAck: false};
  });

  const consumer = await consumerFactory.newConsumer();

  consumer.startConsuming().subscribe({
    next: (msg) => {
      console.log('Received message', msg);
      consumer.commitMessage(msg);
    },
    error: (error) => {
      if (error instanceof RabbitMqConnectionClosedError)
        return void console.error('Connection was closed');

      if (error instanceof RabbitMqChannelClosedError)
        return void console.error('Channel was closed by the server');

      if (error instanceof RabbitMqChannelCancelledError)
        return void console.error('Channel cancellation occurred');
      
      // ... and so on
    },
  });

  const anotherConnection = await connectionFactory.newConnection();
  const publisherFactory = new PublisherFactory(anotherConnection);
  publisherFactory.setCustomSetupFunction(async (connection) => {
    const channel = await connection.createConfirmChannel();
    await channel.assertExchange('exchange', 'topic', {});
    const queueMetadata = await channel.assertQueue('queue', {
      durable: true,
    });

    await channel.bindQueue(queueMetadata.queue, 'exchange', 'route.#');
    return {channel, exchange: 'exchange'};
  });
  const publisher = await publisherFactory.newPublisher();

  publisher.actionsStream().subscribe({
    next: console.log,
    error: (error) => {
      if (error instanceof RabbitMqPublisherConfirmationError)
        return void console.error('Sent message failed to be confirmed');
      
      // ... and so on
    },
  });

  setInterval(() => publisher.publishMessage(Buffer.from('hello hello!'), `route.${Math.ceil(Math.random() * 10)}`), 1000);
}

main();

Reconnect

Let's assume that we have a working consumer instance, built either with object based configs or custom setup function. We can build a service, in which this consumer will be injected. Then, active reconnection logic can be implemented in the following way:

import { Message } from 'amqplib';
import { ReplaySubject } from 'rxjs/internal/ReplaySubject';
import { Consumer, RabbitMqError } from 'nabbitmq';

export class ConsumerService {
  private stream: ReplaySubject<Message>;
  constructor(
    private readonly consumer: Consumer,
  ) {
    this.init();
  }

  public init() {
    this.stream = this.consumer.startConsuming();
    this.stream.subscribe({
      next: (message) => {
        console.log('Received a message', message);
        this.consumer.commitMessage(message);
      },
      error: (error) => {
        if (error instanceof RabbitMqError) {
          this.consumer.reconnect().toPromise() // reconnect method returns an observable, which will complete once connection is reestablished
            .then(() => this.init()) // something like mutual recursion
            .catch((err) => console.error('Failed to reconnect:', err));
        }
      },
    });
  }
}

The same logic can be reproduced for publisher instances.

License

NabbitMQ is MIT Licensed.

Note that the project description data, including the texts, logos, images, and/or trademarks, for each open source project belongs to its rightful owner. If you wish to add or remove any projects, please contact us at [email protected].