Core concepts
Schema Registry
Tansu has an embedded schema registry that supports JSON schema. The schema registry is enabled using the --schema-registry
command-line parameter to the server. A schema registry is a file
or s3
URL containing schemas that Tansu uses to validate messages. When a message is produced to a topic with an associated schema, the message is validated against the schema. If a message does not conform to its schema it is rejected with an INVALID_RECORD
error.
JSON schema
An example JSON schema for the "person" topic. This schema is stored in the root directory of the registry as person.json
:
{
"title": "Person",
"type": "object",
"properties": {
"key": {
"type": "string",
"pattern": "^[A-Z]{3}-\\d{3}$"
},
"value": {
"type": "object",
"properties": {
"firstName": {
"type": "string",
"description": "The person's first name."
},
"lastName": {
"type": "string",
"description": "The person's last name."
},
"age": {
"description": "Age in years which must be equal to or greater than zero.",
"type": "integer",
"minimum": 0
}
}
}
}
}
The schema must be an object, with properties for the message "key" and/or "value".
A schema that covers the message key, but allows any message value could look like:
{
"title": "Person",
"type": "object",
"properties": {
"key": {
"type": "string",
"pattern": "^[A-Z]{3}-\\d{3}$"
},
}
}
A schema that allows any message key, but restricts the message value could look like:
{
"title": "Person",
"type": "object",
"properties": {
"value": {
"type": "object",
"properties": {
"firstName": {
"type": "string",
"description": "The person's first name."
},
"lastName": {
"type": "string",
"description": "The person's last name."
},
"age": {
"description": "Age in years which must be equal to or greater than zero.",
"type": "integer",
"minimum": 0
}
}
}
}
}
Protocol Buffers
An example protocol buffer schema for the "employee" topic. This schema is stored in the root directory of the registry as employee.proto
:
syntax = 'proto3';
message Key {
int32 id = 1;
}
message Value {
string name = 1;
string email = 2;
}
The schema should contain message definitions for the Key
and/or Value
.
A schema that covers the message key, but allows any message value could look like:
syntax = 'proto3';
message Key {
int32 id = 1;
}
A schema that allows any message key, but restricts the message value could look like:
syntax = 'proto3';
message Value {
string name = 1;
string email = 2;
}
Example
This example uses JSON schema as it is simpler to use with the Apache Kafka command line tools.
The person schema can be found in the etc/schema
directory of the Tansu GitHub repository. This directory is also used when starting Tansu using the just tansu-server
recipe or Docker compose.
Starting Tansu with schema validation enabled:
❯ just tansu-server
./target/debug/tansu-server --kafka-cluster-id ${CLUSTER_ID}
--kafka-advertised-listener-url tcp://${ADVERTISED_LISTENER}
--schema-registry file://./etc/schemas
--storage-engine ${STORAGE_ENGINE} 2>&1 | tee tansu.log
Create the person topic:
❯ just person-topic-create
kafka-topics --bootstrap-server localhost:9092
--partitions=3
--replication-factor=1
--create
--topic person
Created topic person.
Produce a message that is valid for the person schema:
❯ just person-topic-produce-valid
echo 'h1:pqr,h2:jkl,h3:uio "ABC-123" {"firstName": "John", "lastName": "Doe", "age": 21}' | kafka-console-producer --bootstrap-server localhost:9092 --topic person --property parse.headers=true --property parse.key=true
Produce a message that is invalid for the person schema (the age
must be greater to equal to 0):
❯ just person-topic-produce-invalid
echo 'h1:pqr,h2:jkl,h3:uio "ABC-123" {"firstName": "John", "lastName": "Doe", "age": -1}' | kafka-console-producer --bootstrap-server localhost:9092 --topic person --property parse.headers=true --property parse.key=true
[2024-12-19 11:51:28,412] ERROR Error when sending message to topic person with key: 19 bytes, value: 51 bytes with error: (org.apache.kafka.clients.producer.internals.ErrorLoggingCallback)
org.apache.kafka.common.InvalidRecordException: This record has failed the validation on broker and hence will be rejected.
The server log contains the reason for the message being rejected:
2024-12-19T11:51:28.407467Z DEBUG peer{addr=127.0.0.1:60095}:produce{api_key=0 api_version=11 correlation_id=5}: tansu_schema_registry::json: 47: instance=Object {"code": String("ABC-123")}
2024-12-19T11:51:28.407524Z DEBUG peer{addr=127.0.0.1:60095}:produce{api_key=0 api_version=11 correlation_id=5}: tansu_schema_registry::json: 56: r=()
2024-12-19T11:51:28.407546Z DEBUG peer{addr=127.0.0.1:60095}:produce{api_key=0 api_version=11 correlation_id=5}: tansu_schema_registry::json: 40: validator=Some(Validator { root: SchemaNode { validators: Keyword, location: Location(""), absolute_path: Some(Uri { scheme: "https", authority: Some(Authority { userinfo: None, host: "example.com", host_parsed: RegName("example.com"), port: None }), path: "/person.schema.json", query: None, fragment: None }) }, config: CompilationConfig { draft: None, content_media_type: [], content_encoding: [] } }) encoded=Some(b"{\"firstName\": \"John\", \"lastName\": \"Doe\", \"age\": -1}")
2024-12-19T11:51:28.407589Z DEBUG peer{addr=127.0.0.1:60095}:produce{api_key=0 api_version=11 correlation_id=5}: tansu_schema_registry::json: 47: instance=Object {"age": Number(-1), "firstName": String("John"), "lastName": String("Doe")}
2024-12-19T11:51:28.407626Z WARN peer{addr=127.0.0.1:60095}:produce{api_key=0 api_version=11 correlation_id=5}: tansu_schema_registry::json: 51: err=ValidationError { instance: Number(-1), kind: Minimum { limit: Number(0) }, instance_path: Location("/age"), schema_path: Location("/properties/age/minimum") }
2024-12-19T11:51:28.407652Z WARN peer{addr=127.0.0.1:60095}:produce{api_key=0 api_version=11 correlation_id=5}: tansu_schema_registry::json: 57: err=Api(InvalidRecord)
2024-12-19T11:51:28.407724Z WARN peer{addr=127.0.0.1:60095}:produce{api_key=0 api_version=11 correlation_id=5}: tansu_server::broker::produce: 75: err=Storage(Api(InvalidRecord))