All Projects → rxmqjs → Rxmq.js

rxmqjs / Rxmq.js

JavaScript pub/sub library based on RxJS

Programming Languages

javascript
184084 projects - #8 most used programming language

Projects that are alternatives of or similar to Rxmq.js

go-nats-examples
Single repository for go-nats example code. This includes all documentation examples and any common message pattern examples.
Stars: ✭ 99 (-26.12%)
Mutual labels:  message-bus, message-queue
OpenSleigh
OpenSleigh is a Saga management library for .NET Core.
Stars: ✭ 198 (+47.76%)
Mutual labels:  message-bus, message-queue
grails-rabbitmq-native
A Grails plugin that provides convenient RabbitMQ functionality using the native Java library for RabbitMQ.
Stars: ✭ 27 (-79.85%)
Mutual labels:  message-bus, message-queue
Phpnats
A PHP client for the NATSio cloud messaging system.
Stars: ✭ 209 (+55.97%)
Mutual labels:  message-queue, message-bus
Storage Based Queue
Javascript queue library with persistent storage based queue mechanism for the browsers environments. Specially designed for offline.
Stars: ✭ 33 (-75.37%)
Mutual labels:  message-queue, browser
Nats.c
A C client for NATS
Stars: ✭ 220 (+64.18%)
Mutual labels:  message-queue, message-bus
telephone-ts
Telephone-ts: The "Event Emitter-less" TypeScript Event Architecture.
Stars: ✭ 22 (-83.58%)
Mutual labels:  message-bus, message-queue
Enqueue Dev
Message Queue, Job Queue, Broadcasting, WebSockets packages for PHP, Symfony, Laravel, Magento. DEVELOPMENT REPOSITORY - provided by Forma-Pro
Stars: ✭ 1,977 (+1375.37%)
Mutual labels:  message-queue, message-bus
Plumber
A swiss army knife CLI tool for interacting with Kafka, RabbitMQ and other messaging systems.
Stars: ✭ 514 (+283.58%)
Mutual labels:  message-queue, message-bus
Benthos
Fancy stream processing made operationally mundane
Stars: ✭ 3,705 (+2664.93%)
Mutual labels:  message-queue, message-bus
Qmq
QMQ是去哪儿网内部广泛使用的消息中间件,自2012年诞生以来在去哪儿网所有业务场景中广泛的应用,包括跟交易息息相关的订单场景; 也包括报价搜索等高吞吐量场景。
Stars: ✭ 2,420 (+1705.97%)
Mutual labels:  message-queue, message-bus
Mq
MQ is a simple distributed in-memory message broker
Stars: ✭ 114 (-14.93%)
Mutual labels:  message-queue, message-bus
Hangfire.topshelf
Best practice for hangfire samples
Stars: ✭ 192 (+43.28%)
Mutual labels:  message-queue, message-bus
Enqueue Bundle
[READ-ONLY] Message queue bundle for Symfony. RabbitMQ, Amazon SQS, Redis, Service bus, Async events, RPC over MQ and a lot more
Stars: ✭ 233 (+73.88%)
Mutual labels:  message-queue, message-bus
Message Bus
Go simple async message bus
Stars: ✭ 166 (+23.88%)
Mutual labels:  message-queue, message-bus
psr-container-messenger
Message bus and queue for Mezzio with Symfony Messenger + Enqueue
Stars: ✭ 24 (-82.09%)
Mutual labels:  message-bus, message-queue
ng-radio
RxJS-based message bus service for Angular2
Stars: ✭ 12 (-91.04%)
Mutual labels:  rxjs, message-bus
Nats Server
High-Performance server for NATS.io, the cloud and edge native messaging system.
Stars: ✭ 10,223 (+7529.1%)
Mutual labels:  message-queue, message-bus
Rebus
🚌 Simple and lean service bus implementation for .NET
Stars: ✭ 1,733 (+1193.28%)
Mutual labels:  message-queue, message-bus
Console Badge
🎨 Create simple badges in the browser console
Stars: ✭ 130 (-2.99%)
Mutual labels:  browser

Rxmq.js

Build Status npm MIT

JavaScript pub/sub library based on RxJS

What is it?

Rxmq.js is an in-memory message bus based on reactive extensions - inspired by postal.js - written in JavaScript using ES6 and Babel. Rxmq.js runs equally good in the browser and on the server using node.js. It provides a 'broker' that allows for creation of more sophisticated pub/sub implementations than what you usually find in event-style based libraries. On top of that, all used objects are parts of reactive extensions which allows doing a lot of cool things with them out of the box.

Quick start

If you want to subscribe to an observable, you tell Rxmq what channel and topic to subscribe to and a set of functions to be invoked (taken from Rx.Observable.subscribe):

import Rxmq from 'rxmq';

const subscription = Rxmq.channel('posts')
  .observe('post.add')
  .subscribe(
    // following methods are same as for Rx.Observable.subscribe
    data => {
      // handle new data ...
    },
    error => {
      // handle error ...
    }
  );

The publisher might do something similar to this:

Rxmq.channel('posts')
  .subject('post.add')
  .next({
    title: 'Woo-hoo, first post!',
    text: 'My lengthy post here',
  });

Note, that if you are not using ES6 modules (e.g. with babel), you will need to require Rxmq in the following way:

var Rxmq = require('rxmq').default;

Channels? Topics?

A channel is a logical partition of topics, more specifically - a set of topics. As well explained by postal.js readme section on channels, conceptually, it's like a dedicated highway for a specific set of communication. In case of Rxmq.js each topic is represented by a slightly tweaked Rx.Subject (specifically - it never triggers complete(), so you can keep sending your data all the time). Using channel- and topic-oriented messaging instead of traditional JavaScript approaches like callbacks or promises enables you to separate components (or modules) communication by context.

It's possible to get a more concise API if you want to hang onto a Channel - which can be really convenient while working with a specific channel (e.g. inside of a specific component):

const channel = Rxmq.channel('posts');
const subject = channel.subject('post.add');

const subscription = subject.subscribe(data => {
  /*do stuff with data */
});

subject.next({
  title: 'Woo-hoo, first post!',
  text: 'My lengthy post here',
});

How's Rxmq.js Different From {Insert Eventing Library Here}?

Some of those are shamelessly taken from postal.js list :)

  • Rxmq is not an event emitter - it's not meant to be mixed into an instance. Instead, it's a stand alone 'broker' – a message bus.
  • Rxmq uses a slightly modified Rx.Subject (it will never be completed or stopped by error) to pass messages. This means you use all the cool features of Rx.Observable and Rx.Observer while working on your messaging.
  • Most 'event aggregator' libs are single channel - which can lead to event name collision, and reduce the performance of matching an event to the correct subscribers. Rxmq is multi-channel.
  • Rxmq built-in topic logic supports hierarchical wildcard topic bindings - supporting the same logic as topic bindings in the AMQP spec. And if you don't like that approach, you can easily provide your own bindings resolver.

More on How to Use It

Here are four examples of using Rxmq.

// This gets you a handle to the default Rxmq channel
// You can get a named channel instead like this:
// const channel = Rxmq.channel('DoctorWho');
const channel = Rxmq.channel();

// subscribe to 'name.change' topics
const subscription = channel.observe('name.change').subscribe(data => {
  $('#example1').html('Name: ' + data.name);
});

// And someone publishes a name change:
channel.subject('name.change').next({ name: 'Dr. Who' });

// To dispose, just trigger the unsubscribe() method:
subscription.unsubscribe();

Subscribing to a wildcard topic using *

The * symbol represents 'one word' in a topic (i.e - the text between two periods of a topic). By subscribing to '*.changed', the binding will match name.changed & location.changed but not changed.companion.

const chgSubscription = channel.observe('*.changed').subscribe(data => {
  $('<li>' + data.type + ' changed: ' + data.value + '</li>').appendTo(
    '#example2'
  );
});
channel.subject('name.changed').next({ type: 'Name', value: 'John Smith' });
channel
  .subject('location.changed')
  .next({ type: 'Location', value: 'Early 20th Century England' });
chgSubscription.unsubscribe();

Subscribing to a wildcard topic using #

The # symbol represents 0-n number of characters/words in a topic string. By subscribing to 'DrWho.#.Changed', the binding will match DrWho.NinthDoctor.Companion.Changed & DrWho.Location.Changed but not Changed.

const starSubscription = channel.observe('DrWho.#.Changed').subscribe(data => {
  $('<li>' + data.type + ' Changed: ' + data.value + '</li>').appendTo(
    '#example3'
  );
});
channel
  .subject('DrWho.NinthDoctor.Companion.Changed')
  .next({ type: 'Companion Name', value: 'Rose' });
channel
  .subject('DrWho.TenthDoctor.Companion.Changed')
  .next({ type: 'Companion Name', value: 'Martha' });
channel
  .subject('DrWho.Eleventh.Companion.Changed')
  .next({ type: 'Companion Name', value: 'Amy' });
channel
  .subject('DrWho.Location.Changed')
  .next({ type: 'Location', value: 'The Library' });
channel
  .subject('TheMaster.DrumBeat.Changed')
  .next({ type: 'DrumBeat', value: "This won't trigger any subscriptions" });
channel.subject('Changed').next({
  type: 'Useless',
  value: "This won't trigger any subscriptions either",
});
starSubscription.unsubscribe();

Using Rx.Observable methods with a subscription

import { distinctUntilKeyChanged } from 'rxjs/operators';

const dupChannel = Rxmq.channel('Blink');
const dupSubscription = dupChannel
  .observe('WeepingAngel.#')
  .pipe(distinctUntilKeyChanged('name'))
  .subscribe(data => {
    $('<li>' + data.value + '</li>').appendTo('#example4');
  });
// demonstrating multiple channels per topic being used
// You can do it this way if you like, but the example above has nicer syntax (and *much* less overhead)
dupChannel.subject('WeepingAngel.DontBlink').next({ value: "Don't Blink" });
dupChannel.subject('WeepingAngel.DontBlink').next({ value: "Don't Blink" });
dupChannel
  .subject('WeepingAngel.DontEvenBlink')
  .next({ value: "Don't Even Blink" });
dupChannel
  .subject('WeepingAngel.DontBlink')
  .next({ value: "Don't Close Your Eyes" });
dupChannel.subject('WeepingAngel.DontBlink').next({ value: "Don't Blink" });
dupChannel.subject('WeepingAngel.DontBlink').next({ value: "Don't Blink" });
dupSubscription.unsubscribe();

Using request-response pattern

To make a request, you can do the following:

const channel = rxmq.channel('user');

channel
  .request({ topic: 'last.login', data: { userId: 8675309 } })
  .timeout(2000)
  .subscribe(
    data =>
      console.log(
        `Last login for userId: ${data.userId} occurred on ${data.time}`
      ),
    err => console.error('Uh oh! Error:', err),
    () => console.log('done!')
  );

It's also possible to make a request with custom reply subject, like so:

const channel = rxmq.channel('user');

channel
  .request({
    topic: 'posts.all',
    data: { userId: 8675309 },
    Subject: Rx.Subject,
  })
  .subscribe(
    post => console.log(`Got post: ${post.id}`),
    err => console.error('Uh oh! Error:', err),
    () => console.log('done!')
  );

To handle requests:

// SUCCESS REPLY
const subscription = channel
  .observe('last.login')
  .subscribe(({ data, replySubject }) => {
    const result = getLoginInfo(data.userId);
    // `replySubject` is just a Rx.AsyncSubject
    replySubject.next({ time: result.time, userId: data.userId });
    replySubject.complete();
  });

// ERROR REPLY
const subscription = channel
  .observe('last.login')
  .subscribe(({ data, replySubject }) => {
    const result = getLoginInfo(data.userId);
    // `replySubject` is just a Rx.AsyncSubject
    replySubject.error(new Error('No such user'));
    replySubject.complete();
  });

Make sure to always call .complete() after you're done with dispatching your data.

Connecting external Rx.Observable to Rxmq topic

const topic = channel.subject('ajax');
const ajax = Rx.Observable.fromPromise($.ajax({ url: 'http://...' }).promise());
ajax.multicast(topic).connect();

Available plugins

  • rxmq.aliases - a plugin that provides bus- and channel-level convenience aliases.
  • rxmq.middleware - a plugin that adds support for topic-based middleware.

I still need help!

Feel free to ask any questions you might have by opening an issue.

Build, Dependencies, etc.

  • Rxmq depends only on RxJS.

Can I contribute?

Sure thing! While project is still in its early days, I hope the API is relatively stable. Pull requests are welcome, but please make sure to include tests for your additions.

License

MIT

Note that the project description data, including the texts, logos, images, and/or trademarks, for each open source project belongs to its rightful owner. If you wish to add or remove any projects, please contact us at [email protected].