All Projects → rabbitmq → rabbitmq-stream-dotnet-client

rabbitmq / rabbitmq-stream-dotnet-client

Licence: other
RabbitMQ client for the stream protocol

Programming Languages

C#
18002 projects
powershell
5483 projects

Projects that are alternatives of or similar to rabbitmq-stream-dotnet-client

eshopzero
.Net Microservice Application
Stars: ✭ 27 (-53.45%)
Mutual labels:  rabbitmq, rabbitmq-client
net-amqp-rabbitmq
Perl bindings to the librabbitmq-c AMQP library.
Stars: ✭ 23 (-60.34%)
Mutual labels:  rabbitmq, rabbitmq-client
longears
The RabbitMQ client for R
Stars: ✭ 32 (-44.83%)
Mutual labels:  rabbitmq, rabbitmq-client
node-carotte-amqp
An amqplib wrapper for microservices
Stars: ✭ 27 (-53.45%)
Mutual labels:  rabbitmq, rabbitmq-client
Enqueue Dev
Message Queue, Job Queue, Broadcasting, WebSockets packages for PHP, Symfony, Laravel, Magento. DEVELOPMENT REPOSITORY - provided by Forma-Pro
Stars: ✭ 1,977 (+3308.62%)
Mutual labels:  rabbitmq, rabbitmq-client
Flink Learning
flink learning blog. http://www.54tianzhisheng.cn/ 含 Flink 入门、概念、原理、实战、性能调优、源码解析等内容。涉及 Flink Connector、Metrics、Library、DataStream API、Table API & SQL 等内容的学习案例,还有 Flink 落地应用的大型项目案例(PVUV、日志存储、百亿数据实时去重、监控告警)分享。欢迎大家支持我的专栏《大数据实时计算引擎 Flink 实战与性能优化》
Stars: ✭ 11,378 (+19517.24%)
Mutual labels:  streaming, rabbitmq
nabbitmq
Node.js library for interacting with RabbitMQ based on RxJS streams
Stars: ✭ 20 (-65.52%)
Mutual labels:  rabbitmq, rabbitmq-client
OtakuWorld
Anime Watcher, Manga Reader, and Novel Reader as three separate apps, same UI
Stars: ✭ 123 (+112.07%)
Mutual labels:  streaming
Geluid
Made with Electron. Streams audio from your soundcard to a browser in an easy way
Stars: ✭ 29 (-50%)
Mutual labels:  streaming
CSV2RDF
Streaming, transforming, SPARQL-based CSV to RDF converter. Apache license.
Stars: ✭ 48 (-17.24%)
Mutual labels:  streaming
app
studio link - app - mirror repo only -> issues now https://gitlab.com/studio.link/app
Stars: ✭ 56 (-3.45%)
Mutual labels:  streaming
bourne
🚤 Better streaming for Ecto.
Stars: ✭ 71 (+22.41%)
Mutual labels:  streaming
go-amqp-reconnect
auto reconnecting example for github.com/streadway/amqp Connection & Channel
Stars: ✭ 79 (+36.21%)
Mutual labels:  rabbitmq
rb-spider
基于 RabbitMQ 中间件的爬虫的 Ruby 实现 [Developing]
Stars: ✭ 13 (-77.59%)
Mutual labels:  rabbitmq
docker
collection of docker / docker-compose files, dind, gitlab, jenkins, mongo, mysql, oracle, rabbitmq, redis, sonarqube
Stars: ✭ 25 (-56.9%)
Mutual labels:  rabbitmq
platform-sdk-js
Webtor.io platform SDK for online torrent streaming
Stars: ✭ 38 (-34.48%)
Mutual labels:  streaming
jsonld-streaming-serializer.js
A fast and lightweight streaming JSON-LD serializer for JavaScript
Stars: ✭ 20 (-65.52%)
Mutual labels:  streaming
b-rabbit
A thread safe library that aims to provide a simple API for interfacing with RabbitMQ. Built on top of rabbitpy, the library make it very easy to use the RabbitMQ message broker with just few lines of code. It implements all messaging pattern used by message brokers
Stars: ✭ 15 (-74.14%)
Mutual labels:  rabbitmq
seckill parent
基于springboot+springcloud的高并发和商品秒杀项目,通过redis,rabbitmq等技术实现秒杀的高并发。
Stars: ✭ 59 (+1.72%)
Mutual labels:  rabbitmq
jsmpeg-stream-go
MPEG1 streaming demo with jsmpeg implemented by Go
Stars: ✭ 14 (-75.86%)
Mutual labels:  streaming

RabbitMQ client for the stream protocol


Build Status codecov

#@ Table of Contents


Overview

Dotnet client for RabbitMQ Stream Queues

Installing via NuGet

The client is distributed via NuGet.

Getting started

A rapid getting started

 var config = new StreamSystemConfig
{
    UserName = "guest",
    Password = "guest",
    VirtualHost = "/"
};
// Connect to the broker 
var system = await StreamSystem.Create(config);

const string stream = "my_first_stream";

// Create the stream. It is important to put some retention policy 
// in this case is 200000 bytes.
await system.CreateStream(new StreamSpec(stream)
{
    MaxLengthBytes = 200000,
});
var producer = await system.CreateProducer(
    new ProducerConfig
    {
        Reference = Guid.NewGuid().ToString(),
        Stream = stream,
        // Here you can receive the messages confirmation
        // it means the message is stored on the server
        ConfirmHandler = conf =>
        {
            Console.WriteLine($"message: {conf.PublishingId} - confirmed");        
        }
    });

// Publish the messages and set the publishingId that
// should be sequential
for (ulong i = 0; i < 100; i++)
{
    var message = new Message(Encoding.UTF8.GetBytes($"hello {i}"));
    await producer.Send(i, message);
}

// not mandatory. Just to show the confirmation
Thread.Sleep(TimeSpan.FromSeconds(1));

// Create a consumer
var consumer = await system.CreateConsumer(
    new ConsumerConfig
    {
        Reference = Guid.NewGuid().ToString(),
        Stream = stream,
        // Consume the stream from the beginning 
        // See also other OffsetSpec 
        OffsetSpec = new OffsetTypeFirst(),
        // Receive the messages
        MessageHandler = async (consumer, ctx, message) =>
        {
            Console.WriteLine($"message: {Encoding.Default.GetString(message.Data.Contents.ToArray())} - consumed");
            await Task.CompletedTask;
        }
    });
Console.WriteLine($"Press to stop");
Console.ReadLine();

await producer.Close();
await consumer.Close();
await system.DeleteStream(stream);
await system.Close();

Usage


Connect

var config = new StreamSystemConfig
{
    UserName = "myuser",
    Password = "mypassword",
    VirtualHost = "myhost",
    Endpoints = new List<EndPoint> {new IPEndPoint(IPAddress.Parse("<<brokerip>>"), 5552)},
}

Multi Host

var config = new StreamSystemConfig
{
    UserName = "myuser",
    Password = "mypassword",
    VirtualHost = "myhost",
    Endpoints = new List<EndPoint>
    {
        new IPEndPoint(IPAddress.Parse("<<brokerip1>>"), 5552),
        new IPEndPoint(IPAddress.Parse("<<brokerip2>>"), 5552),
        new IPEndPoint(IPAddress.Parse("<<brokerip3>>"), 5552)
    },
};

TLS

var config = new StreamSystemConfig
{
    UserName = "guest",
    Password = "guest",
    VirtualHost = "/",
    Ssl = new SslOption()
    {
        Enabled = true                    
},

Load Balancer

var lbAddressResolver = new AddressResolver(new IPEndPoint(IPAddress.Parse("<<loadBalancerIP>>"), 5552));
var config = new StreamSystemConfig
{
    UserName = "guest",
    Password = "guest",
    VirtualHost = "/",
    AddressResolver = lbAddressResolver,
    Endpoints = new List<EndPoint> {addressResolver.EndPoint},
}    

Manage Streams

await system.CreateStream(new StreamSpec(stream));

system.CreateStream is idempotent: trying to re-create a stream with the same name and same properties (e.g. maximum size) will not throw an exception.
In other words, you can be sure the stream has been created once system.CreateStream returns.
Note it is not possible to create a stream with the same name as an existing stream but with different properties. Such a request will result in an exception.

It is possible to set up the retention policy when creating the stream, based on size or time:

await system.CreateStream(new StreamSpec(stream)
{
    MaxLengthBytes = 200000,
    MaxAge = TimeSpan.FromHours(8),
});

Set a policy is highly recommended.

RabbitMQ does not store the whole stream in a single file, but splits it in segment files. This is also used for truncate the stream: when a stream reaches his maximum size, an entire segment file is deleted. For this reason MaxLengthBytes (the max dimension of the entire stream) is usually significantly higher than MaxSegmentSizeBytes (the max dimension of a single segment file). RabbitMQ enforces the retention policy when the current segment has reached its maximum size and is closed in favor of a new one.

await system.CreateStream(new StreamSpec(stream)
{
    MaxLengthBytes = 20_000,
    MaxSegmentSizeBytes = 1000
});

Producer

A Producer instance is created from the System.

var producer = await system.CreateProducer(
    new ProducerConfig
    {
        Stream = "my_stream",
    });

Consider a Producer instance like a long-lived object, do not create one to send just one message.

Parameter Description Default
Stream The stream to publish to. No default, mandatory setting.
Reference The logical name of the producer. null (no deduplication)
ClientProvidedName Set the TCP Client Name dotnet-stream-producer
ConfirmHandler Handler with confirmed messages It is an event
ConnectionClosedHandler Event when the client is disconnected It is an event
MaxInFlight Max Number of messages before send 1000

Publish Messages

Standard publish

    var publishingId = 0;
    var message = new Message(Encoding.UTF8.GetBytes("hello"));
    await producer.Send(publishingId, message);

publishingId must be incremented for each send.

Sub Entries Batching

A sub-entry is one "slot" in a publishing frame, meaning outbound messages are not only batched in publishing frames, but in sub-entries as well. Use this feature to increase throughput at the cost of increased latency.

var subEntryMessages = List<Messages>();
for (var i = 1; i <= 500; i++)
{
    var message = new Message(Encoding.UTF8.GetBytes($"SubBatchMessage_{i}"));
    subEntryMessages.Add(message);
}
var publishingId = 1; 
await producer.Send(publishingId, subEntryMessages, CompressionType.Gzip);
messages.Clear();

Not all the compressions are implemented by defaults, to avoid to many dependencies. See the table:

Compression Description Provided by client
CompressionType.None No compression yes
CompressionType.GZip GZip yes
CompressionType.Lz4 Lz4 No
CompressionType.Snappy Snappy No
CompressionType.Zstd Zstd No

You can add missing codecs with StreamCompressionCodecs.RegisterCodec api. See Examples/CompressCodecs for Lz4,Snappy and Zstd implementations.

Deduplication

See here for more details Set a producer reference to enable the deduplication:

var producer = await system.CreateProducer(
    new ProducerConfig
    {
        Reference = "my_producer",
        Stream = "my_stream",
    });

then:

var publishingId = 0;
var message = new Message(Encoding.UTF8.GetBytes($"my deduplicate message {i}"));
await producer.Send(publishingId, message);

Consume Messages

Define a consumer:

var consumer = await system.CreateConsumer(
    new ConsumerConfig
    {
        Reference = "my_consumer",
        Stream = stream,
        MessageHandler = async (consumer, ctx, message) =>
        {
            Console.WriteLine(
                $"message: {Encoding.Default.GetString(message.Data.Contents.ToArray())}");
            await Task.CompletedTask;
        }
});

Offset Types

There are five types of Offset and they can be set by the ConsumerConfig.OffsetSpec property that must be passed to the Consumer constructor, in the example we use OffsetTypeFirst:

var consumerOffsetTypeFirst = await system.CreateConsumer(
    new ConsumerConfig
    {
        Reference = "my_consumer_offset_first",
        Stream = stream,
        OffsetSpec = new OffsetTypeFirst(),
        MessageHandler = async (consumer, ctx, message) =>
        {
 
            await Task.CompletedTask;
        }
    });

The five types are:

  • First: it takes messages from the first message of the stream.
var offsetTypeFirst = new OffsetTypeFirst();
  • Last: it takes messages from the last chunk of the stream, i.e. it doesn’t start from the last message, but the last “group” of messages.
var offsetTypeLast = new OffsetTypeLast();
  • Next: it takes messages published after the consumer connection.
var offsetTypeNext = new OffsetTypeNext()
  • Offset: it takes messages starting from the message with id equal to the passed value. If the value is less than the first message of the stream, it starts from the first (i.e. if you pass 0, but the stream starts from 10, it starts from 10). If the message with the id hasn’t yet been published it waits until this publishingId is reached.
ulong iWantToStartFromPubId = 10;
var offsetTypeOffset = new OffsetTypeOffset(iWantToStartFromPubId);
  • Timestamp: it takes messages starting from the first message with timestamp bigger than the one passed
var anHourAgo = (long)DateTime.UtcNow.AddHours(-1).Subtract(new DateTime(1970, 1, 1)).TotalSeconds;
var offsetTypeTimestamp = new OffsetTypeTimestamp(anHourAgo);

Track Offset

The server can store the current delivered offset given a consumer with StoreOffset in this way:

var messagesConsumed = 0;
var consumer = await system.CreateConsumer(
    new ConsumerConfig
    {
        Reference = "my_consumer",
        Stream = stream,
        MessageHandler = async (consumer, ctx, message) =>
        {
            if (++messagesConsumed % 1000 == 0)
            {
                await consumer.StoreOffset(ctx.Offset);
            }

Note: Avoid to store the offset for each single message, it can reduce the performances.

It is possible to retrieve the offset with QueryOffset:

var trackedOffset = await system.QueryOffset("my_consumer", stream);
var consumer = await system.CreateConsumer(
    new ConsumerConfig
    {
        Reference = "my_consumer",
        Stream = stream,
        OffsetSpec = new OffsetTypeOffset(trackedOffset),    

OBS. if don't have stored an offset for the consumer's reference on the stream you get an OffsetNotFoundException exception.

Handle Close

Producers/Consumers raise and event when the client is disconnected:

 new ProducerConfig/ConsumerConfig
 {
  ConnectionClosedHandler = s =>
   {
    Console.WriteLine($"Connection Closed: {s}");
    return Task.CompletedTask;
   },

Handle Metadata Update

Stream metadata update is raised when the stream topology changes or the stream is deleted. You can use MetadataHandler to handle it:

 new ProducerConfig/ConsumerConfig
 {
  MetadataHandler = update =>
   {
   ......  
   },
 }

Reliable

  • Reliable Producer
  • Reliable Consumer (not ready yet)

Reliable Producer

Reliable Producer is a smart layer built up of the standard Producer.
The idea is to leave the user decides what to use, the standard or reliable producer.

The main features are:

  • Provide publishingID automatically
  • Auto-Reconnect in case of disconnection
  • Trace sent and received messages
  • Invalidate messages
  • Handle the metadata Update

Provide publishingID automatically

Reliable Producer retrieves the last publishingID given the producer name. Zero(0) is the default value in case there is no a publishingID.

Auto-Reconnect

Reliable Producer restores the TCP connection in case the Producer is disconnected for some reason. During the reconnection it continues to store the messages in a local-list. The user will receive back the confirmed or un-confirmed messages.

Trace sent and received messages

Reliable Producer keeps in memory each sent message and remove from the memory when the message is confirmed or goes in timout. ConfirmationHandler receives the messages back with the status. confirmation.Status can have different values, but in general ConfirmationStatus.Confirmed means the messages is stored on the server other status means that there was a problem with the message/messages.

ConfirmationHandler = confirmation =>
{                    
    if (confirmation.Status == ConfirmationStatus.Confirmed)
    {

        // OK
    }
    else
    {
        // Some problem
    }
}

Invalidate messages

If the client doesn't receive a confirmation within 2 seconds Reliable Producer removes the message from the internal messages cache. The user will receive ConfirmationStatus.TimeoutError in the ConfirmationHandler.

Handle the metadata Update

If the streams changes the topology (ex:Stream deleted or add/remove follower), the client receives an MetadataUpdate event. Reliable Producer detects the event and tries to reconnect the producer if the stream still exist else closes the producer.

Send API

Reliable Producer implements two send(..)

  • Send(Message message) // standard
  • Send(List<Message> messages, CompressionType compressionType) //sub-batching with compression

Reconnection Strategy

By default Reliable Producer uses an BackOffReconnectStrategy to reconnect the client. You can customize the behaviour implementing the IReconnectStrategy interface:

void WhenDisconnected(out bool reconnect);
void WhenConnected();

with reconnect you can decide when reconnect the producer.

You can use it:

var p = await ReliableProducer.CreateReliableProducer(new ReliableProducerConfig()
{
...
ReconnectStrategy = MyReconnectStrategy

Examples

See the directory Examples/Reliable

Build from source

Build:

make build

Test:

make test

Run test in docker:

make run-test-in-docker

Project Status

The client is work in progress. The API(s) could change prior to version 1.0.0

Release Process

  • Ensure builds are green: link
  • Tag the main branch using your GPG key:
    git tag -a -s -u GPG_KEY_ID -m 'rabbitmq-stream-dotnet-client v1.0.0-beta.6' 'v1.0.0-beta.6' && git push && git push --tags
    
  • Ensure the build for the tag passes: link
  • Check for the new version on NuGet: link
  • Create the new release on GitHub: link
  • Announce the new release on the mailing list: link
Note that the project description data, including the texts, logos, images, and/or trademarks, for each open source project belongs to its rightful owner. If you wish to add or remove any projects, please contact us at [email protected].