Scala

Spark Streaming Testing with Scala Example

Spark Streaming Testing

How do you create and automate tests of Spark Streaming applications?  In this post, we’ll show an example of one way in Scala.  This post is heavy on code examples and has the added bonus of using a code coverage plugin.

Are the tests in this tutorial examples unit tests?  Or, are they integration tests?  Functional tests?   I don’t know, you tell me in the comments below if you have an opinion.  If I had to choose, I’d say unit tests because we are stubbing the streaming provider.

Pre-requisites

As I’m sure you can guess, you will need some Spark Streaming Scala code to test.  We’re going to use our Spark Streaming example from Slack code in this post.  So, check that out first if you need some streaming scala code to use.  It’s not required to use that code though.  You should be able to get the concepts presented and apply to your own code if desired.  All the testing code and Spark streaming example code is available to pull from Github anyhow.

We’re going to use sbt to build and run tests and create coverage reports.  So, if you are not using sbt please translate to your build tool accordingly.

Overview

In order to write automated tests for Spark Streaming, we’re going to use a third party library called scalatest.  Also, we’re going to add an sbt plugin called “sbt-coverage”.  Then, with these tools in hand, we can write some Scala test code and create test coverage reports.

Steps

  1. Pull Spark Streaming code example from github
  2. Describe Updates to build.sbt
  3. Create project/plugins.sbt
  4. Write Scala code
  5. Execute tests and coverage reports

Pull Spark Streaming Code Example from Github

If you don’t want to copy-and-paste code, you can pull it from github.  Just pull the spark-course repo from https://github.com/tmcgrath/spark-course and the project we are working from is in the spark-streaming-tests directory.

Updates to Previous build.sbt

build.sbt should be updated to include a new command alias as well as the scalatest 3rd party lib as seen below:

Spark Streaming tests build.sbt update

scalaVersion := "2.11.8"
  
  +addCommandAlias("sanity", ";clean ;compile ;coverage ;test; coverageReport")
  
  resolvers += "jitpack" at "https://jitpack.io"
 @@ -19,5 +21,6 @@ libraryDependencies ++= Seq(
  // comment above line and uncomment the following to run in sbt
  // "org.apache.spark" %% "spark-streaming" % "1.6.1",
    "org.scalaj" %% "scalaj-http" % "2.3.0",
 -  "org.jfarcand" % "wcs" % "1.5" 
 +  "org.jfarcand" % "wcs" % "1.5",
 +  "org.scalatest" %% "scalatest" % "2.2.6" % "test"
  )

Notice how we add “test” to the end of the libraryDependencies sequence to indicate the library is only needed for tests.

Create project/plugins.sbt

Add a new line for the sbt-coverage plugin as seen here:

SBT code coverage plugin

addSbtPlugin("org.scoverage" % "sbt-scoverage" % "1.3.5")

Write Scala Tests

Actually, before we write the actual tests, we’re going to update our previous SlackStreamingApp’s main method to facilitate automated tests.  I know, I know, if we would have written SlackStreamingApp with TDD, then we wouldn’t have to do this, right?

Anyhow, it’s not a huge change.

Spark Streaming Scala tests

object SlackStreamingApp {
 - 
 +
    def main(args: Array[String]) {
      val conf = new SparkConf().setMaster(args(0)).setAppName("SlackStreaming")
      val ssc = new StreamingContext(conf, Seconds(5))
      val stream = ssc.receiverStream(new SlackReceiver(args(1)))
      stream.print()
 -    if (args.length > 2) {
 -      stream.saveAsTextFiles(args(2))
 -    }
 +
 +    processStream(args, stream)
 +
      ssc.start()
      ssc.awaitTermination()
    }
 - 
 +
 +  def processStream(args: Array[String], stream: DStream[String]): Unit = {
 +    args match {
 +      case Array(_, _, path, _*) => stream.saveAsTextFiles(args(2))
 +      case _ => return
 +    }
 +
 +
 +  }
 +

As you can hopefully see, we just needed to extract the code looking for a command-line arg into a new function called processStream.  Also, we need to add one more line to the imports at the top

Spark Scala DStream import

import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}

Next, we write the testing code.  To start, we need to create new directories to store the test code.  Create src/test/scala/com/supergloo directories.  Next, we add test code to this directory by creating the following Scala file: src/test/scala/com/supergloo/SlackStreamingTest.scala

Spark Streaming Scalatest

package com.supergloo
 
import com.supergloo.SlackStreamingApp._
import org.apache.hadoop.mapred.InvalidInputException
import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.{ClockWrapper, Seconds, StreamingContext}
import org.scalatest.concurrent.Eventually
import org.scalatest.{BeforeAndAfter, FlatSpec, Matchers}
 
import scala.collection.mutable
import scala.concurrent.duration._
import scala.language.postfixOps
import scala.reflect.io.Path
import scala.util.Try
 
class SlackStreamingTest extends FlatSpec with Matchers with Eventually with BeforeAndAfter {
 
  private val master = "local[1]"
  private val appName = "spark-streaming-test"
  private val filePath: String = "target/testfile"
 
  private var ssc: StreamingContext = _
 
  private val batchDuration = Seconds(1)
 
  var clock: ClockWrapper = _
 
  before {
    val conf = new SparkConf()
      .setMaster(master).setAppName(appName)
      .set("spark.streaming.clock", "org.apache.spark.streaming.util.ManualClock")
 
    ssc = new StreamingContext(conf, batchDuration)
    clock = new ClockWrapper(ssc)
  }
 
  after {
    if (ssc != null) {
      ssc.stop()
    }
    Try(Path(filePath + "-1000").deleteRecursively)
  }
 
  "Slack Streaming App " should " store streams into a file" in {
    val lines = mutable.Queue[RDD[String]]()
    val dstream = ssc.queueStream(lines)
 
    dstream.print()
    processStream(Array("", "", filePath), dstream)
 
 
    ssc.start()
 
    lines += ssc.sparkContext.makeRDD(Seq("b", "c"))
    clock.advance(1000)
 
    eventually(timeout(2 seconds)){
      val wFile: RDD[String] = ssc.sparkContext.textFile(filePath+ "-1000")
      wFile.count() should be (2)
      wFile.collect().foreach(println)
    }
 
  }
 
  "Slack Streaming App " should " store empty streams if no data received" in {
    val lines = mutable.Queue[RDD[String]]()
    val dstream = ssc.queueStream(lines)
 
    dstream.print()
    processStream(Array("", "", filePath), dstream)
 
 
    ssc.start()
 
    clock.advance(1000)
 
    eventually(timeout(1 seconds)){
      val wFile: RDD[String] = ssc.sparkContext.textFile(filePath+ "-1000")
      wFile.count() should be (0)
      wFile.collect().foreach(println)
    }
 
  }
 
  "Slack Streaming App " should " not store streams if argument is not passed" in {
    val lines = mutable.Queue[RDD[String]]()
    val dstream = ssc.queueStream(lines)
 
    dstream.print()
    processStream(Array("", ""), dstream)
 
    val wFile: RDD[String] = ssc.sparkContext.textFile(filePath+ "-1000")
 
    ssc.start()
 
    lines += ssc.sparkContext.makeRDD(Seq("b", "c"))
    clock.advance(2000)
 
    eventually(timeout(3 seconds)){
      a [InvalidInputException] should be thrownBy {
        wFile.count() should be (0)
      }
    }
  }
}

Next, we need to create addition directories and add ClockWrapper.scala to src/test/scala/org/apache/spark/streaming/.  More on this class later.

Spark Streaming ClockWrapper

 
package org.apache.spark.streaming
 
import org.apache.spark.util.ManualClock
 
/**
  * This class is defined in this package as the ManualClock is
  * private in the "spark" package
  */
class ClockWrapper(ssc: StreamingContext) {
 
  def getTimeMillis(): Long = manualClock().getTimeMillis()
 
  def setTime(timeToSet: Long) = manualClock().setTime(timeToSet)
 
  def advance(timeToAdd: Long) = manualClock().advance(timeToAdd)
 
  def waitTillTime(targetTime: Long): Long = manualClock().waitTillTime(targetTime)
 
  private def manualClock(): ManualClock = {
    ssc.scheduler.clock.asInstanceOf[ManualClock]
  }
}

(By the way, ClockWrapper is taken from an approach I saw on Spark unit testing.  See “Additional Resouces” section below for link.)

Ok, we’re ready to execute now.

Execute Scala tests and coverage reports

In the spark-streaming-tests directory, we can now issue sbt sanity from command-line.  You should see all three tests pass:

Spark Streaming Scalatest results

 
[info] SlackStreamingTest:
[info] Slack Streaming App 
[info] - should store streams into a file
[info] Slack Streaming App 
[info] - should store empty streams if no data received
[info] Slack Streaming App 
[info] - should not store streams if argument is not passed
[info] Run completed in 4 seconds, 436 milliseconds.
[info] Total number of tests run: 3
[info] Suites: completed 1, aborted 0
[info] Tests: succeeded 3, failed 0, canceled 0, ignored 0, pending 0
[info] All tests passed.

To review coverage reports, simply open target/scala-2.11/scoverage-report/index.html in a browser.

Conclusion

Hopefully, this Spark Streaming unit test example helps start your Spark Streaming testing approach.  We covered a code example, how to run and viewing the test coverage results.  If you have any questions or comments, let me know.  Also, subscribe to the Supergloo YouTube channel for an upcoming screencast from this post.

Additional Resources

Featured image credit https://flic.kr/p/dgSbYM

Reference: Spark Streaming Testing with Scala Example from our JCG partner Todd McGrath at the Supergloo blog.

Todd McGrath

Todd is a consultant in data engineering and software development using Scala, Apache Spark, Scala, Groovy, Python, relational, columnar and noSQL databases. He is a 20-year software veteran and founder of supergloo, inc.
Subscribe
Notify of
guest

This site uses Akismet to reduce spam. Learn how your comment data is processed.

0 Comments
Inline Feedbacks
View all comments
Back to top button