All Projects → datastax → reactive-pulsar

datastax / reactive-pulsar

Licence: Apache-2.0 license
Reactive Streams adapter for Apache Pulsar Java Client

Programming Languages

java
68154 projects - #9 most used programming language

Projects that are alternatives of or similar to reactive-pulsar

assembler
Functional, type-safe, stateless reactive Java API for efficient implementation of the API Composition Pattern for querying/merging data from multiple datasources/services, with a specific focus on solving the N + 1 query problem
Stars: ✭ 102 (+126.67%)
Mutual labels:  reactive-streams, project-reactor
KotlinReactiveMS
An educational project to learn reactive programming with Spring 5 and Kotlin
Stars: ✭ 33 (-26.67%)
Mutual labels:  reactive-streams, spring-reactive
Missionary
A functional effect and streaming system for clojure and clojurescript.
Stars: ✭ 157 (+248.89%)
Mutual labels:  reactive-streams
pulsar-flume-ng-sink
An Apache Flume Sink implementation to publish data to Apache pulsar
Stars: ✭ 19 (-57.78%)
Mutual labels:  apache-pulsar
Rxterm
Functional reactive terminals in C++ ⚡⌨️
Stars: ✭ 226 (+402.22%)
Mutual labels:  reactive-streams
Awesome Reactive Programming
A repository for sharing all the resources available on Reactive Programming and Reactive Systems
Stars: ✭ 163 (+262.22%)
Mutual labels:  reactive-streams
Smallrye Mutiny
An Intuitive Event-Driven Reactive Programming Library for Java
Stars: ✭ 231 (+413.33%)
Mutual labels:  reactive-streams
Rsocket Rpc Java
Standard RSocket RPC Java Implementation
Stars: ✭ 126 (+180%)
Mutual labels:  reactive-streams
rsocket-rpc-js
Standard RSocket RPC Implementation
Stars: ✭ 29 (-35.56%)
Mutual labels:  reactive-streams
Play Ws
Standalone Play WS, an async HTTP client with fluent API
Stars: ✭ 190 (+322.22%)
Mutual labels:  reactive-streams
pf4j-spring-tutorial
A step by step guide to create an application supporting third-party plugins in Java and Spring boot
Stars: ✭ 68 (+51.11%)
Mutual labels:  spring-reactive
Reactor Addons
Official modules for the Reactor project
Stars: ✭ 175 (+288.89%)
Mutual labels:  reactive-streams
Streamsupport
Backport of Java 8/9 java.util.stream API to Java 6/7 and Android. Moved from https://sourceforge.net/projects/streamsupport/
Stars: ✭ 165 (+266.67%)
Mutual labels:  reactive-streams
Kmq
Kafka-based message queue
Stars: ✭ 239 (+431.11%)
Mutual labels:  reactive-streams
Reactive Ms Example
An educational project to learn reactive programming with Spring 5
Stars: ✭ 157 (+248.89%)
Mutual labels:  reactive-streams
flutter-form-with-validation-BLOC
This form and validation functions are created by using the BLOC pattern with RxDart instead of using StatefulWidget
Stars: ✭ 63 (+40%)
Mutual labels:  reactive-streams
Monix
Asynchronous, Reactive Programming for Scala and Scala.js.
Stars: ✭ 1,819 (+3942.22%)
Mutual labels:  reactive-streams
Pulsar4s
Idiomatic, typesafe, and reactive Scala client for Apache Pulsar
Stars: ✭ 172 (+282.22%)
Mutual labels:  reactive-streams
Rxjavareactivestreams
Adapter between RxJava and ReactiveStreams
Stars: ✭ 227 (+404.44%)
Mutual labels:  reactive-streams
Troilus
Troilus is a Java client library for Cassandra.
Stars: ✭ 17 (-62.22%)
Mutual labels:  reactive-streams

Reactive Pulsar adapter

Reactive Streams adapter for Apache Pulsar Java Client. This uses Project Reactor as the Reactive Streams implementation.

Update: This project is deprecated and replaced by official Reactive client for Apache Pulsar and Spring Pulsar

Please migrate to use Reactive client for Apache Pulsar and Spring Pulsar. Spring Pulsar contains reactive support.

Library status: API is evolving

The API is evolving and the documentation and examples might not match the released version available in Maven central. Please keep this in mind when using the library and the applying the examples.

Presentations about the library

Getting it

This library requires Java 8 or + to run.

With Gradle:

repositories {
    mavenCentral()
}

dependencies {
    implementation "com.github.lhotari:reactive-pulsar-adapter:0.2.1"
}

With Maven:

<dependencies>
    <dependency>
        <groupId>com.github.lhotari</groupId>
        <artifactId>reactive-pulsar-adapter</artifactId> 
        <version>0.2.1</version>
    </dependency>
</dependencies>

Spring Boot starter

There's a Spring Boot example at https://github.com/lhotari/reactive-pulsar-showcase . Another Spring Boot example is available at https://github.com/lhotari/reactive-iot-backend-ApacheCon2021 .

Getting it with Gradle:

repositories {
    mavenCentral()
}

dependencies {
    implementation "com.github.lhotari:reactive-pulsar-spring-boot-starter:0.2.1"
    testImplementation "com.github.lhotari:reactive-pulsar-spring-test-support:0.2.1"
}

Getting it with Maven:

<dependencies>
    <dependency>
        <groupId>com.github.lhotari</groupId>
        <artifactId>reactive-pulsar-spring-boot-starter</artifactId> 
        <version>0.2.1</version>
    </dependency>
  <dependency>
    <groupId>com.github.lhotari</groupId>
    <artifactId>reactive-pulsar-spring-test-support</artifactId>
    <version>0.2.1</version>
    <scope>test</scope>
  </dependency>
</dependencies>

Usage

Initializing the library

In standalone application

Using an existing PulsarClient instance:

ReactivePulsarClient reactivePulsarClient = ReactivePulsarClient.create(pulsarClient);

In Spring Boot application using reactive-pulsar-spring-boot-starter

Configure pulsar.client.serviceUrl property in application properties. Any additional properties under pulsar.client. prefix will be used to configure the Pulsar Client. The Spring Boot starter will configure a ReactivePulsarClient bean which will be available for autowiring.

Sending messages

ReactiveMessageSender<String> messageSender = reactivePulsarClient
        .messageSender(Schema.STRING)
        .topic(topicName)
        .maxInflight(100)
        .build();
Mono<MessageId> messageId = messageSender
        .sendMessage(Mono.just(MessageSpec.of("Hello world!")));
// for demonstration
messageId.subscribe(System.out::println);

Sending messages with cached producer

Add require dependency for cache implementation. This step isn't required when using reactive-pulsar-spring-boot-starter. A ReactiveProducerCache instance will be made available as a Spring bean in that case. However, it is necessary to set the cache on the ReactiveMessageSenderFactory.

With Gradle:

dependencies {
    implementation "com.github.lhotari:reactive-pulsar-adapter:0.2.1"
    implementation "com.github.lhotari:reactive-pulsar-caffeine-producer-cache:0.2.1"
}

With Maven:

<dependencies>
    <dependency>
        <groupId>com.github.lhotari</groupId>
        <artifactId>reactive-pulsar-adapter</artifactId> 
        <version>0.2.1</version>
    </dependency>
    <dependency>
        <groupId>com.github.lhotari</groupId>
        <artifactId>reactive-pulsar-caffeine-producer-cache</artifactId>
        <version>0.2.1</version>
    </dependency>
</dependencies>
CaffeineReactiveProducerCache producerCache = new CaffeineReactiveProducerCache();
ReactiveMessageSender<String> messageSender = reactivePulsarClient
        .messageSender(Schema.STRING)
        .cache(producerCache)
        .topic(topicName)
        .maxInflight(100)
        .build();
Mono<MessageId> messageId = messageSender
        .sendMessage(Mono.just(MessageSpec.of("Hello world!")));
// for demonstration
messageId.subscribe(System.out::println);

It is recommended to use a cached producer in most cases. The cache enables reusing the Pulsar Producer instance and related resources across multiple message sending calls. This improves performance since a producer won't have to be created and closed before and after sending a message.

The adapter library implementation together with the cache implementation will also enable reactive backpressure for sending messages. The maxInflight setting will limit the number of messages that are pending from the client to the broker. The solution will limit reactive streams subscription requests to keep the number of pending messages under the defined limit. This limit is per-topic and impacts the local JVM only.

Reading messages

Reading all messages for a topic:

    ReactiveMessageReader<String> messageReader =
            reactivePulsarClient.messageReader(Schema.STRING)
                    .topic(topicName)
                    .build();
    messageReader.readMessages()
            .map(Message::getValue)
            // for demonstration
            .subscribe(System.out::println);

By default, the stream will complete when end of the topic is reached. The end of the topic is detected with Pulsar Reader's hasMessageAvailableAsync method.

The ReactiveMessageReader doesn't support partitioned topics. It's possible to read the content of indidual partitions. Topic names for individual partitions can be discovered using the PulsarClient's getPartitionsForTopic method. The adapter library doesn't currently wrap that method.

Example: poll for up to 5 new messages and stop polling when a timeout occurs

With .endOfStreamAction(EndOfStreamAction.POLL) the Reader will poll for new messages when the reader reaches the end of the topic.

    ReactiveMessageReader<String> messageReader =
            reactivePulsarClient.messageReader(Schema.STRING)
                    .topic(topicName)
                    .startAtSpec(StartAtSpec.LATEST)
                    .endOfStreamAction(EndOfStreamAction.POLL)
                    .build();
    messageReader.readMessages()
            .take(Duration.ofSeconds(5))
            .take(5)
            // for demonstration
            .subscribe(System.out::println);

Consuming messages

    ReactiveMessageConsumer<String> messageConsumer=
        reactivePulsarClient.messageConsumer(Schema.STRING)
        .topic(topicName)
        .consumerConfigurer(consumerBuilder->consumerBuilder.subscriptionName("sub"))
        .build();
    messageConsumer.consumeMessages(messageFlux ->
                    messageFlux.map(message ->
                            MessageResult.acknowledge(message.getMessageId(), message.getValue())))
        .take(Duration.ofSeconds(2))
        // for demonstration
        .subscribe(System.out::println);

Consuming messages using a message handler component with auto-acknowledgements

ReactiveMessageHandler reactiveMessageHandler=
    ReactiveMessageHandlerBuilder
        .builder(reactivePulsarClient
           .messageConsumer(Schema.STRING)
           .consumerConfigurer(consumerBuilder->
             consumerBuilder.subscriptionName("sub")
            .topic(topicName))
            .build())
        .messageHandler(message -> Mono.fromRunnable(()->{
            System.out.println(message.getValue());
        }))
        .build()
        .start();
// for demonstration
// the reactive message handler is running in the background, delay for 10 seconds
Thread.sleep(10000L);
// now stop the message handler component
reactiveMessageHandler.stop();

License

Reactive Pulsar adapter library is Open Source Software released under the Apache Software License 2.0.

How to Contribute

The library is Apache 2.0 licensed.

Contributions are welcome. Contributors will be asked to sign a CLA before the contributions are merged since there's a desire to be able to move the Reactive Pulsar project under Apache in the future. Without CLAs that process comes complicated.

Bugs and Feature Requests

If you detect a bug or have a feature request or a good idea for Reactive Pulsar adapter, please open a GitHub issue or ping one of the contributors on Twitter or on Pulsar Slack.

Questions

Please use [reactive-pulsar] tag on Stackoverflow. Ask a question now.

Note that the project description data, including the texts, logos, images, and/or trademarks, for each open source project belongs to its rightful owner. If you wish to add or remove any projects, please contact us at [email protected].