Core Java

Akka Typed Actors: Exploring the receiver pattern

In the previous article we looked at some of the basic features provided by Akka Typed. In this article and the next one we’ll look a bit closer at some more features and do that by looking at the two different patterns provided by Akka Typed: the Receiver and the Receptionist pattern. If you’re new to Akka Typed, it might be a good idea to first read the previous article, since that’ll give you a bit of an introduction into Akka Typed. So for this article in our series on akka-typed we’ll look at the Receiver pattern.

 

The receiver pattern

In the Akka Typed distribution there is a package call akka.typed.patterns. In this package there are two different patterns the Receiver pattern and the Receptionist pattern. Why these two patterns were important enough to add to the distribution I don’t really know to be honest, but they do provide a nice way to introduce some more concepts and ideas behind Akka Typed.

So let’s look into the Receiver pattern and we’ll do the Receptionist pattern in the next article. To understand what the Receiver pattern doen, lets just look at the messages that we can send to it:

/**
   * Retrieve one message from the Receiver, waiting at most for the given duration.
   */
  final case class GetOne[T](timeout: FiniteDuration)(val replyTo: ActorRef[GetOneResult[T]]) extends Command[T]
  /**
   * Retrieve all messages from the Receiver that it has queued after the given
   * duration has elapsed.
   */
  final case class GetAll[T](timeout: FiniteDuration)(val replyTo: ActorRef[GetAllResult[T]]) extends Command[T]
  /**
   * Retrieve the external address of this Receiver (i.e. the side at which it
   * takes in the messages of type T.
   */
  final case class ExternalAddress[T](replyTo: ActorRef[ActorRef[T]]) extends Command[T]

As you can see from these messages what a Receiver does is that it queues messages of type T, and provides additional commands to either get one or more of those messages, while waiting a specific time. To use a receiver we need to get the ExternalAddress, so that we can send messages of type T to it. And from an other actor we can send get GetOne and GetAll messages to see whether there are any messages waiting in the receiver.

For our example we’re going to create the following actors:

  • A producer which sends messages of type T to the receiver.
  • A consumer which can retrieve messages of type T from this receiver.
  • A root actor, which runs this scenario.

We’ll start with the producer, which looks like this:

 /**
   * Producer object containing the protocol and the behavior. This is a very simple
   * actor that produces messages using a schedule. To start producing messages
   * we need to send an initial message
   */
  object Producer {

    // a simple protocol defining the messages that can be sent
    sealed trait ProducerMsg
    final case class registerReceiverMsgIn(msgIn: ActorRef[HelloMsg]) extends ProducerMsg
    final case class addHelloWorldMsg(msg: HelloMsg) extends ProducerMsg

    // the producer, which first waits for a registerReceiver message, after which
    // it changes behavior, to send messages.
    val producer = Full[ProducerMsg] {

      // if we receive a register message, we know where to send messages to
      case Msg(ctx, registerReceiverMsgIn(msgConsumer)) =>

        println("Producer: Switching behavior")

        // simple helper function which sends a message to self.
        def scheduleMessage() = ctx.schedule(500 millisecond, ctx.self, addHelloWorldMsg(Hello(s"hello @ ${System.currentTimeMillis()}")))
        // schedule the first one, the rest will be triggered through the behavior.
        scheduleMessage()

        Static {
          // add a message to the receiver and schedule a new one
          case addHelloWorldMsg(msg) => {println(s"Producer: Adding new '$msg' to receiver: $msgConsumer") ;msgConsumer ! msg; scheduleMessage()}
        }

      // don't switch behavior on any of the other messages
      case _ => Same
    }
  }

In this object we define the messages that can be sent to the actor, and the behavior. The registerReceiverMsgIn message provides the actor with the destination it should send messages to (more on this later), and the addHelloWorldMsg tells the behavior what message to send to the address provided by the registerReceiverMsgIn message. If you look at this behavior you can see that we use a Full[T] behavior. For this behavior we have to provide matchers for all the messages and signals, and as an added bonus we also get access to the actor ctx. In its initial state this behavior only responds to registerReceiverMsgIn messages. When it receives such a message it does two things:

  1. It defines a function which we can use to schedule a message, we we also directly call, to schedule a message being sent in half a second.
  2. It defines our new behavior. This new behavior can process the messages sent by the scheduleMessage function. When it receives that message, it sends the content to the provided messageConsumer (the Receiver), and calls the schedule message again. To keep sending messages every 500 ms.

So when we sent the initial registerReceiverMessage, it will result in an actor that sends a new message to the receiver every 500 ms. Now lets look at the other side: the consumer.

For the consumer we’ve also wrapped everything in an object, which looks like this:

 object Consumer {
    val consumer = Total[HelloMsg] {
      // in the case of a registerReceiver message, we change the implementation
      // since we're ready to receive other message.
      case registerReceiverCmdIn(commandAddress) => {
        println("Consumer: Switching behavior")

        // return a static implementation which closes over actorRefs
        // all messages we receive we pass to the receiver, which will queue
        // them. We have a specific message that prints out the received messages
        ContextAware { ctx =>
          Static[HelloMsg] {

            // printmessages just prints out the list of messages we've received
            case PrintMessages(msgs) => println(s"Consumer: Printing messages: $msgs") ;msgs.foreach { hw => println(s"  $hw")}

            // if we get the getAllMessages request, we get all the messages from
            // the receiver.
            case GetAllMessages() => {
              println("Consumer: requesting all messages")
              val wrap = ctx.spawnAdapter[GetAllResult[HelloMsg]] {
                case msgs:GetAllResult[HelloMsg] => println(s"Consumer: Received ${msgs.msgs.length} messages"); PrintMessages(msgs.msgs)
              }
              commandAddress ! GetAll(2 seconds)(wrap)
            }
          }
        }
      }

      // for all the other cases return the existing implementation, in essence
      // we're just ignoring other messages till we change state
      case _ => Same
    }    
  }

In this object we defines a single behavior, which also switches its implementation after receiving the first message. The first message in this case is called registerReceiverCmdIn. With this message we get access to the actorRef (of the Receiver) that we need to send the GetAll and getOne messages to. After we’ve switched behavior, we process our own custom GetAllMessages message, which will trigger a GetAll message being sent to the Receiver. Since our own behavior isn’t typed for the kind of responses received from the Receiver, we use an adapter (ctx.spawnAdapter). This adapter will receive the response from the Receiver and print out the messages.

The final message part is an actor which initiates this behavior:

  // Simple root actor, which we'll use to start the other actors
  val scenario1 = {
    Full[Unit] {
      case Sig(ctx, PreStart) => {

        import Producer._
        import Consumer._

        println("Scenario1: Started, now lets start up a number of child actors to do our stuff")

        // first start the two actors, one implements the receiver pattern, and
        // the other is the one we control directly.
        val receiverActor = ctx.spawn(Props(Receiver.behavior[HelloMsg]), "receiver")
        val consumerActor = ctx.spawn(Props(consumer), "adder")
        val producerActor = ctx.spawn(Props(producer), "producer")

        // our producerActor first needs the actorRef it can use to add messages to the receiver
        // for this we use a wrapper, this wrapper creates a child, which we use to get the
        // address, to which we can send messages.
        val wrapper = ctx.spawnAdapter[ActorRef[HelloMsg]] {
          case p: ActorRef[HelloMsg] => producerActor ! registerReceiverMsgIn(p)
        }

        // now send the message to get the external address, the response will be sent
        // to our own actor as a registerReceiver message, through the adapter
        receiverActor ! ExternalAddress(wrapper)

        // our printing actor needs to now the address of the receiver so send it to him
        consumerActor ! registerReceiverCmdIn(receiverActor)

        // by calling getAllMessages we get the messages within a time period.
        println("Scenario1: Get all the messages")
        consumerActor ! GetAllMessages()
        Thread.sleep(3000)
        consumerActor ! GetAllMessages()
        Thread.sleep(5000)
        consumerActor ! GetAllMessages()

        Same
      }
    }
  }

Nothing to special here. We create the various actors in this scenario and use the ctx.spawnAdapter to get the external address of the receiver, which we pass to the producerActor. Next we pass the address of the receiver actor to the consumer. Now we call the GetAllMessages on the consumer address which gets the messages from the receiver and prints them out.

So summarising the steps that will be executed in this example:

  1. We create a root actor that will run this scenario.
  2. From this root actor we create the three actors: receiver, consumer and producer.
  3. Next we get the externalAddress from the receiver (the address to which we sent messages of type T) and using an adapter pass this to the producer.
  4. The producer, on receiving this message, switches behavior and starts sending messages to the passed in address.
  5. The root actor, in the meantime, passes the address of the Receiver to the consumer.
  6. The consumer ,when it receives this messages, changes behavior and now waits for messages of the type GetAllMessages.
  7. The root actor will now send a GetAllMessages to the consumer.
  8. When the consumer receives this messages it will use an adapter to send a GetAll message to the receiver. When the adapter receive a response it prints out the number of messages received, and handles further processing to the consumer by sending a PrintMessage for each received message from the receiver.

And the result of this scenario looks like this:

Scenario1: Started, now lets start up a number of child actors to do our stuff
Scenario1: Get all the messages
Consumer: Switching behavior
Consumer: requesting all messages
Producer: Switching behavior
Producer: Adding new 'Hello(hello @ 1446277162929)' to receiver: Actor[akka://Root/user/receiver#1097367365]
Producer: Adding new 'Hello(hello @ 1446277163454)' to receiver: Actor[akka://Root/user/receiver#1097367365]
Producer: Adding new 'Hello(hello @ 1446277163969)' to receiver: Actor[akka://Root/user/receiver#1097367365]
Consumer: Received 3 messages
Consumer: Printing messages: Vector(Hello(hello @ 1446277162929), Hello(hello @ 1446277163454), Hello(hello @ 1446277163969))
  Hello(hello @ 1446277162929)
  Hello(hello @ 1446277163454)
  Hello(hello @ 1446277163969)
Producer: Adding new 'Hello(hello @ 1446277164488)' to receiver: Actor[akka://Root/user/receiver#1097367365]
Producer: Adding new 'Hello(hello @ 1446277165008)' to receiver: Actor[akka://Root/user/receiver#1097367365]
Consumer: requesting all messages
Producer: Adding new 'Hello(hello @ 1446277165529)' to receiver: Actor[akka://Root/user/receiver#1097367365]
Producer: Adding new 'Hello(hello @ 1446277166049)' to receiver: Actor[akka://Root/user/receiver#1097367365]
Producer: Adding new 'Hello(hello @ 1446277166569)' to receiver: Actor[akka://Root/user/receiver#1097367365]
Producer: Adding new 'Hello(hello @ 1446277167089)' to receiver: Actor[akka://Root/user/receiver#1097367365]
Consumer: Received 6 messages
Consumer: Printing messages: Vector(Hello(hello @ 1446277164488), Hello(hello @ 1446277165008), Hello(hello @ 1446277165529), Hello(hello @ 1446277166049), Hello(hello @ 1446277166569), Hello(hello @ 1446277167089))
  Hello(hello @ 1446277164488)
  Hello(hello @ 1446277165008)
  Hello(hello @ 1446277165529)
  Hello(hello @ 1446277166049)
  Hello(hello @ 1446277166569)
  Hello(hello @ 1446277167089)
Producer: Adding new 'Hello(hello @ 1446277167607)' to receiver: Actor[akka://Root/user/receiver#1097367365]
Producer: Adding new 'Hello(hello @ 1446277168129)' to receiver: Actor[akka://Root/user/receiver#1097367365]
Producer: Adding new 'Hello(hello @ 1446277168650)' to receiver: Actor[akka://Root/user/receiver#1097367365]
Producer: Adding new 'Hello(hello @ 1446277169169)' to receiver: Actor[akka://Root/user/receiver#1097367365]
Producer: Adding new 'Hello(hello @ 1446277169690)' to receiver: Actor[akka://Root/user/receiver#1097367365]
Producer: Adding new 'Hello(hello @ 1446277170210)' to receiver: Actor[akka://Root/user/receiver#1097367365]
Consumer: requesting all messages
Producer: Adding new 'Hello(hello @ 1446277170729)' to receiver: Actor[akka://Root/user/receiver#1097367365]
Producer: Adding new 'Hello(hello @ 1446277171249)' to receiver: Actor[akka://Root/user/receiver#1097367365]
Producer: Adding new 'Hello(hello @ 1446277171769)' to receiver: Actor[akka://Root/user/receiver#1097367365]
Producer: Adding new 'Hello(hello @ 1446277172289)' to receiver: Actor[akka://Root/user/receiver#1097367365]
Consumer: Received 10 messages
Consumer: Printing messages: Vector(Hello(hello @ 1446277167607), Hello(hello @ 1446277168129), Hello(hello @ 1446277168650), Hello(hello @ 1446277169169), Hello(hello @ 1446277169690), Hello(hello @ 1446277170210), Hello(hello @ 1446277170729), Hello(hello @ 1446277171249), Hello(hello @ 1446277171769), Hello(hello @ 1446277172289))
  Hello(hello @ 1446277167607)
  Hello(hello @ 1446277168129)
  Hello(hello @ 1446277168650)
  Hello(hello @ 1446277169169)
  Hello(hello @ 1446277169690)
  Hello(hello @ 1446277170210)
  Hello(hello @ 1446277170729)
  Hello(hello @ 1446277171249)
  Hello(hello @ 1446277171769)
  Hello(hello @ 1446277172289)
Producer: Adding new 'Hello(hello @ 1446277172808)' to receiver: Actor[akka://Root/user/receiver#1097367365]
Producer: Adding new 'Hello(hello @ 1446277173328)' to receiver: Actor[akka://Root/user/receiver#1097367365]
Producer: Adding new 'Hello(hello @ 1446277173849)' to receiver: Actor[akka://Root/user/receiver#1097367365]
Producer: Adding new 'Hello(hello @ 1446277174369)' to receiver: Actor[akka://Root/user/receiver#1097367365]

Cool right! As you can see from the message sequence, our producer sends messages to the receiver which queues them up. Next we have a consumer which requests all the messages that have been received so far and prints them out.

That’s it for this article on Akka-Typed, in the next one we’ll look at the Receptionist pattern also present in Akka-Typed.

Reference: Akka Typed Actors: Exploring the receiver pattern from our JCG partner Jos Dirksen at the Smart Java blog.
Subscribe
Notify of
guest

This site uses Akismet to reduce spam. Learn how your comment data is processed.

0 Comments
Inline Feedbacks
View all comments
Back to top button