Core Java

Use reactive streams API to combine akka-streams with rxJava

Just a quick article this time, since I’m still experimenting with this stuff. There is a lot of talk around reactive programming. In Java 8 we’ve got the Stream API, we got rxJava we got ratpack and Akka has got akka-streams.

The main issue with these implementations is that they aren’t compatible. You can’t connect the subscriber of one implementation to the publisher of another. Luckily an initiative has started to provide a way that these different implementations can work together:

 
 
 

“It is the intention of this specification to allow the creation of many conforming implementations, which by virtue of abiding by the rules will be able to interoperate smoothly, preserving the aforementioned benefits and characteristics across the whole processing graph of a stream application.”

From – http://www.reactive-streams.org/

How does this work

Now how do we do this? Lets look at a quick example based on the akka-stream provided examples (from here). In the following listing:

package sample.stream
 
import akka.actor.ActorSystem
import akka.stream.FlowMaterializer
import akka.stream.scaladsl.{SubscriberSink, PublisherSource, Source}
import com.google.common.collect.{DiscreteDomain, ContiguousSet}
import rx.RxReactiveStreams
import rx.Observable;
import scala.collection.JavaConverters._
 
object BasicTransformation {
 
  def main(args: Array[String]): Unit = {
 
    // define an implicit actorsystem and import the implicit dispatcher
    implicit val system = ActorSystem("Sys")
    import system.dispatcher
 
    // flow materializer determines how the stream is realized.
    // this time as a flow between actors.
    implicit val materializer = FlowMaterializer()
 
    // input text for the stream.
    val text =
      """|Lorem Ipsum is simply dummy text of the printing and typesetting industry.
         |Lorem Ipsum has been the industry's standard dummy text ever since the 1500s, 
         |when an unknown printer took a galley of type and scrambled it to make a type 
         |specimen book.""".stripMargin
 
    // create an observable from a simple list (this is in rxjava style)
    val first = Observable.from(text.split("\\s").toList.asJava);
    // convert the rxJava observable to a publisher
    val publisher = RxReactiveStreams.toPublisher(first);
    // based on the publisher create an akka source
    val source = PublisherSource(publisher);
 
    // now use the akka style syntax to stream the data from the source
    // to the sink (in this case this is println)
    source.
      map(_.toUpperCase).                 // executed as actors
      filter(_.length > 3).
      foreach { el =>                     // the sink/consumer
        println(el)
      }.
      onComplete(_ => system.shutdown())  // lifecycle event
  }
}

The code comments in this example explain pretty much what is happening. What we do here is we create a rxJava based Observable. Convert this Observable to a “reactive streams” publisher and use this publisher to create an akka-streams source. For the rest of the code we can use the akka-stream style flow API to model the stream. In this case we just do some filtering and print out the result.

Subscribe
Notify of
guest

This site uses Akismet to reduce spam. Learn how your comment data is processed.

0 Comments
Inline Feedbacks
View all comments
Back to top button