Enterprise Java

Working with Amazon Simple Queue Service using java

Amazon Simple Queue Service or SQS is a highly scalable hosted messaging queue provided by Amazon Webservice stack. Amazon SQS can be used to completely decouple operations of different components within the system which otherwise exchange data to  perform independent tasks. Amazon SQS also helps us in saving the data which would be lost in case the application is down or if one of the component becomes unavailable.

Amazon SQS Features (Copied directly from amazon website)

  1. Redundant infrastructure—Guarantees delivery of your messages at least once, highly concurrent access to messages, and high availability for sending and retrieving messages
  2. Multiple writers and readers—Multiple parts of your system can send or receive messages at the same time. SQS locks the message during processing, keeping other parts of your system from processing the message simultaneously.
  3. Configurable settings per queue—All of your queues don’t have to be exactly alike. For example, one queue can be optimized for messages that require a longer processing time than others.
  4. Variable message size—Your messages can be up to 65536 bytes (64 KiB) in size. For even larger messages, you can store the contents of the message using the Amazon Simple Storage Service (Amazon S3) or Amazon SimpleDB and use Amazon SQS to hold a pointer to the Amazon S3 or Amazon SDB object. Alternately, you can split the larger message into smaller ones.
  5. Access control—You can control who can send messages to a queue, and who can receive messages from a queue
  6. Delay Queues—A delay queue is one which the user sets a default delay on a queue such that delivery of all messages enqueued will be postponed for that duration of time. You can set the delay value when you create a queue with CreateQueue, and you can update the value with SetQueueAttributes. If you update the value, the new value affects only messages enqueued after the update.

With above knowledge in place let us try to use SQS for creating a simple photo processing service.

Problem Defination for the tutorial

We will be creating a simple photo-processing application with following components.

  1. Photo Uploader serivce – This is a webservice which allows users to upload a photo to the system. Once the photo is uploaded they are stored in a temporary storage. To keep it simple we will assume that user has already uploaded the photo and stored it in a predefined location.
  2. AWSSimpleQueueServiceUtil – This is a utility class which wraps a Amazon SQS client and performs basic CRUD operations on the SQS queue.
  3. PhotoProcessingManager – Manages the entire show. It will invoke AWSSimpleQueueServiceUtil  to send/receive messages to SQS and invoke PhotoProcessor to process the photo and finally delete the message from the queue.  Mostly we should intend this class to act as Listener to the SQS but for simplicity we will just be using a Poll mechanism to pull the messages from SQS.
  4. PhotoProcessor – Gets a photo message from SQS through PhotoProcessingManager and generates a thumbnail.

Before beginning it would be great if you go through video in the following link: http://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSGettingStartedGuide/Welcome.html

Steps for getting started

  1. Create a amazon account. You will need a credit card for that
  2. Log on to your console console.aws.amazon.com.
  3. On console dashboard search for SQS and click on it. It will take you to your SQS home.
  4. Create a new SQS queue and name it PhotoQueue. Leave the rest of the setting as default. We can also create and delete a SQS queue dynamically but in this tutorial I have a pre-created queue which I will be using in my code.
  5. Now we have a queue and now we will create a simple java project in our favorite java editor and see how we can leverage this queue.
  6. Once you are done you need to download your security credentials. For this go to “My Account”/”security credentials”. What we are after is access credentials. You will see that there are 3 types of access credentials one of them is “Access Keys”. We need this to access and work on PhotoQueue we just created. We will create a new set of access keys and store the access key and secret key is a safe location.
  7. Now download the  sdk for java from here. http://aws.amazon.com/sdkforjava. In lib folder of sdk copy the aws-java-sdk-1.3.33.jar to your project classpath.

    Maven users can add following dependency in their POM

    <dependency>
    	<groupId>com.amazonaws</groupId>
    	<artifactId>aws-java-sdk</artifactId>
    	<version>1.3.33</version>
    </dependency>

    Create a file called “AwsCredentials.properties” store this in your project. This file will contain following properties

    accessKey =
    secretKey =

    The values of these properties are the access key you generated in the step 6.

  8. For photo processing I am using imgscalr. It is a lightweight and awesome photo processing library in java for doing simple tasks like resize, rotate, crop etc. You can download the jar from http://www.thebuzzmedia.com/software/imgscalr-java-image-scaling-library/#download. Maven users can add the following to their dependency list.
    <dependency>
            <groupId>org.imgscalr</groupId>
            <artifactId>imgscalr-lib</artifactId>
            <version>4.2</version>
            <type>jar</type>
            <scope>compile</scope>
     </dependency>

Now we are ready to rock and roll and get our hands dirty with some code.

AWSSimpleQueueServiceUtil.java

package com.aranin.adconnect.util.aws;

import com.amazonaws.auth.BasicAWSCredentials;
import com.amazonaws.services.sqs.AmazonSQS;
import com.amazonaws.services.sqs.AmazonSQSClient;
import com.amazonaws.services.sqs.model.*;

import java.io.FileInputStream;
import java.util.List;
import java.util.Properties;

/**
 * Created by IntelliJ IDEA.
 * User: Niraj Singh
 * Date: 3/19/13
 * Time: 10:44 AM
 * To change this template use File | Settings | File Templates.
 */
public class AWSSimpleQueueServiceUtil {
    private BasicAWSCredentials credentials;
    private AmazonSQS sqs;
    private String simpleQueue = "PhotoQueue";
    private static volatile  AWSSimpleQueueServiceUtil awssqsUtil = new AWSSimpleQueueServiceUtil();

    /**
     * instantiates a AmazonSQSClient http://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/services/sqs/AmazonSQSClient.html
     * Currently using  BasicAWSCredentials to pass on the credentials.
     * For SQS you need to set your regions endpoint for sqs.
     */
    private   AWSSimpleQueueServiceUtil(){
        try{
            Properties properties = new Properties();
            properties.load(new FileInputStream("D:/samayik/adkonnection/src/main/resources/AwsCredentials.properties"));
            this.credentials = new   BasicAWSCredentials(properties.getProperty("accessKey"),
                                                         properties.getProperty("secretKey"));
            this.simpleQueue = "PhotoQueue";

            this.sqs = new AmazonSQSClient(this.credentials);
            /**
             * My queue is in singapore region which has following endpoint for sqs
             * https://sqs.ap-southeast-1.amazonaws.com
             * you can find your endpoints here
             * http://docs.aws.amazon.com/general/latest/gr/rande.html
             *
             * Overrides the default endpoint for this client ("sqs.us-east-1.amazonaws.com")
             */
            this.sqs.setEndpoint("https://sqs.ap-southeast-1.amazonaws.com");
            /**
               You can use this in your web app where    AwsCredentials.properties is stored in web-inf/classes
             */
            //AmazonSQS sqs = new AmazonSQSClient(new ClasspathPropertiesFileCredentialsProvider());

        }catch(Exception e){
            System.out.println("exception while creating awss3client : " + e);
        }
    }

    public static AWSSimpleQueueServiceUtil getInstance(){
        return awssqsUtil;
    }

    public AmazonSQS getAWSSQSClient(){
         return awssqsUtil.sqs;
    }

    public String getQueueName(){
         return awssqsUtil.simpleQueue;
    }

    /**
     * Creates a queue in your region and returns the url of the queue
     * @param queueName
     * @return
     */
    public String createQueue(String queueName){
        CreateQueueRequest createQueueRequest = new CreateQueueRequest(queueName);
        String queueUrl = this.sqs.createQueue(createQueueRequest).getQueueUrl();
        return queueUrl;
    }

    /**
     * returns the queueurl for for sqs queue if you pass in a name
     * @param queueName
     * @return
     */
    public String getQueueUrl(String queueName){
        GetQueueUrlRequest getQueueUrlRequest = new GetQueueUrlRequest(queueName);
        return this.sqs.getQueueUrl(getQueueUrlRequest).getQueueUrl();
    }

    /**
     * lists all your queue.
     * @return
     */
    public ListQueuesResult listQueues(){
       return this.sqs.listQueues();
    }

    /**
     * send a single message to your sqs queue
     * @param queueUrl
     * @param message
     */
    public void sendMessageToQueue(String queueUrl, String message){
        SendMessageResult messageResult =  this.sqs.sendMessage(new SendMessageRequest(queueUrl, message));
        System.out.println(messageResult.toString());
    }

    /**
     * gets messages from your queue
     * @param queueUrl
     * @return
     */
    public List<Message> getMessagesFromQueue(String queueUrl){
       ReceiveMessageRequest receiveMessageRequest = new ReceiveMessageRequest(queueUrl);
       List<Message> messages = sqs.receiveMessage(receiveMessageRequest).getMessages();
       return messages;
    }

    /**
     * deletes a single message from your queue.
     * @param queueUrl
     * @param message
     */
    public void deleteMessageFromQueue(String queueUrl, Message message){
        String messageRecieptHandle = message.getReceiptHandle();
        System.out.println("message deleted : " + message.getBody() + "." + message.getReceiptHandle());
        sqs.deleteMessage(new DeleteMessageRequest(queueUrl, messageRecieptHandle));
    }

    public static void main(String[] args){

    }

}

PhotoProcessor.java

package com.aranin.adconnect.util.aws;

import org.imgscalr.Scalr;

import javax.imageio.ImageIO;
import java.awt.image.BufferedImage;
import java.io.File;

/**
 * Created by IntelliJ IDEA.
 * User: Niraj Singh
 * Date: 3/19/13
 * Time: 12:32 PM
 * To change this template use File | Settings | File Templates.
 */
public class PhotoProcessor {

    public static void  generateImage(String imagePath, String origName, String targetName, int scalabity){
        String origImage =   null;
        String targetImage = null;
        File origFile = null;
        BufferedImage buffImg = null;
        File targetFile = null;
        try{
            origImage =   imagePath + "/" + origName;
            targetImage = imagePath + "/" + targetName;
            origFile = new File(origImage);
            buffImg = ImageIO.read(origFile);
            buffImg = Scalr.resize(buffImg, Scalr.Method.SPEED, scalabity);
            targetFile = new File(targetImage);
            ImageIO.write(buffImg, "jpeg", targetFile);

        }catch (Exception e){
            System.out.println("Exception in processing image : " + e);
        }finally {
            buffImg = null;

        }
    }
}

PhotoFile.java

package com.aranin.adconnect.util.aws;

/**
 * Created by IntelliJ IDEA.
 * User: Niraj Singh
 * Date: 3/19/13
 * Time: 12:29 PM
 * To change this template use File | Settings | File Templates.
 */
public class PhotoFile {
    private String origName;
    private String targetName;
    public String imagePath;

    public String getOrigName() {
        return origName;
    }

    public void setOrigName(String origName) {
        this.origName = origName;
    }

    public String getTargetName() {
        return targetName;
    }

    public void setTargetName(String targetName) {
        this.targetName = targetName;
    }

    public String getImagePath() {
        return imagePath;
    }

    public void setImagePath(String imagePath) {
        this.imagePath = imagePath;
    }

    public String toString(){
        return origName + "," +  targetName + "," + imagePath;
    }
}

SQSPhotoManager.java

package com.aranin.adconnect.util.aws;

import com.amazonaws.services.sqs.model.Message;

import java.util.List;
import java.util.StringTokenizer;

/**
 * Created by IntelliJ IDEA.
 * User: Niraj Singh
 * Date: 3/20/13
 * Time: 11:38 AM
 * To change this template use File | Settings | File Templates.
 */
public class SQSPhotoManager implements Runnable{
    private String queueUrl;
    public static void main(String[] args){
        AWSSimpleQueueServiceUtil awssqsUtil =   AWSSimpleQueueServiceUtil.getInstance();
        /**
         * 1. get the url for your photo queue
         */
        String queueUrl  = awssqsUtil.getQueueUrl(awssqsUtil.getQueueName());
        System.out.println("queueUrl : " + queueUrl);

        /**
         * 2. Add a photo to the queue to be processed
         */

        PhotoFile photo = new PhotoFile();
        photo.setImagePath("C:/Users/Public/Pictures/Sample Pictures");
        photo.setOrigName("Tree.jpg");
        photo.setTargetName("Tree_thumb.jpg");

        /**
         * 3. set the photofile in queue for processing
         */

         awssqsUtil.sendMessageToQueue(queueUrl, photo.toString());

        /**
         * get the messages from queue
         */

        Thread managerthread = new Thread(new SQSPhotoManager(queueUrl),"T2");
        managerthread.start();

    }

    public SQSPhotoManager(String queueUrl){
        this.queueUrl = queueUrl;
    }

    @Override
    public void run() {
        AWSSimpleQueueServiceUtil awssqsUtil =   AWSSimpleQueueServiceUtil.getInstance();
        boolean flag = true;
        while(flag){
            List<Message> messages =  awssqsUtil.getMessagesFromQueue(this.queueUrl);
            if(messages == null || messages.size() == 0){
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();  //To change body of catch statement use File | Settings | File Templates.
                }
            }else{
                flag = false;
                for (Message message : messages) {
                    String messagePhoto = message.getBody();
                    System.out.println("photo to be processed : " + messagePhoto);
                    StringTokenizer photoTokenizer = new StringTokenizer(messagePhoto,",");
                    String source = null;
                    String target = null;
                    String path = null;

                    source = photoTokenizer.nextToken();
                    target = photoTokenizer.nextToken();
                    path = photoTokenizer.nextToken();
                    System.out.println("source : " + source);
                    System.out.println("target : " + target);
                    System.out.println("path : " + path);
                    /**
                     * generate thumbmail within 150*150 container
                     */
                    PhotoProcessor.generateImage(path, source, target, 150);
                }

                /**
                * finally delete the message
                */
                for (Message message : messages) {
                      awssqsUtil.deleteMessageFromQueue(this.queueUrl, message);
                }

            }
        }
    }
}

This will form the core for your PhotoProcessor application using SQS. This code has a glaring drawback. It polls on the SQS using a thread, it would be great if you can create a listener in your code which can subscribe to your queue and take necessary action when new message arrives. That is indeed the subject of my next post. Till then feel free to bombard me with questions, together we can find answer to those.
 

Reference: Working with Amazon Simple Queue Service using java from our JCG partner Niraj Singh at the Weblog4j blog.
Subscribe
Notify of
guest

This site uses Akismet to reduce spam. Learn how your comment data is processed.

6 Comments
Oldest
Newest Most Voted
Inline Feedbacks
View all comments
Russell Bateman
Russell Bateman
10 years ago

You didn’t even begin to list all the JAR dependencies for this project, apache commons, apache http, jackson, etc.

Niraj Singh
10 years ago

Hi Russell,

My apologies, you can download the entire code from here.

https://www.assembla.com/code/weblog4j/subversion/nodes/19/awsdemo/trunk

It will contain lots of other files but you can check the pom for your reference.

This is an intellij project so I hope you can modify to suit your need.

You can also copy the dependency from following post

http://weblog4j.com/2013/05/14/amazon-sqs-listening-to-sqs-using-apache-camel-the-spring-dsl-way/

Hope this helps.

Regards
Niraj

leon
leon
9 years ago

I guess, awssqsUtil.createQueue(awssqsUtil.getQueueName()); is missing in SQSPhotoManager.java.

Albert Rannetsperger
Albert Rannetsperger
8 years ago

The fact that I can see hard-coded path names and — even worse — that you dev on a Windows machine, makes it very difficult for me to trust your code.

Albert R. (Brightpearl)

Ritu
Ritu
8 years ago

This project is incomplete. It doesn’t even have the entry point. Do we have any project which is working and can be referred?

Subhash
Subhash
7 years ago
Reply to  Ritu
Back to top button