All Projects → pellse → assembler

pellse / assembler

Licence: Apache-2.0 license
Functional, type-safe, stateless reactive Java API for efficient implementation of the API Composition Pattern for querying/merging data from multiple datasources/services, with a specific focus on solving the N + 1 query problem

Programming Languages

java
68154 projects - #9 most used programming language
kotlin
9241 projects

Projects that are alternatives of or similar to assembler

Reactive Ms Example
An educational project to learn reactive programming with Spring 5
Stars: ✭ 157 (+53.92%)
Mutual labels:  reactive, reactive-streams, reactive-programming, reactor
Scalecube Services
v2.0 - ScaleCube Services provides a low latency Reactive Microservices library for serverless service registry and discovery based on gossip protocol and without single point-of-failure or bottlenecks.
Stars: ✭ 23 (-77.45%)
Mutual labels:  flux, reactive, reactive-streams, reactor
reacted
Actor based reactive java framework for microservices in local and distributed environment
Stars: ✭ 17 (-83.33%)
Mutual labels:  reactive, reactive-streams, event-sourcing, reactive-programming
Reactor Netty
TCP/HTTP/UDP/QUIC client/server with Reactor over Netty
Stars: ✭ 1,743 (+1608.82%)
Mutual labels:  flux, reactive, reactive-streams, reactor
Reactor Addons
Official modules for the Reactor project
Stars: ✭ 175 (+71.57%)
Mutual labels:  flux, reactive, reactive-streams, reactor
Spring 5 Examples
This repository is contains spring-boot 2 / spring framework 5 project examples. Using reactive programming model / paradigm and Kotlin
Stars: ✭ 87 (-14.71%)
Mutual labels:  flux, reactive-programming, functional-reactive-programming, reactor
KotlinReactiveMS
An educational project to learn reactive programming with Spring 5 and Kotlin
Stars: ✭ 33 (-67.65%)
Mutual labels:  reactive, reactive-streams, reactive-programming, reactor
Pos
Sample Application DDD, Reactive Microservices, CQRS Event Sourcing Powered by DERMAYON LIBRARY
Stars: ✭ 207 (+102.94%)
Mutual labels:  reactive, cqrs, event-sourcing
Watermill
Building event-driven applications the easy way in Go.
Stars: ✭ 3,504 (+3335.29%)
Mutual labels:  reactive, cqrs, event-sourcing
Vueflux
♻️ Unidirectional State Management Architecture for Swift - Inspired by Vuex and Flux
Stars: ✭ 315 (+208.82%)
Mutual labels:  flux, reactive, reactive-programming
Rsocket Rpc Java
Standard RSocket RPC Java Implementation
Stars: ✭ 126 (+23.53%)
Mutual labels:  reactive, reactive-streams, reactive-programming
Reactor Core
Non-Blocking Reactive Foundation for the JVM
Stars: ✭ 3,891 (+3714.71%)
Mutual labels:  flux, reactive, reactive-streams
Rx.Http
A reactive way to make HTTP Request in .NET Core 🚀
Stars: ✭ 62 (-39.22%)
Mutual labels:  reactive, reactive-streams, reactive-programming
Hibernate Reactive
A reactive API for Hibernate ORM, supporting non-blocking database drivers and a reactive style of interaction with the database.
Stars: ✭ 167 (+63.73%)
Mutual labels:  reactive, jpa, hibernate
Fpgo
Monad, Functional Programming features for Golang
Stars: ✭ 165 (+61.76%)
Mutual labels:  reactive, reactive-programming, functional-reactive-programming
Awesome Reactive Programming
A repository for sharing all the resources available on Reactive Programming and Reactive Systems
Stars: ✭ 163 (+59.8%)
Mutual labels:  reactive, reactive-streams, reactive-programming
netifi-quickstart-java
Project to assist you in getting started using Netifi.
Stars: ✭ 23 (-77.45%)
Mutual labels:  reactive, reactive-streams, reactive-programming
kotlin-kafka-and-kafka-streams-examples
Kafka with KafkaReactor and Kafka Streams Examples in Kotlin
Stars: ✭ 33 (-67.65%)
Mutual labels:  reactive-streams, reactive-programming, reactor
Rocket.jl
Functional reactive programming extensions library for Julia
Stars: ✭ 69 (-32.35%)
Mutual labels:  reactive, reactive-programming, functional-reactive-programming
Goes
Go Event Sourcing made easy
Stars: ✭ 144 (+41.18%)
Mutual labels:  flux, cqrs, event-sourcing

Assembler

Maven Central Javadocs

Functional, type-safe, stateless reactive Java API for efficient implementation of the API Composition Pattern for querying/merging data from multiple datasources/services, with a specific focus on solving the N + 1 query problem.

Native Reactive Streams Support (since version 0.3.1)

A new implementation reactive-assembler-core was added to natively support Reactive Streams specification. This new implementation internally leverages Project Reactor, which now allows the Assembler library (through the reactive-assembler-core module) to participate in end to end reactive streams chains (e.g. from a REST endpoint to a RSocket based microservice to the database) and keep all reactive streams properties as defined by the Reactive Manifesto (Responsive, Resillient, Elastic, Message Driven with back-pressure, non-blocking, etc.)

Use Cases

One interesting use case would be for example to build a materialized view in a microservice architecture supporting Event Sourcing and Command Query Responsibility Segregation (CQRS). In this context, if we have an incoming stream of events where each event needs to be enriched with some sort of external data before being stored, it would be convenient to be able to easily batch those events instead of hitting those external services for every single event.

Usage Example for Native Reactive Support

Assuming the following data model of a very simplified online store, and api to access different services:

public record Customer(Long customerId, String name) {}
public record BillingInfo(Long id, Long customerId, String creditCardNumber) {}
public record OrderItem(Long id, Long customerId, String orderDescription, Double price) {}
public record Transaction(Customer customer, BillingInfo billingInfo, List<OrderItem> orderItems) {}

Flux<Customer> customers(); // call to a RSocket microservice (no query filters for brevity)
Publisher<BillingInfo> billingInfo(List<Long> customerIds); // Connects to relational database (R2DBC)
Publisher<OrderItem> allOrders(List<Long> customerIds); // Connects to MongoDB (Reactive Streams Driver)

If customers() returns 50 customers, instead of having to make one additional call per customerId to retrieve each customer's associated BillingInfo (which would result in 50 additional network calls, thus the N + 1 queries issue) we can only make 1 additional call to retrieve all at once all BillingInfo for all Customer returned by customers(), same for OrderItem. Since we are working with 3 different and independent datasources, joining data from Customer, BillingInfo and OrderItem into Transaction (using customerId as a correlation id between all those entities) has to be done at the application level, which is what this library was implemented for.

When using reactive-assembler-core, here is how we would aggregate multiple reactive datasources and implement the API Composition Pattern:

import static io.github.pellse.reactive.assembler.AssemblerBuilder.assemblerOf;
import static io.github.pellse.reactive.assembler.RuleMapper.oneToMany;
import static io.github.pellse.reactive.assembler.RuleMapper.oneToOne;
import static io.github.pellse.reactive.assembler.Mapper.rule;
import reactor.core.publisher.Flux;
    
Assembler<Customer, Flux<Transaction>> assembler = assemblerOf(Transaction.class)
    .withIdExtractor(Customer::customerId)
    .withAssemblerRules(
        rule(BillingInfo::customerId, oneToOne(this::getBillingInfo)),
        rule(OrderItem::customerId, oneToMany(this::getAllOrders)),
        Transaction::new)
    .build();

Flux<Transaction> transactionFlux = assembler.assemble(customers());

In the scenario where we deal with an infinite stream of data, since the Assembler needs to completely drain the upstream from customers() to gather all the correlation ids (customerId), the example above will result in resource exhaustion. The solution is to split the stream into multiple smaller streams and batch the processing of those individual smaller streams. Most reactive libraries already support that concept, below is an example using Project Reactor:

Flux<Transaction> transactionFlux = customers()
    .windowTimeout(100, ofSeconds(5))
    .flatMapSequential(assembler::assemble);

Asynchronous Caching (since version 0.4.1)

In addition to providing helper functions to define mapping semantics (e.g. oneToOne(), OneToMany()), the Assembler also provides a caching/memoization mechanism of the downstreams through the cached() wrapper method.

import static io.github.pellse.reactive.assembler.AssemblerBuilder.assemblerOf;
import static io.github.pellse.reactive.assembler.RuleMapper.oneToMany;
import static io.github.pellse.reactive.assembler.RuleMapper.oneToOne;
import static io.github.pellse.reactive.assembler.Mapper.rule;
import static io.github.pellse.reactive.assembler.CacheFactory.cached;
import reactor.core.publisher.Flux;
    
var assembler = assemblerOf(Transaction.class)
    .withIdExtractor(Customer::customerId)
    .withAssemblerRules(
        rule(BillingInfo::customerId, oneToOne(cached(this::getBillingInfo))),
        rule(OrderItem::customerId, oneToMany(cached(this::getAllOrders))),
        Transaction::new)
    .build();
    
var transactionFlux = customers()
    .window(3)
    .flatMapSequential(assembler::assemble);

This can be useful for aggregating dynamic data with static data or data we know doesn't change often (or on a predefined schedule e.g. data that is refreshed by a batch job once a day).

The cached() method internally uses the list of correlation ids from the upstream as cache keys (list of customer ids in the above example) with each individual value (or entity) of the downstream being cached. Concretely, if we take the line rule(BillingInfo::customerId, oneToOne(cached(this::getBillingInfo))), from the example above window(3) would generate windows of 3 Customer e.g.:

[C1, C2, C3], [C1, C4, C7], [C4, C5, C6], [C2, C4, C6]

At that specific moment in time the execution would like this:

  1. getBillingInfo([1, 2, 3]) = [B1, B2, B3]
  2. merge(getBillingInfo([4, 7]), fromCache([1])) = [B4, B7, B1]
  3. merge(getBillingInfo([5, 6]), fromCache([4])) = [B5, B6, B4]
  4. fromCache([2, 4, 6]) = [B2, B4, B6]

Pluggable Asynchronous Caching Strategy

Overloaded versions of the cached() method are also defined to allow plugging your own cache implementation. We can pass an additional parameter of type CacheFactory to customize the caching mechanism:

public interface CacheFactory<ID, R> {
    Cache<ID, R> create(Function<Iterable<? extends ID>, Mono<Map<ID, Collection<R>>>> fetchFunction);
}
    
public interface Cache<ID, R> {
    Mono<Map<ID, Collection<R>>> getAll(Iterable<ID> ids);
}

If no CacheFactory parameter is passed to cached(), the default implementation will internally return a Cache based on ConcurrentHashMap.

Below is an example of a few different ways we can explicitely customize the caching mecanism:

import static io.github.pellse.reactive.assembler.AssemblerBuilder.assemblerOf;
import static io.github.pellse.reactive.assembler.RuleMapper.oneToMany;
import static io.github.pellse.reactive.assembler.RuleMapper.oneToOne;
import static io.github.pellse.reactive.assembler.Mapper.rule;
import static io.github.pellse.reactive.assembler.Cache.cache;
import static io.github.pellse.reactive.assembler.CacheFactory.cached;
import reactor.core.publisher.Flux;
  
var assembler = assemblerOf(Transaction.class)
    .withIdExtractor(Customer::customerId)
    .withAssemblerRules(
        rule(BillingInfo::customerId, oneToOne(cached(this::getBillingInfo, new HashMap<>()))),
        rule(OrderItem::customerId, oneToMany(cached(this::getAllOrders, cache(HashMap::new)))),
        Transaction::new)
    .build();

Overloaded versions of cached() and cache() are provided to wrap any implementation of java.util.Map since it doesn't natively implement Mono<Map<ID, Collection<R>>> getAll(Iterable<ID> ids).

Third Party Asynchronous Cache Provider Integration

Here is a list of add-on modules that can be used to integrate third party asynchronous caching libraries (more will be added in the future):

Assembler add-on module Third party cache library
Maven Central Javadocs reactive-assembler-cache-caffeine Caffeine

Below is an example of using a CacheFactory implementation for the Caffeine library through the caffeineCache() helper method from the caffeine add-on module:

import com.github.benmanes.caffeine.cache.Cache;

import static io.github.pellse.reactive.assembler.AssemblerBuilder.assemblerOf;
import static io.github.pellse.reactive.assembler.RuleMapper.oneToMany;
import static io.github.pellse.reactive.assembler.RuleMapper.oneToOne;
import static io.github.pellse.reactive.assembler.Mapper.rule;
import static io.github.pellse.reactive.assembler.CacheFactory.cached;
import static io.github.pellse.reactive.assembler.cache.caffeine.CaffeineCacheFactory.caffeineCache;

import static java.time.Duration.ofMinutes;
import static com.github.benmanes.caffeine.cache.Caffeine.newBuilder;

var cacheBuilder = newBuilder()
                .recordStats()
                .expireAfterWrite(ofMinutes(10))
                .maximumSize(1000);

var assembler = assemblerOf(Transaction.class)
    .withIdExtractor(Customer::customerId)
    .withAssemblerRules(
        rule(BillingInfo::customerId, oneToOne(cached(this::getBillingInfo, caffeineCache()))),
        rule(OrderItem::customerId, oneToMany(cached(this::getAllOrders, caffeineCache(cacheBuilder))))),
        Transaction::new)
    .build();

Integration with non-reactive sources

A utility function toPublisher() is also provided to wrap non-reactive sources, useful when e.g. calling 3rd party synchronous APIs:

import static io.github.pellse.reactive.assembler.QueryUtils.toPublisher;

List<BillingInfo> getBillingInfo(List<Long> customerIds); // non-reactive source
List<OrderItem> getAllOrders(List<Long> customerIds); // non-reactive source

Assembler<Customer, Flux<Transaction>> assembler = assemblerOf(Transaction.class)
    .withIdExtractor(Customer::customerId)
    .withAssemblerRules(
        rule(BillingInfo::customerId, oneToOne(toPublisher(this::getBillingInfo))),
        rule(OrderItem::customerId, oneToMany(toPublisher(this::getAllOrders))),
        Transaction::new)
    .build();

Kotlin Support

Maven Central Javadocs reactive-assembler-kotlin-extension

import io.github.pellse.reactive.assembler.kotlin.assembler
import io.github.pellse.reactive.assembler.kotlin.cached
import io.github.pellse.reactive.assembler.Cache.cache
import io.github.pellse.reactive.assembler.RuleMapper.oneToMany
import io.github.pellse.reactive.assembler.RuleMapper.oneToOne
import io.github.pellse.reactive.assembler.Mapper.rule
import io.github.pellse.reactive.assembler.cache.caffeine.CaffeineCacheFactory.caffeineCache

// Example 1:
val assembler = assembler<Transaction>()
    .withIdExtractor(Customer::customerId)
    .withAssemblerRules(
        rule(BillingInfo::customerId, oneToOne(::getBillingInfo.cached())),
        rule(OrderItem::customerId, oneToMany(::getAllOrders.cached(::hashMapOf))),
        ::Transaction
    ).build()
            
// Example 2:
val assembler = assembler<Transaction>()
    .withIdExtractor(Customer::customerId)
    .withAssemblerRules(
        rule(BillingInfo::customerId, oneToOne(::getBillingInfo.cached(cache()))),
        rule(OrderItem::customerId, oneToMany(::getAllOrders.cached(caffeineCache()))),
        ::Transaction
    ).build()

Other Supported Technologies

Java 8 Stream (synchronous and parallel) for cases where full reactive/non-blocking support is not needed:

Maven Central Javadocs

The implementations below are still available, but it is strongly recommended to switch to reactive-assembler-core as the new reactive support can easily integrate with any external reactive libraries:

  1. Maven Central Javadocs CompletableFuture
  2. Maven Central Javadocs Flux
  3. Maven Central Javadocs RxJava
  4. Maven Central Javadocs Akka Stream
  5. Maven Central Javadocs Eclipse MicroProfile Reactive Stream Operators

You only need to include in your project's build file (maven, gradle) the lib that corresponds to the type of reactive (or non reactive) support needed (Java 8 stream, CompletableFuture, Flux, RxJava, Akka Stream, Eclipse MicroProfile Reactive Stream Operators).

All modules above have dependencies on the following modules:

  1. Maven Central Javadocs assembler-core
  2. Maven Central Javadocs assembler-util

What's Next?

See the list of issues for planned improvements in a near future.

Note that the project description data, including the texts, logos, images, and/or trademarks, for each open source project belongs to its rightful owner. If you wish to add or remove any projects, please contact us at [email protected].