Andrey Redko

About Andrey Redko

Andriy is a well-grounded software developer with more then 12 years of practical experience using Java/EE, C#/.NET, C++, Groovy, Ruby, functional programming (Scala), databases (MySQL, PostreSQL, Oracle) and NoSQL solutions (MongoDB, Redis).

Implementing Producer/Consumer using SynchronousQueue

Among plenty of useful classes which Java provides for concurrency support, there is one I would like to talk about: SynchronousQueue. In particular, I would like to walk through Producer / Consumer implementation using handy SynchronousQueue as an exchange mechanism.

It might not sound clear why to use this type of queue for producer / consumer communication unless we look under the hood of SynchronousQueue implementation. It turns out that it’s not really a queue as we used to think about queues. The analogy would be just a collection containing at most one element.
 
 
Why it’s useful? Well, there are several reasons. From producer’s point of view, only one element (or message) could be stored into the queue. In order to proceed with the next element (or message), the producer should wait till consumer consumes the one currently in the queue. From consumer’s point of view, it just polls the queue for next element (or message) available. Quite simple, but the great benefit is: producer cannot send messages faster than consumer can process them.

Here is one of the use cases I encountered recently: compare two database tables (possibly just huge) and detect if those contain different data or data is the same (copy). The SynchronousQueue is quite a handy tool for this problem: it allows to handle each table in own thread as well as compensate the possible timeouts / latency while reading from two different databases.

Let’s start by defining our compare function which accepts source and destination data sources as well as a table name (to compare). I am using quite useful JdbcTemplate class from Spring framework as it extremely well abstract all the boring details dealing with connections and prepared statements.

public boolean compare( final DataSource source, final DataSource destination, final String table )  {
    final JdbcTemplate from = new  JdbcTemplate( source );
    final JdbcTemplate to = new JdbcTemplate( destination );
}

Before doing any actual data comparison, it’s a good idea to compare table’s row count of the source and destination databases:

if( from.queryForLong('SELECT count(1) FROM ' + table ) != to.queryForLong('SELECT count(1) FROM ' + table ) ) {
    return false;
}

Now, at least knowing that table contains same number of rows in both databases, we can start with data comparison. The algorithm is very simple:

  • create a separate thread for source (producer) and destination (consumer) databases
  • producer thread reads single row from the table and puts it into the SynchronousQueue
  • consumer thread also reads single row from the table, then asks queue for the available row to compare (waits if necessary) and lastly compare two result sets

Using another great part Java concurrent utilities for thread pooling, let’s define a thread pool with fixed amount of threads (2).

final ExecutorService executor = Executors.newFixedThreadPool( 2 );
final SynchronousQueue< List< ? > > resultSets = new SynchronousQueue< List< ? > >();

Following the described algorithm, the producer functionality could be represented as a single callable:

Callable< Void > producer = new Callable< Void >() {
    @Override
    public Void call() throws Exception {
        from.query( 'SELECT * FROM ' + table,
            new RowCallbackHandler() {
                @Override
                public void processRow(ResultSet rs) throws SQLException {
                    try {                   
                        List< ? > row = ...; // convert ResultSet to List
                        if( !resultSets.offer( row, 2, TimeUnit.MINUTES ) ) {
                            throw new SQLException( 'Having more data but consumer has already completed' );
                        }
                    } catch( InterruptedException ex ) {
                        throw new SQLException( 'Having more data but producer has been interrupted' );
                    }
                }
            }
        );

        return  null;
    }
};

The code is a bit verbose due to Java syntax but it doesn’t do much actually. Every result set read from the table producer converts to a list (implementation has been omitted as it’s a boilerplate) and puts in a queue (offer). If queue is not empty, producer is blocked waiting for consumer to finish his work. The consumer, respectively, could be represented as a following callable:

Callable< Void > consumer = new Callable< Void >() {
    @Override
    public Void call() throws Exception {
        to.query( 'SELECT * FROM ' + table,
            new RowCallbackHandler() {
                @Override
                public void processRow(ResultSet rs) throws SQLException {
                    try {
                        List< ? > source = resultSets.poll( 2, TimeUnit.MINUTES );
                        if( source == null ) {
                            throw new SQLException( 'Having more data but producer has already completed' );
                        }                                     

                        List< ? > destination = ...; // convert ResultSet to List
                        if( !source.equals( destination ) ) {
                            throw new SQLException( 'Row data is not the same' );
                        }
                    } catch ( InterruptedException ex ) {
                        throw new SQLException( 'Having more data but consumer has been interrupted' );
                    }
                }
            }
        );

        return  null;
    }
};

The consumer does a reverse operation on the queue: instead of putting data it pulls it (poll) from the queue. If queue is empty, consumer is blocked waiting for producer to publish next row. The part which is left is only submitting those callables for execution. Any exception returned by the Future’s get method indicates that table doesn’t contain the same data (or there are issue with getting data from database):

    List< Future< Void > > futures = executor.invokeAll( Arrays.asList( producer, consumer ) );
    for( final Future< Void > future: futures ) {
        future.get( 5, TimeUnit.MINUTES );
    }

 

Reference: Implementing Producer/Consumer using SynchronousQueue from our JCG partner Andrey Redko at the Andriy Redko {devmind} blog.

Related Whitepaper:

Bulletproof Java Code: A Practical Strategy for Developing Functional, Reliable, and Secure Java Code

Use Java? If you do, you know that Java software can be used to drive application logic of Web services or Web applications. Perhaps you use it for desktop applications? Or, embedded devices? Whatever your use of Java code, functional errors are the enemy!

To combat this enemy, your team might already perform functional testing. Even so, you're taking significant risks if you have not yet implemented a comprehensive team-wide quality management strategy. Such a strategy alleviates reliability, security, and performance problems to ensure that your code is free of functionality errors.Read this article to learn about this simple four-step strategy that is proven to make Java code more reliable, more secure, and easier to maintain.

Get it Now!  

Leave a Reply


6 × five =



Java Code Geeks and all content copyright © 2010-2014, Exelixis Media Ltd | Terms of Use
All trademarks and registered trademarks appearing on Java Code Geeks are the property of their respective owners.
Java is a trademark or registered trademark of Oracle Corporation in the United States and other countries.
Java Code Geeks is not connected to Oracle Corporation and is not sponsored by Oracle Corporation.

Sign up for our Newsletter

15,153 insiders are already enjoying weekly updates and complimentary whitepapers! Join them now to gain exclusive access to the latest news in the Java world, as well as insights about Android, Scala, Groovy and other related technologies.

As an extra bonus, by joining you will get our brand new e-books, published by Java Code Geeks and their JCG partners for your reading pleasure! Enter your info and stay on top of things,

  • Fresh trends
  • Cases and examples
  • Research and insights
  • Two complimentary e-books