Core Java

Easy event processing with var, Lombok and Fluxtion

Introduction

In this article I am combining two products Lombok and Fluxtion to demonstrate how tools can reduce both the code written and time to delivery while improving the readability of the code. The use of var from java 10 improves the situation even further. Both products and var use inference at build time to accelerate development.

Fluxtion’s ethos is to minimise waste, our goal here is to remove boiler plate code, reduce code-noise, and simplify integration tasks. We want to expends as little development time as possible while still delivering an efficient and high performance solution capable of processing millions of messages per second.

Using the techniques described I compare a Fluxtion/Lombok implementation to a scala example using Akka streams, the Java version requires less code and is simpler to build.

Housekeeping, apologies for not acknowledging Richard Warburton of 
Opsian, in my first blog.

Code Signal to Noise ratio

When we code we address two main tasks:

  • Translating business requirements into programmatic logic
  • Interfacing the logic with the deployment environment

Ideally we would like to spend all our time on the first and nothing on the second. Additionally the total volume of code written should be reduced as well. Balancing the abstraction while still empowering the developer is not easy, too greater an abstraction and we remove expressive power. I hope to strike a good balance with the approach taken in this article.

Imagine writing some tax calculation logic that takes 50 lines, but writing code for databases, webservers, marshalling, logging etc. requires 1000 lines. Although a demonstration of technical capability there is no business value in the purely technical implementation details. Viewing this from another angle we could see the business logic as a signal and the infrastructure code as noise. The solutions we write can be measured with a signal to noise ratio with respect to useful business logic.

Wikipedia defines signal to noise ratio as:

Signal-to-noise ratio (abbreviated SNR or S/N) is a measure used in
science and engineering that compares the level of a desired signal to the level of background noise. SNR is defined as the ratio of signal power to the noise power, often expressed in decibels. A ratio higher than 1:1 (greater than 0 dB) indicates more signal than noise.

It is desirable to aim for a high SNR ratio in most systems, in programming terms some of the advantages of a high SNR are:

  • Less code to write
  • Easier business logic to understand and maintain
  • Shorter learning curve
  • Simpler debugging/fault finding, less things to get wrong
  • More efficient development

In java we have felt this pressure for better code SNR over the years, moving from heavyweight j2ee container to simpler frameworks like spark and spring boot. The language itself has accommodated this shift by introducing changes such as lambdas, streams, method references and var variable declaration.

Combining Fluxtion and Lombok

Before the example a quick primer on Fluxtion and Lombok.

Fluxtion primer

Fluxtion is an embeddable streaming event processing engine written in Java. The developer describes the processing in a mixture of declarative and imperative forms so Fluxtion can generate a decision engine. The engine is serialised as java code and can be embedded in any java application. The application feeds events into the engine for stream processing.

Engine generation can happen inline in the application or as part of the build process with a maven plugin.

Lombok primer

Lombok is a utility that automatically writes boiler plate code for java classes, saving developers time and reducing code noise. Executing as an annotation processing tool Lombok generates byte code representing the boiler plate code for annotated classes. An incomplete set of Lombok features include:

  • Automatic bean style getter and setter for properties
  • Hash code and equals generated for properties
  • Automatic toString method
  • Automatic constructor for all class properties

Just add Lombok to your maven build and your ide should just work, or it does with netbeans and intellij.

Streaming max temp example

Lets look at a common Fluxtion usage pattern. Subscribe to a stream of events, extract a value from an event, perform a calculation on the value, filter and push a result into a user object. In this simple example we have the following requirements to meet:

  • Listening to temperature events
  • Extract the temperature 
  • Maintain the maximum temperature
  • Pushing the temperature into a user defined instance when there is a new maximum

Clone the repo from github and use this article’s tagged version. The project is here.

git clone --branch  article_lombok_july2019 https://github.com/gregv12/articles.git

cd articles/2019/june/lombok/

mvn clean install

The Fluxtion code to deliver the processing requirements:

select(TempEvent::getTemp)
  .map(max()).notifyOnChange(true)
  .push(new MyTempProcessor()::setMaxTemp);

This gives a high code SNR and low line count, all the code is business logic focused. To achieve this Fluxtion makes use of method references and type inference. The method references allow Fluxtion to infer the desired behaviour, what functions to build, the source and target types and how to pass data from one node to another in the execution graph. The method references give us a pleasant type safe way to express arbitrary logic. It is the inference employed by the tool that removes the load from the developer to explictly express every processing step, giving us a low-code environment to work in.

After Fluxtion generation the serialised streaming event processor is
here, represented as java code. A test for the example is here.

@Test
    public void testTemp() throws Exception{
        EventHandler handler = new InlineLombok().handler();
        ((Lifecycle)handler).init();
        handler.onEvent(new InlineLombok.TempEvent(10));
        handler.onEvent(new InlineLombok.TempEvent(9));
        handler.onEvent(new InlineLombok.TempEvent(17));
        handler.onEvent(new InlineLombok.TempEvent(16));
        handler.onEvent(new InlineLombok.TempEvent(14));
        handler.onEvent(new InlineLombok.TempEvent(24));
        Assert.assertEquals(3, MyTempProcessor.count);
    }

output:

08:08:42.921 [main] INFO  c.f.generator.compiler.SepCompiler - generated sep: D:\projects\fluxtion\articles\2019\june\lombok\target\generated-sources\fluxtion\com\fluxtion\articles\lombok\temperature\generated\lombok\TempMonitor.java
new max temp:10.0
new max temp:17.0
new max temp:24.0
Tests run: 1, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 4.79 sec

Processing graph image:

Looking closer at the first line in the example above, select(TempEvent::getTemp), we can examine the inference that Fluxtion is making. The logic implied here is:

  • Create a subscription for events of type TempEvent, 
  • Add a node that extracts the value of getTemp from the incoming event
  • Make the temp value available as Number property of a Node
  • Notify children of change to temp value when an incoming temperature event is received.

The map, notifyOnChange and push functions are steps added to the execution chain. See the Wrapper interface of Fluxtion streaming module for details. Due to the high SNR it is easy to understand their purpose and effect, but for completeness:

  • map(max()) extract a the number property from the previous node (temperature). Apply the value to a stateful max function when a new value is received. Store the current max value in a node with a Number property. Notifies any child nodes the value of the current max when an event is received.
  •  notifyOnChange A stateful function that triggers when the value monitored has updated and is different to the previous value. Only new max values are propagated to child nodes.
  • push(new MyTempProcessor()::setMaxTemp) Adds a user node, MyTempProcessor, into the execution chain. When triggered by a new max temp pushes the value of the node into setMaxTemp of MyTempProcessor. Carry out all type conversions for primitive types without  generating garbage.

To use method references on the TempEvent we first need to define a getter/setter style accessor method pair. Of course ide’s can generate the required methods, but the SNR will still drop after the generation. Expand this to a larger domain and the problem multiplies. Lombok can come to our rescue here, removing unnecessary code and restoring our SNR.

Before Lombok:

public class InlineNoLombok {
    
    public EventHandler handler() throws Exception {
        return sepInstance(c
                -> select(TempEvent::getTemp)
                        .map(max()).notifyOnChange(true)
                        .push(new MyTempProcessor()::setMaxTemp),
                "com.fluxtion.articles.lombok.temperature.generated.nolombok", "TempMonitor");
    }
    

    public static class TempEvent extends Event {

        private double temp;

        public TempEvent(double temp) {
            this.temp = temp;
        }

        public double getTemp() {
            return temp;
        }

        public void setTemp(double temp) {
            this.temp = temp;
        }
        
    }

}

After Lombok: 

Adding a single @Data annotation removes the getter/setter and the @AllArgsConstructor removes the constructor:

public class InlineLombok {

    public EventHandler handler() throws Exception {
        return sepInstance(c
                -> select(TempEvent::getTemp)
                        .map(max()).notifyOnChange(true)
                        .push(new MyTempProcessor()::setMaxTemp),
                "com.fluxtion.articles.lombok.temperature.generated.nolombok", "TempMonitor");
    }

    @Data
    @AllArgsConstructor
    public static class TempEvent extends Event {
        private double temp;
    }
}

Even with this smallest of examples using Lombok and Fluxtion together the actual business logic is much easier to read. A better code SNR makes the application more efficient to build and easier to understand.

Flight data example

Let’s extend this to a more complex example where the value of a high SNR becomes apparent. In this example we are processing flight data for a whole year. The example was inspired by this  blog, and the code for the akka streaming solution is  here. The summary of requirements:

Process a year’s worth of all US flight landing records stored in CSV format
here.

  • Group the carriers by name
  • Filter records that have a delay > 0
  • Carrier name:column 8, delay: column 14
  • For a carrier grouping calculate:
    • Cumulative sum of total delay
    • Total number of delayed flights
    • Average delay for a flight if it is late
  • Calculate the total count of flights regardless of delay

We need to define data types and processing logic to solve the problem. It would be easy to be overwhelmed by the noise in the solution. But Fluxtion allows us to concentrate on the business logic and Lombok makes the data types easy to work with, both tools using inference to reduce the code to write:

public class FlightAnalyser {

  @SepBuilder(
          name = "FlightDelayAnalyser",
          packageName = "com.fluxtion.articles.lombok.flight.generated"
  )
  public void buildFlightProcessor(SEPConfig cfg) {
    var flightDetails = csvMarshaller(FlightDetails.class, 1)
            .map(14, FlightDetails::setDelay).converter(14, defaultInt(-1))
            .map(8, FlightDetails::setCarrier).converter(8, Converters::intern).build();
    //filter and group by
    var delayedFlight = flightDetails.filter(FlightDetails::getDelay, positive());
    var carrierDelay = groupBy(delayedFlight, FlightDetails::getCarrier, CarrierDelay.class);
    //derived values for a group
    carrierDelay.init(FlightDetails::getCarrier, CarrierDelay::setCarrierId);
    carrierDelay.avg(FlightDetails::getDelay, CarrierDelay::setAvgDelay);
    carrierDelay.count(CarrierDelay::setTotalFlights);
    carrierDelay.sum(FlightDetails::getDelay, CarrierDelay::setTotalDelayMins);
    //make public for testing
    var delayByGroup = cfg.addPublicNode(carrierDelay.build(), "delayMap");
    //dump to console, triggers on EofEvent
    printValues("\nFlight delay analysis\n========================",
            delayByGroup, eofTrigger());
  }

  @Data //input data from CSV
  public static class FlightDetails {
    private String carrier;
    private int delay;
  }

  @Data //derived data
  public static class CarrierDelay {
    private String carrierId;
    private int avgDelay;
    private int totalFlights;
    private int totalDelayMins;
  }

}

Implementation analysis

Lombok allows us to deal with data classes and field types, ignoring the scaffolding of getters/setters. We define an input record, FlightDetails and the grouping summary record, CarrierDelay.

The use of the var keyword for intermediate instance assignment simplifies reading and writing the code.

  • line 8 Fluxtion maps the csv to the FlightDetails type the 1 indicates an initial header line to ignore.
  • line 9 maps column 14 to delay value. An optional converter function maps a missing or non-numeric delay to the value of -1. Type inference by Fluxtion ensures a char to int conversion with zero gc
  • line 10 maps column 8 to carrier name. The carrier name is interned to reduce unnecessary allocation of String objects as we expect the same carrier names to appear many times. Bearing in mind there are 7 million records this will reduce gc pressure massively.
  • line 12 the filter function positive() is applied to the field FlightDetails::getDelay. only delayed flights are processed by child nodes.
  • line 13 filtered records, delayedFlight, are grouped by the key FlightDetails::getCarrier, the target of the group is CarrierDelay.
  • line 15 defines the initialisation function for a new carrier entry into the group,  only called when a new key is allocated in the group.
  • line 16 applies average function to delay and sets the value CarrierDelay:setAvgDelay
  • line 17 applies count function to delay and sets the value CarrierDelay:setTotalFlights
  • line 18 applies sum function to delay and sets the value CarrierDelay:setTotalDelayMinutes

The calculations are stateful and have unique values for each carrier, every time a FlightDelay record is received the calculations update for the relevant carrier.

  • line 21 assigns a delayMap as public final variable to assist testing
  • line 22 prints the map values when an end of file event is received

Performance 

Executing the flight analysis for 2008, unzip the flight csv data and pass file location to the executable jar in the distribution.

java.exe -jar dist\flightanalyser.jar [FLIGHT_CSV_DATA]
Flight delay analysis
========================
FlightAnalyser.CarrierDelay(carrierId=OO, avgDelay=31, totalFlights=219367, totalDelayMins=6884487)
FlightAnalyser.CarrierDelay(carrierId=AA, avgDelay=35, totalFlights=293277, totalDelayMins=10414936)
FlightAnalyser.CarrierDelay(carrierId=MQ, avgDelay=35, totalFlights=205765, totalDelayMins=7255602)
FlightAnalyser.CarrierDelay(carrierId=FL, avgDelay=31, totalFlights=117632, totalDelayMins=3661868)
FlightAnalyser.CarrierDelay(carrierId=DL, avgDelay=27, totalFlights=209018, totalDelayMins=5839658)
FlightAnalyser.CarrierDelay(carrierId=NW, avgDelay=28, totalFlights=158797, totalDelayMins=4482112)
FlightAnalyser.CarrierDelay(carrierId=UA, avgDelay=38, totalFlights=200470, totalDelayMins=7763908)
FlightAnalyser.CarrierDelay(carrierId=9E, avgDelay=32, totalFlights=90601, totalDelayMins=2907848)
FlightAnalyser.CarrierDelay(carrierId=CO, avgDelay=34, totalFlights=141680, totalDelayMins=4818397)
FlightAnalyser.CarrierDelay(carrierId=XE, avgDelay=36, totalFlights=162602, totalDelayMins=5989016)
FlightAnalyser.CarrierDelay(carrierId=AQ, avgDelay=12, totalFlights=1908, totalDelayMins=23174)
FlightAnalyser.CarrierDelay(carrierId=EV, avgDelay=35, totalFlights=122751, totalDelayMins=4402397)
FlightAnalyser.CarrierDelay(carrierId=AS, avgDelay=27, totalFlights=62241, totalDelayMins=1714954)
FlightAnalyser.CarrierDelay(carrierId=F9, avgDelay=21, totalFlights=46836, totalDelayMins=992044)
FlightAnalyser.CarrierDelay(carrierId=B6, avgDelay=42, totalFlights=83202, totalDelayMins=3559212)
FlightAnalyser.CarrierDelay(carrierId=WN, avgDelay=26, totalFlights=469518, totalDelayMins=12633319)
FlightAnalyser.CarrierDelay(carrierId=OH, avgDelay=34, totalFlights=96154, totalDelayMins=3291908)
FlightAnalyser.CarrierDelay(carrierId=HA, avgDelay=18, totalFlights=18736, totalDelayMins=342715)
FlightAnalyser.CarrierDelay(carrierId=YV, avgDelay=37, totalFlights=111004, totalDelayMins=4159465)
FlightAnalyser.CarrierDelay(carrierId=US, avgDelay=28, totalFlights=167945, totalDelayMins=4715728)

millis:2682

Processing performance analysis:

file size           = 673 Mb

record count        = 7,009,728

processing time     = 2.689 seconds

bytes process rate  = 250 Mb per second

record process time = 383 nanos per record

record process rate = 2.6 million records per second

Comparing the two solutions we observe the following:

  • The java version uses less code than the scala version
  • Fluxtion removes the need to define a graph, just business logic
  • Building a graph manually is a source of errors
  • Lombok makes data types as terse as scala case classes
  • var reduces code bloat
  • The signal to noise ratio is high making the code easier to maintain and understand
  • Fluxtion is much easier to run, it requires no server setup, just compile and go. 

It is difficult to compare performance numbers, the Akka version talks about a minute to run the example, but I do not have sufficient Akka experience to validate this. Additionally it is an old blog, so the situation has probably moved on.

Conclusion

We set out to demonstrate that java can be a terse language for event streaming if we select a good set of tools to use. Lombok and Fluxtion combine elegantly, allowing declarative definition of processing logic to be both simple and type safe. The use of var makes the code even more readable and easier to write. The key to all of this is inference, each tool infers a different type of behaviour and all of them save the coder from having to explicitly specify it:

  • var – type inference
  • Lombok – infer boiler plate implementation
  • Fluxtion – infers the processing graph

In the case of Fluxtion we compare how the Akka version requires a processing graph to be explicitly defined by the developer. This does not scale for larger more complex situations and will be a source of errors. Even worse the business logic is being obscured with technical infrastructure, making maintenance even more costly in the future.

As a final note the performance of the solution is excellent, processing 2.6 million records per second with zero gc. I hope you enjoyed the job and will be tempted to try Fluxtion and Lombok out.

Acknowledgements 

AllSimon on github, his comments while contributing to Fluxtion led me to experimenting with Lombok

Published on Java Code Geeks with permission by Greg Higgins, partner at our JCG program. See the original article here: Easy event processing with var, Lombok and Fluxtion

Opinions expressed by Java Code Geeks contributors are their own.

Greg Higgins

Originally a mechanical engineer, Greg has been working in IT for the last 18 years. Greg has held a variety of roles over the years, but in the last 9 years he has concentrated on writing systems in the High Frequency Trading space. He is the creator and maintainer of fluxtion event processing framework (https://github.com/v12technology/fluxtion)
Subscribe
Notify of
guest

This site uses Akismet to reduce spam. Learn how your comment data is processed.

0 Comments
Inline Feedbacks
View all comments
Back to top button