Enterprise Java

Beginner’s Guide to Hazelcast Part 4

This is the fourth installment of my Hazelcast series. If one has not seen the other 3, I suggest one go to Part 1Part 2 and Part 3.

Logging

Logging is an important feature of any application and my examples are no different. System.out.println can be a useful tool for telling the user what is going on in console applications. But let’s face it, if one is reading how to use a tool for distributed applications, that person is really not a beginner. Seeing a series of logging messages should not scare anyone. In fact, for the examples in this post, they are necessary to know what is going on by whom. We will be talking about threaded programming after all.

The good folks at Hazelcast seem to have agreed that logging is important and so have many different ways to define what library is logging. The logging framework only depends on JDK logging and has a number of adapters that even allow for custom logging frameworks. One’s logging adapter of choice is set by the property, hazelcast.logging.type to the following settings:

  • JDK logging, This is the default.
  • log4j
  • slf4j
  • none

I used Log4j2 so I picked slf4j and put in the four jar files needed to get it working.

Spinning Distributed Threads

Like many classes in Hazelcast, IExecutorService implements an interface from Java’s libraries, the ExecutorService. This interface defines what it is to be a thread pool. The interface is part of the java.util.concurrent package and has been around since Java 1.5. The package also has implementations of it one can access from java.util.concurrent.Executors. I wish I had something like this in Java 1.4 or 1.3 or 1.2 or 1.1. Making thread pools were fun until deadlocks happened. Now I can use Java library’s pools, good enough for me.

ExecutorServices have an interesting “feature.” One must shut them down or the service will not go away. The first time I used them, I caused a memory leak and shutdown the JVM. I caught the bug during my own testing so the customer never had to see my learning experience. IExecutorService has a different wrinkle. The service will not go away until all the threads finish. This caused many unclean shutdowns. You have been warned!

IExecutorServices can share out threads several different ways. Here they are in detail:

Any ‘Ole Instance

This is when one calls just the submit(Callable call).  This does more than just set a thread randomly into the cluster.  It does some load balancing with that thread so an instance does not get clobbered with threads.

To a Particular Member

This is done via the submit(Callable call, Member member) method.  This sends a thread to a particular member of the cluster.   No load balancing here; just sending to a member.  Be careful, one can easily overload a member and really put the brakes on any processing being done.  I could see this as a way to create a custom load balancer.

To a Collection of Members

Yeah, one can send a thread to multiple members.  When i was doing my example coding, the members all act like they got their own thread and are not sharing one.  If one implements Callable<T> as their thread implementation, the method returns a Map of Futures using the members as the key.  If one uses Runnable, it returns nothing.

To The Member With the Right Key

Entries to a IMap can be anywhere on the cluster. If processing is needed to be done on that entry, a local thread would have to pull up the entry over the network. This can be a problem if the entry is very large. A better way would be to transfer the hopefully smaller thread over to the entry. To do that, the cluster needs to know where to send it. Hence, the call submit(Callable call, Object key).

To All Members

This works the same way as submitting to a collection of members but it is all of them, as in every member in the cluster.  This could get “fun” if one has a large number of members in a cluster.  I think I have heard as much as a 1000 members in one cluster.  Make sure this is what one wants before it is called.

Using an ExecutionCallback

This is basically a way to send out some threads and let the results come back asynchronously.  One uses an ExecutionCallback if one thread is submitted.  One uses MultiExecutionCallback if more than one member is involved.

Example Code

Before I start, let me say that I do not have an example for every method in IExecutorService.  I do have an example for every type discussed, however.  Another thing about the example code.  For instructional purposes I have done some copy-and-paste coding in prior posts so each example can stand on its own and one can get a context of what goes where.  I did this quite a bit in part 3.  If one did not notice it, look at it again.

This time I did not do it because there would have been a lot of code copied and the results would have been pretty ugly.  I used an Enum and I think the results were very good.  I thought an Enum was a good choice because of the limited number of examples and allowed me to be able to show the code in chunks that are understandable if the framework was shown first.

With that explanation, lets move on!

Framework

This are the main bits. It consists of the main class and the thread class.  Notice how the main class shows each way a thread can submitted being called.

Main

package hazelcastservice;

import com.hazelcast.core.Hazelcast;
import com.hazelcast.core.HazelcastInstance;
import com.hazelcast.core.IExecutorService;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.RejectedExecutionException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 *
 * @author Daryl
 */
public class Main {
    private static final Logger logger = LoggerFactory.getLogger(Main.class);
    public static final String SERVICE_NAME = "spinnerella";
    public static final int NUM_INSTANCES = 5;
    /**
     * @param args the command line arguments
     */
    public static void main(String[] args) {
        System.setProperty("hazelcast.logging.type", "slf4j");
        List<HazelcastInstance> instances = new ArrayList<>(NUM_INSTANCES);
        for(int i = 0; i < NUM_INSTANCES; i++) {
            instances.add(Hazelcast.newHazelcastInstance());
            logger.info("instance {} up", i);
        }

        IExecutorService spinner = instances.get(0).getExecutorService(SERVICE_NAME);
        try {
            HazelcastIExecutorServiceExamples.TO_SOME_MEMBER.example(instances, spinner);
            HazelcastIExecutorServiceExamples.TO_PARTICULAR_MEMBER.example(instances, spinner);
            HazelcastIExecutorServiceExamples.ON_THE_KEY_OWNER.example(instances, spinner);
            HazelcastIExecutorServiceExamples.ON_A_SET_OF_MEMBERS.example(instances, spinner);
            HazelcastIExecutorServiceExamples.ON_ALL_MEMBERS.example(instances, spinner);
            HazelcastIExecutorServiceExamples.CALLBACK.example(instances, spinner);
            HazelcastIExecutorServiceExamples.MULTIPLE_MEMBERS_WITH_CALLBACK.example(instances, spinner);
            
            //Lets setup a loop to make sure they are all done (Especially the callback ones)
            for(HazelcastIExecutorServiceExamples example: HazelcastIExecutorServiceExamples.values()) {
                while(!example.isDone()) {
                    Thread.sleep(1000);
                }
            }
        } catch(ExecutionException ee) {
            logger.warn("Can't finish the job", ee);
        } catch(InterruptedException ie) {
            logger.warn("Everybody out of the pool", ie);
        } finally {
            // time to clean up my toys
            boolean allClear = false;
            
            while(!allClear) {
                try {
                    Thread.sleep(1000);
                    Hazelcast.shutdownAll();
                    allClear = true;
                } catch(InterruptedException ie) {
                    //got interrupted. try again
                } catch(RejectedExecutionException ree) {
                    logger.debug("caught a RejectedExecutionException");
                    allClear = false;
                }
            }
            
            logger.info("All done");
        }
    }
}

Thread

package hazelcastservice;

import java.io.Serializable;
import java.util.Random;
import java.util.concurrent.Callable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * This class was inspired by the song "I Like to Move it" from the movie 
 * Madagascar by Dreamworks.  I offer NO apologies for using it.  
 * 
 * To those software developers who like consistent results, I used java.util.Random to
 * make it loop inconsistently each time call is called.  
 * 
 * Sometimes you need to make your own entertainment.
 * @author Daryl
 */
public class MoveItMoveIt implements Callable<Integer>, Serializable {
    private static final Logger logger = LoggerFactory.getLogger(MoveItMoveIt.class);
    private static final int UPPER_BOUND = 15;
        
    @Override
    public Integer call() throws Exception {
        Random random = new Random();
        int howMany = random.nextInt(UPPER_BOUND);
//        int howMany = 2;
        for(int i = 0; i < howMany; i++) {
            logger.info("I like to Move it Move it!");
        }
        logger.info("Move it!");
        return howMany;
    }
}

The Particulars

Here I go showing the different types of calls that were discussed. Remember that these are chunks of an Enum class. The done is a protected variable and the public void example(List<HazelcastInstance> instances, IExecutorService spinner) needed to implemented.

Any ‘Ole Instance

TO_SOME_MEMBER() {
        @Override
        public void example(List<HazelcastInstance> instances, IExecutorService spinner)
                throws ExecutionException, InterruptedException {
            logger.info("Submit to some member.");
            Future<Integer> howMany = spinner.submit(new MoveItMoveIt());
            logger.info("It moved it {} times", howMany.get());
            done = true;
        }
    }

To a Particular Member

TO_PARTICULAR_MEMBER {
        @Override
        public void example(List<HazelcastInstance> instances, IExecutorService spinner)
                throws ExecutionException, InterruptedException {
            logger.info("Submit to a particular member.");
            Member member = getRandomMember(instances);
            logger.debug("member is {}", member);
            Future<Integer> howMany = spinner.submitToMember(new MoveItMoveIt(), member);
            logger.info("It moved it {} times.", howMany.get());
            done = true;
        }
        
        private Member getRandomMember(List<HazelcastInstance> instances) {
            Set<Member> members = instances.get(0).getCluster().getMembers();
            int i = 0;
            int max = new Random().nextInt(instances.size());
            Iterator<Member> iterator = members.iterator();
            Member member = iterator.next();
            while(iterator.hasNext() && (i < max)) {
                member = iterator.next();
                i++;
            }
            return member;
        }
    }

To a Collection of Members

ON_A_SET_OF_MEMBERS {
        @Override
        public void example(List<HazelcastInstance> instances, IExecutorService spinner)
            throws ExecutionException, InterruptedException {
            logger.info("Send to some of the members");
            Set<Member> randomMembers = getRandomMembers(instances);
            Map<Member, Future<Integer>> results = 
                    spinner.submitToMembers(new MoveItMoveIt(), randomMembers);
            for(Future<Integer> howMany: results.values()) {
                logger.info("It moved {} times", howMany.get());
            }
            done = true;
        }
        
        private Set<Member> getRandomMembers(List<HazelcastInstance> instances) {
            int max = new Random().nextInt(instances.size());
            Set<Member> newSet = new HashSet<>(instances.size());
            int k = 0;
            Iterator<Member> i = instances.get(0).getCluster().getMembers().iterator();
            while(i.hasNext() && k < max) {
                newSet.add(i.next());
                k++;
            }
            return newSet;
        }
    }

To The Member With the Right Key

ON_THE_KEY_OWNER {
        @Override
        public void example(List<HazelcastInstance> instances, IExecutorService spinner)
            throws ExecutionException, InterruptedException {
            logger.info("Send to the one owning the key");
            HazelcastInstance randomInstance = getRandomInstance(instances);
            IMap<Long, Boolean> map = randomInstance.getMap("default");
            Long one = 1L;
            map.put(one, Boolean.TRUE);
            
            Future<Integer> howMany = spinner.submitToKeyOwner(new MoveItMoveIt(), one);
            logger.info("It moved it {} times.", howMany.get());
            done = true;
        }
        
        private HazelcastInstance getRandomInstance(List<HazelcastInstance> instances) {
            return instances.get(new Random().nextInt(instances.size()));
        }

    }

To All Members

ON_ALL_MEMBERS {
        @Override
        public void example(List<HazelcastInstance> instances, IExecutorService spinner)
            throws ExecutionException, InterruptedException {
            logger.info("Send to all members");
            Map<Member, Future<Integer>> results = 
                    spinner.submitToAllMembers(new MoveItMoveIt());
            for(Future<Integer> howMany: results.values()) {
                logger.info("It moved {} times", howMany.get());
            }
            done = true;
        }
    }

Using an ExecutionCallback

This example code contains two chunks of code to show a single callback and a multiple callback.

CALLBACK {
        @Override
        public void example(List<HazelcastInstance> instances, IExecutorService spinner)
            throws ExecutionException, InterruptedException {
            logger.info("example with a callback");
            spinner.submit(new MoveItMoveIt(), new ExecutionCallback<Integer>() {
                @Override
                public void onResponse(Integer response) {
                    logger.info("It moved {} times", response);
                    done = true;
                }

                @Override
                public void onFailure(Throwable thrwbl) {
                    logger.error("trouble in the callback", thrwbl);
                    done = true;
                }
            });
        }        
    },
    MULTIPLE_MEMBERS_WITH_CALLBACK {
        @Override
        public void example(List<HazelcastInstance> instances, IExecutorService spinner)
            throws ExecutionException, InterruptedException {
            logger.info("running on multiple members with callback");
            spinner.submitToAllMembers(new MoveItMoveIt(), new MultiExecutionCallback() {

                @Override
                public void onResponse(Member member, Object o) {
                    logger.info("member finished with {} moves", o);
                }

                @Override
                public void onComplete(Map<Member, Object> map) {
                    logger.info("All members completed");
                    for(Object value: map.values()) {
                        logger.info("It moved {} times", value);
                    }
                    done = true;
                }
            });
        }

Conclusion

It was good to publish my own code/ideas again on my blog. I took a quick look at the power of the IExecutorService by Hazelcast. My example code followed the DRY principle. The code in its entirety can be found here.

References

As always with my Hazelcast guides, my information comes from Hazelcast documentation that can be found at here.

Subscribe
Notify of
guest

This site uses Akismet to reduce spam. Learn how your comment data is processed.

0 Comments
Inline Feedbacks
View all comments
Back to top button