Andrey Redko

About Andrey Redko

Andriy is a well-grounded software developer with more then 12 years of practical experience using Java/EE, C#/.NET, C++, Groovy, Ruby, functional programming (Scala), databases (MySQL, PostreSQL, Oracle) and NoSQL solutions (MongoDB, Redis).

Implementing Producer/Consumer using SynchronousQueue

Among plenty of useful classes which Java provides for concurrency support, there is one I would like to talk about: SynchronousQueue. In particular, I would like to walk through Producer / Consumer implementation using handy SynchronousQueue as an exchange mechanism.

It might not sound clear why to use this type of queue for producer / consumer communication unless we look under the hood of SynchronousQueue implementation. It turns out that it’s not really a queue as we used to think about queues. The analogy would be just a collection containing at most one element.
 
 
Why it’s useful? Well, there are several reasons. From producer’s point of view, only one element (or message) could be stored into the queue. In order to proceed with the next element (or message), the producer should wait till consumer consumes the one currently in the queue. From consumer’s point of view, it just polls the queue for next element (or message) available. Quite simple, but the great benefit is: producer cannot send messages faster than consumer can process them.

Here is one of the use cases I encountered recently: compare two database tables (possibly just huge) and detect if those contain different data or data is the same (copy). The SynchronousQueue is quite a handy tool for this problem: it allows to handle each table in own thread as well as compensate the possible timeouts / latency while reading from two different databases.

Let’s start by defining our compare function which accepts source and destination data sources as well as a table name (to compare). I am using quite useful JdbcTemplate class from Spring framework as it extremely well abstract all the boring details dealing with connections and prepared statements.

public boolean compare( final DataSource source, final DataSource destination, final String table )  {
    final JdbcTemplate from = new  JdbcTemplate( source );
    final JdbcTemplate to = new JdbcTemplate( destination );
}

Before doing any actual data comparison, it’s a good idea to compare table’s row count of the source and destination databases:

if( from.queryForLong('SELECT count(1) FROM ' + table ) != to.queryForLong('SELECT count(1) FROM ' + table ) ) {
    return false;
}

Now, at least knowing that table contains same number of rows in both databases, we can start with data comparison. The algorithm is very simple:

  • create a separate thread for source (producer) and destination (consumer) databases
  • producer thread reads single row from the table and puts it into the SynchronousQueue
  • consumer thread also reads single row from the table, then asks queue for the available row to compare (waits if necessary) and lastly compare two result sets

Using another great part Java concurrent utilities for thread pooling, let’s define a thread pool with fixed amount of threads (2).

final ExecutorService executor = Executors.newFixedThreadPool( 2 );
final SynchronousQueue< List< ? > > resultSets = new SynchronousQueue< List< ? > >();

Following the described algorithm, the producer functionality could be represented as a single callable:

Callable< Void > producer = new Callable< Void >() {
    @Override
    public Void call() throws Exception {
        from.query( 'SELECT * FROM ' + table,
            new RowCallbackHandler() {
                @Override
                public void processRow(ResultSet rs) throws SQLException {
                    try {                   
                        List< ? > row = ...; // convert ResultSet to List
                        if( !resultSets.offer( row, 2, TimeUnit.MINUTES ) ) {
                            throw new SQLException( 'Having more data but consumer has already completed' );
                        }
                    } catch( InterruptedException ex ) {
                        throw new SQLException( 'Having more data but producer has been interrupted' );
                    }
                }
            }
        );

        return  null;
    }
};

The code is a bit verbose due to Java syntax but it doesn’t do much actually. Every result set read from the table producer converts to a list (implementation has been omitted as it’s a boilerplate) and puts in a queue (offer). If queue is not empty, producer is blocked waiting for consumer to finish his work. The consumer, respectively, could be represented as a following callable:

Callable< Void > consumer = new Callable< Void >() {
    @Override
    public Void call() throws Exception {
        to.query( 'SELECT * FROM ' + table,
            new RowCallbackHandler() {
                @Override
                public void processRow(ResultSet rs) throws SQLException {
                    try {
                        List< ? > source = resultSets.poll( 2, TimeUnit.MINUTES );
                        if( source == null ) {
                            throw new SQLException( 'Having more data but producer has already completed' );
                        }                                     

                        List< ? > destination = ...; // convert ResultSet to List
                        if( !source.equals( destination ) ) {
                            throw new SQLException( 'Row data is not the same' );
                        }
                    } catch ( InterruptedException ex ) {
                        throw new SQLException( 'Having more data but consumer has been interrupted' );
                    }
                }
            }
        );

        return  null;
    }
};

The consumer does a reverse operation on the queue: instead of putting data it pulls it (poll) from the queue. If queue is empty, consumer is blocked waiting for producer to publish next row. The part which is left is only submitting those callables for execution. Any exception returned by the Future’s get method indicates that table doesn’t contain the same data (or there are issue with getting data from database):

    List< Future< Void > > futures = executor.invokeAll( Arrays.asList( producer, consumer ) );
    for( final Future< Void > future: futures ) {
        future.get( 5, TimeUnit.MINUTES );
    }

 

Reference: Implementing Producer/Consumer using SynchronousQueue from our JCG partner Andrey Redko at the Andriy Redko {devmind} blog.

Do you want to know how to develop your skillset to become a Java Rockstar?

Subscribe to our newsletter to start Rocking right now!

To get you started we give you two of our best selling eBooks for FREE!

JPA Mini Book

Learn how to leverage the power of JPA in order to create robust and flexible Java applications. With this Mini Book, you will get introduced to JPA and smoothly transition to more advanced concepts.

JVM Troubleshooting Guide

The Java virtual machine is really the foundation of any Java EE platform. Learn how to master it with this advanced guide!

Given email address is already subscribed, thank you!
Oops. Something went wrong. Please try again later.
Please provide a valid email address.
Thank you, your sign-up request was successful! Please check your e-mail inbox.
Please complete the CAPTCHA.
Please fill in the required fields.

Leave a Reply


five − = 2



Java Code Geeks and all content copyright © 2010-2014, Exelixis Media Ltd | Terms of Use | Privacy Policy | Contact
All trademarks and registered trademarks appearing on Java Code Geeks are the property of their respective owners.
Java is a trademark or registered trademark of Oracle Corporation in the United States and other countries.
Java Code Geeks is not connected to Oracle Corporation and is not sponsored by Oracle Corporation.
Do you want to know how to develop your skillset and become a ...
Java Rockstar?

Subscribe to our newsletter to start Rocking right now!

To get you started we give you two of our best selling eBooks for FREE!

Get ready to Rock!
You can download the complementary eBooks using the links below:
Close