Enterprise Java

Beginner’s Guide to Hazelcast Part 3

This is a continuation of a series of posts on how to use Hazelcast from a beginner’s point of view.  If you haven’t read the last two, I encourage reading them:

 
 
 
 
 

The Primitives are Coming

During my last post I mentioned to use an ILock with IList and ISet because they are not thread safe.  It hit me that I had not covered a basic part of Hazelcast, the distributed primitives.  They solve the problem of  synchronizing the use of resources in a distributed way.  Those who do a lot of threaded programming will recognize them right away.  For those of you who are new to programming in threads, I will explain what each primitive does and give an example.

IAtomicLong

This is a distributed atomic long.  This means that every operation happens all at once.  For example, one can add a number and retrieve the resulting value in one operation.  One can get the value then add a value.  This is true for every operation one does on this primitive.  As one can imagine, it is thread safe but one cannot do this and it be thread safe.

atomicLong.addAndGet(2 * atomicLong.get());

The line above creates a race condition because there are three operations, the reading of the contents of the atomic long, multiplying by two and adding that to the instance. The thread safely is only there if the operation is guaranteed to happen in one step. To do that, IAtomicLong has a method called alterAndGet. AlterAndGet takes a IFunction object. This makes multi-step operations one step. There is always one synchronous backup of an IAtomicLong and it is not configurable.

IdGenerator

IAtomicLongs are great to use to keep track of how many of what one has. The problem is that since the call is most likely remote, IAtomicLongs for some situations are not an ideal solution. One of those situations is generating unique ids. IdGenerator was made just for that purpose. The way it works is that each member claims one million ids to generate. Once all of those claimed numbers are taken, the segment claims another million. So since each member has a million ids tucked away, the chances the call to an IdGenerator is remote is one in a million. This makes it very fast way to generate unique ids. If any duplicates happen it may be because the members didn’t join up. If a member goes down before its segment is used up, there will be gaps in the ids. For unique id generation missing numbers are not an issue. I do feel members not hooking up to the cluster is an issue but if that is happening, there are bigger things to worry about. If the cluster get restarted, the ids start at zero again. That is because the id is not persisted. This is a in memory database, one takes their chances. To counter that, IdGenerators can be set to start at particular number as long it isn’t claimed by someone else and no ids have been generated yet. Alternatives are to creating ones own id generator or use the java.util.UUID class. This may take more space but each project has its own requirements to meet. IdGenerators always have one synchronous backup and cannot be configured.

ILock

Here is a classic synchronization method with a twist. It is an exclusive lock that is distributed. One just invokes the method lock and a thread either waits or obtains a lock. Once the lock is established, the critical section can be preformed. Once the work is done, the unlock method is used. Veterans of this technique will put the critical section in a try finally block, establishing the lock just outside the try block and the unlock in the finally section. This is invaluable for performing actions on structures that are not thread safe. The process that gets the lock owns the lock and is required to call unlock for other processes to be able to establish locks. This can be problematic when one has threads in multiple locations on the network. Hazelcast thought of this problem and has the lock released when a member goes down. Another feature is that the lock method has a timeout of 300 seconds. This prevents starved threads. ILocks have one synchronous backup and is not configurable.

A bit of advice from someone that has experience, keep the critical sections as small as possible; this helps performance, and prevent deadlocks. Deadlocks are a pain to debug and harder to test because of the unknown execution order of threads. One time the bug manifests itself then it does not. This can continue for a week or more because of a misplaced lock. Then one has to make sure it will not happen again. This is hard to prove because of the unknown execution of the threads. By the time it is all done, the boss is frustrated because of the time it took and one does not know if the bug is fixed or not.

ICondition

Ever wanted to wait for an event to happen but did not want other people to have to wait for it too? That is exactly what conditions are for in threaded programming. Before Java 1.5, this was accomplished via the synchronized-wait-notify technique. This can be preformed by the lock-condition technique. Take a trip with me and I can show one how this works. Imagine a situation where there is a non-thread safe list and it has a producer and a consumer writing and reading from it. Obviously, there are critical sections that need to be protected. That falls into the lap of a lock. After a lock is established, critical work can begin. The only problem is that the resource in a state that is useless to the thread. For example, a consumer cannot pull entries from an empty list. A producer cannot put entries on a full list. This is where a condition comes in. The producer or consumer will enter a while loop that tests for the condition that is favorable and call condition.await(). Once await is called, the thread gives up its lock and lets other threads access their critical sections. The awaiting thread will get the lock back to test for its condition and may await some more or the condition is satisfied and starts doing work. Once the critical section is complete, the thread can call signal() or signalAll() to tell the other threads to wake up and check their conditions. Conditions are created by the lock instead of the Hazelcast instance. Another thing is that if one wants the condition to be distributed, one must use the lock.newCondition(String name) method. IConditions have one synchronous backup and cannot be configured.

I cannot tell one how many deadlocks can occur using this technique. Sometimes the signal comes when the thread is waiting and everything is good. The other side is that the signal is sent when the thread is not waiting, enters the wait state and it waits forever. For this reason, I advocate using a timeout while waiting so the thread can check every once in a while if the condition has been met. That way if the signal misses, the worst that can happen is a little waiting time instead of forever waiting. I used the timeout technique in my example. Copy and paste the code as much as one wants. I would rather have tested techniques being used rather than untested code invading the Internet.

ICountDownLatch

A ICountDownLatch is a synchronizing tool that triggers when its counter goes to zero. This is not a common way to do coordinate but it is there when needed. The example section, I think, provides a much better explanation of how it works. The latch can be reset after it goes to zero so it can be used over again. If the owning member goes away, all of the threads waiting for the latch to strike zero are signaled as if it zero has been achieved. The ICountDownLatch is backed up synchronously in one other place and cannot be configured.

ISemaphore

Yes, there is a distributed version of the classic semaphore. This is exciting to me be because last time I went to a Operating System class, semaphores needed a bit of hardware support. Maybe I just dated myself, oh well, it is still cool (again dating myself). Semaphores work by limiting the number of threads that can access a resource. Unlike locks, semaphores have no sense of ownership so different threads can release the claim on the resource. Unlike the rest of the primitives, the ISemaphore can be configured. I configure one in my example. It is in the hazelcast.xml in the default package of my project.

Examples

Here are the examples. I had a comment about my last post asking me to indent my code so it is more readable. I will do that for sure this time because of the amount of code I am posting. One will see a couple of things that I have not discussed before. One is the IExecutorService. This is a distributed version of the ExecutorService. One can actually sent jobs off to be completed by different members. Another thing is that all of the Runnable/Callable classes that are defined implement Serializable. This is necessary in a distributed environment because the object can be sent across to different members. The last thing is the HazelcastInstanceAware interface. It allows a class to access the local Hazelcast instance. Then the class can get instances of the resources it needs (like ILists). Without further ado, here we go.

IAtomicLong

package hazelcastprimitives.iatomiclong;

import com.hazelcast.core.Hazelcast;
import com.hazelcast.core.HazelcastInstance;
import com.hazelcast.core.IAtomicLong;
import com.hazelcast.core.IFunction;
import java.io.Serializable;

/**
 *
 * @author Daryl
 */
public class IAtomicLongExample {
    public static class MultiplyByTwoAndSubtractOne 
        implements IFunction, Serializable {

        @Override
        public Long apply(Long t) {
            return (long)(2 * t - 1);
        }
        
    }
    
    public static final void main(String[] args) {
        HazelcastInstance instance = Hazelcast.newHazelcastInstance();
        final String NAME = "atomic";
        IAtomicLong aLong = instance.getAtomicLong(NAME);
        IAtomicLong bLong = instance.getAtomicLong(NAME);
        aLong.getAndSet(1L);
        System.out.println("bLong is now: " + bLong.getAndAdd(2));
        System.out.println("aLong is now: " + aLong.getAndAdd(0L));
        
        MultiplyByTwoAndSubtractOne alter = new MultiplyByTwoAndSubtractOne();
        aLong.alter(alter);
        System.out.println("bLong is now: " + bLong.getAndAdd(0L));
        bLong.alter(alter);
        System.out.println("aLong is now: " + aLong.getAndAdd(0L));
        
        System.exit(0);
    }
}

Notice that even the MutilpyAndSubtractOne class implements Serializable.

IdGenerator

package hazelcastprimitives.idgenerator;

import com.hazelcast.core.Hazelcast;
import com.hazelcast.core.HazelcastInstance;
import com.hazelcast.core.IdGenerator;

/**
 *
 * @author Daryl
 */
public class IdGeneratorExample {
 
    public static void main(String[] args) {
        HazelcastInstance instance = Hazelcast.newHazelcastInstance();

        IdGenerator generator = instance.getIdGenerator("generator");
        
        for(int i = 0; i < 10; i++) {
            System.out.println("The generated value is " + generator.newId());
        }
        
        instance.shutdown();
        System.exit(0);
    }
}

ILock

This ILock example can also be considered an ICondition example. I had to use a condition because the ListConsumer was always running before the ListProducer so I made the ListConsumer wait until the IList had something to consume.

package hazelcastprimitives.ilock;

import com.hazelcast.core.Hazelcast;
import com.hazelcast.core.HazelcastInstance;
import com.hazelcast.core.HazelcastInstanceAware;
import com.hazelcast.core.ICondition;
import com.hazelcast.core.IExecutorService;
import com.hazelcast.core.IList;
import com.hazelcast.core.ILock;
import java.io.Serializable;
import java.util.concurrent.TimeUnit;

/**
 *
 * @author Daryl
 */
public class ILockExample {

    static final String LIST_NAME = "to be locked";
    static final String LOCK_NAME = "to lock with";
    static final String CONDITION_NAME = "to signal with";
    /**
     * @param args the command line arguments
     */
    public static void main(String[] args) {
        HazelcastInstance instance = Hazelcast.newHazelcastInstance();
        IExecutorService service = instance.getExecutorService("service");
        ListConsumer consumer = new ListConsumer();
        ListProducer producer = new ListProducer();
        
        try {
            service.submit(producer);
            service.submit(consumer);
            Thread.sleep(10000);
        } catch(InterruptedException ie){
            System.out.println("Got interrupted");
        } finally {
            instance.shutdown();
        }
    }
    
    public static class ListConsumer implements Runnable, Serializable, HazelcastInstanceAware {

        private transient HazelcastInstance instance;
        
        @Override
        public void run() {
            ILock lock = instance.getLock(LOCK_NAME);
            ICondition condition = lock.newCondition(CONDITION_NAME);
            IList list = instance.getList(LIST_NAME);
            lock.lock();
            try {
                while(list.isEmpty()) {
                    condition.await(2, TimeUnit.SECONDS);
                }
                while(!list.isEmpty()) {
                    System.out.println("value is " + list.get(0));
                    list.remove(0);
                }
            } catch(InterruptedException ie) {
                System.out.println("Consumer got interrupted");
            } finally {
                lock.unlock();
            }
            System.out.println("Consumer leaving");
        }

        @Override
        public void setHazelcastInstance(HazelcastInstance hazelcastInstance) {
            instance = hazelcastInstance;
        }
        
    }
    
    public static class ListProducer implements Runnable, Serializable, HazelcastInstanceAware {
        private transient HazelcastInstance instance;

        @Override
        public void run() {
            ILock lock = instance.getLock(LOCK_NAME);
            ICondition condition = lock.newCondition(CONDITION_NAME);
            IList list = instance.getList(LIST_NAME);
            lock.lock();
            try {
                for(int i = 1; i <= 10; i++){
                    list.add(i);
                }
                condition.signalAll();
            } finally {
                lock.unlock();
            }
            System.out.println("Producer leaving");
        }

        @Override
        public void setHazelcastInstance(HazelcastInstance hazelcastInstance) {
            instance = hazelcastInstance;
        }
        
    }
}

ICondition

Here is the real ICondition example. Notice how the SpunProducer and SpunConsumer both share the same ICondition and signal each other. Note I am making use of timeouts to prevent deadlocks.

package hazelcastprimitives.icondition;

import com.hazelcast.core.Hazelcast;
import com.hazelcast.core.HazelcastInstance;
import com.hazelcast.core.HazelcastInstanceAware;
import com.hazelcast.core.ICondition;
import com.hazelcast.core.IExecutorService;
import com.hazelcast.core.IList;
import com.hazelcast.core.ILock;
import java.io.Serializable;
import java.util.concurrent.TimeUnit;

/**
 *
 * @author Daryl
 */
public class IConditionExample {
    
    static final String LOCK_NAME = "lock";
    static final String CONDITION_NAME = "condition";
    static final String SERVICE_NAME = "spinderella";
    static final String LIST_NAME = "list";
    
    public static final void main(String[] args) {
        HazelcastInstance instance = Hazelcast.newHazelcastInstance();
        
        IExecutorService service  = instance.getExecutorService(SERVICE_NAME);
        service.execute(new SpunConsumer());
        service.execute(new SpunProducer());
        
        
        
        try {
            Thread.sleep(10000);

        } catch(InterruptedException ie) {
            System.out.println("Hey we got out sooner than I expected");
        } finally {
            instance.shutdown();
            System.exit(0);
        }
    }
    
    public static class SpunProducer implements Serializable, Runnable, HazelcastInstanceAware {

        private transient HazelcastInstance instance;
        private long counter = 0;
        
        @Override
        public void run() {
            ILock lock = instance.getLock(LOCK_NAME);
            ICondition condition = lock.newCondition(CONDITION_NAME);
            IList list = instance.getList(LIST_NAME);
            
            lock.lock();            
            try {
                if(list.isEmpty()) {
                    populate(list);
                    System.out.println("telling the consumers");
                    condition.signalAll();
                }
                for(int i = 0; i < 2; i++) {
                    while(!list.isEmpty()) {
                        System.out.println("Waiting for the list to be empty");
                        System.out.println("list size: " + list.size() );
                        condition.await(2, TimeUnit.SECONDS);
                    }  
                    populate(list);
                    System.out.println("Telling the consumers");
                    condition.signalAll();
                }
            } catch(InterruptedException ie) {
                System.out.println("We have a found an interuption");
            } finally {
                condition.signalAll();
                System.out.println("Producer exiting stage left");
                lock.unlock();
            }
        }

        @Override
        public void setHazelcastInstance(HazelcastInstance hazelcastInstance) {
            instance = hazelcastInstance;
        }
        
        private void populate(IList list) {
            System.out.println("Populating list");
            long currentCounter = counter;
            for(; counter < currentCounter + 10; counter++) {
                list.add(counter);
            }
        }
    }
    
    public static class SpunConsumer implements Serializable, Runnable, HazelcastInstanceAware {

        private transient HazelcastInstance instance;
        
        @Override
        public void run() {
            ILock lock = instance.getLock(LOCK_NAME);
            ICondition condition = lock.newCondition(CONDITION_NAME);
            IList list = instance.getList(LIST_NAME);
            
            lock.lock();            
            try {
                for(int i = 0; i < 3; i++) {
                    while(list.isEmpty()) {
                        System.out.println("Waiting for the list to be filled");
                        condition.await(1, TimeUnit.SECONDS);
                    }
                    System.out.println("removing values");
                    while(!list.isEmpty()){
                        System.out.println("value is " + list.get(0));
                        list.remove(0);
                    }
                    System.out.println("Signaling the producer");
                    condition.signalAll();
                }
            } catch(InterruptedException ie) {
                System.out.println("We had an interrupt");
            } finally {
                System.out.println("Consumer exiting stage right");
                condition.signalAll();
                lock.unlock();
            }
        }

        @Override
        public void setHazelcastInstance(HazelcastInstance hazelcastInstance) {
            instance = hazelcastInstance;
        }
    }

}

ICountDownLatch

package hazelcastprimitives.icountdownlatch;

import com.hazelcast.core.Hazelcast;
import com.hazelcast.core.HazelcastInstance;
import com.hazelcast.core.HazelcastInstanceAware;
import com.hazelcast.core.ICountDownLatch;
import com.hazelcast.core.IExecutorService;
import com.hazelcast.core.IList;
import com.hazelcast.core.ILock;
import java.io.Serializable;
import java.util.concurrent.TimeUnit;

/**
 *
 * @author Daryl
 */
public class ICountDownLatchExample {
    static final String LOCK_NAME = "lock";
    static final String LATCH_NAME = "condition";
    static final String SERVICE_NAME = "spinderella";
    static final String LIST_NAME = "list";
    
    public static final void main(String[] args) {
        HazelcastInstance instance = Hazelcast.newHazelcastInstance();
        
        IExecutorService service  = instance.getExecutorService(SERVICE_NAME);
        service.execute(new SpunMaster());
        service.execute(new SpunSlave());
        
        
        
        try {
            Thread.sleep(10000);

        } catch(InterruptedException ie) {
            System.out.println("Hey we got out sooner than I expected");
        } finally {
            instance.shutdown();
            System.exit(0);
        }
    }
    
    public static class SpunMaster implements Serializable, Runnable, HazelcastInstanceAware {

        private transient HazelcastInstance instance;
        private long counter = 0;
        
        @Override
        public void run() {
            ILock lock = instance.getLock(LOCK_NAME);
            ICountDownLatch latch = instance.getCountDownLatch(LATCH_NAME);
            IList list = instance.getList(LIST_NAME);
            
            lock.lock();            
            try {
                latch.trySetCount(10);
                populate(list, latch);
            } finally {
                System.out.println("Master exiting stage left");
                lock.unlock();
            }
        }

        @Override
        public void setHazelcastInstance(HazelcastInstance hazelcastInstance) {
            instance = hazelcastInstance;
        }
        
        private void populate(IList list, ICountDownLatch latch) {
            System.out.println("Populating list");
            long currentCounter = counter;
            for(; counter < currentCounter + 10; counter++) {
                list.add(counter);
                latch.countDown();
            }
        }
    }
    
    public static class SpunSlave implements Serializable, Runnable, HazelcastInstanceAware {

        private transient HazelcastInstance instance;
        
        @Override
        public void run() {
            ILock lock = instance.getLock(LOCK_NAME);
            ICountDownLatch latch = instance.getCountDownLatch(LATCH_NAME);
            IList list = instance.getList(LIST_NAME);
            
            lock.lock();            
            try {
                if(latch.await(2, TimeUnit.SECONDS)) {
                    while(!list.isEmpty()){
                        System.out.println("value is " + list.get(0));
                        list.remove(0);
                    }

                }
            } catch(InterruptedException ie) {
                System.out.println("We had an interrupt");
            } finally {
                System.out.println("Slave exiting stage right");
                lock.unlock();
            }
        }

        @Override
        public void setHazelcastInstance(HazelcastInstance hazelcastInstance) {
            instance = hazelcastInstance;
        }
    }

}

ISemaphore

Configuration

Here is the ISemaphore configuration:

<?xml version="1.0" encoding="UTF-8"?>
<hazelcast
xsi:schemaLocation ="http://www.hazelcast.com/schema/config
http://www.hazelcast.com/schema/config/hazelcast-config-3.0.xsd "
xmlns ="http://www.hazelcast.com/schema/config "
xmlns:xsi ="http://www.w3.org/2001/XMLSchema-instance">
    <network>
        <join><multicast enabled="true"/></join>
    </network>
    
    <semaphore name="to reduce access">
        <initial-permits>3</initial-permits>
    </semaphore>
</hazelcast>

Example Code

package hazelcastprimitives.isemaphore;

import com.hazelcast.core.Hazelcast;
import com.hazelcast.core.HazelcastInstance;
import com.hazelcast.core.HazelcastInstanceAware;
import com.hazelcast.core.IExecutorService;
import com.hazelcast.core.ISemaphore;
import com.hazelcast.core.IdGenerator;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;

/**
 *
 * @author Daryl
 */
public class ISemaphoreExample {
    static final String SEMAPHORE_NAME = "to reduce access";
    static final String GENERATOR_NAME = "to use";
    /**
     * @param args the command line arguments
     */
    public static void main(String[] args) {
        HazelcastInstance instance = Hazelcast.newHazelcastInstance();
        IExecutorService service = instance.getExecutorService("service");
        List<Future> futures = new ArrayList(10);
        try {
            for(int i = 0; i < 10; i++) {
                futures.add(service.submit(new GeneratorUser(i)));
            }
            // so I wait til the last man.  No this may not be scalable.
            for(Future future: futures) {
                future.get();
            }
        } catch(InterruptedException ie){
            System.out.printf("Got interrupted.");
        } catch(ExecutionException ee) {
            System.out.printf("Cannot execute on Future. reason: %s\n", ee.toString());
        } finally {
            service.shutdown();
            instance.shutdown();
        }

    }
    
    static class GeneratorUser implements Callable, Serializable, HazelcastInstanceAware {
        private transient HazelcastInstance instance;
        private final int number;
        
        public GeneratorUser(int number) {
            this.number = number;
        }
        
        @Override
        public Long call() {
            ISemaphore semaphore = instance.getSemaphore(SEMAPHORE_NAME);
            IdGenerator gen = instance.getIdGenerator(GENERATOR_NAME);
            long lastId = -1;
            try {
                semaphore.acquire();
                try {
                    for(int i = 0; i < 10; i++){
                        lastId = gen.newId();
                        System.out.printf("current value of generator on %d is %d\n", number, lastId);
                        Thread.sleep(1000);
                    }
                } catch(InterruptedException ie) {
                    System.out.printf("User %d was Interrupted\n", number);
                } finally {
                    semaphore.release();
                }
            } catch(InterruptedException ie) {
                System.out.printf("User %d Got interrupted\n", number);
            }
            System.out.printf("User %d is leaving\n", number);
            return lastId;
        }

        @Override
        public void setHazelcastInstance(HazelcastInstance hazelcastInstance) {
            instance = hazelcastInstance;
        }
        
    }

}

Conclusion

Hazelcast’s primitives were discussed in this post. Most if not all of them revolved around thread coordination. Explanations of the primitive and personal experience were shared. In the examples, the different types of coordination were shown. The examples can be downloaded via subversion at http://darylmathisonblog.googlecode.com/svn/trunk/HazelcastPrimitives.

References

Subscribe
Notify of
guest

This site uses Akismet to reduce spam. Learn how your comment data is processed.

0 Comments
Inline Feedbacks
View all comments
Back to top button