All Projects → calvinlfer → Akka-Streams-custom-stream-processing-examples

calvinlfer / Akka-Streams-custom-stream-processing-examples

Licence: other
Demos of how to do custom stream processing using the Akka Streams GraphStages API

Programming Languages

scala
5932 projects
groovy
2714 projects

Projects that are alternatives of or similar to Akka-Streams-custom-stream-processing-examples

Akka Stream Contrib
Add-ons to Akka Stream
Stars: ✭ 173 (+1230.77%)
Mutual labels:  akka, stream-processing, akka-streams
telepooz
Functional Telegram Bot API wrapper for Scala on top of akka, circe, cats, and shapeless
Stars: ✭ 26 (+100%)
Mutual labels:  akka, akka-streams
Safe Chat
IRC-style chat demo featuring full-stack F#, Akka.Streams, Akkling, Fable, Elmish, Websockets and .NET Core
Stars: ✭ 157 (+1107.69%)
Mutual labels:  akka, akka-streams
Quark
Quark is a streaming-first Api Gateway using Akka
Stars: ✭ 13 (+0%)
Mutual labels:  akka, akka-streams
Alpakka Kafka
Alpakka Kafka connector - Alpakka is a Reactive Enterprise Integration library for Java and Scala, based on Reactive Streams and Akka.
Stars: ✭ 1,295 (+9861.54%)
Mutual labels:  akka, akka-streams
Squbs
Akka Streams & Akka HTTP for Large-Scale Production Deployments
Stars: ✭ 1,365 (+10400%)
Mutual labels:  akka, akka-streams
Otoroshi
Lightweight api management on top of a modern http reverse proxy
Stars: ✭ 177 (+1261.54%)
Mutual labels:  akka, akka-streams
alpakka-samples
Example projects building Reactive Integrations using Alpakka
Stars: ✭ 61 (+369.23%)
Mutual labels:  akka, akka-streams
slicebox
Microservice for safe sharing and easy access to medical images
Stars: ✭ 18 (+38.46%)
Mutual labels:  akka, akka-streams
akka-cookbook
提供清晰、实用的Akka应用指导
Stars: ✭ 30 (+130.77%)
Mutual labels:  akka, akka-streams
Travesty
Diagram- and graph-generating library for Akka Streams
Stars: ✭ 83 (+538.46%)
Mutual labels:  akka, akka-streams
Akka
Examples and explanations of how Akka toolkit works
Stars: ✭ 20 (+53.85%)
Mutual labels:  akka, akka-streams
Toketi Iothubreact
Akka Stream library for Azure IoT Hub
Stars: ✭ 36 (+176.92%)
Mutual labels:  akka, akka-streams
Parquet4s
Read and write Parquet in Scala. Use Scala classes as schema. No need to start a cluster.
Stars: ✭ 125 (+861.54%)
Mutual labels:  akka, akka-streams
Scale
Another example of a REST API with Akka HTTP
Stars: ✭ 23 (+76.92%)
Mutual labels:  akka, akka-streams
Es Cqrs Shopping Cart
A resilient and scalable shopping cart system designed using Event Sourcing (ES) and Command Query Responsibility Segregation (CQRS)
Stars: ✭ 19 (+46.15%)
Mutual labels:  akka, akka-streams
typebus
Framework for building distributed microserviceies in scala with akka-streams and kafka
Stars: ✭ 14 (+7.69%)
Mutual labels:  akka, akka-streams
Gearpump
Lightweight real-time big data streaming engine over Akka
Stars: ✭ 745 (+5630.77%)
Mutual labels:  akka, stream-processing
akka-streams-interleaving
Akka Streams example of how to interleave Sources with priorities
Stars: ✭ 28 (+115.38%)
Mutual labels:  akka, akka-streams
khermes
A distributed fake data generator based in Akka.
Stars: ✭ 94 (+623.08%)
Mutual labels:  akka, akka-streams

Custom Stream Processing in Akka Streams with GraphStages

Demos of how to do custom stream processing using the Akka Streams GraphStages API

Dealing with asynchronous channels

You would use this kind of custom stream processing when you have to deal with integrating with external parties like Amazon's SQS, polling an HTTP endpoint (take a look at timers if you are getting throttled), etc.

From the documentation

In order to receive asynchronous events that are not arriving as stream elements (for example a completion of a future or a callback from a 3rd party API) one must acquire a AsyncCallback by calling getAsyncCallback() from the stage logic. The method getAsyncCallback takes as a parameter a callback that will be called once the asynchronous event fires. It is important to not call the callback directly, instead, the external API must call the invoke(event) method on the returned AsyncCallback. The execution engine will take care of calling the provided callback in a thread-safe way. The callback can safely access the state of the GraphStageLogic implementation.

The documentation provides an example but here's something slightly more complex.

Problem statement

Create a Random Numbers Source where in order to get a random number, you have to poll an API (we pretend to do this) and the result is in a Future. As an added twist, the API could fail and we want to retry after X seconds.

Solution

The documentation says you need an AsyncCallback. When your asynchronous call gets a result, it needs to call invoke with the result on this AsyncCallback. They also mention that the AsyncCallback takes a function as a parameter. It will call the function with the result of the asynchronous call in a thread safe way. In our code, we will call this the target handler. They also recommend that you set all this up in the preStart hook.

image

Here bufferMessageAndEmulatePull is our target handler. As you can see, when the target handler gets a message, it adds it to the buffer and emulates a pull as if the downstream is calling. Your first reaction is correct in saying that it is unsafe to do this which is why in the handler we will check whether we were truly called using the query API to do so. Doing this actually allows us to write less code since the checks are in a single area.

We have setup our AsyncHandler so that when it is called with the results, it calls our target handler with the results in a thread safe way.

The last thing that is left to do is actually make the asynchronous call and when the call completes to call invoke on our AsyncHandler so we can get the results safely.

image

Let's walk through those functions

image

I said our asynchronous call could fail by throwing an exception (😢) or it produces a result. Here is a synchronous call that produces numbers and we wrap it in a Future to make it asynchronous.

So where are we going to call invoke then?

image

Here's where we listen for completion of the asynchronous call, get the result and call invoke. Remember I also said I wanted to retry after X seconds (2 seconds for now), so we do that here.

With all this in place we are ready to write our OutHandler.

image

As you can see, when we are pulled we have to first check whether we are truly pulled from the downstream or whether we are artificially called from bufferMessageAndEmulatePull. So we use the query isAvailable on the outlet port to check.

  • If we are truly pulled then we take the first element off the buffer and send it downstream using push.

  • If we have run out of elements in the buffer then we make our asynchronous call which involves taking the results and obtaining them in a thread safe way.

You can find the entire picture here

You can find it in action here

The example will keep producing random numbers and handle retries internally in the midst of failures that could occur.

Credits

Note that the project description data, including the texts, logos, images, and/or trademarks, for each open source project belongs to its rightful owner. If you wish to add or remove any projects, please contact us at [email protected].