Scala

Building a REST service in Scala with Akka HTTP, Akka Streams and reactive mongo

At the end of last year I wrote a couple of articles that showed how you can use Spray.io to create a Scala based REST service () and how to create a websocket server with Scala, Akka and reactivemongo (). I wanted to explore the REST server part a bit more, but found out that at the end of 2013 Spray.io was acquired by typesafe, and would be integrated with the Akka stack. So in this article we’ll look at how you can use the Akka HTTP functionality to create a simple web server, and in a follow up we’ll look at how the routing from Spray.io was ported to Akka.

In this article we’ll take the following steps:
 
 

  • Get some dummy data into mongoDB for testing.
  • Create a server using Akka Http that uses a simple asynchronous handler to process requests.
  • Create a server which uses a custom flow graph to process incoming requests.
  • Test both of these servers with a http client also created with Akka-Http.

So lets start with some preparation work and get some data into mongoDB for us to work with.

Loading data into mongoDB

For this example we use some stock related information which you can download from here (http://jsonstudio.com/wp-content/uploads/2014/02/stocks.zip). You can easily do this by executing the following steps:

First get the data:

wget http://jsonstudio.com/wp-content/uploads/2014/02/stocks.zip

Start mongodb in a different terminal

mongod --dbpath ./data/

And finally use mongoimport to import the data

unzip -c stocks.zip | mongoimport --db akka --collection stocks

And as a quick check run a query to see if everything works:

jos@Joss-MacBook-Pro.local:~$ mongo akka      
MongoDB shell version: 2.4.8
connecting to: akka
> db.stocks.findOne({},{Company: 1, Country: 1, Ticker:1 } )
{
        "_id" : ObjectId("52853800bb1177ca391c17ff"),
        "Ticker" : "A",
        "Country" : "USA",
        "Company" : "Agilent Technologies Inc."
}
>

At this point we have our test data and can look at the code required to run a server.

Create a server which uses a simple asynchronous handler to process requests

To work with Akka Http and access the data in mongo we’re going to need some additional libraries. So before we do anything else, lets first look at the sbt build file we’ve used for this article:

import com.typesafe.sbt.SbtAspectj._

name := "http-akka"

version := "1.0"

scalaVersion := "2.11.5"

libraryDependencies ++= Seq(
  "com.typesafe.akka" %% "akka-http-core-experimental" % "1.0-M2",
  "org.reactivemongo" %% "reactivemongo" % "0.10.5.0.akka23",
  "org.reactivemongo" %% "play2-reactivemongo" % "0.10.5.0.akka23",
  "com.typesafe.play" % "play-json_2.11" % "2.4.0-M2",
  "ch.qos.logback" % "logback-classic" % "1.1.2"
)

resolvers += "Sonatype Snapshots" at "https://oss.sonatype.org/content/repositories/snapshots/"

resolvers += "Typesafe" at "https://repo.typesafe.com/typesafe/releases/"

mainClass in (Compile, run) := Some("Boot")

When you look through the dependencies you’ll see the usual suspects:

  • akka-http-core-experimental contains all the http server and client stuff we’re going to use. This library depends on akka-stream so we’ll also get that library on our class path.
  • reactiemongo allows us to connect to mongo in a reactive way.
  • I’ve also included play2-reactivemongo and play-json which makes converting the BSON returned from mongo to JSON a lot easier.
  • Finally, for logging we add logback.

Now before we look at the code required to run the server, lets quickly look at how we’ll query mongo. For this we’ve created a simple helper object creatively named Database:

import reactivemongo.api._
import reactivemongo.api.collections.default.BSONCollection
import reactivemongo.bson.BSONDocument
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.Future

object Database {

  val collection = connect()


  def connect(): BSONCollection = {

    val driver = new MongoDriver
    val connection = driver.connection(List("localhost"))

    val db = connection("akka")
    db.collection("stocks")
  }

  def findAllTickers(): Future[List[BSONDocument]] = {
    val query = BSONDocument()
    val filter = BSONDocument("Company" -> 1, "Country" -> 1, "Ticker" -> 1)

    // which results in a Future[List[BSONDocument]]
    Database.collection
      .find(query, filter)
      .cursor[BSONDocument]
      .collect[List]()
  }

  def findTicker(ticker: String) : Future[Option[BSONDocument]] = {
    val query = BSONDocument("Ticker" -> ticker)

    Database.collection
      .find(query)
      .one
  }

}

Not that much to explain. The most important thing to notice here is that both the find functions return a future, so calls to these functions won’t block. Now that we’ve got the basics out of the way, lets look at the code for the first http server which uses an asynchronous handler.

/**
 * Simple Object that starts an HTTP server using akka-http. All requests are handled
 * through an Akka flow.
 */
object Boot extends App {

  // the actor system to use. Required for flowmaterializer and HTTP.
  // passed in implicit
  implicit val system = ActorSystem("Streams")
  implicit val materializer = FlowMaterializer()

  // start the server on the specified interface and port.
  val serverBinding2 = Http().bind(interface = "localhost", port = 8091)
  serverBinding2.connections.foreach { connection =>
    connection.handleWith(Flow[HttpRequest].mapAsync(asyncHandler))
   }
 } 

In this piece of code we create a http server which listens on port 8091. We process each connection that is made with an asyncHandler. This handler should return a Future[HttpResponse]. Lets look at this handler next:


  // With an async handler, we use futures. Threads aren't blocked.
  def asyncHandler(request: HttpRequest): Future[HttpResponse] = {

    // we match the request, and some simple path checking
    request match {

      // match specific path. Returns all the avaiable tickers
      case HttpRequest(GET, Uri.Path("/getAllTickers"), _, _, _) => {

        // make a db call, which returns a future.
        // use for comprehension to flatmap this into
        // a Future[HttpResponse]
        for {
          input <- Database.findAllTickers
        } yield {
          HttpResponse(entity = convertToString(input))
        }
      }

      // match GET pat. Return a single ticker
      case HttpRequest(GET, Uri.Path("/get"), _, _, _) => {

        // next we match on the query paramter
        request.uri.query.get("ticker") match {

            // if we find the query parameter
            case Some(queryParameter) => {

              // query the database
              val ticker = Database.findTicker(queryParameter)

              // use a simple for comprehension, to make
              // working with futures easier.
              for {
                t <- ticker
              } yield  {
                t match {
                  case Some(bson) => HttpResponse(entity = convertToString(bson))
                  case None => HttpResponse(status = StatusCodes.OK)
                }
              }
            }

            // if the query parameter isn't there
            case None => Future(HttpResponse(status = StatusCodes.OK))
          }
      }

      // Simple case that matches everything, just return a not found
      case HttpRequest(_, _, _, _, _) => {
        Future[HttpResponse] {
          HttpResponse(status = StatusCodes.NotFound)
        }
      }
    }
  }

As you can see from this code the handler code is pretty straightforward. We use pattern matching to match a specific url and use the Database object we saw earlier to query mongo. Note the calls to convertToString. These are a couple of helper methods that convert BSON to JSON using the play libraries we included earlier:

 def convertToString(input: List[BSONDocument]) : String = {
    input
      .map(f => convertToString(f))
      .mkString("[", ",", "]")
  }

  def convertToString(input: BSONDocument) : String = {
    Json.stringify(BSONFormats.toJSON(input))
  }

When we start this server and open the adres in the browser, we’ll see something like this:

akka-stream-1.png

Easy right? Now lets look at a bit more advanced scenario.

Create a server which uses a custom flow graph to process incoming requests.

Akka-http internally uses akka-streams to handle http connections. This means that we can use akka-streams to easily handle http requests in a reactive manner. For a linear flow we can use the standard flow api provided by akka. For more advanced graphs akka-streams provides it’s own DSL, with which you can very easily create more complex graphs where stream events are processed in parallel.

Lets create a new serverbinding that listens on port 8090:

object Boot extends App {

  // the actor system to use. Required for flowmaterializer and HTTP.
  // passed in implicit
  implicit val system = ActorSystem("Streams")
  implicit val materializer = FlowMaterializer()

  // start the server on the specified interface and port.
  val serverBinding1 = Http().bind(interface = "localhost", port = 8090)

  serverBinding1.connections.foreach { connection =>
    connection.handleWith(broadCastMergeFlow)
   }
 } 

This serverbinding is created in the same manner as we did earlier. The main difference is that this this time we don’t pass the processing of the request onto a handler, but we specify an instance of a flow with the name broadCastMergeFlow. This broadcast merge flow looks like this:

 val bCast = Broadcast[HttpRequest]
 
// some basic steps that each retrieve a different ticket value (as a future)
val step1 = Flow[HttpRequest].mapAsync[String](getTickerHandler("GOOG"))
val step2 = Flow[HttpRequest].mapAsync[String](getTickerHandler("AAPL"))
val step3 = Flow[HttpRequest].mapAsync[String](getTickerHandler("MSFT"))
 
// We'll use the source and output provided by the http endpoint
val in = UndefinedSource[HttpRequest]
val out = UndefinedSink[HttpResponse] 
 // when an element is available on one of the inputs, take
// that one, igore the rest
val merge = Merge[String]
// since merge doesn't output a HttpResponse add an additional map step.
val mapToResponse = Flow[String].map[HttpResponse](
(inp:String) => HttpResponse(status = StatusCodes.OK, entity = inp)
) 


  // define another flow. This uses the merge function which
  // takes the first available response
  val broadCastMergeFlow = Flow[HttpRequest, HttpResponse]() {
    implicit builder =>

            bCast ~> step1 ~> merge
      in ~> bCast ~> step2 ~> merge ~> mapToResponse ~> out
            bCast ~> step3 ~> merge

      (in, out)
  }

The most important part are the last couple of lines in this code fragment. Here we draw a graph that defines how a message is handled when it is processed by the server. In this case we first broadcast the incoming http request to three parallel streams. In each stream we next make a call to our database to get a ticket. Next we merge the results back together (a merge takes the first available upstream even) and create a response. So depending which of the steps is the fastest we return a ticker either for GOOG, AAPL or MSFT. To see the result better we added a sleep to the getTickerHandler:


  def getTickerHandler(tickName: String)(request: HttpRequest): Future[String] = {
    // query the database
    val ticker = Database.findTicker(tickName)

    Thread.sleep(Math.random() * 1000 toInt)

    // use a simple for comprehension, to make
    // working with futures easier.
    for {
      t <- ticker
    } yield  {
      t match {
        case Some(bson) => convertToString(bson)
        case None => ""
      }
    }
  }

Neat right! Akka-streams provides a number of basic building blocks you can use to create these flows (for more info see their documentation: http://doc.akka.io/docs/akka-stream-and-http-experimental/1.0-M2/scala/s…). For instance if we want to zip the responses of the steps together we could create a flow like this:


  // waits for events on the three inputs and returns a response
  val zip = ZipWith[String, String, String, HttpResponse] (
    (inp1, inp2, inp3) => new HttpResponse(status = StatusCodes.OK,entity = inp1 + inp2 + inp3)


  // define a flow which broadcasts the request to the three
  // steps, and uses the zipWith to combine the elements before
  val broadCastZipFlow = Flow[HttpRequest, HttpResponse]() {
    implicit builder =>

            bCast ~> step1 ~> zip.input1
      in ~> bCast ~> step2 ~> zip.input2 ~> out
            bCast ~> step3 ~> zip.input3

      (in, out)
  }

I really like how this works and how easy it is to visualize data flowing through the different steps. If we use the merge approach you’ll see a result which looks something like this (when called 10 times):

{"_id":{"$oid":"52853804bb1177ca391c2221"},"Ticker":"GOOG","Profit Margin":0.217
{"_id":{"$oid":"52853804bb1177ca391c2221"},"Ticker":"GOOG","Profit Margin":0.217
{"_id":{"$oid":"52853800bb1177ca391c1809"},"Ticker":"AAPL","Profit Margin":0.217
{"_id":{"$oid":"52853807bb1177ca391c2781"},"Ticker":"MSFT","Profit Margin":0.282
{"_id":{"$oid":"52853804bb1177ca391c2221"},"Ticker":"GOOG","Profit Margin":0.217
{"_id":{"$oid":"52853800bb1177ca391c1809"},"Ticker":"AAPL","Profit Margin":0.217
{"_id":{"$oid":"52853807bb1177ca391c2781"},"Ticker":"MSFT","Profit Margin":0.282
{"_id":{"$oid":"52853804bb1177ca391c2221"},"Ticker":"GOOG","Profit Margin":0.217
{"_id":{"$oid":"52853800bb1177ca391c1809"},"Ticker":"AAPL","Profit Margin":0.217
{"_id":{"$oid":"52853807bb1177ca391c2781"},"Ticker":"MSFT","Profit Margin":0.282

The final part I’d want to show it how you can also use the same approach when you create a http client with akka-http.

Test both of these servers with a http client also created with Akka-Http

Akka-http also provides functionality to easy setup a http client that also uses a stream/flow based approach of message processing. The following listing shows the complete running client:

import akka.actor.ActorSystem
import akka.http.Http
import akka.stream.FlowMaterializer
import akka.http.model._
import akka.stream.scaladsl._
import akka.stream.scaladsl.Source
import akka.stream.scaladsl.FlowGraphImplicits._
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.Future

/**
 * Simple HTTP client created with akka-http
 */
object Client extends App {

  // the actor system to use. Required for flowmaterializer and HTTP.
  // passed in implicit
  implicit val system = ActorSystem("ServerTest")
  implicit val materializer = FlowMaterializer()

  val httpClient1 = Http(system).outgoingConnection("localhost", 8090).flow
  val httpClient2 = Http(system).outgoingConnection("localhost", 8091).flow

  // define a sink that will process the answer
  // we could also process this as a flow
  val printChunksConsumer = Sink.foreach[HttpResponse] { res =>
    if(res.status == StatusCodes.OK) {

      println("Recieved response : " + res);
      res.entity.getDataBytes().map {
        chunk =>
          System.out.println("Chunk: " + chunk.decodeString(HttpCharsets.`UTF-8`.value).substring(0, 80))
        }.to(Sink.ignore).run()
    } else
      println(res.status)
  }

  // we need to set allow cycles since internally the httpclient
  // has some cyclic flows (apparently)
  // we construct a sink, to which we connect a later to define source.
  val reqFlow2: Sink[HttpRequest] = Sink[HttpRequest]() { implicit b =>
    b.allowCycles()
    val source = UndefinedSource[HttpRequest]
    val bcast = Broadcast[HttpRequest]
    val concat = Concat[HttpResponse]

    // simple graph. Duplicate the request, send twice.
    // concat the result.
              bcast ~> httpClient1 ~> concat.first
    source ~> bcast ~> httpClient1 ~> concat.second ~> printChunksConsumer
    source
  }

  // make two calls, both return futures, first one shows direct linked sinks and
  // sources. Second one makes yse if our graph.

  // make number of calls
  val res = 1 to 5 map( i => {
    Source.single(HttpRequest()).to(reqFlow2).run().get(printChunksConsumer)
  })
  val f = Future.sequence(res)

  // make some calls with filled in request URI
  val f3 = Source.single(HttpRequest(uri = Uri("/getAllTickers"))).via(httpClient2).runWith(printChunksConsumer)
  val f4 = Source.single(HttpRequest(uri = Uri("/get?ticker=ADAT"))).via(httpClient2).runWith(printChunksConsumer)
  val f5 = Source.single(HttpRequest(uri = Uri("/get?tikcer=FNB"))).via(httpClient2).runWith(printChunksConsumer)

  for {
    f2Result <- f
    f2Result <- f3
    f2Result <- f4
    f2Result <- f5
  } yield ({
      println("All calls done")
      system.shutdown()
      system.awaitTermination()
    }
  )
}

I won’t go into detail here, since the code follows the same process as for the HTTP server. That’s it for this article and an introduction into akka-stream and akka-http. I really like their approach to message processing and creating readable, reactive code. In a future article we’ll look at some other aspects of akka-http (routes for instance).

Subscribe
Notify of
guest

This site uses Akismet to reduce spam. Learn how your comment data is processed.

0 Comments
Inline Feedbacks
View all comments
Back to top button