All Projects → NewMotion → Akka Rabbitmq

NewMotion / Akka Rabbitmq

Licence: other
RabbitMq client in Scala and Akka actors

Programming Languages

scala
5932 projects

Projects that are alternatives of or similar to Akka Rabbitmq

aop
AMQP on Pulsar protocol handler
Stars: ✭ 93 (-59.21%)
Mutual labels:  amqp, message-queue
Enqueue Bundle
[READ-ONLY] Message queue bundle for Symfony. RabbitMQ, Amazon SQS, Redis, Service bus, Async events, RPC over MQ and a lot more
Stars: ✭ 233 (+2.19%)
Mutual labels:  amqp, message-queue
Kmq
Kafka-based message queue
Stars: ✭ 239 (+4.82%)
Mutual labels:  akka, message-queue
roger-rabbit
A module that makes the process of consuming and publishing messages in message brokers easier
Stars: ✭ 12 (-94.74%)
Mutual labels:  amqp, message-queue
amqp-bunny
[READ-ONLY] The queue-interop compatible AMQP protocol impl. Based on bunny/bunny lib
Stars: ✭ 32 (-85.96%)
Mutual labels:  amqp, message-queue
Chanamq
Open source AMQP messaging broker based on Akka
Stars: ✭ 105 (-53.95%)
Mutual labels:  akka, amqp
Benthos
Fancy stream processing made operationally mundane
Stars: ✭ 3,705 (+1525%)
Mutual labels:  amqp, message-queue
Enqueue Dev
Message Queue, Job Queue, Broadcasting, WebSockets packages for PHP, Symfony, Laravel, Magento. DEVELOPMENT REPOSITORY - provided by Forma-Pro
Stars: ✭ 1,977 (+767.11%)
Mutual labels:  amqp, message-queue
Tsung
Tsung is a high-performance benchmark framework for various protocols including HTTP, XMPP, LDAP, etc.
Stars: ✭ 2,185 (+858.33%)
Mutual labels:  amqp
Kafka With Akka Streams Kafka Streams Tutorial
Code samples for the Lightbend tutorial on writing microservices with Akka Streams, Kafka Streams, and Kafka
Stars: ✭ 204 (-10.53%)
Mutual labels:  akka
Liftbridge
Lightweight, fault-tolerant message streams.
Stars: ✭ 2,175 (+853.95%)
Mutual labels:  message-queue
Librabbitmq
Python bindings to librabbitmq-c
Stars: ✭ 181 (-20.61%)
Mutual labels:  amqp
Phpnats
A PHP client for the NATSio cloud messaging system.
Stars: ✭ 209 (-8.33%)
Mutual labels:  message-queue
Otoroshi
Lightweight api management on top of a modern http reverse proxy
Stars: ✭ 177 (-22.37%)
Mutual labels:  akka
Sidekiq monitor
Advanced monitoring for Sidekiq
Stars: ✭ 220 (-3.51%)
Mutual labels:  message-queue
Akka Stream Contrib
Add-ons to Akka Stream
Stars: ✭ 173 (-24.12%)
Mutual labels:  akka
Rabtap
RabbitMQ wire tap and swiss army knife
Stars: ✭ 171 (-25%)
Mutual labels:  amqp
Nats.c
A C client for NATS
Stars: ✭ 220 (-3.51%)
Mutual labels:  message-queue
Constructr
Coordinated (etcd, ...) cluster construction for dynamic (cloud, containers) environments
Stars: ✭ 219 (-3.95%)
Mutual labels:  akka
Cookim
Distributed web chat application base websocket built on akka.
Stars: ✭ 198 (-13.16%)
Mutual labels:  akka

Akka RabbitMQ client Build Status

This small library allows you use RabbitMQ client via Akka Actors. The main idea implemented in library is to survive losing connection with RabbitMQ server

It gives you two actors ConnectionActor and ChannelActor

ConnectionActor

  • handles connection failures and notifies children
  • keep trying to reconnect if connection lost
  • provides children with new channels when needed

ChannelActor

  • may store messages in memory if channel lost
  • send stored messages as soon as new channel received
  • retrieve new channel if current is broken

Please note that while this library transparently reconnects when a connection fails, it cannot guarantee that no messages will be lost. If you want to make sure every message is delivered, you have to use acknowledgements and confirms. This is documented in the RabbitMQ Reliability Guide. An example program using confirms can be found in this project under ConfirmsExample.scala.

Setup

Sbt

Since version 3.0.0:

libraryDependencies += "com.newmotion" %% "akka-rabbitmq" % "6.0.0"

To add earlier releases as a dependency, you have to add the NewMotion public repository to your resolver list:

resolvers += "New Motion Repository" at "https://nexus.thenewmotion.com/content/groups/public/"
libraryDependencies += "com.thenewmotion.akka" %% "akka-rabbitmq" % "2.3"

Maven

Since version 6.0.0

<dependency>
    <groupId>com.newmotion</groupId>
    <artifactId>akka-rabbitmq_{2.12/2.13}</artifactId>
    <version>6.0.0</version>
</dependency>

Since version 4.0.0

<dependency>
    <groupId>com.newmotion</groupId>
    <artifactId>akka-rabbitmq_{2.12/2.13}</artifactId>
    <version>5.0.4-beta</version>
</dependency>

Since version 3.0.0

<dependency>
    <groupId>com.thenewmotion</groupId>
    <artifactId>akka-rabbitmq_{2.11/2.12}</artifactId>
    <version>3.0.0</version>
</dependency>

For prior releases

<repository>
    <id>thenewmotion</id>
    <name>New Motion Repository</name>
    <url>http://nexus.thenewmotion.com/content/groups/public/</url>
</repository>
...
<dependency>
    <groupId>com.thenewmotion</groupId>
    <artifactId>akka-rabbitmq_{2.11/2.12}</artifactId>
    <version>2.3</version>
</dependency>

Tutorial in comparisons

Before start, you need to add import statement

    import com.newmotion.akka.rabbitmq._

Create connection

Default approach:

    val factory = new ConnectionFactory()
    val connection: Connection = factory.newConnection()

Actor style:

    val factory = new ConnectionFactory()
    val connectionActor: ActorRef = system.actorOf(ConnectionActor.props(factory))

Let's name it:

    system.actorOf(ConnectionActor.props(factory), "my-connection")

How often will it reconnect?

    import concurrent.duration._
    system.actorOf(ConnectionActor.props(factory, reconnectionDelay = 10.seconds), "my-connection")

Create channel

That's plain option:

    val channel: Channel = connection.createChannel()

But we can do better. Asynchronously:

    connectionActor ! CreateChannel(ChannelActor.props())

Synchronously:

    val channelActor: ActorRef = connectionActor.createChannel(ChannelActor.props())

Maybe give it a name:

    connectionActor.createChannel(ChannelActor.props(), Some("my-channel"))

What's about custom actor:

    connectionActor.createChannel(Props(new Actor {
      def receive = {
        case channel: Channel =>
      }
    }))

Setup channel

    channel.queueDeclare("queue_name", false, false, false, null)

Actor style:

    // this function will be called each time new channel received
    def setupChannel(channel: Channel, self: ActorRef) {
      channel.queueDeclare("queue_name", false, false, false, null)
    }
    val channelActor: ActorRef = connectionActor.createChannel(ChannelActor.props(setupChannel))

Use channel

    channel.basicPublish("", "queue_name", null, "Hello world".getBytes)

Using our channelActor:

    def publish(channel: Channel) {
      channel.basicPublish("", "queue_name", null, "Hello world".getBytes)
    }
    channelActor ! ChannelMessage(publish)

But I don't want to lose messages when connection is lost:

    channelActor ! ChannelMessage(publish, dropIfNoChannel = false)

Close channel

    channel.close()

VS

    system stop channelActor

Close connection

    connection.close()

VS

    system stop connectionActor // will close all channels associated with this connection

You can shutdown ActorSystem, this will close all connections as well as channels:

    system.shutdown()

Examples:

Publish/Subscribe

Here is RabbitMQ Publish/Subscribe in actors style

object PublishSubscribe extends App {
  implicit val system = ActorSystem()
  val factory = new ConnectionFactory()
  val connection = system.actorOf(ConnectionActor.props(factory), "akka-rabbitmq")
  val exchange = "amq.fanout"

  def setupPublisher(channel: Channel, self: ActorRef) {
    val queue = channel.queueDeclare().getQueue
    channel.queueBind(queue, exchange, "")
  }
  connection ! CreateChannel(ChannelActor.props(setupPublisher), Some("publisher"))

  def setupSubscriber(channel: Channel, self: ActorRef) {
    val queue = channel.queueDeclare().getQueue
    channel.queueBind(queue, exchange, "")
    val consumer = new DefaultConsumer(channel) {
      override def handleDelivery(consumerTag: String, envelope: Envelope, properties: BasicProperties, body: Array[Byte]) {
        println("received: " + fromBytes(body))
      }
    }
    channel.basicConsume(queue, true, consumer)
  }
  connection ! CreateChannel(ChannelActor.props(setupSubscriber), Some("subscriber"))

  Future {
    def loop(n: Long) {
      val publisher = system.actorSelection("/user/akka-rabbitmq/publisher")

      def publish(channel: Channel) {
        channel.basicPublish(exchange, "", null, toBytes(n))
      }
      publisher ! ChannelMessage(publish, dropIfNoChannel = false)

      Thread.sleep(1000)
      loop(n + 1)
    }
    loop(0)
  }

  def fromBytes(x: Array[Byte]) = new String(x, "UTF-8")
  def toBytes(x: Long) = x.toString.getBytes("UTF-8")
}

Changelog

6.0.0

  • Drop support of Scala 2.11

  • Update dependencies:

    • amqp-client: 5.7.3 -> 5.9.0
    • akka: 2.5.+ -> 2.6.+

5.1.2

  • Update to latest dependencies:

    • amqp-client: 5.7.1 -> 5.7.3
    • Typesafe Config: 1.3.4 -> 1.4.0
    • Specs2: 4.5.1 -> 4.8.1
    • SBT: 1.2.8 -> 1.3.4
    • sbt-build-seed: 5.0.1 -> 5.0.4
    • sbt-sonatype: 2.3 -> 3.8.1

5.0.4-beta

  • Fix: proper error handling of close channel and create channel

  • Fix: proper error handling of setup connection/channel callbacks

  • Fix: if callback exception is uncaught, close connection/channel

  • Fix: take into account blocking nature of new connection/channel

  • Fix: close channel if the channel actor never got it (deadletter)

  • Fix: channel actor shouldn't ask for channel after a connection shutdown

  • If unexpectedly received a new channel, close it and use the old instead

  • Log warning when a message isn't retried any longer + more debug logging

  • Update to latest dependencies:

    • Akka: 2.5.8 -> 2.5.+ (provided)
    • amqp-client: 5.1.1 -> 5.4.2
    • Typesafe Config: 1.3.2 -> 1.3.3
    • Specs2: 4.0.2 -> 4.3.4
    • SBT: 1.0.3 -> 1.2.3
    • sbt-build-seed: 4.0.2 -> 4.1.2
    • sbt-sonatype: 2.0 -> 2.3

5.0.2

  • Supersedes version 5.0.1 which has been withdrawn to investigate some unforeseen issues

5.0.0

  • Update to latest dependencies:

    • Akka: 2.4.14 -> 2.5.8
    • amqp-client: 4.0.0 -> 5.1.1
    • Typesafe Config: 1.3.1 -> 1.3.2
    • Specs2: 3.8.6 -> 4.0.2
    • SBT: 0.13.13 -> 1.0.3
    • sbt-build-seed: 2.1.0 -> 4.0.2
    • sbt-scalariform: 1.3.0 -> 1.8.2
    • sbt-sonatype: 1.1 -> 2.0
    • sbt-pgp: 1.0.0 -> 1.1.0

4.0.0

  • Change organization from com.thenewmotion to com.newmotion

Other Libraries

Akka-RabbitMQ is a low-level library, and leaves it to the coder to manually wire consumers, serialize messages, etc. If you'd like a higher-level abstraction library, look at Op-Rabbit (which uses this library).

Note that the project description data, including the texts, logos, images, and/or trademarks, for each open source project belongs to its rightful owner. If you wish to add or remove any projects, please contact us at [email protected].