About Tomasz Nurkiewicz

Java EE developer, Scala enthusiast. Enjoying data analysis and visualization. Strongly believes in the power of testing and automation.

WatchService combined with Akka actors

WatchService is a handy class that can notify you about any file system changes (create/update/delete of file) in a given set of directories. It is described nicely in the official documentation so I won’t write another introduction tutorial. Instead we will try to combine it with Akka to provide fully asynchronous, non-blocking file system changes notification mechanism. And we will scale it both to multiple directories and multiple… servers! Just for starters here is a simple, self-descriptive example:
 
 
 
 
 

val watchService = FileSystems.getDefault.newWatchService()
Paths.get("/foo/bar").register(watchService, ENTRY_CREATE, ENTRY_DELETE)
 
while(true) {
    val key = watchService.take()
    key.pollEvents() foreach { event =>
        event.kind() match {
            case ENTRY_CREATE =>    //...
            case ENTRY_DELETE =>    //...
            case x =>
                logger.warn(s"Unknown event $x")
        }
    }
    key.reset()
}

I know java.nio stands for “New I/O” and not for “Non-blocking I/O” but one might expect such a class to work asynchronously. Instead we have to sacrifice one thread, use awkward while(true) loop and block on watchService.take(). Maybe that’s how the underlying operating systems works (luckily WatchService uses native OS API when available)? Doesn’t matter, we have to live with that. Fortunately one WatchService can monitor arbitrary number of paths, thus we need only one thread per whole application, not per directory. So, let’s wrap it up in a Runnable:

class WatchServiceTask2(notifyActor: ActorRef) extends Runnable with Logging {
    private val watchService = FileSystems.getDefault.newWatchService()
 
    def run() {
        try {
            while (!Thread.currentThread().isInterrupted) {
                val key = watchService.take()
                //coming soon...
                key.reset()
            }
        } catch {
            case e: InterruptedException =>
                logger.info("Interrupting, bye!")
        } finally {
            watchService.close()
        }
    }
}

This is the skeletal implementation of any Runnable that waits/blocks I want you to follow. Check Thread.isInterrupted() and escape main loop when InterruptedException occurs. This way you can later safely shut down your thread by calling Thread.interrupt() without any delay. Two things to notice: we require notifyActor reference in a constructor (will be needed later, I hope you know why) and we don’t monitor any directories, yet. Luckily we can add monitored directories at any time (but we can never remove them afterwards, API limitation?!) There is one issue, however: WatchService only monitors given directory, but not subdirectories (it is not recursive). Fortunately another new kid on the JDK block, Files.walkFileTree(), releases us from tedious recursive algorithm:

def watchRecursively(root: Path) {
    watch(root)
    Files.walkFileTree(root, new SimpleFileVisitor[Path] {
        override def preVisitDirectory(dir: Path, attrs: BasicFileAttributes) = {
            watch(dir)
            FileVisitResult.CONTINUE
        }
    })
}
 
private def watch(path: Path) =
    path.register(watchService, ENTRY_CREATE, ENTRY_DELETE)

See how nicely we can traverse the whole directory tree using flat FileVisitor? Now the last piece of the puzzle is the body of loop above (you will find full source code on GitHub):

key.pollEvents() foreach {
    event =>
        val relativePath = event.context().asInstanceOf[Path]
        val path = key.watchable().asInstanceOf[Path].resolve(relativePath)
        event.kind() match {
            case ENTRY_CREATE =>
                if (path.toFile.isDirectory) {
                    watchRecursively(path)
                }
                notifyActor ! Created(path.toFile)
            case ENTRY_DELETE =>
                notifyActor ! Deleted(path.toFile)
            case x =>
                logger.warn(s"Unknown event $x")
        }
}

When a new file system entry is created and it happens to be a directory, we start monitoring that directory as well. This way if, for example, we start monitoring /tmp, every single subdirectory is monitored as well, both existing during startup and newly created one. Message classes are pretty straightforward. You might argue that CreatedFile and CreatedDirectory separate classes might have been a better idea – depends on your use case, this was simpler from this article’s perspective:

sealed trait FileSystemChange
case class Created(fileOrDir: File) extends FileSystemChange
case class Deleted(fileOrDir: File) extends FileSystemChange
 
case class MonitorDir(path: Path)

The last MonitorDir message will be used in just a second. Let’s wrap our Runnable task and encapsulate it inside an actor. I know how bad it looks to start a thread inside Akka actor, but Java API forces us to do so and it will be our secret that never escapes that particular actor:

class FileSystemActor extends Actor {
    val log = Logging(context.system, this)
    val watchServiceTask = new WatchServiceTask(self)
    val watchThread = new Thread(watchServiceTask, "WatchService")
 
    override def preStart() {
        watchThread.setDaemon(true)
        watchThread.start()
    }
 
    override def postStop() {
        watchThread.interrupt()
    }
 
    def receive = LoggingReceive {
        case MonitorDir(path) =>
            watchServiceTask watchRecursively path
        case Created(file) => 
            //e.g. forward or broadcast to other actors
        case Deleted(fileOrDir) =>
    }
}

Few things to keep in mind: actor takes full responsibility of the "WatchService" thread lifecycle. Also see how it handles the MonitorDir message. However we don’t monitor any directory from the beginning. This is done outside:

val system = ActorSystem("WatchFsSystem")
val fsActor = system.actorOf(Props[FileSystemActor], "fileSystem")
fsActor ! MonitorDir(Paths get "/home/john")
//...
system.shutdown()

Obviously you can send any number of MonitorDir messages with different directories and all of them are monitored simultaneously – but you don’t have to monitor subdirectories, this is done for you. Creating and deleting new file to smoke test our solution and apparently it works:

received handled message MonitorDir(/home/john/tmp)
received handled message Created(/home/john/tmp/test.txt)
received handled message Deleted(/home/john/tmp/test.txt)

There is one interested piece of functionality we get for free. If we run this application in a cluster and configure one actor to only be created on one of the instances (see: Remote actors – discovering Akka for thorough example how to configure remote actors), we can easily aggregate file system changes from multiple servers! Simply lookup remote (“singleton” across cluster) aggregate actor in FileSystemActor and forward events to it. Aforementioned article explains very similar architecutre so I won’t go into too much detail. Enough to say, with this topology one can easily monitor multiple servers and collect change events on all of them.

So… we have a cool solution, let’s look for a problem. In a single-node setup FileSystemActor provides nice abstraction over blocking WatchService. Other actors interested in file system changes can register in FileSystemActor and respond quickly to changes. In multi-node, cluster setup it works pretty much the same, but now we can easily control several nodes. One idea would be to replicate files over nodes.
 

Reference: WatchService combined with Akka actors from our JCG partner Tomasz Nurkiewicz at the Java and neighbourhood blog.

Do you want to know how to develop your skillset to become a Java Rockstar?

Subscribe to our newsletter to start Rocking right now!

To get you started we give you two of our best selling eBooks for FREE!

JPA Mini Book

Learn how to leverage the power of JPA in order to create robust and flexible Java applications. With this Mini Book, you will get introduced to JPA and smoothly transition to more advanced concepts.

JVM Troubleshooting Guide

The Java virtual machine is really the foundation of any Java EE platform. Learn how to master it with this advanced guide!

Given email address is already subscribed, thank you!
Oops. Something went wrong. Please try again later.
Please provide a valid email address.
Thank you, your sign-up request was successful! Please check your e-mail inbox.
Please complete the CAPTCHA.
Please fill in the required fields.

3 Responses to "WatchService combined with Akka actors"

  1. Lloyd says:

    Hello there :) Thank you for the very informative post.

    Is there any reason why the watchThread needs to be a daemon thread? From what I understand, the main difference between a normal thread and a daemon thread is that a daemon thread is abandoned when the JVM exits – why would we want that in this case?

  2. Not really “abandoned”. The idea is that JVM terminates when all non-daemon threads terminate. In other words if only daemon threads left, JVM exits. I interrupt watchThread manually to make sure it’s cleaned up but just in case if something goes horribly wrong I don’t want this single thread to stop JVM termination.

Leave a Reply


+ four = 11



Java Code Geeks and all content copyright © 2010-2014, Exelixis Media Ltd | Terms of Use | Privacy Policy | Contact
All trademarks and registered trademarks appearing on Java Code Geeks are the property of their respective owners.
Java is a trademark or registered trademark of Oracle Corporation in the United States and other countries.
Java Code Geeks is not connected to Oracle Corporation and is not sponsored by Oracle Corporation.
Do you want to know how to develop your skillset and become a ...
Java Rockstar?

Subscribe to our newsletter to start Rocking right now!

To get you started we give you two of our best selling eBooks for FREE!

Get ready to Rock!
You can download the complementary eBooks using the links below:
Close