All Projects → zio → Zio Akka Cluster

zio / Zio Akka Cluster

Licence: apache-2.0
ZIO wrapper for Akka Cluster

Programming Languages

scala
5932 projects

Projects that are alternatives of or similar to Zio Akka Cluster

Aecor
Pure functional event sourcing runtime
Stars: ✭ 299 (+123.13%)
Mutual labels:  akka, functional-programming
Es Cqrs Shopping Cart
A resilient and scalable shopping cart system designed using Event Sourcing (ES) and Command Query Responsibility Segregation (CQRS)
Stars: ✭ 19 (-85.82%)
Mutual labels:  akka, functional-programming
Reactive Turtle
Scala and Akka for kids. A Functional Programming approach for the well known LOGO.
Stars: ✭ 122 (-8.96%)
Mutual labels:  akka, functional-programming
Patoline
Patoline typesetting system
Stars: ✭ 124 (-7.46%)
Mutual labels:  functional-programming
Functional Data Grid
Data grids in functional style with ReactJS
Stars: ✭ 125 (-6.72%)
Mutual labels:  functional-programming
Elm Ethereum
dApps in Elm
Stars: ✭ 129 (-3.73%)
Mutual labels:  functional-programming
Wookiee
Scala based lightweight service framework using akka and other popular technologies.
Stars: ✭ 132 (-1.49%)
Mutual labels:  akka
Riko
A Python stream processing engine modeled after Yahoo! Pipes
Stars: ✭ 1,571 (+1072.39%)
Mutual labels:  functional-programming
Optics Ts
Type-safe, ergonomic, polymorphic optics for TypeScript
Stars: ✭ 132 (-1.49%)
Mutual labels:  functional-programming
Grain
The Grain compiler toolchain and CLI. Home of the modern web staple. 🌾
Stars: ✭ 2,199 (+1541.04%)
Mutual labels:  functional-programming
Flex
Probabilistic deep learning for data streams.
Stars: ✭ 127 (-5.22%)
Mutual labels:  functional-programming
List
🐆 An immutable list with unmatched performance and a comprehensive functional API.
Stars: ✭ 1,604 (+1097.01%)
Mutual labels:  functional-programming
Returns
Make your functions return something meaningful, typed, and safe!
Stars: ✭ 2,015 (+1403.73%)
Mutual labels:  functional-programming
Luafun
Lua Fun is a high-performance functional programming library for Lua designed with LuaJIT's trace compiler in mind.
Stars: ✭ 1,654 (+1134.33%)
Mutual labels:  functional-programming
Lithium
Lithium - A split-brain resolver for Akka-Cluster
Stars: ✭ 132 (-1.49%)
Mutual labels:  akka
Lightweight Stream Api
Stream API from Java 8 rewritten on iterators for Java 7 and below
Stars: ✭ 1,582 (+1080.6%)
Mutual labels:  functional-programming
Rxsealedunions
Compile-time checked Unions of different types for Domain Modeling [STABLE]
Stars: ✭ 130 (-2.99%)
Mutual labels:  functional-programming
Codezilla
⚡️ codezilla ⚡️ One giant 🦖 collection of algorithms & design patterns.
Stars: ✭ 127 (-5.22%)
Mutual labels:  functional-programming
Actors
Evaluation of API and performance of different actor libraries
Stars: ✭ 125 (-6.72%)
Mutual labels:  akka
Naive functional programming
A naive approach to functional programming using TypeScript
Stars: ✭ 129 (-3.73%)
Mutual labels:  functional-programming

ZIO Wrapper for Akka Cluster

CI

This library is a ZIO wrapper for Akka Cluster. It exposes a purely functional API allowing you to leverage the distributed features of Akka without the need to use the actor model.

The following features are available:

  • Akka Cluster (join, leave, cluster state, cluster events)
  • Akka Distributed PubSub
  • Akka Cluster Sharding

Add the dependency

To use zio-akka-cluster, add the following line in your build.sbt file:

libraryDependencies += "dev.zio" %% "zio-akka-cluster" % "0.2.0"

How to use

In order to use the library, you need to provide an ActorSystem. Refer to the Akka Documentation if you need help.

Akka Cluster

The features described here require the following import:

import zio.akka.cluster.Cluster

When you create an ActorSystem, Akka will look at your configuration file and join a cluster if seed nodes are specified. See Akka Documentation to know more about cluster usage. You can also manually join a cluster using Cluster.join.

def join(seedNodes: List[Address]): ZIO[Has[ActorSystem], Throwable, Unit]

It's possible to get the status of the cluster by calling Cluster.clusterState

val clusterState: ZIO[Has[ActorSystem], Throwable, CurrentClusterState]

To monitor the cluster and be informed of changes (e.g. new members, member unreachable, etc), use Cluster.clusterEvents. This functions returns a ZIO Queue that will be populated with the cluster events as they happen. The returned queue is unbounded, but if you want to supply your own bounded queue, use Cluster.clusterEventsWith. To unsubscribe, simply shutdown the queue. initialStateAsEvents indicates if you want to receive previous cluster events leading to the current state, or only future events.

def clusterEvents(initialStateAsEvents: Boolean = false): ZIO[Has[ActorSystem], Throwable, Queue[ClusterDomainEvent]]

Finally, you can leave the current cluster using Cluster.leave.

val leave: ZIO[Has[ActorSystem], Throwable, Unit]

Akka PubSub

The features described here require the following import:

import zio.akka.cluster.pubsub.PubSub

Akka Distributed PubSub lets you publish and receive events from any node in the cluster. See Akka Documentation to know more about PubSub usage. To create a PubSub object which can both publish and subscribe, use PubSub.createPubSub.

def createPubSub[A]: ZIO[Has[ActorSystem], Throwable, PubSub[A]]

There are also less powerful variants PubSub.createPublisher if you only need to publish and PubSub.createSubscriber if you only need to subscribe.

To publish a message, use publish. It requires the following:

  • the topic you want to publish to
  • data is the message to publish.
  • sendOneMessageToEachGroup can be used in order to send the message not to all subscribers but to only one subscriber per group.
def publish(topic: String, data: A, sendOneMessageToEachGroup: Boolean = false): Task[Unit]

To subscribe to messages, use listen. It requires the following:

  • the topic you want to subscribe to.
  • a group name if you want only one subscriber per group to receive each message, to be used with sendOneMessageToEachGroup=true

listen returns an unbounded ZIO Queue that will be populated with the messages. To use a bounded queue, use listenWith instead. Note that listen waits for the subscription acknowledgment before completing, which means that once it completes, all messages published will be received. To stop listening, simply shutdown the queue.

def listen(topic: String, group: Option[String] = None): Task[Queue[A]] =
    Queue.unbounded[A].tap(listenWith(topic, _, group))

Note on Serialization Akka messages are serialized when they are sent across the network. By default, Java serialization is used but it is not recommended to use it in production. See Akka Documentation to see how to provide your own serializer. This library wraps messages inside of a zio.akka.cluster.pubsub.MessageEnvelope case class, so your serializer needs to cover it as well.

Example:

import akka.actor.ActorSystem
import zio.{ Has, Managed, Task, ZLayer }
import zio.akka.cluster.pubsub.PubSub

val actorSystem: ZLayer[Any, Throwable, Has[ActorSystem]] =
  ZLayer.fromManaged(Managed.make(Task(ActorSystem("Test")))(sys => Task.fromFuture(_ => sys.terminate()).either))

(for {
  pubSub   <- PubSub.createPubSub[String]
  queue    <- pubSub.listen("my-topic")
  _        <- pubSub.publish("my-topic", "yo")
  firstMsg <- queue.take
} yield firstMsg).provideLayer(actorSystem)

Akka Cluster Sharding

The features described here require the following import:

import zio.akka.cluster.sharding.Sharding

Akka Cluster Sharding lets you distribute entities across a cluster and communicate with them using a logical ID, without having to care about their physical location. It is particularly useful when you have some business logic that needs to be processed by a single process across a cluster (e.g. some state that should be only in one place at a given time, a single writer to a database, etc). See Akka Documentation to know more about Cluster Sharding usage.

To start sharding a given entity type on a node, use Sharding.start. It returns a Sharding object which can be used to send messages, stop or passivate sharded entities.

def start[R, Msg, State](
    name: String,
    onMessage: Msg => ZIO[Entity[State] with R, Nothing, Unit],
    numberOfShards: Int = 100
  ): ZIO[Has[ActorSystem] with R, Throwable, Sharding[Msg]]

It requires:

  • the name of the entity type. Entities will be distributed on all the nodes of the cluster where start was called with this name.
  • onMessage is the behavior of the sharded entity. For each received message, it will run an effect of type ZIO[Entity[State], Nothing, Unit]:
    • Entity[State] gives you access to a Ref[Option[State]] which you can use to read or modify the state of the entity. The state is set to None when the entity is started. This Entity object also allows you to get the entity ID and to stop the entity from within (e.g. after some time of inactivity).
    • Nothing means the effect should not fail, you must catch and handle potential errors
    • Unit means the effect should not return anything
  • numberOfShards indicates how entities will be split across nodes. See this page for more information.

You can also use Sharding.startProxy if you need to send messages to entities located on other nodes.

To send a message to a sharded entity without expecting a response, use send. To send a message to a sharded entity expecting a response, use ask. To stop one, use stop. The entityId identifies the entity to target. Messages sent to the same entityId from different nodes in the cluster will be handled by the same actor.

def send(entityId: String, data: M): Task[Unit]
def ask[R](entityId: String, data: M): Task[R]
def stop(entityId: String): Task[Unit]
def passivate(entityId: String): Task[Unit]

Note on Serialization Akka messages are serialized when they are sent across the network. By default, Java serialization is used, but it is not recommended in production. See Akka Documentation to see how to provide your own serializer. This library wraps messages inside of a zio.akka.cluster.sharding.MessageEnvelope case class, so your serializer needs to cover it as well.

Example:

import akka.actor.ActorSystem
import zio.akka.cluster.sharding.{ Entity, Sharding }
import zio.{ Has, Managed, Task, ZIO, ZLayer }

val actorSystem: ZLayer[Any, Throwable, Has[ActorSystem]] =
  ZLayer.fromManaged(Managed.make(Task(ActorSystem("Test")))(sys => Task.fromFuture(_ => sys.terminate()).either))

val behavior: String => ZIO[Entity[Int], Nothing, Unit] = {
  case "+" => ZIO.accessM[Entity[Int]](_.get.state.update(x => Some(x.getOrElse(0) + 1)))
  case "-" => ZIO.accessM[Entity[Int]](_.get.state.update(x => Some(x.getOrElse(0) - 1)))
  case _   => ZIO.unit
}

(for {
  sharding <- Sharding.start("session", behavior)
  entityId = "1"
  _        <- sharding.send(entityId, "+")
  _        <- sharding.send(entityId, "+")
  _        <- sharding.send(entityId, "-")
} yield ()).provideLayer(actorSystem)
Note that the project description data, including the texts, logos, images, and/or trademarks, for each open source project belongs to its rightful owner. If you wish to add or remove any projects, please contact us at [email protected].