Scala

Akka Notes – DeathWatch – 7

When we talked about Actor lifecycle, we saw that Actors could be stopped by various means (using ActorSystem.stop or ActorContext.stop or sending a PoisonPill – there’s also the Kill and the gracefulStop).

Whatever reason an Actor dies, there are cases when a few other actors in the system would like to know about it. Let’s take a trivial example of an Actor who talks to a database – let’s call it a RepositoryActor. For obvious reasons, there would be few other actors in the system who would be sending message to this RepositoryActor. These “interested” Actors would like to keep an eye on or watch this Actor if it goes down. Now, that in Actor terms is called DeathWatch. And the methods to watch and unwatch over this is intuitively ActorContext.watch and ActorContext.unwatch. If watched, the watchers would receive a Terminated message from the stopped Actor which they could comfortable add in to their receive partial function.

Unlike Supervision (next write-up will insert link once complete), where there is a strict enforcement of parent-child hierarchy, any Actor could watch any other Actor in the ActorSystem.

DeathWatch

Let’s have a look at the code.

Code

QuoteRepositoryActor

  1. Our QueryRepositoryActor holds a bunch of quotes as a List and serves a random one upon receiving a QuoteRepositoryRequest.
  2. It keeps track of the number of messages received and if it receives more than 3 messages, it kills itself with a PoisonPill

Nothing fancy here.

package me.rerun.akkanotes.deathwatch

import akka.actor.{PoisonPill, Actor, ActorLogging, actorRef2Scala}  
import me.rerun.akkanotes.protocols.QuoteRepositoryProtocol._  
import scala.util.Random

class QuoteRepositoryActor() extends Actor with ActorLogging {

  val quotes = List(
    "Moderation is for cowards",
    "Anything worth doing is worth overdoing",
    "The trouble is you think you have time",
    "You never gonna know if you never even try")

  var repoRequestCount:Int=1

  def receive = {

    case QuoteRepositoryRequest => {

      if (repoRequestCount>3){
        self!PoisonPill
      }
      else {
        //Get a random Quote from the list and construct a response
        val quoteResponse = QuoteRepositoryResponse(quotes(Random.nextInt(quotes.size)))

        log.info(s"QuoteRequest received in QuoteRepositoryActor. Sending response to Teacher Actor $quoteResponse")
        repoRequestCount=repoRequestCount+1
        sender ! quoteResponse
      }

    }

  }

}

TeacherActorWatcher

Again, nothing fancy with TeacherActorWatcher except that it creates the QuoteRepositoryActor and watches over it using a context.watch.

package me.rerun.akkanotes.deathwatch

import akka.actor.{Terminated, Props, Actor, ActorLogging}  
import me.rerun.akkanotes.protocols.TeacherProtocol.QuoteRequest  
import me.rerun.akkanotes.protocols.QuoteRepositoryProtocol.QuoteRepositoryRequest

class TeacherActorWatcher extends Actor with ActorLogging {

  val quoteRepositoryActor=context.actorOf(Props[QuoteRepositoryActor], "quoteRepositoryActor")
  context.watch(quoteRepositoryActor)


  def receive = {
    case QuoteRequest => {
      quoteRepositoryActor ! QuoteRepositoryRequest
    }
    case Terminated(terminatedActorRef)=>{
      log.error(s"Child Actor {$terminatedActorRef} Terminated")
    }
  }
}

TestCases

This is the interesting bit. Frankly, I never thought that these could be tested. akka-testkit FTW. We will analyze three testcases here :

1. Assert receipt of Terminated message if watched

The QuoteRepositoryActor should send the testcase a Terminated message on receipt of the 4th message. The first three messages should go in fine.

"A QuoteRepositoryActor" must {
    ...
    ...
    ...

    "send back a termination message to the watcher on 4th message" in {
      val quoteRepository=TestActorRef[QuoteRepositoryActor]

      val testProbe=TestProbe()
      testProbe.watch(quoteRepository) //Let's watch the Actor

      within (1000 millis) {
        var receivedQuotes = List[String]()
        (1 to 3).foreach(_ => quoteRepository ! QuoteRepositoryRequest)
        receiveWhile() {
          case QuoteRepositoryResponse(quoteString) => {
            receivedQuotes = receivedQuotes :+ quoteString
          }
        }

        receivedQuotes.size must be (3)
        println(s"receiveCount ${receivedQuotes.size}")

        //4th message
        quoteRepository!QuoteRepositoryRequest
        testProbe.expectTerminated(quoteRepository)  //Expect a Terminated Message
      }
    }

2. Assert non-receipt of Terminated message if not watched/unwatched

Actually, we are over-doing things just to showcase the context.unwatch. The testcase would work just fine if we remove the testProbe.watch and testProbe.unwatch lines.

"not send back a termination message on 4th message if not watched" in {
      val quoteRepository=TestActorRef[QuoteRepositoryActor]

      val testProbe=TestProbe()
      testProbe.watch(quoteRepository) //watching

      within (1000 millis) {
        var receivedQuotes = List[String]()
        (1 to 3).foreach(_ => quoteRepository ! QuoteRepositoryRequest)
        receiveWhile() {
          case QuoteRepositoryResponse(quoteString) => {
            receivedQuotes = receivedQuotes :+ quoteString
          }
        }

        testProbe.unwatch(quoteRepository) //not watching anymore
        receivedQuotes.size must be (3)
        println(s"receiveCount ${receivedQuotes.size}")

        //4th message
        quoteRepository!QuoteRepositoryRequest
        testProbe.expectNoMsg() //Not Watching. No Terminated Message
      }
    }

3. Assert receipt of Terminated message in the TeacherActorWatcher

We subscribe to the EventStream and check for a specific log message to assert termination.

"end back a termination message to the watcher on 4th message to the TeacherActor" in {

      //This just subscribes to the EventFilter for messages. We have asserted all that we need against the QuoteRepositoryActor in the previous testcase
      val teacherActor=TestActorRef[TeacherActorWatcher]

      within (1000 millis) {
        (1 to 3).foreach (_=>teacherActor!QuoteRequest) //this sends a message to the QuoteRepositoryActor

        EventFilter.error (pattern="""Child Actor .* Terminated""", occurrences = 1).intercept{
          teacherActor!QuoteRequest //Send the dangerous 4th message
        }
      }
    }

The pattern property of the EventFilter, not surprisingly, expects a regex pattern. The pattern="""Child Actor .* Terminated""" is expected to match a log message which is of the format Child Actor {Actor[akka://TestUniversityMessageSystem/user/$$d/quoteRepositoryActor#-1905987636]} Terminated

Github

As always, the code is available at github. Watch for the deathwatch package.

Reference: Akka Notes – DeathWatch – 7 from our JCG partner Arun Manivannan at the Rerun.me blog.
Subscribe
Notify of
guest

This site uses Akismet to reduce spam. Learn how your comment data is processed.

0 Comments
Inline Feedbacks
View all comments
Back to top button