Enterprise Java

Kafka Record Patterns for Data Replication

Imagine going down to your local milkshake bar and signing a contract with the owner so that you could purchase bespoke drinks at a set price. Let’s say you agreed on fresh milk with 3.5% fat and one tablespoon of chocolate powder, per 500ml of milk.  Putting that into a table might look like this:

PKcontract_numberstartfat_contentchocolate_powder
100123456782021-01-013.5%1 tbsp

After a few weeks, your tastebuds become a little desensitised and you decide you want to add some more chocolate powder. The owner is agile, so he adjusts the contract, meaning we need to add a few columns in order to track validity:

PKcontract_numbercontract_fromstartendfat_contentchocolate_powder
100123456782021-01-010001-01-012021-01-313.5%1 tbsp
101123456782021-01-012021-02-019999-12-313.5%2 tbsp

Note two things: 1) this table is not normalised and 2) I used a low date (year 0001) and high date (year 9999) for the start of the first row and the end of the last row.

In reality we would probably normalise this data. For the sake of this example, I won’t because it will make it more readable as I add more information below.

The low and high dates are there, so that I can always find data, regardless of the date I use – I don’t have to know the contract termination date which is different for every contract, in order to be able to simply ask what the latest recipe is, for a given contract number:

1
2
3
4
5
select *
from contracts
where contract_number = '12345678'
  and '9999-12-31' between start and end;
--> returns row with primary key 101

After a few more weeks, I realise that I need to reduce my calorific intake, but I’m a complete chocoholic. We agree to reduce the fat content:

PKcontract_numbercontract_fromstartendfat_contentchocolate_powder
100123456782021-01-010001-01-012021-01-313.5%1 tbsp
101123456782021-01-012021-02-012021-02-283.5%2 tbsp
102123456782021-01-012021-03-019999-12-310.8%2 tbsp

At some point I get bored of milkshakes and I terminate the contract, but because I never purchased a milkshake with 0.8% fat, the owner lets me terminate it with a date in the past, say 2021-02-14, so that we can delete the last row:

PKcontract_numbercontract_fromcontract_tostartendfat_contentchocolate_powder
100123456782021-01-012021-02-140001-01-012021-01-313.5%1 tbsp
101123456782021-01-012021-02-142021-02-019999-12-313.5%2 tbsp

Note that it is a design choice whether or not we “shorten” the end date. We might want to do that in order to make such data not be found after the contract termination date. It depends on requirements more than anything.

What has all this got to do with Kafka, and data replication?

Imagine a self-contained microservice which needs to have an up to date copy of this data, in memory, in order to run lightning fast. Imagine you want that cache to be distributed across all of your service instances (Kubernetes pods). How about the following 7 lines of Kotlin code that use the nifty Kafka Streams API:

01
02
03
04
05
06
07
08
09
10
val builder = StreamsBuilder()
val globalStore = Materialized.`as`(globalStoreName)
// global, so that every pod has access to all data from all partitions:
builder.globalTable(CONTRACTS_TOPIC, globalStore)
val streams = KafkaStreams(builder.build(), props)
streams.start()
val globalBausteinView = streams.store(fromNameAndType(globalStoreName, ...)
 
// REST Handler:
val contractJson = globalBausteinView.get(contractNumber)

We need to publish the contract data to the topic used as the input, but before we do that, let’s think about the keys we use, in order to have the data survive log compaction. It would be no good to publish three records, each using the contract number as the key, because as soon as the topic were compacted, only the data from the last row would survive, and any service replicating from scratch would have an incomplete dataset. The solution is to include the start date in the key, e.g. “12345678::2021-02-01”.

We have a number of options regarding the values (payload). Let’s work through the examples.

(Note: initially contracts are valid for 5 years, so the contract_to column always has a value)

1) Denormalised Table, Variation 1 – One Event per Attribute Combination

Use CasePKcontract_numbercontract_fromcontract_tostartendfat

 

content

chocolate

 

powder

records emitted
Contract Creation100123456782021-01-012025-12-310001-01-019999-12-313.5%1 tbspKey:  12345678::2021-01-01

 

Value: {cn: 12345678, from: “2021-01-01”, to: “2025-12-31”, start: “2021-01-01”, end: “2025-12-31”, fatContent: 3.5, choc: 1}

          
Change choc powder101123456782021-01-012025-12-310001-01-012021-01-313.5%1 tbspKey:  12345678::2021-01-01

 

Value: {cn: 12345678, from: “2021-01-01”, to: “2025-12-31”, start: “2021-01-01”, end: “2021-01-31”, fatContent: 3.5, choc: 1}

102123456782025-12-312025-12-312021-02-019999-12-313.5%2 tbspKey:  12345678::2021-02-01
Value: {cn: 12345678, from: “2021-01-01”, to: “2025-12-31”, start: “2021-02-01”, end: “2025-12-31”, fatContent: 3.5, choc: 2}
          
Change fat content101123456782021-01-012025-12-310001-01-012021-01-313.5%1 tbspnone – no changes made
102123456782021-01-012025-12-312021-02-012021-02-283.5%2 tbspKey:  12345678::2021-02-01
Value: {cn: 12345678, from: “2021-01-01”, to: “2025-12-31”, start: “2021-02-01”, end: “2021-02-28”, fatContent: 3.5, choc: 2}
103123456782021-01-012025-12-312021-03-019999-12-310.8%2 tbspKey:  12345678::2021-03-01
Value: {cn: 12345678, from: “2021-01-01”, to: “2025-12-31”, start: “2021-03-01”, end: “2025-12-31”, fatContent: 0.8, choc: 2}
          
Contract Termination101123456782021-01-012021-02-140001-01-012021-01-313.5%1 tbspKey:  12345678::2021-01-01

 

Value: {cn: 12345678, from: “2021-01-01”, to: “2021-02-14”, start: “2021-01-01”, end: “2021-01-31”, fatContent: 3.5, choc: 1}

102123456782021-01-012021-02-142021-02-012021-02-143.5%2 tbspKey:  12345678::2021-02-01
Value: {cn: 12345678, from: “2021-01-01”, to: “2021-02-14”, start: “2021-02-01”, end: “2021-02-14”, fatContent: 3.5, choc: 2}
103deletedKey: 12345678:2021-03-01

 

Value: null (tombstone record)

Note how the key and start/end dates are not the ugly technical dates but limited to the atual contract validity. That is a design choice where I chose not to expose technical details.

In this variant, we publish a record for the “lowest common denominators” in terms of validity. There is an event for each time window in which values are constant. Each change, leads to a new record.

Imagine viewing the validities of the values seperately, as they might be if we normalised the table:

ValueJanuaryFebruaryMarchApril…
Milk Fat Content3.50.8
Chocolate Powder12
Resulting Time Windows with constant values3.5 & 13.5 & 20.8 & 2

Each change leads to a new row in the denormalised table and hence a new record in Kafka. The three events that are published are visible on that bottom row.

As an alternative, we could publish one event per contract, with validities inside the payload, as follows.

2) Denormalised Table, Variation 2 – One Event per Contract

Use Caserecords emitted
Contract CreationKey:  12345678

 

Value: {cn: 12345678, from: “2021-01-01”, to: “2025-12-31”,

    fatContent: [ {start: “2021-01-01”, end: “2025-12-31”, value: 3.5} ],

    choc: [ {start: “2021-01-01”, end: “2025-12-31”, value: 1} ]

}

Change chocolate powderKey:  12345678
Value: {cn: 12345678, from: “2021-01-01”, to: “2025-12-31”,

 

    fatContent: [ {start: “2021-01-01”, end: “2025-12-31”, value: 3.5} ],

    choc: [ {start: “2021-01-01”, end: “2021-01-31”, value: 1},

                 {start: “2021-02-01”, end: “2025-12-31”, value: 2} ]

}

With this variation, we end up having to publish a list of values together with their validities.

3) Normalised Table, Each Attribute on its own Topic

The next solution is to publish each attribute on its own topic.

Use Caserecords emitted
Contract CreationTopic: Contract

 

Key:  12345678

Value: {cn: 12345678, from: “2021-01-01”, to: “2025-12-31”}

Topic: Fat Content

Key: 12345678::2021-01-01

Value: {start: “2021-01-01”, end: “2025-12-31”, value: 3.5}

Topic: Chocolate Powder

Key: 12345678::2021-01-01

Value: {start: “2021-01-01”, end: “2025-12-31”, value: 1}

  
Change choc powderTopic: Chocolate Powder

 

Key: 12345678::2021-01-01

Value: {start: “2021-01-01”, end: “2021-01-31”, value: 1}

Key: 12345678::2021-02-01

Value: {start: “2021-02-01”, end: “2025-12-31”, value: 2}

  
Change fat contentTopic: Fat Content

 

Key: 12345678::2021-01-01

Value: {start: “2021-01-01”, end: “2021-02-28”, value: 3.5}

Key: 12345678::2021-03-01

Value: {start: “2021-03-01”, end: “2025-12-31”, value: 0.8}

  
Contract TerminationTopic: Contract

 

Key:  12345678

Value: {cn: 12345678, from: “2021-01-01”, to: “2021-02-14”}

Topic: Fat Content

Key: 12345678::2021-01-01

Value: {start: “2021-01-01”, end: “2021-02-14”, value: 3.5}

Key: 12345678::2021-03-01

Value: null (tombstone record)

Topic: Chocolate Powder

Key: 12345678::2021-01-01 –> no change, so no record emitted

Key: 12345678::2021-02-01

Value: {start: “2021-02-01”, end: “2021-02-14”, value: 2}

4) Verticalised Table, One Topic for all Attributes

The final solution is to use a verticalised table in order to store the data. This has the advantage that you can dynamically add new attributes, and in fact each contract could have different attributes. This is akin to a schemaless document. The publication of records in Kafka becomes quite generic.

Use Caserecords emitted
Contract CreationKey:  12345678::fatContent::2021-01-01

 

Value: {start: “2021-01-01”, end: “2025-12-31”, value: 3.5}

Key: 12345678::chocolatePowder::2021-01-01

Value: {start: “2021-01-01”, end: “2025-12-31”, value: 1}

  
Change choc powderKey:  12345678::fatContent::2021-01-01 –> no change, no event emitted

 

Key: 12345678::chocolatePowder::2021-01-01

Value: {start: “2021-01-01”, end: “2021-01-31”, value: 1}

Key: 12345678::chocolatePowder::2021-02-01

Value: {start: “2021-02-01”, end: “2025-12-31”, value: 2}

  
Change fat contentKey:  12345678::fatContent::2021-01-01

 

Value: {start: “2021-01-01”, end: “2021-02-28”, value: 3.5}

Key:  12345678::fatContent::2021-03-01

Value: {start: “2021-03-01”, end: “2021-02-28”, value: 0.8}

Key: 12345678::chocolatePowder::2021-01-01 –> no change, no event emitted

Key: 12345678::chocolatePowder::2021-02-01 –> no change, no event emitted

  
Contract TerminationKey:  12345678::fatContent::2021-01-01

 

Value: {start: “2021-01-01”, end: “2021-02-14”, value: 3.5}

Key:  12345678::fatContent::2021-03-01

Value: null (tombstone record)

Key: 12345678::chocolatePowder::2021-01-01 –> no change, no event emitted

Key: 12345678::chocolatePowder::2021-02-01

Value: {start: “2021-02-01”, end: “2021-02-14”, value: 2}

My favourite is the first solution, as I find it to be the closest to the functional business requirements.

Another way to choose which solution to use might be to calculate the effect that the solution has on data volume (storage in Kafka; transport through your landscape; storage in replicates).

If you have other solutions, please get in touch.

Published on Java Code Geeks with permission by Ant Kutschera, partner at our JCG program. See the original article here: Kafka Record Patterns for Data Replication

Opinions expressed by Java Code Geeks contributors are their own.

Ant Kutschera

Ant is a freelance Java architect and developer. He has been writing a blog and white papers since 2004 and writes about anything he finds interesting, related to Java or software. Most recently he has been working on enterprise systems involving Eclipse RCP, Google GWT, Hibernate, Spring and J2ME. He believes very strongly in being involved in all parts of the software life cycle.
Subscribe
Notify of
guest

This site uses Akismet to reduce spam. Learn how your comment data is processed.

0 Comments
Inline Feedbacks
View all comments
Back to top button