Publish and Consume events with CloudEvents and Azure Event Grid

cloudevents-horizontal-color

The recent announcement of first-class support for CloudEvents with Azure Event Grid is very exciting. The release emphasizes Microsoft’s dedication to an open-standard for defining event data in a consistent manner – across all clouds and platforms.

In this blog post we’ll take a look at how to publish events to Event Grid using the CloudEvents schema. We’ll also create an Azure Function (v2) that subscribes to those events and receives them using the same definition.

cloudevent-to-function

The key takeaway, and perhaps the most important element; will be that the schema used for these examples will be from the open-standard defined by the Serverless Working Group. The event format can be reviewed at the following repository: https://github.com/cloudevents/spec/blob/master/json-format.md

All the code from this post can be found at: https://github.com/dbarkol/cloud-events

Azure Setup

Let’s start by setting up a few things in Azure, the first of which is enabling the CloudEvents CLI extension. From the Cloud Shell in the Azure Portal, run the following command:

az extension add –-name eventgrid

This will provide us with support for CloudEvents when creating custom topics and event subscriptions that want to leverage the schema.

Now, let’s create a resource group to manage all the services and assets that we want to leverage:

az group create --name cloudeventsdemo-rg -l westcentralus

We then create a custom topic, just like we are accustomed to, but with the addition of the input-schema parameter:

az eventgrid topic create 
  --name <topic-name>   
  -l westcentralus   
  -g cloudeventsdemo-rg  
  --input-schema cloudeventv01schema

Publish events

Moving on to the publishing side, we can now create a strongly-typed class to represent a cloud event. For more information about the schema and how it maps to the Event Grid format, see: https://docs.microsoft.com/en-us/azure/event-grid/cloudevents-schema

using Newtonsoft.Json;
namespace CloudEventSample.Models
{
public class CloudEvent<T> where T : class
{
[JsonProperty("eventID")]
public string EventId { get; set; }
[JsonProperty("cloudEventsVersion")]
public string CloudEventVersion { get; set; }
[JsonProperty("eventType")]
public string EventType { get; set; }
[JsonProperty("eventTypeVersion")]
public string EventTypeVersion { get; set; }
[JsonProperty("source")]
public string Source { get; set; }
[JsonProperty("eventTime")]
public string EventTime { get; set; }
[JsonProperty("data")]
public T Data { get; set; }
}
}

view raw
cloudevent.cs
hosted with ❤ by GitHub

For the data,  to keep things interesting, we’ll go with information about bands and their albums to demonstrate the usage of a custom objects within the payload.

using System.Collections.Generic;
namespace CloudEventSample.Models
{
public class Band
{
public string Name { get; set; }
public List<Album> Albums { get; set; }
}
public class Album
{
public string AlbumName { get; set; }
public int YearReleased { get; set; }
public string RecordLabel { get; set; }
}
}

view raw
band-album-models.cs
hosted with ❤ by GitHub

The complete code for a .NET Core console application that publishes the event, is listed below. We’ll talk about some of the key points right after the listing.

using System;
using System.Collections.Generic;
using System.Globalization;
using System.Net.Http;
using System.Net.Http.Headers;
using System.Text;
using System.Threading.Tasks;
using CloudEventSample.Models;
using Newtonsoft.Json;
namespace CloudEventsSample.SendEvent
{
internal class Program
{
private const string TopicEndpoint = "<topic-endpoint-url>";
private const string TopicKey = "<topic-key>";
private const string Topic =
"/subscriptions/<subscription-id>/resourceGroups/<resource-group-name>/providers/Microsoft.EventGrid/topics/<topic-name>";
private static void Main(string[] args)
{
Console.WriteLine("Press <enter> to send");
Console.ReadLine();
SendEvent().Wait();
}
private static async Task SendEvent()
{
var client = new HttpClient { BaseAddress = new Uri(TopicEndpoint) };
client.DefaultRequestHeaders.Accept.Clear();
client.DefaultRequestHeaders.Accept.Add(new MediaTypeWithQualityHeaderValue("application/json"));
client.DefaultRequestHeaders.Add("aeg-sas-key", TopicKey);
var cloudEvent = new CloudEvent<Band>
{
EventId = Guid.NewGuid().ToString(),
EventType = "newBand",
EventTypeVersion = "1.0",
CloudEventVersion = "0.1",
Data = GetBand(),
Source = $"{Topic}#subjectband",
EventTime = DateTime.UtcNow.ToString(CultureInfo.InvariantCulture),
};
var json = JsonConvert.SerializeObject(cloudEvent);
var content = new StringContent(json, Encoding.UTF8, "application/json");
var response = await client.PostAsync(string.Empty, content);
}
private static Band GetBand()
{
var band = new Band
{
Name = "Spinal Tap",
Albums = new List<Album>
{
new Album { AlbumName = "Intravenus de Milo", YearReleased = 1974, RecordLabel = "Megaphone"},
new Album { AlbumName = "Shark Sandwich", YearReleased = 1980, RecordLabel = "Polymer"}
}
};
return band;
}
}
}

view raw
publishcloudevent.cs
hosted with ❤ by GitHub

The code here isn’t much different than how you would already send an event to Event Grid. However, there are a few slight differences worth noting. The first thing we want to point us is obviously the use of the CloudEvent schema:

var cloudEvent = new CloudEvent<Band>
{
EventId = Guid.NewGuid().ToString(),
EventType = "newBand",
EventTypeVersion = "1.0",
CloudEventVersion = "0.1",
Data = GetBand(),
Source = $"{Topic}#subjectband",
EventTime = DateTime.UtcNow.ToString(CultureInfo.InvariantCulture),
};

view raw
newcloudevent.cs
hosted with ❤ by GitHub

The property that stands out is Source, which according to the documentation, will be mapped to Event Grid like this: topic#subject. As a result, topic will need to be formatted as follows:

/subscriptions/<subscription-id>/resourceGroups/<resource-group-name>/providers/Microsoft.EventGrid/topics/<topic-name>

We can then append anything we like for the subject that follows the hashtag in the field. Also, notice that we are sending only one event at a time instead of an array to comply with the CloudEvents standard.

That about wraps it up for sending an event. Let’s start putting together an event subscription and handler.

Consume events

I chose to use an Azure Function for a consumer in this example to showcase the first-class support on Azure. However, keep in mind that Event Grid publishers and handlers are actually platform-agnostic. The source and handlers for these events can reside almost anywhere. I also wanted a little more hands-on experience with v2 Functions. 🙂

The complete code for a Azure Function with a HTTP trigger is as follows:

public static class NewBand
{
[FunctionName("newband")]
public static IActionResult Run(
[HttpTrigger(AuthorizationLevel.Function, "post", Route = null)]HttpRequest req,
TraceWriter log)
{
log.Info("newband function triggered");
var requestBody = new StreamReader(req.Body).ReadToEnd();
// Check the header for the event type.
if (!req.Headers.TryGetValue("Aeg-Event-Type", out var headerValues))
return new BadRequestObjectResult("Not a valid request");
var eventTypeHeaderValue = headerValues.FirstOrDefault();
if (eventTypeHeaderValue == "SubscriptionValidation")
{
// Subscription Validation event
// Echo back the validation code
// to confirm the subscription.
var events = JsonConvert.DeserializeObject<EventGridEvent[]>(requestBody);
dynamic data = events[0].Data;
var validationCode = data["validationCode"];
return new JsonResult(new
{
validationResponse = validationCode
});
}
else if (eventTypeHeaderValue == "Notification")
{
// Notification event
// Deserialize the cloud event
// and access the event data.
log.Info(requestBody);
var newBandEvent = JsonConvert.DeserializeObject<CloudEvent<Band>>(requestBody);
log.Info($"Welcome, {newBandEvent.Data.Name}!");
return new OkObjectResult("");
}
return new BadRequestObjectResult("Not a valid request");
}
}

The code is almost identical to a normal event except on line 35 when it is deserialized into the  CloudEvent model we defined earlier.

Subscribe to events

Its time to wrap this us by creating an event subscription that leverages the CloudEvent schema. Assuming the function has been deployed, we can now execute the following from the CLI:

az eventgrid event-subscription create
   --name <subscription-name>
   --topic-name <topic-name>
   -g <resource-group-name>
   --endpoint <endpoint-url>
   --event-delivery-schema cloudeventv01schema

Summary

This was a lot of fun to experiment with and we are only just starting our journey with Event Grid and CloudEvents. If you’d like to review the sample code, please see: https://github.com/dbarkol/cloud-events.

References

For more about Cloud Events, the Event Grid announcement and other related information:

 

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out /  Change )

Google photo

You are commenting using your Google account. Log Out /  Change )

Twitter picture

You are commenting using your Twitter account. Log Out /  Change )

Facebook photo

You are commenting using your Facebook account. Log Out /  Change )

Connecting to %s