Publish and Subscribe with Hazelcast

A few weeks ago I wrote a blog on getting started with Hazelcast describing how ludicrously simple it is to create distributed maps, lists and queues. At the time I mentioned that Hazelcast does quite a few other things besides. This blog takes a quick look at another of Hazelcast’s features: its broadcast messaging system based on the Publish/Subscribe pattern. This takes the usual format where by the message sender app publishes messages on a certain topic. The messages aren’t directed at any particular client, but can be read by any client that registers an interest in the topic.

The obvious scenario for publish and subscribe comes from the world of high finance and market makers. A market maker both buys and sells financial instruments such as stocks and competes for business by advertising both a buy and sell prices in a, usually electronic, market place. To implement a very simple market maker scenario using Hazelcast we need three classes: a StockPrice bean, a MarketMaker and a Client.

The following code has been added to my existing Hazelcast project that’s available on Github. There are no additional POM dependencies to worry about.

public class StockPrice implements Serializable { 

  private static final long serialVersionUID = 1L; 

  private final BigDecimal bid; 

  private final BigDecimal ask; 

  private final String code; 

  private final String description; 

  private final long timestamp; 

  /** 
   * Create a StockPrice for the given stock at a given moment 
   */ 
  public StockPrice(BigDecimal bid, BigDecimal ask, String code, String description, 
      long timestamp) { 
    super(); 
    this.bid = bid; 
    this.ask = ask; 
    this.code = code; 
    this.description = description; 
    this.timestamp = timestamp; 
  } 

  public BigDecimal getBid() { 
    return bid; 
  } 

  public BigDecimal getAsk() { 
    return ask; 
  } 

  public String getCode() { 
    return code; 
  } 

  public String getDescription() { 
    return description; 
  } 

  public long getTimestamp() { 
    return timestamp; 
  } 

  @Override 
  public String toString() { 

    StringBuilder sb = new StringBuilder("Stock - "); 
    sb.append(code); 
    sb.append(" - "); 
    sb.append(description); 
    sb.append(" - "); 
    sb.append(description); 
    sb.append(" - Bid: "); 
    sb.append(bid); 
    sb.append(" - Ask: "); 
    sb.append(ask); 
    sb.append(" - "); 
    SimpleDateFormat df = new SimpleDateFormat("HH:MM:SS"); 
    sb.append(df.format(new Date(timestamp))); 
    return sb.toString(); 
  } 
}

The StockPrice bean, with all the usual getters and setters, models a stock’s ask and bid price (sell and buy in normal language) at any given time and the MarketMaker class publishes these beans using Hazelcast.

Normally a market maker will publish prices in more than one financial instrument; however, for simplicity, in this demo the MarketMaker only publishes a single price.

public class MarketMaker implements Runnable { 

  private static Random random = new Random(); 

  private final String stockCode; 

  private final String description; 

  private final ITopic<StockPrice> topic; 

  private volatile boolean running; 

  public MarketMaker(String topicName, String stockCode, String description) { 
    this.stockCode = stockCode; 
    this.description = description; 
    this.topic = createTopic(topicName); 
    running = true; 
  } 

  @VisibleForTesting 
  ITopic<StockPrice> createTopic(String topicName) { 
    HazelcastInstance hzInstance = Hazelcast.newHazelcastInstance(); 
    return hzInstance.getTopic(topicName); 
  } 

  public void publishPrices() { 

    Thread thread = new Thread(this); 
    thread.start(); 
  } 

  @Override 
  public void run() { 

    do { 
      publish(); 
      sleep(); 
    } while (running); 
  } 

  private void publish() { 

    StockPrice price = createStockPrice(); 
    System.out.println(price.toString()); 
    topic.publish(price); 
  } 

  @VisibleForTesting 
  StockPrice createStockPrice() { 

    double price = createPrice(); 
    DecimalFormat df = new DecimalFormat("#.##"); 

    BigDecimal bid = new BigDecimal(df.format(price - variance(price))); 
    BigDecimal ask = new BigDecimal(df.format(price + variance(price))); 

    StockPrice stockPrice = new StockPrice(bid, ask, stockCode, description, 
        System.currentTimeMillis()); 
    return stockPrice; 
  } 

  private double createPrice() { 

    int val = random.nextInt(2010 - 1520) + 1520; 
    double retVal = (double) val / 100; 
    return retVal; 
  } 

  private double variance(double price) { 
    return (price * 0.01); 
  } 

  private void sleep() { 
    try { 
      TimeUnit.SECONDS.sleep(2); 
    } catch (InterruptedException e) { 
      e.printStackTrace(); 
    } 
  } 

  public void stop() { 
    running = false; 
  } 

  public static void main(String[] args) throws InterruptedException { 

    MarketMaker bt = new MarketMaker("STOCKS", "BT.L", "British Telecom"); 
    MarketMaker cbry = new MarketMaker("STOCKS", "CBRY.L", "Cadburys"); 
    MarketMaker bp = new MarketMaker("STOCKS", "BP.L", "British Petrolium"); 

    bt.publishPrices(); 
    cbry.publishPrices(); 
    bp.publishPrices(); 

  } 

}

As usual, setting up Hazelcast is fairly straight forward and most of the code in the MarketMaker class above has nothing to do with Hazelcast. The class is split into two part: construction and publishing prices. The constructor takes three arguments, which it stores away for later. It also creates a Hazelcast instance and registers a simple topic called "STOCKS" via the private createTopic() method. As you might expect, creating a Hazelcast instance and registering a topic takes two lines of code as shown below:

  ITopic<StockPrice> createTopic(String topicName) { 
    HazelcastInstance hzInstance = Hazelcast.newHazelcastInstance(); 
    return hzInstance.getTopic(topicName); 
  }

The rest of the class runs the price publishing mechanism using a thread to call the MarketMaker‘s run() method. This method generates a random bid, ask price for the associated stock code and publishes it using Hazelcast. Publishing is achieved using the following single line of code:

    topic.publish(price);

The final part of the MarketMaker class is the main() method and all this does is to create several MarketMaker instances and sets them running.

Now that Hazelcast knows about our ever changing stock prices, the next thing to do is to sort out the client code.

public class Client implements MessageListener<StockPrice> { 

  public Client(String topicName) { 
    HazelcastInstance hzInstance = Hazelcast.newHazelcastInstance(); 
    ITopic<StockPrice> topic = hzInstance.getTopic(topicName); 
    topic.addMessageListener(this); 
  } 

  /** 
   * @see com.hazelcast.core.MessageListener#onMessage(com.hazelcast.core.Message) 
   */ 
  @Override 
  public void onMessage(Message<StockPrice> arg0) { 
    System.out.println("Received: " + arg0.getMessageObject().toString()); 
  } 

  public static void main(String[] args) { 

    new Client("STOCKS"); 
  } 

}

As with any messaging system, the message sender code has to know both who to call and what to call. The “what to call” is achieved by the client creating an Hazelcast instance and registering an interest in the "STOCKS" topic, in the same way as the publisher as shown below:

    HazelcastInstance hzInstance = Hazelcast.newHazelcastInstance(); 
    ITopic<StockPrice> topic = hzInstance.getTopic(topicName); 
    topic.addMessageListener(this);

The “what to call” is achieved by the client implementing Hazelcast’s MessageListener interface and its single method onMessage()

  @Override 
  public void onMessage(Message<StockPrice> arg0) { 
    System.out.println("Received: " + arg0.getMessageObject().toString()); 
  }

The final part of the client code is its main() method that creates a client instance.

The final thing to do is to run the code. For this I’ve simply put all the necessary JAR files in a single directory and there’s only two to consider: hazel cast-3.1.jar and guava-13.0.1.jar.

Screen Shot 2013-12-07 at 10.51.43

Once that was done I changed to the project’s classes directory:

cd /Users/Roger/git/captaindebug/hazelcast/target/classes

…and fired up the publisher

java -cp ./:/Users/Roger/tmp/mm/guava-13.0.1.jar:/Users/Roger/tmp/mm/hazelcast-3.1.jar com.captaindebug.hazelcast.pubsub.MarketMaker

…and then the client.

java -cp ./:/Users/Roger/tmp/mm/guava-13.0.1.jar:/Users/Roger/tmp/mm/hazelcast-3.1.jar com.captaindebug.hazelcast.pubsub.Client

Of course, if you’re running this on your machine using this rough and ready technique, then remember to replace
/Users/Roger/tmp/mm with the path to the place where you’ve put your copies of these JAR files.

If you run a MarketMaker publisher in one terminal and a couple of clients in two other terminals, then you’ll get something like this, where you can see the prices being published and the clients receiving updates.

Screen Shot 2013-12-22 at 17.40.07

One thing to note about Hazelcast is that a ‘ cluster‘ refers to a cluster of Hazelcast instances, rather than a cluster of JVMs. This isn’t obvious until you ask for more than one Hazelcast instance per application. When additional clients join the cluster you’ll see something like this:

Members [5] {
Member [192.168.0.7]:5701
Member [192.168.0.7]:5702
Member [192.168.0.7]:5703
Member [192.168.0.7]:5704 this
Member [192.168.0.7]:5705
}

In the above log, there are two listener entries, one for each client process, and three publisher entries, one for each of the MarketMaker instances started in the MarketMaker’s main() method.

Screen Shot 2013-12-07 at 11.16.21
The thing to consider here is whether or not it’s good practice to create a Hazelcast instance per object instantiation (as I’ve done in the sample code) or is it better to have a single static Hazelcast instance in your code. I’m not sure of the answer to this so if there are any Hazelcast gurus reading this please let me know.

That’s it then: Hazelcast is happily running in publish and subscribe mode, but I’ve not covered all of Hazelcast’s features; perhaps more on those later…

 

Reference: Publish and Subscribe with Hazelcast from our JCG partner Roger Hughes at the Captain Debug’s Blog blog.

Do you want to know how to develop your skillset to become a Java Rockstar?

Subscribe to our newsletter to start Rocking right now!

To get you started we give you two of our best selling eBooks for FREE!

JPA Mini Book

Learn how to leverage the power of JPA in order to create robust and flexible Java applications. With this Mini Book, you will get introduced to JPA and smoothly transition to more advanced concepts.

JVM Troubleshooting Guide

The Java virtual machine is really the foundation of any Java EE platform. Learn how to master it with this advanced guide!

Given email address is already subscribed, thank you!
Oops. Something went wrong. Please try again later.
Please provide a valid email address.
Thank you, your sign-up request was successful! Please check your e-mail inbox.
Please complete the CAPTCHA.
Please fill in the required fields.

Leave a Reply


nine − = 4



Java Code Geeks and all content copyright © 2010-2014, Exelixis Media Ltd | Terms of Use | Privacy Policy | Contact
All trademarks and registered trademarks appearing on Java Code Geeks are the property of their respective owners.
Java is a trademark or registered trademark of Oracle Corporation in the United States and other countries.
Java Code Geeks is not connected to Oracle Corporation and is not sponsored by Oracle Corporation.
Do you want to know how to develop your skillset and become a ...
Java Rockstar?

Subscribe to our newsletter to start Rocking right now!

To get you started we give you two of our best selling eBooks for FREE!

Get ready to Rock!
You can download the complementary eBooks using the links below:
Close