About Tomasz Nurkiewicz

Java EE developer, Scala enthusiast. Enjoying data analysis and visualization. Strongly believes in the power of testing and automation.

Turning Twitter4J into RxJava’s Observable

aTwitter4J is a Java wrapper around Twitter API. While Twitter supports simple request-response interactions in this article we will explore streaming APIs. In contrary to request-response model which is always initiated by the client, streaming API pushes data from Twitter server to the clients as soon as they are available. Of course in case of Twitter we are talking about tweets, called Status in the API.oslo-1

The question is, how would you design a Java API for streaming purposes? No surprise here: callbacks, callbacks everywhere!
 
 
 

import twitter4j.*;
 
TwitterStream twitter = new TwitterStreamFactory().getInstance();
twitter.addListener(new StatusAdapter() {
  public void onStatus(Status status) {
    System.out.println(status.getUser().getName() + " : " + status.getText());
  }
});
twitter.sample();

Say that on top of this API we would like to count how many messages we receive per second. A lot of accidental complexity sneaks in:

final AtomicInteger countPerSecond = new AtomicInteger();
 
twitter.addListener(new StatusAdapter() {
  public void onStatus(Status status) {
    countPerSecond.incrementAndGet();
  }
});
twitter.sample();
 
Executors.newScheduledThreadPool(1).scheduleAtFixedRate(new Runnable() {
  @Override
  public void run() {
    final int count = countPerSecond.getAndSet(0);
    log.debug("Tweets/second: {}", count);
  }
}, 1, 1, SECONDS);

We need a ScheduledExecutorService and be very careful about thread safety. Moreover this approach doesn’t scale as it requires hand-crafted code for every use case we can imagine, like throttling, combining or accumulating. It turns out that bridging Twitter4J streaming API (and virtually any callback-based API for that matter) to RxJava‘s Observable is quite straightforward and will greatly simplify further solutions.

Before we explore how to create new Observable representing stream of Twitter messages on top of Twitter4J API let’s assume that we already have one:

Observable<Status> twitter = twitterObservable();  //to be implemented

Observable<Status> twitter is a stream of Status objects where each such object is one tweet. How do we solve our initial problem of counting tweets per second (tps)?

Observable<Integer> tpsStream = twitter.
    buffer(1, TimeUnit.SECONDS).
    map(list -> list.size());

That was easy! We take initial stream of tweets and buffer them every second. When one second elapses only a single event is triggered containing a List<Status> produced within that time frame. Later on we transform List into Integer by taking its size(). And that’s it! tpsStream will yield one number per second representing count of tweets per second. If we suddenly realized that our system is overloaded by that number, we can easily sample the stream and pick just a subset of them. E.g. we want to get at most one tweet every 100 milliseconds:

twitter.sample(100, MILLISECONDS)

There are more than hundred operators available similar to buffer() and sample() but I hope you get the idea. Now that we see how useful an Observable<Status> is, let’s implement it. When defining Observable we need to supply two handlers: one describing what happens when client subscribes to a given Observable and optionally – how to handle unsubscribing:

public Observable<Status> twitterObservable() {
  return Observable.create(subscriber -> {
    final TwitterStream twitterStream = new TwitterStreamFactory().getInstance();
    twitterStream.addListener(new StatusAdapter() {
      public void onStatus(Status status) {
        subscriber.onNext(status);
      }
      public void onException(Exception ex) {
        subscriber.onError(ex);
      }
    });
    twitterStream.sample();
    return Subscriptions.create(() -> {
      twitterStream.cleanUp();
    });
  });
}

Quite a bit of code written in Java 8 (Scala and Groovy work equally well with RxJava). Callback provided to Observable.create() is executed every time someone subscribes to Observable. It turns out that all examples below never trigger this handler because RxJava is very lazy in nature, thus it won’t connect to Twitter unless absolutely required. For example twitter.filter(...) will return a new Observable with only a subset of tweets matching certain criteria. But as long as you don’t physically subscribe (using twitter.subscribe()) to that Observable, nothing will really happen. In example below the connection is postponed until we call subscribe(). After that text of each encountered tweet is extracted and if it contains #java hashtag – it will be printed. All of this happens asynchronously and the whole statement is non-blocking:

twitter.
  map(Status::getText).
  filter(text -> text.contains("#java")).
  subscribe(System.out::println);

The Subscriptions.create() also takes a handler – and as you can guess it tells what should happen when client is no longer interested in Observable<Status>.

Twitter4J is just an example how you can adapt callback-based API into an Observable. Other examples include incoming network packages, JMS messages or file system changes. In all cases the scenario is the same.
 

Reference: Turning Twitter4J into RxJava’s Observable from our JCG partner Tomasz Nurkiewicz at the Java and neighbourhood blog.
Related Whitepaper:

Functional Programming in Java: Harnessing the Power of Java 8 Lambda Expressions

Get ready to program in a whole new way!

Functional Programming in Java will help you quickly get on top of the new, essential Java 8 language features and the functional style that will change and improve your code. This short, targeted book will help you make the paradigm shift from the old imperative way to a less error-prone, more elegant, and concise coding style that’s also a breeze to parallelize. You’ll explore the syntax and semantics of lambda expressions, method and constructor references, and functional interfaces. You’ll design and write applications better using the new standards in Java 8 and the JDK.

Get it Now!  

Leave a Reply


× 8 = sixteen



Java Code Geeks and all content copyright © 2010-2014, Exelixis Media Ltd | Terms of Use | Privacy Policy
All trademarks and registered trademarks appearing on Java Code Geeks are the property of their respective owners.
Java is a trademark or registered trademark of Oracle Corporation in the United States and other countries.
Java Code Geeks is not connected to Oracle Corporation and is not sponsored by Oracle Corporation.

Sign up for our Newsletter

20,709 insiders are already enjoying weekly updates and complimentary whitepapers! Join them now to gain exclusive access to the latest news in the Java world, as well as insights about Android, Scala, Groovy and other related technologies.

As an extra bonus, by joining you will get our brand new e-books, published by Java Code Geeks and their JCG partners for your reading pleasure! Enter your info and stay on top of things,

  • Fresh trends
  • Cases and examples
  • Research and insights
  • Two complimentary e-books