Azure Event Hubs, Kafka and Dependency Injection in Azure Functions

functions-kafka-hubs

This post will demonstrate a solution that leverages the following technologies and Azure features:

GitHub repository: https://github.com/dbarkol/functions-eventhubs-kafka

Event Hubs and Kafka

Azure Event Hubs now supports Apache Kafka 1.0 and higher. What this means is that Event Hubs provides wire protocol compatibility for Kafka. This is a similar pattern used by other cloud services. For example, Azure Cosmos DB takes the same approach for the APIs that it supports (Cassandra, Gremlin, Table, MongoDB and SQL).

20

Events Hubs doesn’t actually run Apache Kafka. Similarly, Cosmos does not run a version of MongoDB underneath. Instead, compatibility is established at the wire level.

Compatibility at this level is exciting because, in addition to providing a managed solution to a distributed streaming platform, it also introduces Event Hubs to the rich ecosystem of Kafka applications and tools in the open source community.

For more information about Event Hubs and Kafka support, see Use Azure Event Hubs from Apache applications.

Enabling Kafka in Azure Event Hubs

From the Portal

This post provides a walkthrough of how to create an Apache Kafka-enabled Event Hubs namespace from the Azure portal: https://docs.microsoft.com/en-us/azure/event-hubs/event-hubs-create-kafka-enabled.

An Event Hubs namespace maps to a Kafka cluster, which is a collection of Brokers.

From the CLI

I prefer to use the Azure CLI over the portal as much as possible. If you are interested in a step-by-step guide using the CLI, see the README in the GitHub repository.

Producing Events from an Azure Function

I’m going to dive right into the implementation of the Azure Function, starting with some of the required packages:

Using Code, I just executed the following commands to add the packages:

dotnet add package Confluent.Kafka
dotnet add package Microsoft.Azure.Functions.Extensions

CA Certificate file

Included in the project is a file called cacert.pem. This is required by the Confluent library to verify that the endpoint comes from a verified source. The messaging services on Azure only expose endpoints over HTTPS (no HTTP endpoints, ever). Leveraging this certificate will ensure client-broker encryption with Event Hubs.

When deploying the Function to Azure, we want to include this file in the output directory. The csproj file in the solution accomplishes this with this entry in the ItemGroup section:

<None Update="cacert.pem">
   <CopyToOutputDirectory>Always</CopyToOutputDirectory>
</None>

Application Settings

For local development, the local.settings.json file should look similar to:

{
    "IsEncrypted": false,
    "Values": {
        "AzureWebJobsStorage": "",
        "FUNCTIONS_WORKER_RUNTIME": "dotnet",
        "IsLocal": "1",
        "EventHubConnectionString": "{your-event-hub-connection-string}",
        "EventHubFqdn": "{event-hub-namespace}.servicebus.windows.net:9093",
        "EventHubName": "{event-hub-name}"
    }
}

All the Event Hub settings (connection string, FQDN and name) will need to be added to the Application Settings in the Function App once it’s deployed in Azure.  The IsLocal property is used to help determine the path of the certificate between environments (local vs Azure).

Kafka Producer Interface

The solution includes an interface for a Kafka Producer with a single method for sending an event:

using System.Threading.Tasks;
namespace Zohan.KafkaDemo
{
public interface IKafkaProducer
{
Task SendEvent(string topicName, string key, string value);
}
}

view raw
IKafkaProducer.cs
hosted with ❤ by GitHub

Kafka Producer Implementation

The implementation for the Kafka Producer is responsible for instantiating an instance of the Producer class from the Confluent library. This code has no knowledge of Event Hubs and will work with a traditional Kafka cluster as well.

using System;
using System.Collections.Generic;
using System.IO;
using System.Text;
using System.Threading.Tasks;
using Microsoft.Azure.WebJobs;
using Microsoft.Extensions.Logging;
using Confluent.Kafka;
namespace Zohan.KafkaDemo
{
public class KafkaProducer : IKafkaProducer
{
private IProducer<string, string> _producer = null;
public KafkaProducer(string brokerList,
string connectionString,
string caCertLocation)
{
var config = new ProducerConfig
{
BootstrapServers = brokerList,
SecurityProtocol = SecurityProtocol.SaslSsl,
SaslMechanism = SaslMechanism.Plain,
SaslUsername = "$ConnectionString",
SaslPassword = connectionString,
SslCaLocation = caCertLocation
};
_producer = new ProducerBuilder<string, string>(config).Build();
}
public Task SendEvent(string topicName, string key, string value)
{
return _producer.ProduceAsync(topicName, new Message<string, string>{
Key = key,
Value = value
});
}
}
}

view raw
KafkaProducer.cs
hosted with ❤ by GitHub

Registering the Kafka Producer Service

In order to provide an instance of this class within the implementation of the function, we will have to register the service. The assembly attribute registers the configuration method.

using System;
using System.IO;
using System.Linq;
using Microsoft.Azure.Functions.Extensions.DependencyInjection;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
[assembly: FunctionsStartup(typeof(Zohan.KafkaDemo.Startup))]
namespace Zohan.KafkaDemo
{
public class Startup : FunctionsStartup
{
public override void Configure(IFunctionsHostBuilder builder)
{
// Retrieve the Event Hubs connection string that will allow us
// to send events.
var eventHubConnectionString = Environment.GetEnvironmentVariable("EventHubConnectionString");
// Retrieve the Event Hubs fully qualified domain name along
// with the port number.
// Example: {event-hub-namespace}.servicebus.windows.net:9093
var eventHubFqdn = Environment.GetEnvironmentVariable("EventHubFqdn");
// Retrieve the certificate file location. This is required
// for client-broker encryption with Event Hubs.
var caCertFileLocation = GetCaCertFileLocation();
// Add a singleton instance of the kafka producer class.
builder.Services.AddSingleton<IKafkaProducer>(new KafkaProducer(eventHubFqdn, eventHubConnectionString, caCertFileLocation));
}
private string GetCaCertFileLocation()
{
// For local testing
if (Environment.GetEnvironmentVariable("IsLocal") == "1")
{
return "cacert.pem";
}
else
{
// When the certificate file is copied to the output directory
// of the Azure Function, it will reside in the site/wwwroot
// folder.
string home = Environment.GetEnvironmentVariable("HOME");
string path = Path.Combine(home, "site", "wwwroot");
return Path.Combine(path, "cacert.pem");
}
}
}
}

view raw
KafkaStartup.cs
hosted with ❤ by GitHub

Putting it all together

It all comes together in the implementation of the Azure Function, which is triggered by an HTTP request.

Notice how an instance of the Kafka Producer class is passed into the constructor. It is then used within the invocation of the function to send an event to a Kafka endpoint:

using System;
using System.IO;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Mvc;
using Microsoft.Azure.WebJobs;
using Microsoft.Azure.WebJobs.Extensions.Http;
using Microsoft.AspNetCore.Http;
using Microsoft.Extensions.Logging;
using Newtonsoft.Json;
using Confluent.Kafka;
namespace Zohan.KafkaDemo
{
public class SendEvent
{
private readonly IKafkaProducer _producer;
public SendEvent(IKafkaProducer producer)
{
// Save an instance of the Kafka producer
_producer = producer;
}
[FunctionName("sendevent")]
public async Task<IActionResult> Run(
[HttpTrigger(AuthorizationLevel.Anonymous, "post", Route = null)] HttpRequest req,
ILogger log)
{
log.LogInformation("Send event function triggered");
// Retrieve the topic name
var topicName = Environment.GetEnvironmentVariable("EventHubName");
// Read the message body from the incoming request
var requestBody = await new StreamReader(req.Body).ReadToEndAsync();
// Send the event to the topic
await _producer.SendEvent(topicName, // topic name
null, // key
requestBody); // value
return (ActionResult)new OkObjectResult($"Ok");
}
}
}

Consuming Events with the kafkacat Utility

kafkacat is a really cool utility that can produce and consume events with a Kafka topic. I wanted to continue the story of using the Kafka protocol by using this utility to read from Event Hubs.

If you are on a Windows machine, you can install kafkacat with the Windows Subsystem for Linux.

To consume events, fire up a terminal and initialize some variables:

connectionString="{event-hub-connection-string}"
kafkaEndpoint="{event-hub-namespace}.servicebus.windows.net:9093"
topicname="{topic-name}"

Start consuming events (line-breaks added for clarity):

kafkacat -X security.protocol=SASL_SSL 
         -X sasl.mechanisms=PLAIN 
         -X sasl.username="\$ConnectionString" 
         -X sasl.password=$connectionString 
         -b $kafkaEndpoint 
         -t $topicname 
         -o end

An example of a consumed test message can be seen in the screenshot below:

kafkacat-sample

Summary

There are a lot of moving parts here with Event Hubs, Apache Kafka and some shiny, new dependency injection support within Azure Functions.

Hopefully, there are some helpful, reusable components in this solution that you find useful. Thank you for reading!

Resources

 

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out /  Change )

Google photo

You are commenting using your Google account. Log Out /  Change )

Twitter picture

You are commenting using your Twitter account. Log Out /  Change )

Facebook photo

You are commenting using your Facebook account. Log Out /  Change )

Connecting to %s