Assembler
Functional, type-safe, stateless reactive Java API for efficient implementation of the API Composition Pattern for querying/merging data from multiple datasources/services, with a specific focus on solving the N + 1 query problem.
Native Reactive Streams Support (since version 0.3.1)
A new implementation reactive-assembler-core was added to natively support Reactive Streams specification. This new implementation internally leverages Project Reactor, which now allows the Assembler library (through the reactive-assembler-core module) to participate in end to end reactive streams chains (e.g. from a REST endpoint to a RSocket based microservice to the database) and keep all reactive streams properties as defined by the Reactive Manifesto (Responsive, Resillient, Elastic, Message Driven with back-pressure, non-blocking, etc.)
Use Cases
One interesting use case would be for example to build a materialized view in a microservice architecture supporting Event Sourcing and Command Query Responsibility Segregation (CQRS). In this context, if we have an incoming stream of events where each event needs to be enriched with some sort of external data before being stored, it would be convenient to be able to easily batch those events instead of hitting those external services for every single event.
Usage Example for Native Reactive Support
Assuming the following data model of a very simplified online store, and api to access different services:
public record Customer(Long customerId, String name) {}
public record BillingInfo(Long id, Long customerId, String creditCardNumber) {}
public record OrderItem(Long id, Long customerId, String orderDescription, Double price) {}
public record Transaction(Customer customer, BillingInfo billingInfo, List<OrderItem> orderItems) {}
Flux<Customer> customers(); // call to a RSocket microservice (no query filters for brevity)
Publisher<BillingInfo> billingInfo(List<Long> customerIds); // Connects to relational database (R2DBC)
Publisher<OrderItem> allOrders(List<Long> customerIds); // Connects to MongoDB (Reactive Streams Driver)
If customers()
returns 50 customers, instead of having to make one additional call per customerId to retrieve each customer's associated BillingInfo
(which would result in 50 additional network calls, thus the N + 1 queries issue) we can only make 1 additional call to retrieve all at once all BillingInfo
for all Customer
returned by customers()
, same for OrderItem
. Since we are working with 3 different and independent datasources, joining data from Customer
, BillingInfo
and OrderItem
into Transaction
(using customerId as a correlation id between all those entities) has to be done at the application level, which is what this library was implemented for.
When using reactive-assembler-core, here is how we would aggregate multiple reactive datasources and implement the API Composition Pattern:
import static io.github.pellse.reactive.assembler.AssemblerBuilder.assemblerOf;
import static io.github.pellse.reactive.assembler.RuleMapper.oneToMany;
import static io.github.pellse.reactive.assembler.RuleMapper.oneToOne;
import static io.github.pellse.reactive.assembler.Mapper.rule;
import reactor.core.publisher.Flux;
Assembler<Customer, Flux<Transaction>> assembler = assemblerOf(Transaction.class)
.withIdExtractor(Customer::customerId)
.withAssemblerRules(
rule(BillingInfo::customerId, oneToOne(this::getBillingInfo)),
rule(OrderItem::customerId, oneToMany(this::getAllOrders)),
Transaction::new)
.build();
Flux<Transaction> transactionFlux = assembler.assemble(customers());
In the scenario where we deal with an infinite stream of data, since the Assembler needs to completely drain the upstream from customers()
to gather all the correlation ids (customerId), the example above will result in resource exhaustion. The solution is to split the stream into multiple smaller streams and batch the processing of those individual smaller streams. Most reactive libraries already support that concept, below is an example using Project Reactor:
Flux<Transaction> transactionFlux = customers()
.windowTimeout(100, ofSeconds(5))
.flatMapSequential(assembler::assemble);
Asynchronous Caching (since version 0.4.1)
In addition to providing helper functions to define mapping semantics (e.g. oneToOne()
, OneToMany()
), the Assembler also provides a caching/memoization mechanism of the downstreams through the cached()
wrapper method.
import static io.github.pellse.reactive.assembler.AssemblerBuilder.assemblerOf;
import static io.github.pellse.reactive.assembler.RuleMapper.oneToMany;
import static io.github.pellse.reactive.assembler.RuleMapper.oneToOne;
import static io.github.pellse.reactive.assembler.Mapper.rule;
import static io.github.pellse.reactive.assembler.CacheFactory.cached;
import reactor.core.publisher.Flux;
var assembler = assemblerOf(Transaction.class)
.withIdExtractor(Customer::customerId)
.withAssemblerRules(
rule(BillingInfo::customerId, oneToOne(cached(this::getBillingInfo))),
rule(OrderItem::customerId, oneToMany(cached(this::getAllOrders))),
Transaction::new)
.build();
var transactionFlux = customers()
.window(3)
.flatMapSequential(assembler::assemble);
This can be useful for aggregating dynamic data with static data or data we know doesn't change often (or on a predefined schedule e.g. data that is refreshed by a batch job once a day).
The cached()
method internally uses the list of correlation ids from the upstream as cache keys (list of customer ids in the above example) with each individual value (or entity) of the downstream being cached. Concretely, if we take the line rule(BillingInfo::customerId, oneToOne(cached(this::getBillingInfo)))
, from the example above window(3)
would generate windows of 3 Customer
e.g.:
[C1, C2, C3], [C1, C4, C7], [C4, C5, C6], [C2, C4, C6]
At that specific moment in time the execution would like this:
- getBillingInfo([1, 2, 3]) = [B1, B2, B3]
- merge(getBillingInfo([4, 7]), fromCache([1])) = [B4, B7, B1]
- merge(getBillingInfo([5, 6]), fromCache([4])) = [B5, B6, B4]
- fromCache([2, 4, 6]) = [B2, B4, B6]
Pluggable Asynchronous Caching Strategy
Overloaded versions of the cached()
method are also defined to allow plugging your own cache implementation. We can pass an additional parameter of type CacheFactory
to customize the caching mechanism:
public interface CacheFactory<ID, R> {
Cache<ID, R> create(Function<Iterable<? extends ID>, Mono<Map<ID, Collection<R>>>> fetchFunction);
}
public interface Cache<ID, R> {
Mono<Map<ID, Collection<R>>> getAll(Iterable<ID> ids);
}
If no CacheFactory
parameter is passed to cached()
, the default implementation will internally return a Cache
based on ConcurrentHashMap
.
Below is an example of a few different ways we can explicitely customize the caching mecanism:
import static io.github.pellse.reactive.assembler.AssemblerBuilder.assemblerOf;
import static io.github.pellse.reactive.assembler.RuleMapper.oneToMany;
import static io.github.pellse.reactive.assembler.RuleMapper.oneToOne;
import static io.github.pellse.reactive.assembler.Mapper.rule;
import static io.github.pellse.reactive.assembler.Cache.cache;
import static io.github.pellse.reactive.assembler.CacheFactory.cached;
import reactor.core.publisher.Flux;
var assembler = assemblerOf(Transaction.class)
.withIdExtractor(Customer::customerId)
.withAssemblerRules(
rule(BillingInfo::customerId, oneToOne(cached(this::getBillingInfo, new HashMap<>()))),
rule(OrderItem::customerId, oneToMany(cached(this::getAllOrders, cache(HashMap::new)))),
Transaction::new)
.build();
Overloaded versions of cached()
and cache()
are provided to wrap any implementation of java.util.Map
since it doesn't natively implement
Mono<Map<ID, Collection<R>>> getAll(Iterable<ID> ids)
.
Third Party Asynchronous Cache Provider Integration
Here is a list of add-on modules that can be used to integrate third party asynchronous caching libraries (more will be added in the future):
Assembler add-on module | Third party cache library |
---|---|
Caffeine |
Below is an example of using a CacheFactory
implementation for the Caffeine library through the caffeineCache()
helper method from the caffeine add-on module:
import com.github.benmanes.caffeine.cache.Cache;
import static io.github.pellse.reactive.assembler.AssemblerBuilder.assemblerOf;
import static io.github.pellse.reactive.assembler.RuleMapper.oneToMany;
import static io.github.pellse.reactive.assembler.RuleMapper.oneToOne;
import static io.github.pellse.reactive.assembler.Mapper.rule;
import static io.github.pellse.reactive.assembler.CacheFactory.cached;
import static io.github.pellse.reactive.assembler.cache.caffeine.CaffeineCacheFactory.caffeineCache;
import static java.time.Duration.ofMinutes;
import static com.github.benmanes.caffeine.cache.Caffeine.newBuilder;
var cacheBuilder = newBuilder()
.recordStats()
.expireAfterWrite(ofMinutes(10))
.maximumSize(1000);
var assembler = assemblerOf(Transaction.class)
.withIdExtractor(Customer::customerId)
.withAssemblerRules(
rule(BillingInfo::customerId, oneToOne(cached(this::getBillingInfo, caffeineCache()))),
rule(OrderItem::customerId, oneToMany(cached(this::getAllOrders, caffeineCache(cacheBuilder))))),
Transaction::new)
.build();
Integration with non-reactive sources
A utility function toPublisher()
is also provided to wrap non-reactive sources, useful when e.g. calling 3rd party synchronous APIs:
import static io.github.pellse.reactive.assembler.QueryUtils.toPublisher;
List<BillingInfo> getBillingInfo(List<Long> customerIds); // non-reactive source
List<OrderItem> getAllOrders(List<Long> customerIds); // non-reactive source
Assembler<Customer, Flux<Transaction>> assembler = assemblerOf(Transaction.class)
.withIdExtractor(Customer::customerId)
.withAssemblerRules(
rule(BillingInfo::customerId, oneToOne(toPublisher(this::getBillingInfo))),
rule(OrderItem::customerId, oneToMany(toPublisher(this::getAllOrders))),
Transaction::new)
.build();
Kotlin Support
reactive-assembler-kotlin-extension
import io.github.pellse.reactive.assembler.kotlin.assembler
import io.github.pellse.reactive.assembler.kotlin.cached
import io.github.pellse.reactive.assembler.Cache.cache
import io.github.pellse.reactive.assembler.RuleMapper.oneToMany
import io.github.pellse.reactive.assembler.RuleMapper.oneToOne
import io.github.pellse.reactive.assembler.Mapper.rule
import io.github.pellse.reactive.assembler.cache.caffeine.CaffeineCacheFactory.caffeineCache
// Example 1:
val assembler = assembler<Transaction>()
.withIdExtractor(Customer::customerId)
.withAssemblerRules(
rule(BillingInfo::customerId, oneToOne(::getBillingInfo.cached())),
rule(OrderItem::customerId, oneToMany(::getAllOrders.cached(::hashMapOf))),
::Transaction
).build()
// Example 2:
val assembler = assembler<Transaction>()
.withIdExtractor(Customer::customerId)
.withAssemblerRules(
rule(BillingInfo::customerId, oneToOne(::getBillingInfo.cached(cache()))),
rule(OrderItem::customerId, oneToMany(::getAllOrders.cached(caffeineCache()))),
::Transaction
).build()
Other Supported Technologies
Java 8 Stream (synchronous and parallel) for cases where full reactive/non-blocking support is not needed:
The implementations below are still available, but it is strongly recommended to switch to reactive-assembler-core as the new reactive support can easily integrate with any external reactive libraries:
You only need to include in your project's build file (maven, gradle) the lib that corresponds to the type of reactive (or non reactive) support needed (Java 8 stream, CompletableFuture, Flux, RxJava, Akka Stream, Eclipse MicroProfile Reactive Stream Operators).
All modules above have dependencies on the following modules:
What's Next?
See the list of issues for planned improvements in a near future.