Andrey Redko

About Andrey Redko

Andriy is a well-grounded software developer with more then 12 years of practical experience using Java/EE, C#/.NET, C++, Groovy, Ruby, functional programming (Scala), databases (MySQL, PostreSQL, Oracle) and NoSQL solutions (MongoDB, Redis).

Using Delayed queues in practice

Often there are use cases when you have some kind of work or job queue and there is a need not to handle each work item or job immediately but with some delay. For example, if user clicks a button which triggers some work to be done, and one second later user realizes he / she was mistaken and job shouldn’t start at all. Or, f.e. there could be a use case when some work elements in a queue should be removed after some delay (expiration).

There are a lot of implementations out there, but one I would like to describe is using pure JDK concurrent framework classes: DelayedQueue and Delayed interface.

Let me start with simple (and empty) interface which defines the work item. I am skipping the implementation details like properties and methods as those are not important.

package com.example.delayed;

public interface WorkItem {
   // Some properties and methods here
}

The next class in our model will represent the postponed work item and implement Delayed interface. There are just few basic concepts to take into account: the delay itself and the actual time the respective work item has been submitted. This is how expiration would be calculated. So let’s do that by introducing PostponedWorkItem class.

package com.example.delayed;

import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;

public class PostponedWorkItem implements Delayed {
    private final long origin;
    private final long delay;
    private final WorkItem workItem;

    public PostponedWorkItem( final WorkItem workItem, final long delay ) {
        this.origin = System.currentTimeMillis();
        this.workItem = workItem;
        this.delay = delay;
    }

    @Override
    public long getDelay( TimeUnit unit ) {
        return unit.convert( delay - ( System.currentTimeMillis() - origin ), 
                TimeUnit.MILLISECONDS );
    }

    @Override
    public int compareTo( Delayed delayed ) {
        if( delayed == this ) {
            return 0;
        }

        if( delayed instanceof PostponedWorkItem ) {
            long diff = delay - ( ( PostponedWorkItem )delayed ).delay;
            return ( ( diff == 0 ) ? 0 : ( ( diff < 0 ) ? -1 : 1 ) );
        }

        long d = ( getDelay( TimeUnit.MILLISECONDS ) - delayed.getDelay( TimeUnit.MILLISECONDS ) );
        return ( ( d == 0 ) ? 0 : ( ( d < 0 ) ? -1 : 1 ) );
    }
}

As you can see, we create new instance of the class and save the current system time in internal origin property. The getDelayed method calculates the actual time left before work item gets expired. The delay is external setting which comes as constructor parameter. The mandatory implementation of Comparable<Delayed> is required as Delayed extends this interface.

Now, we are mostly done! To complete the example, let’s make sure that same work item won’t be submitted twice to the work queue by implementing equals and hashCode (implemenation is pretty trivial and should not require any comments).

public class PostponedWorkItem implements Delayed {
    ...

    @Override
    public int hashCode() {
        final int prime = 31;

        int result = 1;
        result = prime * result + ( ( workItem == null ) ? 0 : workItem.hashCode() );

        return result;
    }

    @Override
    public boolean equals( Object obj ) {
        if( this == obj ) {
            return true;
        }

        if( obj == null ) {
            return false;
        }

        if( !( obj instanceof PostponedWorkItem ) ) {
            return false;
        }

        final PostponedWorkItem other = ( PostponedWorkItem )obj;
        if( workItem == null ) {
            if( other.workItem != null ) {
                return false;
            }
        } else if( !workItem.equals( other.workItem ) ) {
            return false;
        }

        return true;
    }
}

The last step is to introduce some kind of manager which will scheduled work items and periodically polls out expired ones: meet WorkItemScheduler class.

package com.example.delayed;

import java.util.ArrayList;
import java.util.Collection;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.DelayQueue;

public class WorkItemScheduler {
    private final long delay = 2000; // 2 seconds

    private final BlockingQueue< PostponedWorkItem > delayed =
            new DelayQueue< PostponedWorkItem >(); 

    public void addWorkItem( final WorkItem workItem ) {
        final PostponedWorkItem postponed = new PostponedWorkItem( workItem, delay );
        if( !delayed.contains( postponed )) {
            delayed.offer( postponed );
        }
    }

    public void process() {
        final Collection< PostponedWorkItem > expired = new ArrayList< PostponedWorkItem >();
        delayed.drainTo( expired );

        for( final PostponedWorkItem postponed: expired ) {
            // Do some real work here with postponed.getWorkItem()
        }
    }
}

Usage of BlockingQueue guarantees thread safety and high level of concurrency. The process method should be run periodically in order to drain work items queue. It could be annotated by @ Scheduled annotation from Spring Framework or by EJB’s @Schedule annotation from JEE 6.

Enjoy!

Reference: Using Delayed queues in practice from our JCG partner Andriy Redko at the Andriy Redko {devmind} blog.

Do you want to know how to develop your skillset to become a Java Rockstar?

Subscribe to our newsletter to start Rocking right now!

To get you started we give you two of our best selling eBooks for FREE!

JPA Mini Book

Learn how to leverage the power of JPA in order to create robust and flexible Java applications. With this Mini Book, you will get introduced to JPA and smoothly transition to more advanced concepts.

JVM Troubleshooting Guide

The Java virtual machine is really the foundation of any Java EE platform. Learn how to master it with this advanced guide!

Given email address is already subscribed, thank you!
Oops. Something went wrong. Please try again later.
Please provide a valid email address.
Thank you, your sign-up request was successful! Please check your e-mail inbox.
Please complete the CAPTCHA.
Please fill in the required fields.

One Response to "Using Delayed queues in practice"

  1. Tinclon says:

    Your compareTo method sorts elements according to the originally specified delay, and not according to how much time is actually left before the element’s delay has expired.

    As such, all elements with equal delay time will be sorted equally, even when some of those items are added to the queue before others.

Leave a Reply


7 − = three



Java Code Geeks and all content copyright © 2010-2014, Exelixis Media Ltd | Terms of Use | Privacy Policy | Contact
All trademarks and registered trademarks appearing on Java Code Geeks are the property of their respective owners.
Java is a trademark or registered trademark of Oracle Corporation in the United States and other countries.
Java Code Geeks is not connected to Oracle Corporation and is not sponsored by Oracle Corporation.
Do you want to know how to develop your skillset and become a ...
Java Rockstar?

Subscribe to our newsletter to start Rocking right now!

To get you started we give you two of our best selling eBooks for FREE!

Get ready to Rock!
You can download the complementary eBooks using the links below:
Close