Pub-Sub with Dapr and Azure Event Hubs

This post will demonstrate how to configure and use Azure Event Hubs as a pub-sub component in Dapr.

It will also cover how to consume messages from a topic with Dapr using Node.js, Go and C#.

The source code can be found at: https://github.com/dbarkol/dapr-pubsub-eventhubs

Dapr and Pub-Sub

What makes Dapr so unique and innovative is the sidecar architecture that it is built on. By using this simple, yet powerful pattern, common building blocks that are used across many distributed systems today can easily be configured and leveraged without the dependency of their respective SDKs or libraries.

For example, if you wanted to leverage a S3 bucket or Azure Storage Account for an output binding, the code you write to work with those services remains the same, regardless of which option you choose. Since communication between your code and those services is facilitated through Dapr, the sidecar that runs alongside your application will provide the abstraction on your behalf – shielding you from those implementation details and nuances.

Perhaps the best image that illustrates how these buildings blocks fit into the bigger picture, is this one:

Reference: Dapr building blocks

Along with a component for publish and subscribe (pub-sub), there are several other key building blocks that address some of the most important considerations in a microservices and distributed systems architecture today.

The rest of this post will focus on the pub-sub component and the details of configuring it with Event Hubs. If you are looking for a more complete overview of Dapr, along with some tutorials on getting started, I encourage you to visit: https://github.com/dapr.

Let’s build something

Putting something into practice is usually the best way for me to learn something new. It also helps if there is some kind of story or use case involved, to make it more relatable.

Use case: song requests and radio station playlists

Let’s image that we want to employ a publish-subscribe pattern to support song requests from radio listeners across the country. Multiple radio stations will be able to subscribe to the incoming requests and add them to their playlists.

This diagram represents the logical architecture for the solution:

Each subscriber can be a unique application, written in a different language. This polyglot approach showcases the flexibility of Dapr and also addresses one of the current challenges (or roadblocks) with microservices today: being able to support multiple languages and technology stacks.

When we introduce Dapr into the solution, it provides us with the benefits of the sidecar pattern to support multiple languages, stacks and platforms. The physical architecture diagram, with Dapr, is depicted here:

Since all communication is passed through a Dapr sidecar, using HTTP or gRPC, we now have the ability to choose any language, SDK or platform.

Setup

I’m going to use the Azure Cloud Shell to provision the necessary services, which will only require Event Hubs and a Storage Account (more on that later). I’ll start with initializing several variables to make the commands easier to read and comprehend.

# resource group name and location
rgname=zohan-dapr-demo
location=westus2

# event hubs namespace and event hub (topic)
ehnamespace=<unique-event-hubs-namespace-name>
ehname=songs

# authorization rule for shared access policy
authorizationrule=authorizationpolicy

# consumer group for each subscriber
consumergroup1=csharp-subscriber-app
consumergroup2=node-subscriber-app
consumergroup3=go-subscriber-app

# storage account name
storageaccount=<unique-storage-account-name>

Why multiple consumer groups?

You many have noticed that we plan on creating a consumer group for each subscribing application (radio station). This is necessary to ensure that each radio station receives and manages their own subscription to the incoming requests. The same pattern and approach applies to Kafka. Had the applications shared the same consumer group, they would not each get their own copy of the events coming in (well, at least not easily).

I’ve always like this diagram that illustrates how multiple consumer groups are used to read from Event Hubs (reference: https://docs.microsoft.com/en-us/azure/event-hubs/event-hubs-features#event-consumers).

The main takeaway here is that each consumer group is comprised of a collection of receivers that collectively process the events from the partitions. This relationship is great for parallelism in which the ideal ratio between event receivers and partitions is 1:1.

However, in this scenario we are leveraging Event Hubs as the message broker for pub-sub and the best way to ensure that each topic subscription gets their own events is by providing it with their own consumer group.

Okay, let’s get back to provisioning the services. We’ll begin by creating a resource group:

az group create -n $rgname -l $location

Create a Event Hubs namespace and Event Hub (topic):

# create event hubs namespace
az eventhubs namespace create --name $ehnamespace -g $rgname -l $location --sku Standard

# create event hub (topic)
az eventhubs eventhub create -g $rgname --namespace-name $ehnamespace --name $ehname

Create the consumer groups, one for each subscription:

# csharp subscriber
az eventhubs eventhub consumer-group create --resource-group $rgname --namespace-name $ehnamespace --eventhub-name $ehname --name $consumergroup1

# node subscriber
az eventhubs eventhub consumer-group create --resource-group $rgname --namespace-name $ehnamespace --eventhub-name $ehname --name $consumergroup2

# go subscriber
az eventhubs eventhub consumer-group create --resource-group $rgname --namespace-name $ehnamespace --eventhub-name $ehname --name $consumergroup3

Create a shared access policy for the Event Hub (topic):

# create shared access policy with send and listen rights
az eventhubs eventhub authorization-rule create --eventhub-name $ehname --name $authorizationrule --namespace-name $ehnamespace -g $rgname --rights Send Listen

# query the primary connection string
az eventhubs eventhub authorization-rule keys list --resource-group $rgname --namespace-name $ehnamespace --eventhub-name $ehname --name $authorizationrule --query "primaryConnectionString"

About the shared access policy

There are too many demos that use the root manage key in Azure. Unless it’s completely necessary, we should be creating policies with only the privileges that are required. In this case, we’ll use Send and Listen rights.

Finally, let’s create a storage account:

# create storage account
az storage account create --name $storageaccount --location $location --resource-group $rgname --sku Standard_LRS --kind BlobStorage --access-tier Hot

Storage account and client-side cursors

A storage account is needed to keep track of the offset when reading from Event Hubs. Both Kafka and Event Hub consumers work off of the concept of a client-side cursor. This means that it is the responsibility of the consumer to keep track of where they are within the topic. In this case, with the help of the Event Processor Host in the Dapr component, the offset is saved in a storage account.

Time for some Dapr, and a little code

With the basic infrastructure out of the way, we can get to the fun part: writing some code, using Dapr to communicate with a message broker, and seeing this in action!

Song request

The payload for a song request is fairly simple. It will only contain three properties: an identifier, the artist, and the name of the song. A sample request might resemble:

{
    "id": 1,
    "artist": "Led Zeppelin",
    "name": "Kashmir"
}

A closer look at consuming messages in Dapr

Consuming messages from a pub-sub component in Dapr can be broken up into two parts:

  1. Subscribing to topics. This consist or responding to a request from Dapr with the topic subscription details for the application. The response includes a collection of topic names and their respective route paths.
  2. Consuming messages. This is the actual endpoint that will be invoked on the application for new messages. Dapr expects a 200 OK response as an acknowledgement that the message was processed successfully.

Another very important note about the messages that Dapr supports is that they are wrapped with a Cloud Events envelope. This means that the subscribers can anticipate the original contents of the message to be within the data property of the payload. For more about Cloud Events, see: https://cloudevents.io/.

Dapr Pub-Sub component

Each application that wants to subscribe to events using Dapr, will need be daparized. In other words, it’ll need to have a Dapr sidecar assigned to it so that it could communicate with the pub-sub component through a consistent set of APIs and settings.

The YAML file for each subscriber that uses the Event Hubs component should resemble the following example:

apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
  name: messagebus
spec:
  type: pubsub.azure.eventhubs
  metadata:
    - name: connectionString
      value: Endpoint=sb://<namespace-name>.servicebus.windows.net/;SharedAccessKeyName=authorizationpolicy;SharedAccessKey=<key>;EntityPath=songs
    - name: storageAccountName
      value: <storage-account-name>
    - name: storageAccountKey
      value: <storage-account-key>
    - name: storageContainerName
      value: <unique-container-name-per-subscriber>

Some important details about the YAML file:

  • The connection string value will contain the name of your Event Hubs service, along with the shared access key, name of the policy we created and the hub/topic that will be used as the Entity Path (songs).
  • The storage account details should include the name of the account we provisioned earlier, along with an access key (not the entire connection string).
  • The storage container name must be unique here for each subscribing application. You do not need to create the container, it will be created on demand when the application runs with Dapr.
  • The offset (client-side cursor) for each subscriber will be stored in a container of the storage account. Make sure that a different container is used for each subscription.

Node.js subscriber

The Node.js implementation for consuming messages from a topic is perhaps the easiest one to comprehend. The application, in it’s entirely, is presented here:

const express = require('express')
const bodyParser = require('body-parser')
const app = express()
app.use(bodyParser.json({ type: 'application/*+json' }));
const port = 3000
app.get('/dapr/subscribe', (req, res) => {
res.json([
{
topic: "songs",
route: "playlist"
}
]);
})
app.post('/playlist', (req, res) => {
let song = req.body.data;
console.log("New song request: " + song.artist + " – " + song.name);
res.sendStatus(200);
});
app.listen(port, () => console.log(`consumer app listening on port ${port}!`))
gist: https://gist.github.com/dbarkol/4a629bb409cebfefcdeab6543c8bcfc8

This simple application supports two routes:

  1. /dapr/subscribe – This is the endpoint that Dapr will call to interrogate your application about the topics it wishes to subscribe too. In this example, messages from a topic called songs will be routed to /playlist.
  2. /playlist – This route supports the incoming messages. Notice that it accesses the song request from the data property of the request: let song = req.body.data. It sends back an acknowledgement to let Dapr know it was processed.

To run the Node application, we’ll first need to make the necessary updates we just covered to a file called pubsub.yaml in the components folder (in the node folder of the source code).

After updating the YAML file, change directories to the node sample:

cd node

Launch the application with the Dapr CLI:

dapr run --app-id node-subscriber-app --app-port 3000 --port 3500 --components-path components node app.js

For a complete reference of the dapr run command, see: https://github.com/dapr/cli/blob/master/docs/reference/dapr-run.md

Now with application running, along with the Dapr sidecar, we can use the CLI again to post a song request to the pub-sub component:

dapr publish --topic songs --data '{"id": 2, "artist": "B.B. King", "name": "Why I Sing the Blues"}'

As long as everything is configured correctly, the output from the logs should appear as:

== APP == New song request: B.B. King - Why I Sing the Blues

Let’s move on to another language to see how consuming messages from a topic is achieved with Go.

Go

Disclaimer: I’m still picking up the Go programming language. If there is a cleaner or better approach for this solution, please let me know. 🙂

As a quick recap, regardless of the language, an application that intends to consume message from a topic will need to implement a route for /dapr/subscribe along with another route to support the incoming messages.

The implementation in Go, in all its glory is:

package main
import (
"encoding/json"
"fmt"
"io/ioutil"
"log"
"net/http"
)
var port string
var topic string
var route string
func init() {
port = "8080"
topic = "songs"
route = "playlist"
}
func main() {
// Return a collection of topic subscriptions
http.HandleFunc("/dapr/subscribe", func(w http.ResponseWriter, r *http.Request) {
json, _ := json.Marshal([]struct {
Topic string `json:"topic"`
Route string `json:"route"`
}{{
topic, route}})
w.Header().Set("Content-Type", "application/json")
w.Write(json)
})
http.HandleFunc("/"+route, func(w http.ResponseWriter, r *http.Request) {
type SongRequest struct {
ID int `json:"id"`
Artist string `json:"artist"`
Name string `json:"name"`
}
type CloudEvent struct {
ID string `json:"id"`
Subject string `json:"subject"`
Source string `json:"source"`
Type string `json:"type"`
SpecVersion string `json:"specversion"`
DataContentType string `json:"datacontentytype"`
Data SongRequest `json:"data"`
}
event := &CloudEvent{}
var requestBody []byte
requestBody, _ = ioutil.ReadAll((r.Body))
err := json.Unmarshal(requestBody, &event)
if err != nil {
fmt.Println("Could not decode message")
}
log.Printf("New song request: %s – %s", event.Data.Artist, event.Data.Name)
})
fmt.Println("starting HTTP server….")
http.ListenAndServe(":"+port, nil)
}
view raw dapr-subscribe-go.go hosted with ❤ by GitHub
gist: https://gist.github.com/dbarkol/3050fc4d320c917543933514681b7e51

Since Go is strongly typed, there are several structs declared to support the incoming requests. The first struct is for the topic subscriptions:

struct {
   Topic string `json:"topic"`
   Route string `json:"route"`
}

The next two structs are used to support the song request payload and the cloud event envelope:

type SongRequest struct {
   ID     int    `json:"id"`
   Artist string `json:"artist"`
   Name   string `json:"name"`
}

type CloudEvent struct {
   ID              string      `json:"id"`
   Subject         string      `json:"subject"`
   Source          string      `json:"source"`
   Type            string      `json:"type"`
   SpecVersion     string      `json:"specversion"`
   DataContentType string      `json:"datacontentytype"`
   Data            SongRequest `json:"data"`
}

Launch the application with the Dapr CLI, notice the change in the app-id to align with the name of the consumer group:

cd go
dapr run --app-id go-subscriber-app --app-port 8080 --port 3500 --components-path components go run songrequests.go

C#

Finally, this brings us to the C# implementation of consuming messages from a topic. I’m going to use the .NET Core Web API template since it provides most of the scaffolding that we’ll need.

dotnet new webapi

Let’s add a few nuget packages to help with the CloudEvents envelope and some JSON serialization:

dotnet add package CloudNative.CloudEvents --version 1.3.80
dotnet add package CloudNative.CloudEvents.AspNetCore --version 1.3.80
dotnet add package Newtonsoft.Json --version 12.0.3

In the Startup class, to help with the model bindings, let’s add an input formatter for CloudEvents:

public void ConfigureServices(IServiceCollection services)
{
   services.AddControllers();
   services.AddMvc(opts =>
   {
      opts.InputFormatters.Insert(0, new CloudNative.CloudEvents.CloudEventJsonInputFormatter());
   });            
}

A class to represent the song request will also be added:

using Newtonsoft.Json;

namespace SongRequests.Models
{
    public class Song
    {
        [JsonProperty("id")]
        public int Id { get; set; }

        [JsonProperty("artist")]
        public string Artist { get; set; }

        [JsonProperty("name")]
        public string Name { get; set; }
    }
}

The last piece that brings this all together will be the implementation in the API controller:

using System.Collections.Generic;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Mvc;
using Microsoft.Extensions.Logging;
using CloudNative.CloudEvents;
using SongRequests.Models;
using Newtonsoft.Json;
namespace SongRequests.Controllers
{
[ApiController]
[Route("[controller]")]
public class RequestController : ControllerBase
{
private readonly ILogger<RequestController> _logger;
public RequestController(ILogger<RequestController> logger)
{
_logger = logger;
}
[HttpGet("/dapr/subscribe")]
public ActionResult<IEnumerable<string>> Get()
{
// Initialize an array of topic subscriptions. Each subscription
// contains the name of the topic and the route.
var topics = new [] {new { topic = "songs", route = "playlist"}};
return new OkObjectResult(topics);
}
[HttpPost("/playlist")]
public async Task<IActionResult> NewSong(CloudEvent cloudEvent)
{
// The message is wrapped in a cloud event envelope. Which means that
// the domain-specific information (the song) is in the Data object.
var song = JsonConvert.DeserializeObject<Song>(cloudEvent.Data.ToString());
_logger.LogInformation($"New song request: {song.Artist} – {song.Name}");
return new OkResult();
}
}
}
gist: https://gist.github.com/dbarkol/a8882394a03446b780f6bc80bfcfad4f

Launch the application with the Dapr CLI (make note of the app-id flag):

cd csharp
dapr run --app-id csharp-subscriber-app --port 3500 --app-port 5000 --components-path components dotnet run

Summary

That’s a wrap! This post ended up being about 10x longer than I had anticipated, but hopefully someone finds it useful.

Perhaps the coolest part here, and possibly one of the most important takeaways about Dapr, is that the code will not change if a different message broker is used – only the configuration for the component.

References

  1. You get major props for doing something I wish more people would do.

    From GitHub: At a high-level, the physical architecture for this solution looks like:

    Showing the top level architecture is fundamental to putting all of the code in perspective… Thank You!

    Reply

  2. Thank you for the feedback! Putting it together this way helps me learn it as well. So it’s a win-win, if it helps others at the same time.

    Reply

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out /  Change )

Google photo

You are commenting using your Google account. Log Out /  Change )

Twitter picture

You are commenting using your Twitter account. Log Out /  Change )

Facebook photo

You are commenting using your Facebook account. Log Out /  Change )

Connecting to %s