Enterprise Java

Connect to RabbitMQ (AMQP) using Scala, Play and Akka

In this article we’ll look at how you can connect from Scala to RabbitMQ so you can support the AMQP protocol from your applications. In this example I’ll use the Play Framework 2.0 as container (for more info on this see my other article on this subject) to run the application in, since Play makes developing with Scala a lot easier. This article will also use Akka actors to send and receive the messages from RabbitMQ.

What is AMQP

First, a quick introduction into AMQP. AMQP stands for “Advanced Message Queueing Protocol” and is an open standard for messaging. The AMQP homepage states their vision as this: “To become the standard protocol for interoperability between all messaging middleware”. AMQP defines a transport level protocol for exchanging messages that can be used to integrate applications from a number of different platform, languages and technologies.
There are a number of tools implementing this protocol, but one that is getting more and more attention is RabbitMQ. RabbitMQ is an open source, erlang based message broker that uses AMQP. All application that can speak AMQP can connect to and make use of RabbitMQ. So in this article we’ll show how you can connect from your Play2/Scala/Akka based application to RabbitMQ.
In this article we’ll show you how to do implement the two most common scenarios:

  • Send / recieve: We’ll configure one sender to send a message every couple of seconds, and use two listeners that will read the messages, in a round robin fashion, from the queue.
  • Publish / subscribe: For this example we’ll create pretty much the same scenario, but this time, the listeners will both get the message at the same time.

I assume you’ve got an installation of RabbitMQ. If not follow the instructions from their site.

Setup basic Play 2 / Scala project

For this example I created a new Play 2 project. Doing this is very easy:

jos@Joss-MacBook-Pro.local:~/Dev/play-2.0-RC2$ ./play new Play2AndRabbitMQ
       _            _ 
 _ __ | | __ _ _  _| |
| '_ \| |/ _' | || |_|
|  __/|_|\____|\__ (_)
|_|            |__/ 
 
play! 2.0-RC2, http://www.playframework.org
 
The new application will be created in /Users/jos/Dev/play-2.0/PlayAndRabbitMQ
 
What is the application name? 
> PlayAndRabbitMQ
 
Which template do you want to use for this new application? 
 
  1 - Create a simple Scala application
  2 - Create a simple Java application
  3 - Create an empty project
 
> 1
 
OK, application PlayAndRabbitMQ is created.
 
Have fun!

I am used to work from Eclipse with the scala-ide pluging, so I execute play eclipsify and import the project in Eclipse.
The next step we need to do is set up the correct dependencies. Play uses sbt for this and allows you to configure your dependencies from the build.scala file in your project directory. The only dependency we’ll add is the java client library from RabbitMQ. Even though Lift provides a scala based AMQP library, I find using the RabbitMQ one directly just as easy. After adding the dependency my build.scala looks like this:

import sbt._
import Keys._
import PlayProject._
 
object ApplicationBuild extends Build {
 
    val appName         = "PlayAndRabbitMQ"
    val appVersion      = "1.0-SNAPSHOT"
 
    val appDependencies = Seq(
      "com.rabbitmq" % "amqp-client" % "2.8.1"
    )
 
    val main = PlayProject(appName, appVersion, appDependencies, mainLang = SCALA).settings(
    )
}

Add rabbitMQ configuration to the config file

For our examples we can configure a couple of things. The queue where to send the message to, the exchange to use, and the host where RabbitMQ is running. In a real world scenario we would have more configuration options to set, but for this case we’ll just have these three. Add the following to your application.conf so that we can reference it from our application.

#rabbit-mq configuration
rabbitmq.host=localhost
rabbitmq.queue=queue1
rabbitmq.exchange=exchange1

We can now access these configuration files using the ConfigFactory. To allow easy access create the following object:

object Config {
  val RABBITMQ_HOST = ConfigFactory.load().getString("rabbitmq.host");
  val RABBITMQ_QUEUE = ConfigFactory.load().getString("rabbitmq.queue");
  val RABBITMQ_EXCHANGEE = ConfigFactory.load().getString("rabbitmq.exchange");
}

Initialize the connection to RabbitMQ

We’ve got one more object to define before we’ll look at how we can use RabbitMQ to send and receive messages. to work with RabbitMQ we require a connection. We can get a connection to a server by using a ConnectionFactory. Look at the javadocs for more information on how to configure the connection.

object RabbitMQConnection {
 
  private val connection: Connection = null;
 
  /**
   * Return a connection if one doesn't exist. Else create
   * a new one
   */
  def getConnection(): Connection = {
    connection match {
      case null => {
        val factory = new ConnectionFactory();
        factory.setHost(Config.RABBITMQ_HOST);
        factory.newConnection();
      }
      case _ => connection
    }
  }
}

Start the listeners when the application starts

We need to do one more thing before we can look at the RabbitMQ code. We need to make sure our message listeners are registered on application startup and our senders start sending. Play 2 provides a
GlobalSettings object for this which you can extend to execute code when your application starts. For our example we’ll use the following object (remember, this needs to be stored in the default namespace:

import play.api.mvc._
import play.api._
import rabbitmq.Sender
 
object Global extends GlobalSettings {
 
  override def onStart(app: Application) {
    Sender.startSending
  }
}

We’ll look at this Sender.startSending operation, which initializes all the senders and receivers in the following sections.

Setup send and receive scenario

Let’s look at the Sender.startSending code that will setup a sender that sends a msg to a specific queue. For this we use the following piece of code:

object Sender {
 
  def startSending = {
    // create the connection
    val connection = RabbitMQConnection.getConnection();
    // create the channel we use to send
    val sendingChannel = connection.createChannel();
    // make sure the queue exists we want to send to
    sendingChannel.queueDeclare(Config.RABBITMQ_QUEUE, false, false, false, null);
 
   Akka.system.scheduler.schedule(2 seconds, 1 seconds
          , Akka.system.actorOf(Props(
               new SendingActor(channel = sendingChannel, 
                                          queue = Config.RABBITMQ_QUEUE)))
          , "MSG to Queue");
  }
}
 
class SendingActor(channel: Channel, queue: String) extends Actor {
 
  def receive = {
    case some: String => {
      val msg = (some + " : " + System.currentTimeMillis());
      channel.basicPublish("", queue, null, msg.getBytes());
      Logger.info(msg);
    }
    case _ => {}
  }
}

In this code we take the following steps:

  1. Use the factory to retrieve a connection to RabbitMQ
  2. Create a channel on this connection to use in communicating with RabbitMQ
  3. Use the channel to create the queue (if it doesn’t exist yet)
  4. Schedule Akka to send a message to an actor every second.

This all should be pretty straightforward. The only (somewhat) complex part is the scheduling part. What this schedule operation does is this. We tell Akka to schedule a message to be sent to an actor. We want a 2 seconds delay before it is fired, and we want to repeat this job every second. The actor that should be used for this is the SendingActor you can also see in this listing. This actor needs access to a channel to send a message and this actor also needs to know where to send the message it receives to. This is the queue.
So every second this Actor will receive a message, append a timestamp, and use the provided channel to send this message to the queue: channel.basicPublish(“”, queue, null, msg.getBytes());. Now that we send a message each second it would be nice to have listeners on this queue that can receive messages. For receiving messages we’ve also created an Actor that listens indefinitely on a specific queue.

class ListeningActor(channel: Channel, queue: String, f: (String) => Any) extends Actor {
 
  // called on the initial run
  def receive = {
    case _ => startReceving
  }
 
  def startReceving = {
 
    val consumer = new QueueingConsumer(channel);
    channel.basicConsume(queue, true, consumer);
 
    while (true) {
      // wait for the message
      val delivery = consumer.nextDelivery();
      val msg = new String(delivery.getBody());
 
      // send the message to the provided callback function
      // and execute this in a subactor
      context.actorOf(Props(new Actor {
        def receive = {
          case some: String => f(some);
        }
      })) ! msg
    }
  }
}

This actor is a little bit more complex than the one we used for sending. When this actor receives a message (kind of message doesn’t matter) it starts listening on the queue it was created with. It does this by creating a consumer using the supplied channel and tells the consumers to start listening on the specified queue. The consumer.nextDelivery() method will block until a message is waiting in the configured queue. Once a message is received, a new Actor is created to which the message is sent. This new actor passes the message on to the supplied method, where you can put your business logic.
To use this listener we need to supply the following arguments:

  • Channel: Allows access to RabbitMQ
  • Queue: The queue to listen to for messages
  • f: The function that we’ll execute when a message is received.

The final step for this first example is glueing everything together. We do this by adding a couple of method calls to the Sender.startSending method.

  def startSending = {
   ...
    val callback1 = (x: String) => Logger.info("Recieved on queue callback 1: " + x);
 
    setupListener(connection.createChannel(),Config.RABBITMQ_QUEUE, callback1);
 
    // create an actor that starts listening on the specified queue and passes the
    // received message to the provided callback
    val callback2 = (x: String) => Logger.info("Recieved on queue callback 2: " + x);
 
    // setup the listener that sends to a specific queue using the SendingActor
    setupListener(connection.createChannel(),Config.RABBITMQ_QUEUE, callback2);
   ...
  }
 
  private def setupListener(receivingChannel: Channel, queue: String, f: (String) => Any) {
    Akka.system.scheduler.scheduleOnce(2 seconds, 
        Akka.system.actorOf(Props(new ListeningActor(receivingChannel, queue, f))), "");
  }

In this code you can see that we define a callback function, and use this callback function, together with the queue and the channel to create the ListeningActor. We use the scheduleOnce method to start this listener in a separate thread. Now with this code in place we can run the application (play run) open up localhost:9000 to start the application and we should see something like the following output.

[info] play - Starting application default Akka system.
[info] play - Application started (Dev)
[info] application - MSG to Exchange : 1334324531424
[info] application - MSG to Queue : 1334324531424
[info] application - Recieved on queue callback 2: MSG to Queue : 1334324531424
[info] application - MSG to Exchange : 1334324532522
[info] application - MSG to Queue : 1334324532522
[info] application - Recieved on queue callback 1: MSG to Queue : 1334324532522
[info] application - MSG to Exchange : 1334324533622
[info] application - MSG to Queue : 1334324533622
[info] application - Recieved on queue callback 2: MSG to Queue : 1334324533622
[info] application - MSG to Exchange : 1334324534722
[info] application - MSG to Queue : 1334324534722
[info] application - Recieved on queue callback 1: MSG to Queue : 1334324534722
[info] application - MSG to Exchange : 1334324535822
[info] application - MSG to Queue : 1334324535822
[info] application - Recieved on queue callback 2: MSG to Queue : 1334324535822

Here you can clearly see the round-robin way messages are processed.

Setup publish and subscribe scenario

Once we’ve got the above code running, adding publish / subscribe functionality is very trivial. Instead of the SendingActor we now use a PublishingActor:

class PublishingActor(channel: Channel, exchange: String) extends Actor {
 
  /**
   * When we receive a message we sent it using the configured channel
   */
  def receive = {
    case some: String => {
      val msg = (some + " : " + System.currentTimeMillis());
      channel.basicPublish(exchange, "", null, msg.getBytes());
      Logger.info(msg);
    }
    case _ => {}
  }
}

An exchange is used by RabbitMQ to allow multiple recipients to receive the same message (and a whole lot of other advanced functionality). The only change in the code from the other actor is that this time we send the message to an exchange instead of to a queue. The listener code is exactly the same, the only thing we need to do is connect a queue to a specific exchange. So that listeners on that queue receive the messages sent to to the exchange. We do this, once again, from the setup method we used earlier.

    ...
    // create a new sending channel on which we declare the exchange
    val sendingChannel2 = connection.createChannel();
    sendingChannel2.exchangeDeclare(Config.RABBITMQ_EXCHANGEE, "fanout");
 
    // define the two callbacks for our listeners
    val callback3 = (x: String) => Logger.info("Recieved on exchange callback 3: " + x);
    val callback4 = (x: String) => Logger.info("Recieved on exchange callback 4: " + x);
 
    // create a channel for the listener and setup the first listener
    val listenChannel1 = connection.createChannel();
    setupListener(listenChannel1,listenChannel1.queueDeclare().getQueue(), 
                   Config.RABBITMQ_EXCHANGEE, callback3);
 
    // create another channel for a listener and setup the second listener
    val listenChannel2 = connection.createChannel();
    setupListener(listenChannel2,listenChannel2.queueDeclare().getQueue(), 
                   Config.RABBITMQ_EXCHANGEE, callback4);
 
    // create an actor that is invoked every two seconds after a delay of
    // two seconds with the message "msg"
    Akka.system.scheduler.schedule(2 seconds, 1 seconds, Akka.system.actorOf(Props(
               new PublishingActor(channel = sendingChannel2
                    , exchange = Config.RABBITMQ_EXCHANGEE))), 
         "MSG to Exchange");
    ...

We also created an overloaded method for setupListener, which, as an extra parameter, also accepts the name of the exchange to use.

  private def setupListener(channel: Channel, queueName : String, exchange: String, f: (String) => Any) {
    channel.queueBind(queueName, exchange, "");
 
    Akka.system.scheduler.scheduleOnce(2 seconds, 
        Akka.system.actorOf(Props(new ListeningActor(channel, queueName, f))), "");
  }

In this small piece of code you can see that we bind the supplied queue (which is a random name in our example) to the specified exchange. After that we create a new listener as we’ve seen before.
Running this code now will result in the following output:

[info] play - Application started (Dev)
[info] application - MSG to Exchange : 1334325448907
[info] application - MSG to Queue : 1334325448907
[info] application - Recieved on exchange callback 3: MSG to Exchange : 1334325448907
[info] application - Recieved on exchange callback 4: MSG to Exchange : 1334325448907
[info] application - MSG to Exchange : 1334325450006
[info] application - MSG to Queue : 1334325450006
[info] application - Recieved on exchange callback 4: MSG to Exchange : 1334325450006
[info] application - Recieved on exchange callback 3: MSG to Exchange : 1334325450006

As you can see, in this scenario both listeners receive the same message. That pretty much wraps it up for this article. As you’ve seen using the Java based client api for RabbitMQ is more than sufficient, and easy to use from Scala. Note though that this example is not production ready, you should take care to close connections, nicely shutdown listeners and actors. All this shutdown code isn’t shown here.

Reference: Connect to RabbitMQ (AMQP) using Scala, Play and Akka from our JCG partner Jos Dirksen at the Smart Java blog.

Subscribe
Notify of
guest

This site uses Akismet to reduce spam. Learn how your comment data is processed.

1 Comment
Oldest
Newest Most Voted
Inline Feedbacks
View all comments
chip
chip
11 years ago

This was a very helpful post! How to you handle shutting down an app correctly though? Since consumer.nextDelivery will block indefinitely it prevents Akka from shutting down when the app receives a SIGTERM or it’s reloaded during development.

Back to top button