Kafka Record Patterns for Data Replication

Imagine going down to your local milkshake bar and signing a contract with the owner so that you could purchase bespoke drinks at a set price. Let’s say you agreed on fresh milk with 3.5% fat and one tablespoon of chocolate powder, per 500ml of milk.  Putting that into a table might look like this:

PK contract_number start fat_content chocolate_powder
100 12345678 2021-01-01 3.5% 1 tbsp

After a few weeks, your tastebuds become a little desensitised and you decide you want to add some more chocolate powder. The owner is agile, so he adjusts the contract, meaning we need to add a few columns in order to track validity:

PK contract_number contract_from start end fat_content chocolate_powder
100 12345678 2021-01-01 0001-01-01 2021-01-31 3.5% 1 tbsp
101 12345678 2021-01-01 2021-02-01 9999-12-31 3.5% 2 tbsp

Note two things: 1) this table is not normalised and 2) I used a low date (year 0001) and high date (year 9999) for the start of the first row and the end of the last row.

In reality we would probably normalise this data. For the sake of this example, I won’t because it will make it more readable as I add more information below.

The low and high dates are there, so that I can always find data, regardless of the date I use – I don’t have to know the contract termination date which is different for every contract, in order to be able to simply ask what the latest recipe is, for a given contract number:

select *
from contracts
where contract_number = '12345678' 
  and '9999-12-31' between start and end;
--> returns row with primary key 101

After a few more weeks, I realise that I need to reduce my calorific intake, but I’m a complete chocoholic. We agree to reduce the fat content:

PK contract_number contract_from start end fat_content chocolate_powder
100 12345678 2021-01-01 0001-01-01 2021-01-31 3.5% 1 tbsp
101 12345678 2021-01-01 2021-02-01 2021-02-28 3.5% 2 tbsp
102 12345678 2021-01-01 2021-03-01 9999-12-31 0.8% 2 tbsp

At some point I get bored of milkshakes and I terminate the contract, but because I never purchased a milkshake with 0.8% fat, the owner lets me terminate it with a date in the past, say 2021-02-14, so that we can delete the last row:

PK contract_number contract_from contract_to start end fat_content chocolate_powder
100 12345678 2021-01-01 2021-02-14 0001-01-01 2021-01-31 3.5% 1 tbsp
101 12345678 2021-01-01 2021-02-14 2021-02-01 9999-12-31 3.5% 2 tbsp

Note that it is a design choice whether or not we “shorten” the end date. We might want to do that in order to make such data not be found after the contract termination date. It depends on requirements more than anything.

What has all this got to do with Kafka, and data replication?

Imagine a self-contained microservice which needs to have an up to date copy of this data, in memory, in order to run lightning fast. Imagine you want that cache to be distributed across all of your service instances (Kubernetes pods). How about the following 7 lines of Kotlin code that use the nifty Kafka Streams API:

val builder = StreamsBuilder()
val globalStore = Materialized.`as`(globalStoreName)
// global, so that every pod has access to all data from all partitions:
builder.globalTable(CONTRACTS_TOPIC, globalStore)
val streams = KafkaStreams(builder.build(), props)
streams.start()
val globalBausteinView = streams.store(fromNameAndType(globalStoreName, ...)

// REST Handler:
val contractJson = globalBausteinView.get(contractNumber)

We need to publish the contract data to the topic used as the input, but before we do that, let’s think about the keys we use, in order to have the data survive log compaction. It would be no good to publish three records, each using the contract number as the key, because as soon as the topic were compacted, only the data from the last row would survive, and any service replicating from scratch would have an incomplete dataset. The solution is to include the start date in the key, e.g. “12345678::2021-02-01“.

We have a number of options regarding the values (payload). Let’s work through the examples.

(Note: initially contracts are valid for 5 years, so the contract_to column always has a value)

1) Denormalised Table, Variation 1 – One Event per Attribute Combination

Use Case PK contract_number contract_from contract_to start end

fat

content

chocolate

powder

records emitted
Contract Creation 100 12345678 2021-01-01

2025-12-31

0001-01-01 9999-12-31 3.5% 1 tbsp

Key:  12345678::2021-01-01

Value: {cn: 12345678, from: “2021-01-01”, to: “2025-12-31”, start: “2021-01-01”, end: “2025-12-31”, fatContent: 3.5, choc: 1}

                   
Change choc powder 101 12345678 2021-01-01 2025-12-31 0001-01-01 2021-01-31 3.5% 1 tbsp

Key:  12345678::2021-01-01

Value: {cn: 12345678, from: “2021-01-01”, to: “2025-12-31”, start: “2021-01-01”, end: “2021-01-31”, fatContent: 3.5, choc: 1}

102 12345678 2025-12-31 2025-12-31 2021-02-01 9999-12-31 3.5% 2 tbsp Key:  12345678::2021-02-01

 

Value: {cn: 12345678, from: “2021-01-01”, to: “2025-12-31”, start: “2021-02-01”, end: “2025-12-31”, fatContent: 3.5, choc: 2}

                   
Change fat content 101 12345678 2021-01-01 2025-12-31 0001-01-01 2021-01-31 3.5% 1 tbsp none – no changes made
102 12345678 2021-01-01 2025-12-31 2021-02-01 2021-02-28 3.5% 2 tbsp Key:  12345678::2021-02-01

 

Value: {cn: 12345678, from: “2021-01-01”, to: “2025-12-31”, start: “2021-02-01”, end: “2021-02-28”, fatContent: 3.5, choc: 2}

103 12345678 2021-01-01 2025-12-31 2021-03-01 9999-12-31 0.8% 2 tbsp Key:  12345678::2021-03-01

 

Value: {cn: 12345678, from: “2021-01-01”, to: “2025-12-31”, start: “2021-03-01”, end: “2025-12-31”, fatContent: 0.8, choc: 2}

                   
Contract Termination 101 12345678 2021-01-01 2021-02-14 0001-01-01 2021-01-31 3.5% 1 tbsp

Key:  12345678::2021-01-01

Value: {cn: 12345678, from: “2021-01-01”, to: “2021-02-14”, start: “2021-01-01”, end: “2021-01-31”, fatContent: 3.5, choc: 1}

102 12345678 2021-01-01 2021-02-14 2021-02-01 2021-02-14 3.5% 2 tbsp Key:  12345678::2021-02-01

 

Value: {cn: 12345678, from: “2021-01-01”, to: “2021-02-14”, start: “2021-02-01”, end: “2021-02-14”, fatContent: 3.5, choc: 2}

103

deleted

Key: 12345678:2021-03-01

Value: null (tombstone record)

Note how the key and start/end dates are not the ugly technical dates but limited to the atual contract validity. That is a design choice where I chose not to expose technical details.

In this variant, we publish a record for the “lowest common denominators” in terms of validity. There is an event for each time window in which values are constant. Each change, leads to a new record.

Imagine viewing the validities of the values seperately, as they might be if we normalised the table:

Value January February March April…
Milk Fat Content 3.5 0.8
Chocolate Powder 1 2
Resulting Time Windows with constant values 3.5 & 1 3.5 & 2 0.8 & 2

Each change leads to a new row in the denormalised table and hence a new record in Kafka. The three events that are published are visible on that bottom row.

As an alternative, we could publish one event per contract, with validities inside the payload, as follows.

2) Denormalised Table, Variation 2 – One Event per Contract

Use Case records emitted
Contract Creation

Key:  12345678

Value: {cn: 12345678, from: “2021-01-01”, to: “2025-12-31”,

    fatContent: [ {start: “2021-01-01”, end: “2025-12-31”, value: 3.5} ],

    choc: [ {start: “2021-01-01”, end: “2025-12-31”, value: 1} ]

}

Change chocolate powder Key:  12345678

 

Value: {cn: 12345678, from: “2021-01-01”, to: “2025-12-31”,

    fatContent: [ {start: “2021-01-01”, end: “2025-12-31”, value: 3.5} ],

    choc: [ {start: “2021-01-01”, end: “2021-01-31”, value: 1},

                 {start: “2021-02-01”, end: “2025-12-31”, value: 2} ]

}

With this variation, we end up having to publish a list of values together with their validities.

3) Normalised Table, Each Attribute on its own Topic

The next solution is to publish each attribute on its own topic.

Use Case records emitted
Contract Creation

Topic: Contract

Key:  12345678

Value: {cn: 12345678, from: “2021-01-01”, to: “2025-12-31”}

Topic: Fat Content

Key: 12345678::2021-01-01

Value: {start: “2021-01-01”, end: “2025-12-31”, value: 3.5}

Topic: Chocolate Powder

Key: 12345678::2021-01-01

Value: {start: “2021-01-01”, end: “2025-12-31”, value: 1}

   
Change choc powder

Topic: Chocolate Powder

Key: 12345678::2021-01-01

Value: {start: “2021-01-01”, end: “2021-01-31”, value: 1}

Key: 12345678::2021-02-01

Value: {start: “2021-02-01”, end: “2025-12-31”, value: 2}

   
Change fat content

Topic: Fat Content

Key: 12345678::2021-01-01

Value: {start: “2021-01-01”, end: “2021-02-28”, value: 3.5}

Key: 12345678::2021-03-01

Value: {start: “2021-03-01”, end: “2025-12-31”, value: 0.8}

   
Contract Termination

Topic: Contract

Key:  12345678

Value: {cn: 12345678, from: “2021-01-01”, to: “2021-02-14”}

Topic: Fat Content

Key: 12345678::2021-01-01

Value: {start: “2021-01-01”, end: “2021-02-14”, value: 3.5}

Key: 12345678::2021-03-01

Value: null (tombstone record)

Topic: Chocolate Powder

Key: 12345678::2021-01-01 –> no change, so no record emitted

Key: 12345678::2021-02-01

Value: {start: “2021-02-01”, end: “2021-02-14”, value: 2}

4) Verticalised Table, One Topic for all Attributes

The final solution is to use a verticalised table in order to store the data. This has the advantage that you can dynamically add new attributes, and in fact each contract could have different attributes. This is akin to a schemaless document. The publication of records in Kafka becomes quite generic.

Use Case records emitted
Contract Creation

Key:  12345678::fatContent::2021-01-01

Value: {start: “2021-01-01”, end: “2025-12-31”, value: 3.5}

Key: 12345678::chocolatePowder::2021-01-01

Value: {start: “2021-01-01”, end: “2025-12-31”, value: 1}

   
Change choc powder

Key:  12345678::fatContent::2021-01-01 –> no change, no event emitted

Key: 12345678::chocolatePowder::2021-01-01

Value: {start: “2021-01-01”, end: “2021-01-31”, value: 1}

Key: 12345678::chocolatePowder::2021-02-01

Value: {start: “2021-02-01”, end: “2025-12-31”, value: 2}

   
Change fat content

Key:  12345678::fatContent::2021-01-01

Value: {start: “2021-01-01”, end: “2021-02-28”, value: 3.5}

Key:  12345678::fatContent::2021-03-01

Value: {start: “2021-03-01”, end: “2021-02-28”, value: 0.8}

Key: 12345678::chocolatePowder::2021-01-01 –> no change, no event emitted

Key: 12345678::chocolatePowder::2021-02-01 –> no change, no event emitted

   
Contract Termination

Key:  12345678::fatContent::2021-01-01

Value: {start: “2021-01-01”, end: “2021-02-14”, value: 3.5}

Key:  12345678::fatContent::2021-03-01

Value: null (tombstone record)

Key: 12345678::chocolatePowder::2021-01-01 –> no change, no event emitted

Key: 12345678::chocolatePowder::2021-02-01

Value: {start: “2021-02-01”, end: “2021-02-14”, value: 2}

My favourite is the first solution, as I find it to be the closest to the functional business requirements.

Another way to choose which solution to use might be to calculate the effect that the solution has on data volume (storage in Kafka; transport through your landscape; storage in replicates).

If you have other solutions, please get in touch.

Copyright ©2021, Ant Kutschera