Custom Stream Processing in Akka Streams with GraphStages
Demos of how to do custom stream processing using the Akka Streams GraphStages API
Dealing with asynchronous channels
You would use this kind of custom stream processing when you have to deal with integrating with external parties like Amazon's SQS, polling an HTTP endpoint (take a look at timers if you are getting throttled), etc.
From the documentation
In order to receive asynchronous events that are not arriving as stream elements (for example a completion of a future
or a callback from a 3rd party API) one must acquire a AsyncCallback
by calling getAsyncCallback()
from the stage
logic. The method getAsyncCallback
takes as a parameter a callback that will be called once the asynchronous event
fires. It is important to not call the callback directly, instead, the external API must call the invoke(event)
method on the returned AsyncCallback
. The execution engine will take care of calling the provided callback in a
thread-safe way. The callback can safely access the state of the GraphStageLogic
implementation.
The documentation provides an example but here's something slightly more complex.
Problem statement
Create a Random Numbers Source
where in order to get a random number, you have to poll an API (we pretend to do this)
and the result is in a Future
. As an added twist, the API could fail and we want to retry after X seconds.
Solution
The documentation says you need an AsyncCallback
. When your asynchronous call gets a result, it needs to call invoke
with the result on this AsyncCallback
. They also mention that the AsyncCallback
takes a function as a parameter.
It will call the function with the result of the asynchronous call in a thread safe way. In our code, we will call this
the target handler. They also recommend that you set all this up in the preStart
hook.
Here bufferMessageAndEmulatePull
is our target handler. As you can see, when the target handler gets a message, it
adds it to the buffer and emulates a pull as if the downstream is calling. Your first reaction is correct in saying that
it is unsafe to do this which is why in the handler we will check whether we were truly called using the query API to do
so. Doing this actually allows us to write less code since the checks are in a single area.
We have setup our AsyncHandler
so that when it is called with the results, it calls our target handler with the results
in a thread safe way.
The last thing that is left to do is actually make the asynchronous call and when the call completes to call invoke
on
our AsyncHandler
so we can get the results safely.
Let's walk through those functions
I said our asynchronous call could fail by throwing an exception (Future
to make it asynchronous.
So where are we going to call invoke
then?
Here's where we listen for completion of the asynchronous call, get the result and call invoke
. Remember I also said
I wanted to retry after X seconds (2 seconds for now), so we do that here.
With all this in place we are ready to write our OutHandler
.
As you can see, when we are pulled we have to first check whether we are truly pulled from the downstream or whether we
are artificially called from bufferMessageAndEmulatePull
. So we use the query isAvailable
on the outlet
port to
check.
-
If we are truly pulled then we take the first element off the buffer and send it downstream using
push
. -
If we have run out of elements in the buffer then we make our asynchronous call which involves taking the results and obtaining them in a thread safe way.
You can find the entire picture here
You can find it in action here
The example will keep producing random numbers and handle retries internally in the midst of failures that could occur.