RxJava: From Future to Observable

I first came across Reactive Extensions about 4 years ago on Matthew Podwysocki’s blog but then haven’t heard much about it until I saw Matthew give a talk at Code Mesh a few weeks ago.

It seems to have grown in popularity recently and I noticed that’s there’s now a Java version called RxJava written by Netflix.

I thought I’d give it a try by changing some code I wrote while exploring cypher’s MERGE function to expose an Observable instead of Futures.

To recap, we have 50 threads and we do 100 iterations where we create random (user, event) pairs. We create a maximum of 10 users and 50 events and the goal is to concurrently send requests for the same pairs.

In the example of my other post I was throwing away the result of each query whereas here I returned the result back so I had something to subscribe to.

The outline of the code looks like this:

public class MergeTimeRx
{
    public static void main( final String[] args ) throws InterruptedException, IOException
    {
        String pathToDb = "/tmp/foo";
        FileUtils.deleteRecursively( new File( pathToDb ) );

        GraphDatabaseService db = new GraphDatabaseFactory().newEmbeddedDatabase( pathToDb );
        final ExecutionEngine engine = new ExecutionEngine( db );

        int numberOfThreads = 50;
        int numberOfUsers = 10;
        int numberOfEvents = 50;
        int iterations = 100;

        Observable<ExecutionResult> events = processEvents( engine, numberOfUsers, numberOfEvents, numberOfThreads, iterations );

        events.subscribe( new Action1<ExecutionResult>()
        {
            @Override
            public void call( ExecutionResult result )
            {
                for ( Map<String, Object> row : result )
                {
                }
            }
        } );

        ....
    }

}

The nice thing about using RxJava is that there’s no mention of how we got our collection of ExecutionResults, it’s not important. We just have a stream of them and by calling the subscribe function on the Observable we’ll be informed whenever another one is made available.

Most of the examples I found show how to generate events from a single thread but I wanted to use a thread pool so that I could fire off lots of requests at the same time. The processEvents method ended up looking like this:

private static Observable<ExecutionResult> processEvents( final ExecutionEngine engine, final int numberOfUsers, final int numberOfEvents, final int numberOfThreads, final int iterations )
    {
        final Random random = new Random();
        final List<Integer> userIds = generateIds( numberOfUsers );
        final List<Integer> eventIds = generateIds( numberOfEvents );

        return Observable.create( new Observable.OnSubscribeFunc<ExecutionResult>()
        {
            @Override
            public Subscription onSubscribe( final Observer<? super ExecutionResult> observer )
            {
                final ExecutorService executor = Executors.newFixedThreadPool( numberOfThreads );

                List<Future<ExecutionResult>> jobs = new ArrayList<>();
                for ( int i = 0; i < iterations; i++ )
                {
                    Future<ExecutionResult> job = executor.submit( new Callable<ExecutionResult>()
                    {
                        @Override
                        public ExecutionResult call()
                        {
                            Integer userId = userIds.get( random.nextInt( numberOfUsers ) );
                            Integer eventId = eventIds.get( random.nextInt( numberOfEvents ) );

                            return engine.execute(
                                    "MERGE (u:User {id: {userId}})\n" +
                                    "MERGE (e:Event {id: {eventId}})\n" +
                                    "MERGE (u)-[:HAS_EVENT]->(e)\n" +
                                    "RETURN u, e",
                                    MapUtil.map( "userId", userId, "eventId", eventId ) );
                        }
                    } );
                    jobs.add( job );
                }

                for ( Future<ExecutionResult> future : jobs )
                {
                    try
                    {
                        observer.onNext( future.get() );
                    }
                    catch ( InterruptedException | ExecutionException ignored )
                    {
                    }
                }

                observer.onCompleted();
                executor.shutdown();

                return Subscriptions.empty();
            }
        } );
    }

I’m not sure if that’s the correct way of using Observables so please let me know in the comments if I’ve got it wrong.

I wasn’t sure what the proper way of handling errors was. I initially had a call to observer#onError in the catch block but that means that no further events are produced which wasn’t what I wanted.

The code is available as a gist if you want to play around with it. I added the following dependency to get the RxJava library:

<dependency>
      <groupId>com.netflix.rxjava</groupId>
      <artifactId>rxjava-core</artifactId>
      <version>0.15.1</version>
    </dependency>

 

Reference: RxJava: From Future to Observable from our JCG partner Mark Needham at the Mark Needham Blog blog.
Related Whitepaper:

Bulletproof Java Code: A Practical Strategy for Developing Functional, Reliable, and Secure Java Code

Use Java? If you do, you know that Java software can be used to drive application logic of Web services or Web applications. Perhaps you use it for desktop applications? Or, embedded devices? Whatever your use of Java code, functional errors are the enemy!

To combat this enemy, your team might already perform functional testing. Even so, you're taking significant risks if you have not yet implemented a comprehensive team-wide quality management strategy. Such a strategy alleviates reliability, security, and performance problems to ensure that your code is free of functionality errors.Read this article to learn about this simple four-step strategy that is proven to make Java code more reliable, more secure, and easier to maintain.

Get it Now!  

Leave a Reply


3 − two =



Java Code Geeks and all content copyright © 2010-2014, Exelixis Media Ltd | Terms of Use
All trademarks and registered trademarks appearing on Java Code Geeks are the property of their respective owners.
Java is a trademark or registered trademark of Oracle Corporation in the United States and other countries.
Java Code Geeks is not connected to Oracle Corporation and is not sponsored by Oracle Corporation.

Sign up for our Newsletter

15,153 insiders are already enjoying weekly updates and complimentary whitepapers! Join them now to gain exclusive access to the latest news in the Java world, as well as insights about Android, Scala, Groovy and other related technologies.

As an extra bonus, by joining you will get our brand new e-books, published by Java Code Geeks and their JCG partners for your reading pleasure! Enter your info and stay on top of things,

  • Fresh trends
  • Cases and examples
  • Research and insights
  • Two complimentary e-books
Get tutored by the Geeks! JCG Academy is a fact... Join Now
Hello. Add your message here.