Blog

Schema Registry for Kafka: Evolution of Messaging

Category
Software development
Schema Registry for Kafka: Evolution of Messaging

Many of us worked in dynamic environments where specifications sometimes change daily. As you know, this becomes harder to manage if team members are co-dependant, and even more complex if it involves multiple teams. 

That’s why data structure and validation using messages sent via Kafka are essential, as decoupling services by using messaging is one of the most common Enterprise Integration Patterns (EIP). 

By using Schema Registry we are able to achieve several things, but these two are of greatest importance:

  • having the ability to discern how the data sent to topics should look like (schema definition)
  • enforcing data evolution strategy (schema evolution).

We’ll review how Schema Registry can help you structure data, future-proof it, and avoid pitfalls in such an environment.

Serialization in Kafka

There are many ways to format the message. Delimitier can separate the values, creating a sort of CSV. Although this might be somewhat efficient, it’s not practical. Usually, serialization is a better solution.

The simplest and most commonly used with Kafka is StringSerializer. Out of the box, we are provided with serializers/deserializers for primitive types, but serializing them isn’t very efficient, because they carry only one value. 

Most of the sent data is represented by some structure, while applications use them as objects. That is the reason behind using object serialization

public class SensorData {
	private String sensorId;
	private String deviceId;
	private Instant measurementTime;
	private Integer value;
}Code language: PHP (php)
{
  "sensorId": "c08704f4-f1ed-47a3-9aa1-76fff50b4581",
  "deviceId": "949c7558-6f9b-42bb-9546-2f8535c48583",
  "measurementTime": "2023-05-09T18:24:39.207164357Z",
  "value": 255
}Code language: JSON / JSON with Comments (json)

When we serialize it to JSON, the value of this Java class shows that this message has meaning, a context. This simple example message shows how to transfer the measurement of a sensor attached to a device. This is a simple example, but real-world situations will have much more data. 

Schema

If we look at the SensorData example, we can see that the data has some form and some structure. The thing we need to see is what the rules are that this structure needs to adhere to. Below, you can see how to enrich the Java class. 

public class SensorData {
	@NotNull
	private String sensorId;
	@NotNull
	private String deviceId;
	@NotNull
	private Instant measurementTime;
	@NotNull
	private Integer value;
}Code language: PHP (php)

By adding the @NotNull rule, we tell the validator how to validate input. Without it, objects with no values could be serialized and sent, resulting in unusable or ununderstandable data.

JSON Schema

Doing this in Java or similar languages is relatively easy, but if we want to do it for something like JSON, we need to define a schema.

JSON Schema is a vocabulary that allows you to annotate and validate JSON data structures. It is a JSON-based format for defining the structure, constraints, and rules JSON data should follow. 

It provides a standardized way to describe JSON data’s expected shape and content, making it easier to ensure data integrity and interoperability in applications that use JSON as their data interchange format.

SensorData class defined using JSON Schema:

{
  "$id": "https://example.com/address.schema.json",
  "$schema": "https://json-schema.org/draft/2020-12/schema",
  "description": "Value measured by sensor",
  "type": "object",
  "properties": {
    "sensorId": {
      "type": "string"
    },
    "deviceId": {
      "type": "string"
    },
    "measurementTime": {
      "type": "string",
      "format": "date-time"
    },
    "value": {
      "type": "integer"
    }
  },
  "required": [
    "sensorId",
    "deviceId",
    "measurementTime",
    "value"
  ]
}Code language: JSON / JSON with Comments (json)

Now that we have defined the schema, we know precisely how JSON object is validated.

Protocol Buffers (a.k.a. protobuf)

Protocol Buffers, often called “protobufs,” is a method for serializing structured data, similar to XML or JSON, but designed to be smaller, faster, and simpler.

Google developed it as an open-source project that offers a language-agnostic way to define data structures and then serialize data in a compact, binary format.

SensorData class defined using Protocol Buffers:

syntax = "proto3";
message SensorData {
  string sensorId = 1;
  string deviceId = 2;
  google.protobuf.Timestamp measurementTime = 3;
  int32 value = 4;
}Code language: JavaScript (javascript)

Protocol Buffer message definition is much more condensed than JSON schema.

Apache Avro & Kafka

Since the Apache Software Foundation maintains Kafka, we must mention Apache Avro, their open-source project, usually paired with Kafka to serialize data. 

Apache Avro is a data serialization framework and data exchange system and a part of the Apache Hadoop ecosystem. They designed Avro to provide a compact and efficient way to serialize structured data, making it suitable for big data processing, storage, and exchange scenarios.

SensorData class defined using Apache Avro:

{
   "namespace": "org.example",
   "type": "record",
   "name": "SensorData",
   "fields": [
  	{"name": "sensorId", "type": "string"},
  	{"name": "deviceId", "type": "string"},
  	{"name": "measurementTime", 
  	 "type": "long", 
  	 "logicalType": "timestamp-micros"},
  	{"name": "value", "type": "int"}
   ]
 }Code language: JSON / JSON with Comments (json)

In all these cases, our Java class SensorData differs from Kafka receives. These definitions have code generators that will build the classes for you. Furthermore, all three are most commonly used and have support in various languages, such as Java, Python, C, C++, C#, PHP, and many more.

There is a variant of JSON called binary json (BSON), but for all intents and purposes, we will consider that serialized JSON object looks like a string (spaces, tabs, and newlines removed). As for Protocol Buffers and Apache Avro, they serialize data in binary format, which is much more efficient than a string.

Schema Evolution

By defining the schema, we now have structure, and anyone can take that schema and use it to serialize and deserialize the data. But what happens if the schema changes? Consider the following evolution.

{
  "sensorId": "c08704f4-f1ed-47a3-9aa1-76fff50b4581",
  "deviceId": "949c7558-6f9b-42bb-9546-2f8535c48583",
  "measurementTime": "2023-05-09T18:24:39.207164357Z",
  "value": 255
}Code language: JSON / JSON with Comments (json)
{
  "id": "c08704f4-f1ed-47a3-9aa1-76fff50b4581",
  "deviceId": "949c7558-6f9b-42bb-9546-2f8535c48583",
  "measurementTime": "2023-05-09T18:24:39.207164357Z",
  "value": 255
}Code language: JSON / JSON with Comments (json)
{
  "id": "c08704f4-f1ed-47a3-9aa1-76fff50b4581",
  "deviceId": "949c7558-6f9b-42bb-9546-2f8535c48583",
  "timestamp": 1683657104000,
  "value": "255",
  "unit": "g"
}Code language: JSON / JSON with Comments (json)
{
  "id": "c08704f4-f1ed-47a3-9aa1-76fff50b4581",
  "timestamp": 1683657104000,
  "value": "255",
  "unit": "g"
}Code language: JSON / JSON with Comments (json)

If some entity processes any of these messages apart from the original, it would be unable to deserialize it. Even if deserialization is simple (i.e., JSON to Map) and we use it like a key-value, the data, both names and types, change. That undoubtedly results in errors and messages not being processed or inconsistent data on the receiving end.

Schema Registry

Since the schema can change, we need some way of tracking changes, primarily the last version. Also, we need some form of enforcing how the schema changes

The first step in this process is the Schema Registry. It provides a centralised repository for managing and validating schemas. It also has a few more benefits, including:

  • Data Validation – data is checked against the schema
  • Compatibility Checking – when modifying schema, compatibility checks if modifications can be made
  • Versioning – since we can modify the schemas, the schema registry tracks previous versions
  • Evolution – after we define the initial schema, applications may need to evolve it over time

I wrote most of this blog with the Confluent Schema Registry in mind, but other solutions like KarapaceApicurio, and many more exist.

Confluent Schema Registry Kafka Messaging

To look at the complete process of schema registry usage, let’s look at the image above.

  1. Schema creator registers a schema (schema creator is also able to evolve the schema)
  2. Producer service gets the schema from the schema registry, creates and serializes the message
  3. Producer service sends serialized message through Kafka
  4. Consumer service gets schema from the schema registry
  5. Consumer service receives a message from Kafka
  6. Consumer service deserializes messages according to schema

Schema Compatibility

To define how we can change schema, we first need to define schema compatibility. The following matrix shows what happens when during the validation of a new schema against the previous schema.

Kafka Schema Compatibility Check

The Schema Registry is not a magic bullet. It does not magically solve issues regarding schema upgrade, it simply enforces compatibility. For instance, a BACKWARD – compatible migration setting will allow us to delete any field but only add an optional field. The reasoning is that if we add mandatory fields, anything sent by previous producers’ versions will fail.

The similar logic have the FORWARD and the FULL compatibility checks. One more compatibility variant we need to consider is TRANSITIVE, which means that the check must run against all previous versions instead of only the previous one.

When you try registering a new schema that does not adhere to the compatibility type, the Schema Registry will reject it. That way, the developer will get a clear message that they must correct the schema to use it.

Subject Name Strategy

If you’re using Confluent Schema Registry, then the serializer registers schema in Schema Registry under a subject name. That name defines a namespace in the registry.

All versioning and compatibility checks are done per subject. If the schema evolves, it is still associated with the same subject. It gets a new schema ID, and the incremented version.

There are three subject-naming strategies:

  • TopicNameStrategy – the default naming strategy in which all messages on the topic conform to the same schema
  • RecordNameStrategy – fully qualified name (i.e., for Avro type), same subject for all the record types across topics. It allows the grouping of logically related events with different data structures under one topic.
  • TopicRecordNameStrategy – a combination of the previous two strategies per record but only under a specific topic

Generally, we use RecordNameStrategy and TopicRecordNameStrategy when a single topic contains multiple schemas and grouping by topic isn’t optimal.

You can set schema compatibility requirements globally or on a per-subject basis. You can also update it, but you need to be careful since it can have a breaking effects. 

A solution for distributed teams

While reading what Schema Registry is, it might seem overly complicated and impractical. However, it starts making sense if we look at it as if documenting REST services (OpenAPI). It starts making more sense when considering huge organizations with many geographically distributed teams.

By using the Schema Registry, we can achieve several things. But as I mentioned in the intro, two of those of greatest importance are the ability to discern how the data sent to topics should look (schema definition) and enforcing data evolution strategy (schema evolution).

CONTACT US

Exceptional ideas need experienced partners.