RxJava: From Future to Observable

I first came across Reactive Extensions about 4 years ago on Matthew Podwysocki’s blog but then haven’t heard much about it until I saw Matthew give a talk at Code Mesh a few weeks ago.

It seems to have grown in popularity recently and I noticed that’s there’s now a Java version called RxJava written by Netflix.

I thought I’d give it a try by changing some code I wrote while exploring cypher’s MERGE function to expose an Observable instead of Futures.

To recap, we have 50 threads and we do 100 iterations where we create random (user, event) pairs. We create a maximum of 10 users and 50 events and the goal is to concurrently send requests for the same pairs.

In the example of my other post I was throwing away the result of each query whereas here I returned the result back so I had something to subscribe to.

The outline of the code looks like this:

public class MergeTimeRx
    public static void main( final String[] args ) throws InterruptedException, IOException
        String pathToDb = "/tmp/foo";
        FileUtils.deleteRecursively( new File( pathToDb ) );

        GraphDatabaseService db = new GraphDatabaseFactory().newEmbeddedDatabase( pathToDb );
        final ExecutionEngine engine = new ExecutionEngine( db );

        int numberOfThreads = 50;
        int numberOfUsers = 10;
        int numberOfEvents = 50;
        int iterations = 100;

        Observable<ExecutionResult> events = processEvents( engine, numberOfUsers, numberOfEvents, numberOfThreads, iterations );

        events.subscribe( new Action1<ExecutionResult>()
            public void call( ExecutionResult result )
                for ( Map<String, Object> row : result )
        } );



The nice thing about using RxJava is that there’s no mention of how we got our collection of ExecutionResults, it’s not important. We just have a stream of them and by calling the subscribe function on the Observable we’ll be informed whenever another one is made available.

Most of the examples I found show how to generate events from a single thread but I wanted to use a thread pool so that I could fire off lots of requests at the same time. The processEvents method ended up looking like this:

private static Observable<ExecutionResult> processEvents( final ExecutionEngine engine, final int numberOfUsers, final int numberOfEvents, final int numberOfThreads, final int iterations )
        final Random random = new Random();
        final List<Integer> userIds = generateIds( numberOfUsers );
        final List<Integer> eventIds = generateIds( numberOfEvents );

        return Observable.create( new Observable.OnSubscribeFunc<ExecutionResult>()
            public Subscription onSubscribe( final Observer<? super ExecutionResult> observer )
                final ExecutorService executor = Executors.newFixedThreadPool( numberOfThreads );

                List<Future<ExecutionResult>> jobs = new ArrayList<>();
                for ( int i = 0; i < iterations; i++ )
                    Future<ExecutionResult> job = executor.submit( new Callable<ExecutionResult>()
                        public ExecutionResult call()
                            Integer userId = userIds.get( random.nextInt( numberOfUsers ) );
                            Integer eventId = eventIds.get( random.nextInt( numberOfEvents ) );

                            return engine.execute(
                                    "MERGE (u:User {id: {userId}})\n" +
                                    "MERGE (e:Event {id: {eventId}})\n" +
                                    "MERGE (u)-[:HAS_EVENT]->(e)\n" +
                                    "RETURN u, e",
                                    MapUtil.map( "userId", userId, "eventId", eventId ) );
                    } );
                    jobs.add( job );

                for ( Future<ExecutionResult> future : jobs )
                        observer.onNext( future.get() );
                    catch ( InterruptedException | ExecutionException ignored )


                return Subscriptions.empty();
        } );

I’m not sure if that’s the correct way of using Observables so please let me know in the comments if I’ve got it wrong.

I wasn’t sure what the proper way of handling errors was. I initially had a call to observer#onError in the catch block but that means that no further events are produced which wasn’t what I wanted.

The code is available as a gist if you want to play around with it. I added the following dependency to get the RxJava library:



Reference: RxJava: From Future to Observable from our JCG partner Mark Needham at the Mark Needham Blog blog.
Related Whitepaper:

Bulletproof Java Code: A Practical Strategy for Developing Functional, Reliable, and Secure Java Code

Use Java? If you do, you know that Java software can be used to drive application logic of Web services or Web applications. Perhaps you use it for desktop applications? Or, embedded devices? Whatever your use of Java code, functional errors are the enemy!

To combat this enemy, your team might already perform functional testing. Even so, you're taking significant risks if you have not yet implemented a comprehensive team-wide quality management strategy. Such a strategy alleviates reliability, security, and performance problems to ensure that your code is free of functionality errors.Read this article to learn about this simple four-step strategy that is proven to make Java code more reliable, more secure, and easier to maintain.

Get it Now!  

One Response to "RxJava: From Future to Observable"

  1. kamiseq says:

    you can call onError method of the observer and then use onErrorResumeNext – https://github.com/Netflix/RxJava/wiki/Error-Handling-Operators. so you dont have to care about error handling on producer side.

    the other comment is that you are adding events synchronously (you are blocking on Future.get()) but I guess it would be better to pass observer to executor#call itself

Leave a Reply

one × 6 =

Java Code Geeks and all content copyright © 2010-2014, Exelixis Media Ltd | Terms of Use | Privacy Policy
All trademarks and registered trademarks appearing on Java Code Geeks are the property of their respective owners.
Java is a trademark or registered trademark of Oracle Corporation in the United States and other countries.
Java Code Geeks is not connected to Oracle Corporation and is not sponsored by Oracle Corporation.
Do you want to know how to develop your skillset and become a ...
Java Rockstar?

Subscribe to our newsletter to start Rocking right now!

To get you started we give you two of our best selling eBooks for FREE!

Get ready to Rock!
You can download the complementary eBooks using the links below: