Enterprise Java

Gentle Introduction to Hystrix – Wrapup

This is a follow up to two other posts – Motivation for why something like Hystrix is needed in a distributed systems and a basic intro to Hystrix.

This will be a wrap of my Hystrix journey with details of various properties that can be tweaked to change the behavior of Hystrix and will touch on a few advanced concepts

Tweaking Hystrix Behavior

Hystrix configuration is explained in this wiki here, in brief two broad groups control the properties of Hystrix,

  1. Command Properties
  2. ThreadPool properties

The properties follow an order of precedence that is explained in the wiki, here I will concentrate on ones specified through a properties file.

For a sample Command defined the following way:

public class HelloWorldCommand extends HystrixCommand<String> {

    private static final Logger logger = LoggerFactory.getLogger(HelloWorldCommand.class);

    private final String name;

    public HelloWorldCommand(String name) {
        super(HystrixCommandGroupKey.Factory.asKey("default"));
        this.name = name;
    }

    @Override
    protected String run() throws Exception {
        logger.info("HelloWorld Command Invoked");
        return "Hello " + name;
    }
}

First behavior that can be tweaked is whether to execute the command in a thread pool or the same thread of execution as the caller(SEMAPHORE strategy type). If the execution is in a threadpool, then a timeout for the request can be set.

hystrix.command.HelloWorldCommand.execution.isolation.strategy=THREAD
hystrix.command.HelloWorldCommand.execution.isolation.thread.timeoutInMilliseconds=1000

The second behavior is the Circuit breaker which works based on information collected during a rolling window of time, configured this way, say for 10 seconds:

hystrix.command.HelloWorldCommand.metrics.rollingStats.timeInMilliseconds=10000

In this window if a certain percent of failures(say 50%) happen for a threshold of requests(say 20 in 10 seconds) then the circuit is broken, with a configuration which looks like this:

hystrix.command.HelloWorldCommand.circuitBreaker.requestVolumeThreshold=20
hystrix.command.HelloWorldCommand.circuitBreaker.errorThresholdPercentage=50

Once a circuit is broken, it stays that way for a time set the following way, 5 seconds in this instance:

hystrix.command.HelloWorldCommand.circuitBreaker.sleepWindowInMilliseconds=5000

The threadpool settings are controlled using the Group Key that was specified, called default in this sample. A specific “Threadpool Key” could also have been specified as part of the constructor though.

hystrix.threadpool.default.coreSize=10
hystrix.threadpool.default.queueSizeRejectionThreshold=5

Here 10 commands can potentially be run in parallel and another 5 held in a queue beyond which the requests will be rejected.

Request Collapsing

Tomaz Nurkiewicz in his blog site NoBlogDefFound has done an excellent job of explaining Request Collapsing . My example is a little simplistic, consider a case where a lot of requests are being made to retrieve a Person given an id, the following way:

public class PersonService {

    public Person findPerson(Integer id) {
        return new Person(id, "name : " + id);
    }

    public List<Person> findPeople(List<Integer> ids) {
        return ids
                .stream()
                .map(i -> new Person(i, "name : " + i))
                .collect(Collectors.toList());
    }
}

The service responds with a canned response but assume that the call was to a remote datastore. Also see that this service implements a batched method to retrieve a list of People given a list of id’s.

Request Collapsing is a feature which would batch multiple user requests occurring over a time period into a single such remote call and then fan out the response back to the user.

A hystrix command which takes the set of id’s and gets the response of people can be defined the following way:

public class PersonRequestCommand extends HystrixCommand<List<Person>>{

    private final List<Integer> ids;
    private final PersonService personService = new PersonService();
    private static final Logger logger = LoggerFactory.getLogger(PersonRequestCommand.class);

    public PersonRequestCommand(List<Integer> ids) {
        super(HystrixCommandGroupKey.Factory.asKey("default"));
        this.ids = ids;
    }

    @Override
    protected List<Person> run() throws Exception {
        logger.info("Retrieving details for : " + this.ids);
        return personService.findPeople(this.ids);
    }
}

Fairly straightforward up to this point, the complicated logic is now in the RequestCollapser which looks like this:

package aggregate.commands.collapsed;

import com.netflix.hystrix.HystrixCollapser;
import com.netflix.hystrix.HystrixCollapserKey;
import com.netflix.hystrix.HystrixCollapserProperties;
import com.netflix.hystrix.HystrixCommand;

import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.function.Function;
import java.util.stream.Collectors;

public class PersonRequestCollapser extends HystrixCollapser<List<Person>, Person, Integer> {

    private final Integer id;
    public PersonRequestCollapser(Integer id) {
        super(Setter.
                withCollapserKey(HystrixCollapserKey.Factory.asKey("personRequestCollapser"))
                .andCollapserPropertiesDefaults(HystrixCollapserProperties.Setter().withTimerDelayInMilliseconds(2000)));
        this.id = id;
    }

    @Override
    public Integer getRequestArgument() {
        return this.id;
    }

    @Override
    protected HystrixCommand<List<Person>> createCommand(Collection<CollapsedRequest<Person, Integer>> collapsedRequests) {
        List<Integer> ids = collapsedRequests.stream().map(cr -> cr.getArgument()).collect(Collectors.toList());
        return new PersonRequestCommand(ids);
    }

    @Override
    protected void mapResponseToRequests(List<Person> batchResponse, Collection<CollapsedRequest<Person, Integer>> collapsedRequests) {
        Map<Integer, Person> personMap = batchResponse.stream().collect(Collectors.toMap(Person::getId, Function.identity()));

        for (CollapsedRequest<Person, Integer> cr: collapsedRequests) {
            cr.setResponse(personMap.get(cr.getArgument()));
        }
    }
}

There are a few things going on here, first the types in the parameterized type signature indicates the type of response(List<Person>), the response type expected by the caller (Person) and the request type of the request(id of the person). Then there are two methods one to create a batch command and the second to map the responses back to the original requests.

Now given this from a users perspective nothing much changes, the call is made as if to a single command and Request Collapsing handles batching, dispatching and mapping back the responses. This is how a sample test looks like:

@Test
public void testCollapse() throws Exception {
    HystrixRequestContext requestContext = HystrixRequestContext.initializeContext();

    logger.info("About to execute Collapsed command");
    List<Observable<Person>> result = new ArrayList<>();
    CountDownLatch cl = new CountDownLatch(1);
    for (int i = 1; i <= 100; i++) {
        result.add(new PersonRequestCollapser(i).observe());
    }

    Observable.merge(result).subscribe(p -> logger.info(p.toString())
            , t -> logger.error(t.getMessage(), t)
            , () -> cl.countDown());
    cl.await();
    logger.info("Completed executing Collapsed Command");
    requestContext.shutdown();
}

Conclusion

There is far more to Hystrix than what I have covered here. It is truly an awesome library, essential in creating a resilient system and I have come to appreciate the amount of thought process that has gone into designing this excellent library.

Reference: Gentle Introduction to Hystrix – Wrapup from our JCG partner Biju Kunjummen at the all and sundry blog.
Subscribe
Notify of
guest

This site uses Akismet to reduce spam. Learn how your comment data is processed.

0 Comments
Inline Feedbacks
View all comments
Back to top button