WePay uses Apache Kafka as its real time message broker service to publish and consume realtime events. Messages published to Kafka topics can adhere to a specific schema. To manage the schemas in a centralized location, we use Confluent’s schema-registry, which provides a RESTful interface for storing and retrieving Apache Avro schemas. Having a schema registry is important because it provides a central location to store schemas across all of our applications, which makes it easier for systems to integrate with one another. The registry is also an instrument to enforce certain policies for the data, such as preventing newer schema versions in a topic from breaking compatibility with existing versions. In this post, we’ll describe how we worked with Confluent to integrate its Python client and schema-registry, so we (and others) can send and receive messages in Python using Avro.
Using a common shared Avro schema-registry is advantageous in many ways. It provides safe schema evolution, and helps with the organizational challenge of data management and data policy enforcement. But this introduces an additional step of fetching schemas from the registry, and encoding messages every time a message is published into Kafka stream (and decoding the message when consuming it). This becomes a common routine for every message published and consumed. The current Python Kafka client provided by Confluent did not have the built-in capability to deal with Avro schemas the way that the Kafka Java client supports pluggable serializers and a Confluent Avro implementation. WePay has introduced a wrapper in the Confluent client to solve this problem, and it’s been merged into the main Confluent Python client repository.
The Kafka Avro client is a Python package extending the basic capabilities of Confluent’s Kafka client. It is a wrapper on top of Confluent’s Kafka library. This provides objects to manage connectivity to the schema registry, which is heavily based off of Confluent’s schema registry Java API. The client communicates with the schema-registry server over the typical HTTP/HTTPS protocol. The wrapper also does the heavy lifting of message decoding and encoding with Avro schemas. Both AvroConsumer and AvroProducer wrappers are provided, and they both implement caches to keep schema_id to schema mappings.
The diagram below shows the message handling part of the Kafka Avro client.
The schema-registry server returns a unique schema_id when a new schema is registered. The Avro producer client takes a message and a schema as input. The client first checks the cache for schema_id for the corresponding schema. If the schema is already registered with schema-registry server, then that schema_id is picked from cache. The message is then encoded with the Avro schema and the schema_id is attached to it and published to Kafka topic. If the schema_id is not found in cache, then the schema is registered with schema-registry server and the schema_id is stored back in cache for future use.
The Avro consumer is responsible for pulling a message from Kafka and decoding the content. A message is polled from the Kafka and unpacked to get the schema_id for decoding. The Avro consumer also has a cache, which manages schema_id to schema mappings. The Avro consumer looks into the cache for the stored schema. If schema is not present, Avro consumer makes an API call to the schema-registry server to get the schema for the corresponding schema_id present in the message pack. Then it decodes the message against the Avro schema to get the original message structure.
In order to interoperate with other Kafka ecosystem consumers and producers that use the Schema registry, the standard message format payload is followed. Every message published to Kafka topic through this client will have three parts
- Magic byte: Used to store the version of the schema for the payload being sent
- Schema_id: Unique id associated with the Avro schema with which the message is been encoded
- Message: Actual bytes of the encoded data
The new Avro Kafka client frees the user from having to encode and decode messages with Avro schemas when publishing and receiving messages through Kafka. The client also manages interactions with the schema-registry. The code is available here, as part of the standard Confluent Python Kafka client.