Azure Schema Registry with Event Hubs and Kafka

The Azure Schema Registry provides a repository for developers that wish to store, define and enforce schemas in their distributed applications and services. This post will explore how to use the new Schema Registry with Azure Event Hubs and the supported Kafka API.

All the source code can be found at dbarkol/azure-schema-registry-samples (github.com).

A little about schemas

A schema is often used as a data format, or contract, between services to help improve consistency, governance and data quality.

Producers leverage a schema when serializing data, before sending it to a service, such as Apache Kafka or Pulsar. Consumers are then able to reliably process the data, by using the same schema, when deserializing the payload.

Since both the producer and consumer adhere to using a shared schema, they are afforded the benefits of a clearly defined data structure which is documented, typed and can even be versioned. This contract between systems adds resiliency and a slew of other much needed features that are amongst many of the common challenges in distributed architectures.

Why use a schema registry

It’s easy to see how disparate systems can benefit from the use of schemas. However, a schema by itself isn’t enough. The need for a centralized repository for schemas and their metadata is the piece of the puzzle that completes this story.

A registry provides several benefits and addresses some key considerations:

  • Smaller data payloads. By referencing a schema in a registry, we can avoid including the actual schema in the payload.
  • Security and access control. In addition to providing a managed service that can catalog and organize schemas, a registry can support the much needed security constructs to ensure that schema operations are limited to only those services that have been granted permission.
  • Schema evolution. Schemas and contracts evolve over time – that’s expected. With the support of a registry, the services that rely on their data can lean on the schema registry to handle the transition from one schema to another. Support for compatibility modes and versioning are some of the core tenets in a schema registry that help manage this evolution.

Using the Azure Schema Registry

The schema registry is a new feature in Event Hubs that is only available in the standard or higher tiers. For this post, I’ll be using Event Hubs and some Kafka libraries for the producers and consumers. But, that isn’t a requirement if you want to use the registry with another messaging service, such as Service Bus.

Start with a schema group

A schema group is a mechanism that allows you to associate and manage a collection of schemas. Their relationship might be based off of a customer or perhaps a common set of services within your organization – whatever makes sense to you.

In the Azure portal, you’ll find the Schema Registry in your Event Hubs namespace, under Entities.

Create a group by providing a name, serialization type and compatibility mode. At this time, Avro is the only supported serialization type.

Configure access control

The next step is to grant access to a producer and consumer application that can communicate with the registry in a secure manner. In Azure, the best way to do this is with role-based access control (RBAC). The following steps will help you accomplish this:

  1. Register an application with Azure Active Directory. Save the tenant ID, client ID and client secret for each registered application.
  2. Assign the appropriate Schema Registry role for each application based off of this list: Azure Schema Registry in Event Hubs (Preview) – Azure Event Hubs | Microsoft Docs. I chose to use the Schema Registry Contributor role since I will be adding the schema programmatically to the registry if it does not exist.

Schemas in action

With the application permissions in place and a registry ready to go, we can now move forward with some code.

Define a schema

Let’s get to work and define a schema. Imagine that we have events for a customer loyalty program. Updates are published whenever points are added to a customer’s account. Here is the schema for the loyalty event in the Avro format:

{
   "type": "record",
   "name": "CustomerLoyalty",
   "namespace": "zohan.schemaregistry.events",
   "fields": [
     {
         "name": "CustomerId",
         "type": "int"
     },
     {
         "name": "PointsAdded",
         "type": "int"
     },
     {
         "name": "Description",
         "type": "string"
     }
   ]
}

Generating a strongly typed class in .NET

The Avro format is what will be used to store the schema in the registry. Since this example will be using .NET, generating a strongly typed class in C# will provide an opportunity for some optimization when publishing and consuming events.

I’m going to use a NuGet package called Apache.Avro.Tool to generate the C# class from the Avro file that contains the schema. To install the tool from a command line, run:

dotnet tool install --global Apache.Avro.Tools --version 1.10.1

To generate the C# class from the Avro file:

avrogen -s CustomerInvoice.avsc

The output will be a class like this:

// ——————————————————————————
// <auto-generated>
// Generated by avrogen, version 1.10.0.0
// Changes to this file may cause incorrect behavior and will be lost if code
// is regenerated
// </auto-generated>
// ——————————————————————————
namespace zohan.schemaregistry.events
{
using System;
using System.Collections.Generic;
using System.Text;
using Avro;
using Avro.Specific;
public partial class CustomerLoyalty : ISpecificRecord
{
public static Schema _SCHEMA = Avro.Schema.Parse("{\"type\":\"record\",\"name\":\"CustomerLoyalty\",\"namespace\":\"zohan.schemaregistry.event" +
"s\",\"fields\":[{\"name\":\"CustomerId\",\"type\":\"int\"},{\"name\":\"PointsAdded\",\"type\":\"in" +
"t\"},{\"name\":\"Description\",\"type\":\"string\"}]}");
private int _CustomerId;
private int _PointsAdded;
private string _Description;
public virtual Schema Schema
{
get
{
return CustomerLoyalty._SCHEMA;
}
}
public int CustomerId
{
get
{
return this._CustomerId;
}
set
{
this._CustomerId = value;
}
}
public int PointsAdded
{
get
{
return this._PointsAdded;
}
set
{
this._PointsAdded = value;
}
}
public string Description
{
get
{
return this._Description;
}
set
{
this._Description = value;
}
}
public virtual object Get(int fieldPos)
{
switch (fieldPos)
{
case 0: return this.CustomerId;
case 1: return this.PointsAdded;
case 2: return this.Description;
default: throw new AvroRuntimeException("Bad index " + fieldPos + " in Get()");
};
}
public virtual void Put(int fieldPos, object fieldValue)
{
switch (fieldPos)
{
case 0: this.CustomerId = (System.Int32)fieldValue; break;
case 1: this.PointsAdded = (System.Int32)fieldValue; break;
case 2: this.Description = (System.String)fieldValue; break;
default: throw new AvroRuntimeException("Bad index " + fieldPos + " in Put()");
};
}
}
}

Now that we have a generated class to work with, let’s see how serialization works.

Serializing and deserializing

The Confluent .NET Kafka library provides interfaces for both serialization and deserialization that make integration with the Azure Schema Registry extremely simple.

If you look at the implementation for the async serializer, you’ll notice that a token credential is used to authenticate against the registry. Notice also how the SerializeAsync method uses the schema to serialize the context and return a byte array:

public class KafkaAvroAsyncSerializer<T> : IAsyncSerializer<T>
{
private readonly SchemaRegistryAvroObjectSerializer serializer;
public KafkaAvroAsyncSerializer(string schemaRegistryUrl, TokenCredential credential, string schemaGroup, Boolean autoRegisterSchemas = false)
{
this.serializer = new SchemaRegistryAvroObjectSerializer(
new SchemaRegistryClient(
schemaRegistryUrl,
credential),
schemaGroup,
new SchemaRegistryAvroObjectSerializerOptions()
{
AutoRegisterSchemas = autoRegisterSchemas
});
}
public async Task<byte[]> SerializeAsync(T o, SerializationContext context)
{
if (o == null)
{
return null;
}
using (var stream = new MemoryStream())
{
await serializer.SerializeAsync(stream, o, typeof(T), CancellationToken.None);
return stream.ToArray();
}
}
}

The original implementation of this library can be found here. Deserialization takes a similar approach for authentication (as expected) and converts a byte array into a strongly typed class in the Deserialize method:

public T Deserialize(ReadOnlySpan<byte> data, bool isNull, SerializationContext context)
{
if (data.IsEmpty)
{
return default(T);
}
return (T) this.serializer.Deserialize(new MemoryStream(data.ToArray()), typeof(T), CancellationToken.None);
}

Publishing events

Both the producer and consumer samples in the repository contain an app.config file that resembles the following:

<?xml version="1.0" encoding="utf-8" ?>
<configuration>
<appSettings>
<add key="EH_FQDN" value="{eventhubs-namespace-name}.servicebus.windows.net:9093"/>
<add key="EH_CONNECTION_STRING" value="Endpoint=sb://{eventhubs-namespace-name}.servicebus.windows.net/;SharedAccessKeyName=xxxx;SharedAccessKey=xxxx"/>
<add key="EH_NAME" value="{event-hub-name}"/>
<add key="CA_CERT_LOCATION" value=".\cacert.pem"/>
<add key="SCHEMA_GROUP" value="{schema-group-name}"/>
<add key="SCHEMA_REGISTRY_URL" value="{eventshubs-namespace-name}.servicebus.windows.net"/>
<add key="SCHEMA_REGISTRY_TENANT_ID" value="{tenant id}"/>
<add key="SCHEMA_REGISTRY_CLIENT_ID" value="{client id}"/>
<add key="SCHEMA_REGISTRY_CLIENT_SECRET" value="{client secret}"/>
</appSettings>
</configuration>

If you are following along, you will need to update these values to resemble the Event Hubs namespace, connection string for the Event Hub (topic), and other pieces of relevant information. Plug in the tenant ID, client ID and client secret you created earlier for the producer and consumer applications accordingly. The code for the producer is here:

using Azure.Identity;
using Confluent.Kafka;
using Microsoft.Azure.Kafka.SchemaRegistry.Avro;
using System;
using System.Configuration;
using System.Threading.Tasks;
using zohan.schemaregistry.events;
namespace Zohan.SchemaRegistry.Producer
{
class Program
{
static async Task Main(string[] args)
{
Console.WriteLine("Press any key to begin sending events.");
Console.ReadKey();
await SendEvents();
}
public static async Task SendEvents()
{
// Initialize the producer configuration properties
var config = InitializeProducerConfig();
// Create an instance of the serializer that will
// use the schema for the messages.
var valueSerializer = InitializeValueSerializer();
try
{
// Create an instance of the producer using the serializer
// for the message value.
using (var producer = new ProducerBuilder<Null, CustomerLoyalty>(config)
.SetValueSerializer(valueSerializer)
.Build())
{
// Retrieve the topic name from the configuration settings
var topic = ConfigurationManager.AppSettings["EH_NAME"];
// Send some messages
for (int i = 0; i < 4; i++)
{
var loyaltyEvent = new CustomerLoyalty()
{
CustomerId = 1,
PointsAdded = i,
Description = $"Points added: {i}"
};
var message = new Message<Null, CustomerLoyalty> { Key = null, Value = loyaltyEvent };
await producer.ProduceAsync(topic, message);
}
}
}
catch (Exception e)
{
Console.WriteLine(string.Format("Exception Occurred – {0}", e.Message));
}
}
private static KafkaAvroAsyncSerializer<CustomerLoyalty> InitializeValueSerializer()
{
// Retrieve the necessary settings for the schema url, group and
// credentials needed communicate with the registry in Azure.
var schemaRegistryUrl = ConfigurationManager.AppSettings["SCHEMA_REGISTRY_URL"];
var schemaGroup = ConfigurationManager.AppSettings["SCHEMA_GROUP"];
ClientSecretCredential credential = new ClientSecretCredential(
ConfigurationManager.AppSettings["SCHEMA_REGISTRY_TENANT_ID"],
ConfigurationManager.AppSettings["SCHEMA_REGISTRY_CLIENT_ID"],
ConfigurationManager.AppSettings["SCHEMA_REGISTRY_CLIENT_SECRET"]);
// Set the autoRegisterSchema flag to true so that the schema will be
// registered if it does not already exist.
return new KafkaAvroAsyncSerializer<CustomerLoyalty>(
schemaRegistryUrl,
credential,
schemaGroup,
autoRegisterSchemas: true);
}
private static ProducerConfig InitializeProducerConfig()
{
var brokerList = ConfigurationManager.AppSettings["EH_FQDN"];
var connectionString = ConfigurationManager.AppSettings["EH_CONNECTION_STRING"];
var caCertLocation = ConfigurationManager.AppSettings["CA_CERT_LOCATION"];
return new ProducerConfig
{
BootstrapServers = brokerList,
SecurityProtocol = SecurityProtocol.SaslSsl,
SaslMechanism = SaslMechanism.Plain,
SaslUsername = "$ConnectionString",
SaslPassword = connectionString,
SslCaLocation = caCertLocation
};
}
}
}

Take a moment and check out how the ProducerBuilder is initialized in the sample:

var producer = new ProducerBuilder<Null, CustomerLoyalty>(config)
    .SetValueSerializer(valueSerializer)
    .Build())

What is really interesting here is how the serializer is registered to use the schema registry – pretty slick!

Consuming events

The consumer application follows a similar pattern by configuring the connection to the broker, followed by the deserializer for the value. It then consumes events and outputs the results:

using Azure.Identity;
using Confluent.Kafka;
using Microsoft.Azure.Kafka.SchemaRegistry.Avro;
using System;
using System.Configuration;
using System.Threading;
using zohan.schemaregistry.events;
namespace Zohan.SchemaRegistry.Consumer
{
class Program
{
static void Main(string[] args)
{
Console.WriteLine("Press any key to begin consuming new events");
Console.ReadKey();
ConsumeEvents();
}
static void ConsumeEvents()
{
// Initialize the producer configuration properties
var config = InitializeConsumerConfig();
// Create an instance of the serializer that will
// use the schema for the messages.
var valueDeserializer = InitializeValueDeserializer();
using (var consumer = new ConsumerBuilder<Null, CustomerLoyalty>(config)
.SetValueDeserializer(valueDeserializer)
.Build())
{
CancellationTokenSource cts = new CancellationTokenSource();
Console.CancelKeyPress += (_, e) => { e.Cancel = true; cts.Cancel(); };
// Retrieve the topic name from the configuration settings
// and set the subscription to the topic.
var topic = ConfigurationManager.AppSettings["EH_NAME"];
consumer.Subscribe(topic);
Console.WriteLine($"Consuming messages from topic: {topic}");
// Start consuming events
try
{
while (true)
{
try
{
var msg = consumer.Consume(cts.Token);
var loyalty = msg.Message.Value;
Console.WriteLine($"Customer {loyalty.CustomerId} received {loyalty.PointsAdded} points");
}
catch (ConsumeException e)
{
Console.WriteLine($"Consume error: {e.Error.Reason}");
}
}
}
catch (OperationCanceledException)
{
consumer.Close();
}
}
}
private static KafkaAvroDeserializer<CustomerLoyalty> InitializeValueDeserializer()
{
// Retrieve the necessary settings for the schema url and
// credentials needed communicate with the registry in Azure.
var schemaRegistryUrl = ConfigurationManager.AppSettings["SCHEMA_REGISTRY_URL"];
ClientSecretCredential credential = new ClientSecretCredential(
ConfigurationManager.AppSettings["SCHEMA_REGISTRY_TENANT_ID"],
ConfigurationManager.AppSettings["SCHEMA_REGISTRY_CLIENT_ID"],
ConfigurationManager.AppSettings["SCHEMA_REGISTRY_CLIENT_SECRET"]);
return new KafkaAvroDeserializer<CustomerLoyalty>(schemaRegistryUrl, credential);
}
private static ConsumerConfig InitializeConsumerConfig()
{
var brokerList = ConfigurationManager.AppSettings["EH_FQDN"];
var connectionString = ConfigurationManager.AppSettings["EH_CONNECTION_STRING"];
var caCertLocation = ConfigurationManager.AppSettings["CA_CERT_LOCATION"];
var consumerGroup = ConfigurationManager.AppSettings["CONSUMER_GROUP"];
return new ConsumerConfig
{
BootstrapServers = brokerList,
SecurityProtocol = SecurityProtocol.SaslSsl,
SocketTimeoutMs = 60000,
SessionTimeoutMs = 30000,
SaslMechanism = SaslMechanism.Plain,
SaslUsername = "$ConnectionString",
SaslPassword = connectionString,
SslCaLocation = caCertLocation,
GroupId = consumerGroup,
AutoOffsetReset = AutoOffsetReset.Earliest,
BrokerVersionFallback = "1.0.0"
};
}
}
}

Summary

The Azure Schema Registry looks promising. Being able to use existing libraries for both the consumer and producer applications to communicate with a Kafka endpoint and leverage the registry for serialization was perhaps the most important takeaway for me when learning how to use it. This feature has been in demand for quite some time and I think we’ll see more schema-first development when it comes to using some of the Azure messaging services.

References

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out /  Change )

Twitter picture

You are commenting using your Twitter account. Log Out /  Change )

Facebook photo

You are commenting using your Facebook account. Log Out /  Change )

Connecting to %s