Enterprise Java

Error handling in Storm Trident topologies

This post summarizes my current approach to error handling when designing Storm Trident topologies. I focus here on code design, not on deployment good practices like supervision nor redundancy.

Because of the real-time stream nature of Storm, when facing most kinds of error we’ll ultimately have to move on to the next piece of data. Error handling in that context boils down to reporting this error (or not) and retrying to process the failed input data later (or not). Part 1 of this post is about this aspect.

This implies that when processing a tuple, it is in general hard to be sure if it’s the first time we encounter it or if its content has already been partially applied to persistence. We therefore need to make our state update operations idempotent, which is the subject of the second part of this post.

Don’t be impressed by the size of this post, Storm is actually doing most of the work for us. All that is required really is understanding how it all fits togheter to plug things in a way that makes sense.

This post is based on Storm 0.9, Cassandra 2.0.4 and Kafka 0.7. I have put a toy project on github to illustrate several points discussed below. This project is actually adapted from the “room presence” example I introduced in a previous post.

Part 1: handling erroneous situation

Deciding when to ask to retry

A first simple error handling strategy is simply accepting the degradation of the computation quality caused by runtime errors. This could be the case for example if the topology is computing some real-time trend estimate on a sliding window over a very recent past, or if we are working on sampled data already, like the twitter public stream. If we chose to ignore such errors, the implementation is trivially simple, just wrap the topology logic with a big fat try/catch, report the errors somehow and do not let anything bubble up to Storm .

Most of the time however, we care about consistency and must make a careful decision about attempting to retry or not the failed data.

A typical example of runtime error is inbound data format issues. In that case retrying is of course pointless since it’s not going to get better the second time. We should log the faulting data instead and maybe ask some humans to investigate. Here is a simplistic example from the BytesToString Storm function from my toy project:

public class BytesToString extends BaseFunction {
@Override
     public void execute(TridentTuple tuple, TridentCollector tridentCollector) { 
     try { 
         String asString = new String((byte[]) tuple.getValueByField("bytes"), "UTF-8"); 
         tridentCollector.emit(new Values(asString)); 
     } catch (UnsupportedEncodingException e) { 
         logger.err("ERROR: lost data: unable to parse inbound message from Kafka (expecting UTF-8 string)", e); 
     } 
 }

On the other side, if an error is related to some unreachable external data sources, caused for example by a network partition, we should trigger a retry as described in the next section.

There are many other kinds of errors than the two mentioned above but the point remains: it’s important to distinguish re-tryable from non-retryable errors and react accordingly.

As a final note, be very careful when you decide not to report an error that occurred within a multiget of IBackingMap, because that function must return a list of the same size as the input list of keys. So in case of non retried error, we must return some result one way or another. Most of the time, if we choose not to retry errors in that case, it’s because something is already broken in persistence due to some past error and it’s too late to correct it. In the example below, the error occurs due to failed parsing of some data read from DB and the code just returns null values instead, which is equivalent to considering there is nothing in persistence (nothing useful at least). See also part 3 below for a possible solution for that case.

@Override 
 public List<OpaqueValue> multiGet(List<List<Object>> keys) { 
    try { 
        return Utils.opaqueStringToOpaqueValues(opaqueStrings, HourlyTimeline.class); 
    } catch (IOException e) { 
        logger.err("error while trying to deserialize data from json => giving up (data is lost!)", e); 
        return Utils.listOfNulls(keys.size()); // this assumes previous state does not exist => destroys data! 
    } 
}

(well, this code from TimelineBackingMap actually replaces all data with nulls, which makes things worse, but it’s a toy project…)

Causing a Trident tuple to be replayed…

Once we have decided it makes sense to trigger a tuple replay, we just have to ask for it and Storm will do everything else (just plug the correct spout, cf next section). Technically, this is very simple: triggering a retry from within a Trident primitive like Function or Filter is as simple as throwing FailedException, like in the TimeLineBackingMap from my toy project, which includes example of retried and non-retried errors (Note that the code below from TimelineBackingMap assumes that any DB error is retryable, which is an over-simplification):

@Override 
 public void multiPut(List<List<Object>> keys, List<OpaqueValue> timelines) {;
  List<OpaqueValue> jsonOpaqueTimelines; 
  try { 
      jsonOpaqueTimelines = Utils.opaqueValuesToOpaqueJson(timelines); 
  } catch (IOException e) { 
      System.err.println("error while trying to serialize data to json => giving up (data is lost!)"); 
      return; 
  }
  if (jsonOpaqueTimelines != null) { 
      try { 
          DB.put("room_timelines", toSingleKeys(keys), jsonOpaqueTimelines); 
      } catch (Exception e) { 
          logger.err("error while storing timelines to cassandra, triggering a retry...", e); 
          throw new FailedException("could not store data into Cassandra, triggering a retry...", e); 
      } 
   } 
};

Storm will then propagate the error back to the spout in order to force a tuple replay. If we want the error to be reported in Storm UI, we can throw ReportedFailedException instead.

Another way, which I strongly discourage, is to let any other kind of RuntimeException bubble up to Storm. This essentially achieves the same result at much higher performance cost: it will trigger a worker node crash with automatically restart by Nimbus and all spout will resume reading from the latest known successful index (spout implementation like Kafka spout store their latest successfully processed offset in zookeeper for that purpose). This fail-fast strategy is part of the design of Storm (see documentation on worker supervision and fault-tolerance). In essence this achieves the same consistency guarantees as letting the spout replay some tuples, but the performance impact is of course bigger since we have a full JVM restart with a reset of all currently running topology instances. So never do that on purpose. Still it’s reassuring to be aware that if our nodes crash, the data is not broken and the flow will naturally continue.

A third situation when Storm decides to replay tuples is if they fail to reach the end of the topology before a configured timeout. More precisely, this mechanism is actually triggered by the spout which emitted the tuple if no ACK is received on time, so those replays could also be triggered in case the tuples are processed successfully but the ACK fail to reach the spout due to some network partition. The Storm parameters to control this are topology.enable.message.timeouts and topology.message.timeout.secs, and their default value according to defaults.yaml are “true” and 30 seconds. This is just one more reason why idempotence in our topologies is so important.

… and actually replaying tuples

Once the failure notification reaches the spout (or is generated by it, in case of timeout), we need to make sure the failed tuples will be replayed. Unless you are developing your own spout, this just boils down to choosing the right spout flavor. This choice impacts the way tuples are replayed (or not), so it must be aligned with the strategy in place to handle the replayed tuples within the topology, which is the subject of the next section. There exists 3 kinds of spouts:

  • non transactional: no guarantee, but they can still be useful in some cases if the implementation you choose offers “at-least-once” guarantee
  • transactional: not recommended because they might block the topology in some partition cases
  • opaque: offers a weak-ish guarantee on replay in the sense that they reach tuple will be played at least once but in case of replay the emitted batches might not be identical. In practice all that matters when using those, which I recommend, is to make sure that topology is robust for this kind of flexible replays, which is discussed int he next section.

A final note on tuple and batch replay

I discuss above at tuple level because this makes design decisions simpler. In reality, asking Storm to replay a single tuple will trigger a replay of many other tuples contained in the same batch, some of them potentially error-free.

Part 2: idempotent handling of replayed tuples

The other side of the story is, now that we know tuples could possibly be processed several times, making sure the topology is idempotent, i.e. sending several times the same tuples at it won’t make the state inconsistent. The parts of a topology that have no side effect are of course not impacted by tuple replays.

Storm Trident documentation on state consistency is quite clear, so I’m just adding a bit of salt here.

In case our state update operations are already idempotent

If a state update operation is already idempotent by nature, then it’s already resilient to tuples replays and none of the Storm special mechanism is required.

This is the case of any “store by id” operation if the id value is fully based on inbound tuple content. For example, in my toy project I store occupancy sessions whose primary key is derived from a correlation id found in the inbound events, so in that case the write operation is already replay-ready because any replay will just overwrite an existing data with the same information without any data destruction (assuming we have ordering guarantee, which is true in this case).

public void multiPut(List<List<Object>> keys, List<RoomPresencePeriod> newOrUpdatedPeriods) { 
    DB.upsertPeriods(newOrUpdatedPeriods); 
}

and in CassandraDB.java:

try { 
    PreparedStatement statement = getSession().prepare("INSERT INTO presence (id, payload) values (?,?)"); 
    execute(new BoundStatement(statement).bind(rpp.getId(), periodJson)); 
 } catch (Exception e) { 
    logger.error("error while contacting Cassandra, triggering a retry...", e); 
    new FailedException("error while trying to record room presence in Cassandra ", e); 
 }

Making read-update-write operation idempotent as well

I described in a previous blog post how Storm enables us to implement operations that performs the following without requiring DB-locks and still avoid race conditions:

  • read previous state from DB,
  • update state in memory according to new tuples data,
  • save new state to DB

One beauty of storm is that in order to handle replayed tuples without destroying state we only have to adapt steps 1 and 3. This is super important: we can now implement all our processing logic in step 2 as if each tuple were played only once and not caring at all about replays (as long as we are “pure”, see remark below…). This is what they mean by “storm has exactly once semantic”.

Moreover, if we have an in-house implementation of 1 and 3, making them replay-ready is just a matter of wrapping them with existing Storm classes. The most robust way of doing so is using the Opaque logic, at the cost of storing twice each state, as explained in Trident documentation on transaction spout.

Even better, there already exists plenty of Opaque BackingMap implementation for many backends like Cassandra or Mysql in storm-contrib, so most of the time there is really nothing to do apart from choosing the right one.

The most important point to take away is that in order to use an Opaque BackingMap that handles replayed-tuples, we must use a spout that respects opaque pre-conditions, as summarized in this matrix.

In case we need to implement our own BackingMap for some reason, the only thing we have to do is making it store a current and previous version of the data and a transaction id. Here is a simplistic example from my toy project (but really, consider storm-contrib before coding something like that):

public void put(String table, List<String> keys, List<OpaqueValue> opaqueStrings) {;
    // this should be optimized with C* batches... 
    for (Pair<String, OpaqueValue> keyValue : Utils.zip(keys, opaqueStrings)) { 
         PreparedStatement statement = getSession().prepare(format("INSERT INTO %s (id, txid, prev, curr) values (?, ?, ?, ?)", table)); 
         OpaqueValue opaqueVal = keyValue.getValue(); 
         execute(new BoundStatement(statement).bind(keyValue.getKey(), opaqueVal.getCurrTxid(), opaqueVal.getCurr(), opaqueVal.getPrev())); 
     } 
}
public List<OpaqueValue> get(String table, List<String> keys) {;
    List<OpaqueValue> vals = new ArrayList<>(keys.size()); 
    ResultSet rs = execute(format("select id, txid, prev, curr from %s where id in ( %s ) ", table, toCsv(keys) )); 
    Map<String, OpaqueValue> data = toMapOfOpaque(rs); 
    for (String key: keys){ 
        vals.add(data.get(key)); 
    }
    return vals; 
 }

Then the only thing to do to actually obtain the exactly-once semantic of Trident is to wrap it within an OpaqueMap, like this:

public static StateFactory FACTORY = new StateFactory() { 
    public State makeState(Map conf, IMetricsContext metrics, int partitionIndex, int numPartitions) { 
         return OpaqueMap.build(new TimelineBackingMap(new CassandraDB(conf))); 
    } 
 }

What’s happening behind the scene is that the OpaqueMap will choose which previously stored state (“curr” or “prev”) to expose to our update logic depending on the transaction id associated with the current batch of tuples and the one found in persistence. This transaction ID is provided by the spout, so this is the reason why keeping spout and state choices aligned is so important: state make assumption about the meaning of each transaction ids.

Do not break the previous instance!

Let’s come back a minute to step 2 of the read-update-write sequence mentioned above. Now that we know that an Opaque logic needs to store both the new and old version of any state, look at the following Reducer code and try to determine why it’s broken:

public RoomPresencePeriod reduce(RoomPresencePeriod curr, TridentTuple tuple) { 
     LocationChangedEvent event = (LocationChangedEvent) tuple.getValueByField("occupancyEvent");;
     if (ENTER == event.getEventType()) { 
         curr.setStartTime(event.getTime());            // buggy code 
     } else { 
         curr.setEndTme(event.getTime());              // buggy code 
     } 
     return curr; 
 }

Adepts of functional programming call this an “impure” method because it modifies its input parameters. The reason it breaks Storm opaque logic is that now both the “current” and “previous” java references actually refer to the same instance in memory. So that when the opaque logic is persisting both the previous and current version of some piece of state, really it’s saving twice the new version, and the previous version is therefore lost.

A better implementation could be something like this

public RoomPresencePeriod reduce(RoomPresencePeriod curr, TridentTuple tuple) { 
     LocationChangedEvent event = (LocationChangedEvent) tuple.getValueByField("occupancyEvent");;
     RoomPresencePeriod updated = new RoomPresencePeriod(curr);  // copy constructor 
     if (ENTER == event.getEventType()) { 
        updated.setStartTime(event.getTime()); 
     } else { 
         updated.setEndTme(event.getTime()); 
     } 
     return updated; 
 }

Part 3: human errors: replay all

As a final note, one has to realize with humility that no matter how much efforts and safeguards like those described above we put in place, we will still deploy bugs in production (and be sorry about that, I swear!). In case of a data processing platform, bugs potentially mean data-destroying bugs, which is bad when data is our business. In some cases, we’ll only discover data is broken after the fact, like explained on the note on multiget above.

In his Big Data book, Nathan Marz describes a simple replay-all idea based on the lambda architecture to work around that idea. A short summary of that book is also available here.
 

Reference: Error handling in Storm Trident topologies from our JCG partner Svend Vanderveken at the Svend blog blog.
Subscribe
Notify of
guest

This site uses Akismet to reduce spam. Learn how your comment data is processed.

0 Comments
Inline Feedbacks
View all comments
Back to top button