All Projects → AntonyVorontsov → RabbitMQ.Client.Core.DependencyInjection

AntonyVorontsov / RabbitMQ.Client.Core.DependencyInjection

Licence: MIT license
.Net Core library-wrapper of RabbitMQ.Client for Dependency Injection.

Programming Languages

C#
18002 projects
shell
77523 projects

Projects that are alternatives of or similar to RabbitMQ.Client.Core.DependencyInjection

docker-rabbitmq-node
🐳 A playground for Docker with RabbitMQ and Node.
Stars: ✭ 32 (-68.93%)
Mutual labels:  rabbitmq
docker-workshop-with-react-aspnetcore-redis-rabbitmq-mssql
An Asp.Net Core Docker workshop project that includes react ui, redis, mssql, rabbitmq and azure pipelines
Stars: ✭ 53 (-48.54%)
Mutual labels:  rabbitmq
b-rabbit
A thread safe library that aims to provide a simple API for interfacing with RabbitMQ. Built on top of rabbitpy, the library make it very easy to use the RabbitMQ message broker with just few lines of code. It implements all messaging pattern used by message brokers
Stars: ✭ 15 (-85.44%)
Mutual labels:  rabbitmq
opentelemetry-ext-js
js extensions for the open-telemetry project
Stars: ✭ 122 (+18.45%)
Mutual labels:  rabbitmq
nabbitmq
Node.js library for interacting with RabbitMQ based on RxJS streams
Stars: ✭ 20 (-80.58%)
Mutual labels:  rabbitmq
http-proxy-amqp
an amqp connection pool implementation
Stars: ✭ 17 (-83.5%)
Mutual labels:  rabbitmq
LearningPoint
A repository for learning different technologies, frameworks, features......
Stars: ✭ 66 (-35.92%)
Mutual labels:  rabbitmq
rabbitmq-stream-dotnet-client
RabbitMQ client for the stream protocol
Stars: ✭ 58 (-43.69%)
Mutual labels:  rabbitmq
rabbitmq-management-agent
RabbitMQ Management Agent
Stars: ✭ 16 (-84.47%)
Mutual labels:  rabbitmq
dockerX
Examples of amazing Docker/Docker-Compose/Docker Swarm technologies
Stars: ✭ 17 (-83.5%)
Mutual labels:  rabbitmq
PHP-interview-myway
记录PHP面试包括计算机网路,操作系统,PHP,redis,数据库MySQL(从2019年12月开始)面试了好未来,跟谁学,头条,极客保险,百度文库,百度知道等等公司。持续更新中
Stars: ✭ 380 (+268.93%)
Mutual labels:  rabbitmq
alicemq
RabbitMQ Visualizer
Stars: ✭ 102 (-0.97%)
Mutual labels:  rabbitmq
seckill parent
基于springboot+springcloud的高并发和商品秒杀项目,通过redis,rabbitmq等技术实现秒杀的高并发。
Stars: ✭ 59 (-42.72%)
Mutual labels:  rabbitmq
docker-symfony
Docker Symfony (PHP-FPM - NGINX - MySQL - MailHog - Redis - RabbitMQ)
Stars: ✭ 32 (-68.93%)
Mutual labels:  rabbitmq
docker
collection of docker / docker-compose files, dind, gitlab, jenkins, mongo, mysql, oracle, rabbitmq, redis, sonarqube
Stars: ✭ 25 (-75.73%)
Mutual labels:  rabbitmq
aioamqp consumer
consumer/producer/rpc library built over aioamqp
Stars: ✭ 36 (-65.05%)
Mutual labels:  rabbitmq
rb-spider
基于 RabbitMQ 中间件的爬虫的 Ruby 实现 [Developing]
Stars: ✭ 13 (-87.38%)
Mutual labels:  rabbitmq
amqpextra
Golang AMQP on steroids. Reliable connection. Publisher. Consumer.
Stars: ✭ 59 (-42.72%)
Mutual labels:  rabbitmq
cqrs-event-sourcing-example
Example of a list-making Web API using CQRS, Event Sourcing and DDD.
Stars: ✭ 28 (-72.82%)
Mutual labels:  rabbitmq
go-amqp-reconnect
auto reconnecting example for github.com/streadway/amqp Connection & Channel
Stars: ✭ 79 (-23.3%)
Mutual labels:  rabbitmq

RabbitMQ.Client.Core.DependencyInjection


Codacy Badge

This repository contains the library that provides functionality for wrapping RabbitMQ.Client code and adding it in your application via the dependency injection mechanism. That wrapper provides easy, managed message queue consume and publish operations. The library targets netstandard2.1.

(!)Current version of documentation is out of date for 5.0.0. Stay updated.(!)

Usage

This section contains only an example of a basic usage of the library. You can find the detailed documentation in the docs directory where all functionality fully covered.

Producer

To produce messages in the RabbitMQ queue you have to go through the routine of configuring a RabbitMQ connection and exchanges. In your Startup file you can do it simply calling couple methods in a fluent-Api way.

public static IConfiguration Configuration { get; set; }

public void ConfigureServices(IServiceCollection services)
{
    var rabbitMqSection = Configuration.GetSection("RabbitMq");
    var exchangeSection = Configuration.GetSection("RabbitMqExchange");

    services.AddRabbitMqClient(rabbitMqSection)
        .AddProductionExchange("exchange.name", exchangeSection);
}

By calling AddRabbitMqClient you add IQueueService that provides functionality of sending messages to queues. AddProductionExchange configures exchange to queues bindings (presented in json configuration) that allow messages to route properly. Example of appsettings.json is two sections below. You can also configure everything manually. For more information, see rabbit-configuration and exchange-configuration documentation files.

Now you can inject an instance of IQueueService inside anything you want.

[Route("api/[controller]")]
public class HomeController : Controller
{
    readonly IQueueService _queueService;
    public HomeController(IQueueService queueService)
    {
        _queueService = queueService;
    }
}

Now you can send messages using Send or SendAsync methods.

var messageObject = new
{
    Id = 1,
    Name = "RandomName"
};

await _queueService.SendAsync(
    @object: messageObject,
    exchangeName: "exchange.name",
    routingKey: "routing.key");

You can also send delayed messages.

await _queueService.SendAsync(
    @object: messageObject,
    exchangeName: "exchange.name",
    routingKey: "routing.key",
    millisecondsDelay: 10);

The mechanism of sending delayed messages described in the documentation. Dive into it for more detailed information.

Consumer

After making the message production possible let's make the consumption possible too! Imagine that a consumer is a simple console application.

class Program
{
    const string ExchangeName = "exchange.name";
    public static IConfiguration Configuration { get; set; }

    static void Main()
    {
        var builder = new ConfigurationBuilder()
            .SetBasePath(Directory.GetCurrentDirectory())
            .AddJsonFile("appsettings.json", optional: false, reloadOnChange: true);
        Configuration = builder.Build();

        var serviceCollection = new ServiceCollection();
        ConfigureServices(serviceCollection);

        var serviceProvider = serviceCollection.BuildServiceProvider();
        var queueService = serviceProvider.GetRequiredService<IQueueService>();
        queueService.StartConsuming();
    }

    static void ConfigureServices(IServiceCollection services)
    {
        var rabbitMqSection = Configuration.GetSection("RabbitMq");
        var exchangeSection = Configuration.GetSection("RabbitMqExchange");

        services.AddRabbitMqClient(rabbitMqSection)
            .AddConsumptionExchange("exchange.name", exchangeSection)
            .AddMessageHandlerSingleton<CustomMessageHandler>("routing.key");
    }
}

You have to configure everything almost the same way as you have already done previously with the producer. The main differences are that you need to declare (configure) a consumption exchange calling AddConsumptionExchange instead of a production exchange. For detailed information about difference in exchange declarations you may want to see the documentation. The other important part is adding custom message handlers by implementing the IMessageHandler interface and calling AddMessageHandlerSingleton<T> or AddMessageHandlerTransient<T> methods. IMessageHandler is a simple subscriber, which receives messages from a queue by selected routing key. You are allowed to set multiple message handlers for one routing key (e.g. one is writing it in a database, and the other does the business logic).

You can also use pattern matching while adding message handlers where * (star) can substitute for exactly one word and # (hash) can substitute for zero or more words. You are also allowed to specify the exact exchange which will be "listened" by the selected message handler with the given routing key (or a pattern).

services.AddRabbitMqClient(rabbitMqSection)
    .AddConsumptionExchange("exchange.name", exchangeSection)
    .AddMessageHandlerSingleton<CustomMessageHandler>("*.*.key")
    .AddMessageHandlerSingleton<AnotherCustomMessageHandler>("#", "exchange.name");

The very last step is to start "listening" (consuming) by simply calling StartConsuming method of IQueueService. After that you will start getting messages, and you can handle them in any way you want.

A message handler example.

public class CustomMessageHandler : IMessageHandler
{
    readonly ILogger<CustomMessageHandler> _logger;
    public CustomMessageHandler(ILogger<CustomMessageHandler> logger)
    {
        _logger = logger;
    }

    public void Handle(BasicDeliverEventArgs eventArgs, string matchingRoute)
    {
        // Do whatever you want with the message.
        _logger.LogInformation("Hello world");
    }
}

If you want to use an async magic then implement your custom IAsyncMessageHandler.

public class CustomAsyncMessageHandler : IAsyncMessageHandler
{
    readonly ILogger<CustomAsyncMessageHandler> _logger;
    public CustomAsyncMessageHandler(ILogger<CustomAsyncMessageHandler> logger)
    {
        _logger = logger;
    }

    public async Task Handle(BasicDeliverEventArgs eventArgs, string matchingRoute)
    {
	   // await something.
    }
}

If you want to send messages from the Handle method inside your message handler then use INonCyclicMessageHandler. You can't inject IQueueService inside any message handlers, otherwise you will get a cyclic dependency exception. But INonCyclicMessageHandler allows you to avoid this as it accepts an instance of IQueueService as a parameter of the method Handle.

public class MyNonCyclicMessageHandler : INonCyclicMessageHandler
{
    // Inject anything you want except IQueueService.
    readonly ILogger<MyNonCyclicMessageHandler> _logger;
    public MyNonCyclicMessageHandler(ILogger<MyNonCyclicMessageHandler> logger)
    {
        _logger = logger;
    }

    public void Handle(BasicDeliverEventArgs eventArgs, string matchingRoute, IQueueService queueService)
    {
        // Send anything you want using IQueueService instance.
        var anotherMessage = new MyMessage { Foo = "Bar" };
        queueService.Send(anotherMessage, "exchange.name", "routing.key");
    }
}

INonCyclicMessageHandler has its asynchronous analogue IAsyncNonCyclicMessageHandler.

public class MyAsyncNonCyclicMessageHandler : IAsyncNonCyclicMessageHandler
{
    // Inject anything you want except IQueueService.
    readonly ILogger<MyAsyncNonCyclicMessageHandler> _logger;
    public MyAsyncNonCyclicMessageHandler(ILogger<MyAsyncNonCyclicMessageHandler> logger)
    {
        _logger = logger;
    }

    public async Task Handle(BasicDeliverEventArgs eventArgs, string matchingRoute, IQueueService queueService)
    {
        // Do async stuff.
        var anotherMessage = new MyMessage { Foo = "Bar" };
        await queueService.SendAsync(anotherMessage, "exchange.name", "routing.key");
    }
}

You can register any of those message handlers the same way you registered IMessageHandler before. There are similar extension methods for each type of message handlers. For more information, see the message-consuming documentation file.

You can also find example projects in the repository inside the examples directory.

Configuration

In both cases for producing and consuming messages the configuration file is the same. appsettings.json consists of those sections: (1) settings to connect to the RabbitMQ server and (2) sections that configure exchanges and queue bindings. You can have multiple exchanges and one configuration section per each exchange. Exchange sections define how to bind queues and exchanges with each other using specified routing keys (or patterns). You allowed to bind a queue to an exchange with more than one routing key, but if there are no routing keys in the queue section, then that queue will be bound to the exchange with its name.

{
  "RabbitMq": {
    "HostName": "127.0.0.1",
    "Port": "5672",
    "UserName": "guest",
    "Password": "guest"
  },
  "RabbitMqExchange": {
    "Type": "direct",
    "Durable": true,
    "AutoDelete": false,
    "DeadLetterExchange": "default.dlx.exchange",
    "RequeueFailedMessages": true,
    "Queues": [
	  {
        "Name": "myqueue",
        "RoutingKeys": [ "routing.key" ]
      }
    ]
  }
}

For more information about appsettings.json and manual configuration features, see rabbit-configuration and exchange-configuration documentation files.

Batch message handlers

There are also a feature that you can use in case of necessity of handling messages in batches. First of all you have to create a class that inherits a BaseBatchMessageHandler class. You have to set up values for QueueName and PrefetchCount properties. These values are responsible for the queue that will be read by the message handler, and the size of batches of messages.

public class CustomBatchMessageHandler : BaseBatchMessageHandler
{
    readonly ILogger<CustomBatchMessageHandler> _logger;

    public CustomBatchMessageHandler(
        IRabbitMqConnectionFactory rabbitMqConnectionFactory,
        IEnumerable<BatchConsumerConnectionOptions> batchConsumerConnectionOptions,
        ILogger<CustomBatchMessageHandler> logger)
        : base(rabbitMqConnectionFactory, batchConsumerConnectionOptions, logger)
    {
        _logger = logger;
    }

    public override ushort PrefetchCount { get; set; } = 50;

    public override string QueueName { get; set; } = "queue.name";

    public override TimeSpan? MessageHandlingPeriod { get; set; } = TimeSpan.FromMilliseconds(500);

    public override Task HandleMessages(IEnumerable<BasicDeliverEventArgs> messages, CancellationToken cancellationToken)
    {
        _logger.LogInformation("Handling a batch of messages.");
        foreach (var message in messages)
        {
            _logger.LogInformation(message.GetMessage());
        }
        return Task.CompletedTask;
    }
}

After all you have to register that batch message handler via DI.

services.AddBatchMessageHandler<CustomBatchMessageHandler>(Configuration.GetSection("RabbitMq"));

The message handler will create a separate connection and use it for reading messages. When the message collection is full to the size of PrefetchCount they are passed to the HandleMessage method. You can also set a MessageHandlingPeriod property value and the method HandleMessage will be executed repeatedly so messages in unfilled batches could be processed too. For more information, see the message-consuming documentation file.

Advanced usage and nuances

RabbitMQ client implemented in this library (class which implements IQueueService) opens two connections to the RabbitMQ server. One connection is used for message production, and the other one is for message consumption. This behavior covered in the advanced usage documentation file, dive into it deeply if you want to control the client behavior tighter.

There is also an example project that demonstrates an advanced usage of the RabbitMQ client.

Changelog

All notable changes covered in the changelog file.

License

This library licensed under the MIT license.

Feel free to contribute!

Note that the project description data, including the texts, logos, images, and/or trademarks, for each open source project belongs to its rightful owner. If you wish to add or remove any projects, please contact us at [email protected].