Scala

Even simpler scalability with Akka through RegistryActor

Let’s imagine, your system is deployed on one node, where you running actors that literally draining out the resources. You want to add some more nodes running actors of the same type, to balance the load across the cluster.

Ideally, when you add a new node to existing infrastructure you neither have to create proxies to remote actors on the original node manually, nor change configuration of any node to let it know that an additional element was added. In other words, when any node is added to the system, all other nodes should know automatically, what actors does the new node have, and use them as if they were standard local actors.

Akka’s Actor Registry

Two things you find yourself [re-]implementing, when you’re tied to standard Scala Actors (some production environments are still have to use Java 5, which is unfortunately unsupported by Akka) and making a bet on concurrency through actors, are Supervision Tree (from Erlang, related to Supervisors in Akka) and Registry of Actors.

Though Actor Registry in Akka on the background has a fairly concise implementation (a kind of a smart singleton wrapper around concurrent HashMaps that keeps references to all the actors running on the node) it’s a powerful abstraction that’s hard to survive without, when you’re using Actors in the real-world. E.g. registry significantly simplifies building of load balancers, as long as you no longer should specify explicitly the workers to share the load, but rather the balancer itself looks up for the actors by type or ID on the start-up or during the lifetime in the registry.

The only thing that Akka actor registry lacks as of now is the interface to access it remotely. Adding such an interface makes solving the problem stated above a no-brainer.

Registry Actor

Living in a world of Actors, the first idea you have, when you need to create a remote interface to something, is to create an actor accessible remotely (aka RemoteActor). To a first approximation, there should be an actor that handles messages with the links to the actors on remote nodes, creating proxies and registering them in the local actor registry:

class RegistryActor extends Actor{
  
  ...

  def defaultMessageHandler: PartialFunction[Any, Unit] = {
    case RegisterActor(actor) =>
      log.debug("Registering remote actor [%s]" format(actor))
      if(!isActorInRegistry(actor.uuid) && !isLinkToLocal(actor))
        ActorRegistry.register( // Hack for 0.10, 1.0-M1
          RemoteClient.actorFor(actor.uuid.toString, actor.className, actor.hostname, actor.port)
        ) // RemoteActorRefs will register themselves in 1.0-M1+

    case UnregisterActor(actor) => {
        log.debug("Unregistering remote actor [%s]" format(actor))
        ActorRegistry.foreach{act =>
          if(act.uuid == actor.uuid){
            ActorRegistry.unregister(act)            
          }}
        Option(linkedRegistries.get(RegistryLink(actor.hostname, actor.port))) match{
          case Some(_) => removeLinkToRegistry(RegistryLink(actor.hostname, actor.port))
          case None => log.debug("[%s] is not a registry link" format(actor.uuid))
        }
      }
    ...
    }
...
}

As a prerequisite for future extension, there should also be a map of references to the actor registries running on other nodes (and the way to add and exchange links to registries in runtime). When a registry actor receives and resolves new reference to another registry actor, it sends back the link to self, and all other known registries (so that both registry actors have the same consistent sets of links):

/**
   * RegistryActors located on the other hosts
   */
  protected[easyscale] val linkedRegistries = new ConcurrentHashMap[RegistryLink, ActorRef]()

  def defaultMessageHandler: PartialFunction[Any, Unit] = {
   
    ...

    case AddRegistryLink(link) => 
      if(!linkedRegistries.containsKey(link))
        addRegistryLink(link)
      else
        log.debug("Link to registry [%s] is already present" format(link))
     
      
    case RemoveRegistryLink(link) => 
        log.debug("Unlinking from registry [%s]" format(link))
        linkedRegistries.remove(link)
      
      
  }

Registering Actors on startup

The second step towards resolving the problem is automatically registering local actors at the remote registry, when when the link to it is added:

/**
 * Publishes all local actors as remote references to the linked registry
 * when the registry link is added
 */
trait StartupActorRefsDistribution extends RegistryActor{

  /**
   * Adds link to remote registry, and register all local actors at there
   */
  protected override def addRegistryLink(link: RegistryLink) = {
    super.addRegistryLink(link)
    registerActorsAt(linkedRegistries.get(link))
  }

  /**
   * Registers all local actors at the remote node
   */
  private def registerActorsAt(remoteRegistry: ActorRef) = {
    ActorRegistry.filter(actor =>
      actor.id == REGISTRY_ACTOR && isActorLocal(actor))
    .foreach{actor => remoteRegistry ! RegisterActor(actor)}
    remoteRegistry
  }

}

Every new node should initially know about at least one node running on the cluster (neighboring node):

neighbour { # One of the hosts in the group that has a started RegistryActor
   hostname = "localhost"
   port = 9999
}

Thus, when a new actor registry lets know the “neighbor” about itself, it starts a chain reaction of all other actor registries populating references of their local actors to the new registry and vice versa, so that all the registries at the end are aware of all the actors running in the cluster (and accessing them either through local interface or though a proxy (RemoteActorRef)).

Registering Actors started during the life-time

Akka’s ActorRegistry has a simple notification mechanism that allows to handle events raised when an actor is registered/unregistered from the system (by default, all actors (except for RemoteActorRefs, in Akka 1.0-M1) register themselves in the registry on start/shutdown). It can be used to populate links to the new actors across the system:

/**
 * Publishes all registered local actor as a remote ref on all
 * linked remote registries
 */
trait InlifeActorRefsDistribution extends RegistryActor{

  override def specificMessageHandler = {
    case ActorRegistered(actor) =>
      log.debug("Actor [%s] is registered" format(actor))
      registerOnLinks(actor)
      
    case ActorUnregistered(actor) =>
      log.debug("Actor [%s] is unregistered" format(actor))
      if(isActorLocal(actor)) 
        actor.id match {
          case REGISTRY_ACTOR => ActorRegistry.foreach(act =>
              if(act.getClass.isAssignableFrom(classOf[LocalActorRef]))
                unregisterOnLinks(act))
              case _ => unregisterOnLinks(actor)
            }
        }
   
      /**
       * Makes the actor remote, and registers at remote nodes
       */
      private def registerOnLinks(actor: ActorRef) =
        if(isActorLocal(actor)){          
          ...
          val iterator = linkedRegistries.values.iterator
          while(iterator.hasNext) iterator.next ! RegisterActor(actor)
        }

  ...

  }

Using Actor Registry

Now when we automatically get references to all the actors in the cluster, we can create a balancer that will distribute messages across actors of the same type:

class SimpleTypedBalancer[T](implicit manifest: Manifest[T]) extends Actor{

  def receive = {
    case message :AnyRef =>
      forwardMessage(message,
                     self.getSenderFuture orElse self.getSender,
                     idleWorkerId)      
  }

  def idleWorkerId = Futures.awaitOne{
    ActorRegistry.filter{actor =>
      Class.forName(actor.actorClassName).isAssignableFrom(manifest.erasure)
    }.map(_ !!! IsReady()).toList
  }.result.flatMap(_ match {
      case Ready(actorUuid) =>        
        Option(actorUuid)
      case _ =>
        None        
    })

  def forwardMessage(message: AnyRef, originalSender: Option[AnyRef], workerId: Option[Uuid]) =
  {
    for{id <- workerId; worker <- ActorRegistry.actors.find(actor =>
        actor.id == id.toString ||
        actor.uuid == id )}{
      if(originalSender.isDefined)
        worker.forward(message)
      else worker.sendOneWay(message)
    }
  }

}

Problem Solved

Assume there’s a node with 3 actors of type `SimpleActor` running:

RemoteNode.start

  log.info("Starting registry actor at %s:%s" format(RemoteServer.HOSTNAME, RemoteServer.PORT))
  val registryActor = actorOf(new RegistryActor 
              with StartupActorRefsDistribution
              with InlifeActorRefsDistribution).start

  RegistryActorUtil.initialize

  (1 to 3).foreach(_ => actorOf[SimpleActor].start)

This node knows nothing about the infrastructure of cluster in future, and at the moment it only runs remote API to the registry – RegistryActor. Say, we want to use actors (3 instances of `SimpleActor`) running on the node #1, to share the load on `SimpleActor` actors running on the node #2. Node #2 has the same definition as node #1 (for the only difference that node#1 is explicitly configured as a neighboring host).

Let’s see, if the messages sent to the balancer are distributed between local and remote actors:

doBeforeSpec{
    (1 to 3).foreach(_ => actorOf[SimpleActor].start)
    actorOf(new SimpleTypedBalancer[SimpleActor]).start
  }

  "Messages sent to the balancer should be distributed across local and remote workers" in {

    val balancer = ActorRegistry.filter(actor =>
      Class.forName(actor.actorClassName)
      .isAssignableFrom(classOf[SimpleTypedBalancer[SimpleActor]])
    ).head

    log.info("========SENDING MESSAGES TO BALANCER=========")
    val start = System.currentTimeMillis
    val futures = (1 to 30).map(i => balancer !!! "" + i).toList
    log.info("All messages are disaptched...")
    Futures.awaitAll(futures)
    val processedByWorkers = futures.flatMap(future => future.result).toSet.size
    log.info("Process time by %s workers: %s" format(processedByWorkers, System.currentTimeMillis - start))
    processedByWorkers must beGreaterThan(3)
  }

The test runs fine. which means that more than 3 actors running locally were involved, and therefore, remote actors registered locally were used:

[INFO] [2010-11-23 23:23:18,687] [Thread-18] s.s.a.a.Actor$: Adding RegistryActor as listener to local actor registry
[INFO] [2010-11-23 23:23:18,703] [Thread-18] s.s.a.a.Actor$: Making RegistryActor remote...
[INFO] [2010-11-23 23:23:18,703] [Thread-18] s.s.a.a.Actor$: Adding link to a neighbouring host...
[INFO] [2010-11-23 23:23:18,859] [Thread-18] s.s.a.a.Actor$: ========SENDING MESSAGES TO BALANCER=========
[INFO] [2010-11-23 23:23:18,890] [Thread-18] s.s.a.a.Actor$: All messages are disaptched...
[INFO] [2010-11-23 23:23:19,656] [akka:event-driven:dispatcher:global-1] s.s.a.r.RemoteClient: Starting remote client co
nnection to [localhost:9999]
[INFO] [2010-11-23 23:23:24,906] [Thread-18] s.s.a.a.Actor$: Process time by 6 workers: 6031
[INFO] [2010-11-23 23:23:24,968] [Thread-18] s.s.a.a.Actor$: ====SHUTTING DOWN ACTOR REGISTRY====
[info]   + Messages sent to the balancer should be distributed across local and remote workers
[info] == com.vasilrem.akka.easyscale.behavior.RemoteTypedBalancerSpec ==

Reinventing the wheel

As it was once mentioned in the Akka mail lists, one day ActorRegistry will have remote interface out-of-box. Until that time, you’ll have to end up with your own solution, or use experimental support of JCluster that generally targets the same problem, but uses a different approach.

The code of the RegistryActor is available at GitHub. It will change over time, when I’ll be using it in production.

Reference: Even simpler scalability with Akka through RegistryActor from our JCG partner Vasil Remeniuk at the Vasil Remeniuk blog

Related Articles :

Subscribe
Notify of
guest

This site uses Akismet to reduce spam. Learn how your comment data is processed.

0 Comments
Inline Feedbacks
View all comments
Back to top button