This post will demonstrate a solution that leverages the following technologies and Azure features:
- Dependency injection in .NET Azure Functions
- Sending messages from Azure Functions to Azure Event Hubs using the Kafka protocol.
- Consuming messages from a Kafka topic with kafkacat.
GitHub repository: https://github.com/dbarkol/functions-eventhubs-kafka
Event Hubs and Kafka
Azure Event Hubs now supports Apache Kafka 1.0 and higher. What this means is that Event Hubs provides wire protocol compatibility for Kafka. This is a similar pattern used by other cloud services. For example, Azure Cosmos DB takes the same approach for the APIs that it supports (Cassandra, Gremlin, Table, MongoDB and SQL).
Events Hubs doesn’t actually run Apache Kafka. Similarly, Cosmos does not run a version of MongoDB underneath. Instead, compatibility is established at the wire level.
Compatibility at this level is exciting because, in addition to providing a managed solution to a distributed streaming platform, it also introduces Event Hubs to the rich ecosystem of Kafka applications and tools in the open source community.
For more information about Event Hubs and Kafka support, see Use Azure Event Hubs from Apache applications.
Enabling Kafka in Azure Event Hubs
From the Portal
This post provides a walkthrough of how to create an Apache Kafka-enabled Event Hubs namespace from the Azure portal: https://docs.microsoft.com/en-us/azure/event-hubs/event-hubs-create-kafka-enabled.
An Event Hubs namespace maps to a Kafka cluster, which is a collection of Brokers.
From the CLI
I prefer to use the Azure CLI over the portal as much as possible. If you are interested in a step-by-step guide using the CLI, see the README in the GitHub repository.
Producing Events from an Azure Function
I’m going to dive right into the implementation of the Azure Function, starting with some of the required packages:
- Confluent.Kafka is Confluent’s .NET client for Kafka.
- Microsoft.Azure.Functions.Extensions will provide support for dependency injection along with other features.
Using Code, I just executed the following commands to add the packages:
dotnet add package Confluent.Kafka dotnet add package Microsoft.Azure.Functions.Extensions
CA Certificate file
Included in the project is a file called cacert.pem. This is required by the Confluent library to verify that the endpoint comes from a verified source. The messaging services on Azure only expose endpoints over HTTPS (no HTTP endpoints, ever). Leveraging this certificate will ensure client-broker encryption with Event Hubs.
When deploying the Function to Azure, we want to include this file in the output directory. The csproj file in the solution accomplishes this with this entry in the ItemGroup section:
<None Update="cacert.pem"> <CopyToOutputDirectory>Always</CopyToOutputDirectory> </None>
Application Settings
For local development, the local.settings.json file should look similar to:
{ "IsEncrypted": false, "Values": { "AzureWebJobsStorage": "", "FUNCTIONS_WORKER_RUNTIME": "dotnet", "IsLocal": "1", "EventHubConnectionString": "{your-event-hub-connection-string}", "EventHubFqdn": "{event-hub-namespace}.servicebus.windows.net:9093", "EventHubName": "{event-hub-name}" } }
All the Event Hub settings (connection string, FQDN and name) will need to be added to the Application Settings in the Function App once it’s deployed in Azure. The IsLocal property is used to help determine the path of the certificate between environments (local vs Azure).
Kafka Producer Interface
The solution includes an interface for a Kafka Producer with a single method for sending an event:
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
using System.Threading.Tasks; | |
namespace Zohan.KafkaDemo | |
{ | |
public interface IKafkaProducer | |
{ | |
Task SendEvent(string topicName, string key, string value); | |
} | |
} |
Kafka Producer Implementation
The implementation for the Kafka Producer is responsible for instantiating an instance of the Producer class from the Confluent library. This code has no knowledge of Event Hubs and will work with a traditional Kafka cluster as well.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
using System; | |
using System.Collections.Generic; | |
using System.IO; | |
using System.Text; | |
using System.Threading.Tasks; | |
using Microsoft.Azure.WebJobs; | |
using Microsoft.Extensions.Logging; | |
using Confluent.Kafka; | |
namespace Zohan.KafkaDemo | |
{ | |
public class KafkaProducer : IKafkaProducer | |
{ | |
private IProducer<string, string> _producer = null; | |
public KafkaProducer(string brokerList, | |
string connectionString, | |
string caCertLocation) | |
{ | |
var config = new ProducerConfig | |
{ | |
BootstrapServers = brokerList, | |
SecurityProtocol = SecurityProtocol.SaslSsl, | |
SaslMechanism = SaslMechanism.Plain, | |
SaslUsername = "$ConnectionString", | |
SaslPassword = connectionString, | |
SslCaLocation = caCertLocation | |
}; | |
_producer = new ProducerBuilder<string, string>(config).Build(); | |
} | |
public Task SendEvent(string topicName, string key, string value) | |
{ | |
return _producer.ProduceAsync(topicName, new Message<string, string>{ | |
Key = key, | |
Value = value | |
}); | |
} | |
} | |
} |
Registering the Kafka Producer Service
In order to provide an instance of this class within the implementation of the function, we will have to register the service. The assembly attribute registers the configuration method.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
using System; | |
using System.IO; | |
using System.Linq; | |
using Microsoft.Azure.Functions.Extensions.DependencyInjection; | |
using Microsoft.Extensions.Configuration; | |
using Microsoft.Extensions.DependencyInjection; | |
using Microsoft.Extensions.Logging; | |
[assembly: FunctionsStartup(typeof(Zohan.KafkaDemo.Startup))] | |
namespace Zohan.KafkaDemo | |
{ | |
public class Startup : FunctionsStartup | |
{ | |
public override void Configure(IFunctionsHostBuilder builder) | |
{ | |
// Retrieve the Event Hubs connection string that will allow us | |
// to send events. | |
var eventHubConnectionString = Environment.GetEnvironmentVariable("EventHubConnectionString"); | |
// Retrieve the Event Hubs fully qualified domain name along | |
// with the port number. | |
// Example: {event-hub-namespace}.servicebus.windows.net:9093 | |
var eventHubFqdn = Environment.GetEnvironmentVariable("EventHubFqdn"); | |
// Retrieve the certificate file location. This is required | |
// for client-broker encryption with Event Hubs. | |
var caCertFileLocation = GetCaCertFileLocation(); | |
// Add a singleton instance of the kafka producer class. | |
builder.Services.AddSingleton<IKafkaProducer>(new KafkaProducer(eventHubFqdn, eventHubConnectionString, caCertFileLocation)); | |
} | |
private string GetCaCertFileLocation() | |
{ | |
// For local testing | |
if (Environment.GetEnvironmentVariable("IsLocal") == "1") | |
{ | |
return "cacert.pem"; | |
} | |
else | |
{ | |
// When the certificate file is copied to the output directory | |
// of the Azure Function, it will reside in the site/wwwroot | |
// folder. | |
string home = Environment.GetEnvironmentVariable("HOME"); | |
string path = Path.Combine(home, "site", "wwwroot"); | |
return Path.Combine(path, "cacert.pem"); | |
} | |
} | |
} | |
} |
Putting it all together
It all comes together in the implementation of the Azure Function, which is triggered by an HTTP request.
Notice how an instance of the Kafka Producer class is passed into the constructor. It is then used within the invocation of the function to send an event to a Kafka endpoint:
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
using System; | |
using System.IO; | |
using System.Threading.Tasks; | |
using Microsoft.AspNetCore.Mvc; | |
using Microsoft.Azure.WebJobs; | |
using Microsoft.Azure.WebJobs.Extensions.Http; | |
using Microsoft.AspNetCore.Http; | |
using Microsoft.Extensions.Logging; | |
using Newtonsoft.Json; | |
using Confluent.Kafka; | |
namespace Zohan.KafkaDemo | |
{ | |
public class SendEvent | |
{ | |
private readonly IKafkaProducer _producer; | |
public SendEvent(IKafkaProducer producer) | |
{ | |
// Save an instance of the Kafka producer | |
_producer = producer; | |
} | |
[FunctionName("sendevent")] | |
public async Task<IActionResult> Run( | |
[HttpTrigger(AuthorizationLevel.Anonymous, "post", Route = null)] HttpRequest req, | |
ILogger log) | |
{ | |
log.LogInformation("Send event function triggered"); | |
// Retrieve the topic name | |
var topicName = Environment.GetEnvironmentVariable("EventHubName"); | |
// Read the message body from the incoming request | |
var requestBody = await new StreamReader(req.Body).ReadToEndAsync(); | |
// Send the event to the topic | |
await _producer.SendEvent(topicName, // topic name | |
null, // key | |
requestBody); // value | |
return (ActionResult)new OkObjectResult($"Ok"); | |
} | |
} | |
} |
Consuming Events with the kafkacat Utility
kafkacat is a really cool utility that can produce and consume events with a Kafka topic. I wanted to continue the story of using the Kafka protocol by using this utility to read from Event Hubs.
If you are on a Windows machine, you can install kafkacat with the Windows Subsystem for Linux.
To consume events, fire up a terminal and initialize some variables:
connectionString="{event-hub-connection-string}" kafkaEndpoint="{event-hub-namespace}.servicebus.windows.net:9093" topicname="{topic-name}"
Start consuming events (line-breaks added for clarity):
kafkacat -X security.protocol=SASL_SSL -X sasl.mechanisms=PLAIN -X sasl.username="\$ConnectionString" -X sasl.password=$connectionString -b $kafkaEndpoint -t $topicname -o end
An example of a consumed test message can be seen in the screenshot below:
Summary
There are a lot of moving parts here with Event Hubs, Apache Kafka and some shiny, new dependency injection support within Azure Functions.
Hopefully, there are some helpful, reusable components in this solution that you find useful. Thank you for reading!
Resources
- Apache Kafka
- Azure Event Hubs
- Azure Functions
- Windows Subsystem for Linux
- kafkacat
- GitHub repository