Transactional Outbox Pattern with Azure Functions and Cosmos DB

This post will walk through an implementation of the transactional outbox pattern using Azure Functions and Cosmos DB.

The source code can be found at https://aka.ms/event-driven-architecture.

A little bit about microservices

In many distributed applications, it is typical for services to independently maintain their own datastore. This approach is used frequently in microservices and helps address challenges such as scalability, deployment, and agility by giving each service some level of autonomy.

Another characteristic that is often seen in this architectural pattern is the use of messaging to enable asynchronous communication between services. Messaging solutions like Azure Service Bus, Kafka, Event Hubs, and ActiveMQ are just a few of the available options that support this type of data exchange.

When these two patterns are embraced and used effectively, many of the promises of microservices can often be achieved. However, very rarely do these types of decisions come without tradeoffs.

Let’s use the popular example of an ordering system to dissect a key challenge: distributed transactions. The diagram below presents a frequently seen implementation in microservices:

In this design, the API, or microservice, represents an order management system. The flow of operations is as follows:

  1. Incoming requests from a client application are made to the API.
  2. The API performs some validation and business logic, then saves the data to its dedicated database.
  3. After the data is persisted, the service then makes a call to a message broker to publish a message or event for other services to consume.

This seems really simple, so what’s the problem?

For starters, issues can arise here when the message broker is unavailable. Also, what if there is a runtime error or exception during or even before the publishing step; how does that get resolved?

This leaves the ordering service, and quite possibly the data, in an inconsistent and brittle state. Even worse, it now puts the responsibility on the service to somehow rectify the situation. Ideally, this microservice should only be in charge of managing orders and should not be involved in how or if the other services receive those messages.

Distributed transactions in microservices

In monolith and many older applications, we commonly used transactions that spanned over multiple systems. Since everything was considered to be local and within our control, applying ACID principles was possible.

As we move towards microservices, this becomes increasingly challenging. Approaches such as implementing a multi-phase commit protocol are difficult and often require heavy weight infrastructure to pull off.

Assume you don’t have distributed transactions!

In most cases, the best thing to do it is to move away from this approach and embrace patterns that are tailored towards distributed systems and applications. This means that we should assume distributed transactions are no longer an option. We avoid 2-phase commits, dual writes and other related patterns that may have worked previously when the infrastructure of the solution was more centralized.

The transactional outbox pattern

In the order system example, the message broker and database are two different resources. Our goal is to ensure that messages are going to be eventually published to the message broker, and ultimately available for the other services, while still persisting the order to the database.

To make this possible, we will need to save the order information and the message that we wish to publish to the database in a single transaction.

This sequence diagram shows the first part of the outbox pattern by making use of a transaction to save both the order details and message in an atomic operation:

The second half of this pattern requires another process or worker that will be responsible for retrieving the pending messages from the database. Once successfully retrieved, it can then proceed to publish the corresponding messages and update the state of the entries accordingly:

The outbox processor will perform the following actions:

  1. Get the outbox entries (the pending messages).
  2. Publish the messages to the message broker.
  3. Update the outbox entries to reflect their published state.

The complete sequence diagram for the transactional outbox pattern now looks like this:

The outbox pattern allows us to avoid the distributed transaction but still atomically save to the database and publish a message.

Implementing the outbox pattern with Azure Functions and Cosmos DB

The remainder of this post will focus on an implementation of the transactional outbox pattern in Azure. There are many ways to implement this pattern, and we’ll use Functions and Cosmos DB to demonstrate an intriguing, event-driven option.

The architecture for the solution is represented in the following diagram:

The overall flow can be broken down into these steps:

  1. Incoming orders are sent from a client application to an HTTP-triggered Azure Function.
  2. The Azure Function will save both the order details as well as any events that should be published to Cosmos DB using a single transaction.
  3. Another Azure Function, the outbox processor, is invoked from the Cosmos DB change feed when the transaction completes.
  4. Events waiting to be published are retrieved from Cosmos DB.
  5. If there are any pending events, the function will attempt to publish them to a message broker. Azure Service Bus is used in this example to showcase an important feature called duplicate detection, more on that later.
  6. Finally, once published, the function will update the corresponding entries in Cosmos to a “Processed” state.

Cosmos DB and choosing a partition key

Before diving into the code, we have to set up a bit of infrastructure. The API for NoSQL is native to Cosmos DB and it will be used here to store the order details and messages in a document format. After the database is created in Cosmos, we will need to create a container to store all the data. Choosing the partition key here is critical.

Transactions in Cosmos DB must share the same partition key and container.

For each incoming order request, we will insert two documents: one for the order details and another for the message that will be published. Each document will have a unique ID, of course, but also share a common property for the order ID. This property (orderId) is the perfect candidate for a partition key when creating the container:


Azure Functions – Receiving orders and using transactions

The first Azure Function is responsible for accepting incoming orders and inserting two documents into a Cosmos DB container within a single transaction.

To support transactions, we will have to use the Cosmos DB SDK instead of the output binding. It all starts with dependency injection in the Startup.cs file to pass in an instance of the Cosmos client:

using Microsoft.Azure.Cosmos;
using Microsoft.Azure.Functions.Extensions.DependencyInjection;
using Microsoft.Extensions.DependencyInjection;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
[assembly: FunctionsStartup(typeof(OrderMaker.Startup))]
namespace OrderMaker
{
public class Startup : FunctionsStartup
{
public override void Configure(IFunctionsHostBuilder builder)
{
// Inject the CosmosDB client as a singleton
var connectionString = Environment.GetEnvironmentVariable("CosmosDBConnectionString");
var options = new CosmosClientOptions()
{
SerializerOptions = new CosmosSerializationOptions()
{
PropertyNamingPolicy = CosmosPropertyNamingPolicy.CamelCase
}
};
builder.Services.AddSingleton<CosmosClient>(s => new CosmosClient(connectionString, options));
}
}
}

The HTTP-triggered function takes the incoming request and prepares two documents for insertion into the container. A transactional batch is used to group both insert operations. This basically means that all the associated operations within the batch will either succeed or fail together. The entire function code is represented here (gist reference):

using System;
using System.IO;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Mvc;
using Microsoft.Azure.WebJobs;
using Microsoft.Azure.WebJobs.Extensions.Http;
using Microsoft.AspNetCore.Http;
using Microsoft.Extensions.Logging;
using Newtonsoft.Json;
using OrderMaker.Models;
using Microsoft.Azure.Cosmos;
namespace OrderMaker
{
public class CreateOrder
{
private readonly CosmosClient _client;
private readonly Container _container;
public CreateOrder(CosmosClient cosmosClient)
{
_client = cosmosClient;
var databaseId = Environment.GetEnvironmentVariable("CosmosDBDatabaseId");
var containerId = Environment.GetEnvironmentVariable("CosmosDBContainerId");
_container = _client.GetContainer(databaseId, containerId);
}
[FunctionName("CreateOrder")]
public async Task<IActionResult> Run(
[HttpTrigger(AuthorizationLevel.Anonymous, "post", Route = null)] HttpRequest req,
ILogger log)
{
// The function will insert two items into a CosmosDB container to support the
// outbox pattern. A transactional batch is used to ensure that both items are
// written successfully or not at all.
log.LogInformation("Incoming order request invoked");
// Read the request body and deserialize it into an
// order object so that it can be saved to CosmosDB.
var requestBody = await new StreamReader(req.Body).ReadToEndAsync();
var incomingOrder = JsonConvert.DeserializeObject<Order>(requestBody);
// Set the ID of the document to the same value as the Order ID
incomingOrder.Id = incomingOrder.OrderId;
// Create an order created (outbox) object for the outbox pattern.
// The ID of this document must be different that the one for
// the order object. The OrderProcessed property is used to identify
// orders that have not been published to a mesage bus.
var orderCreated = new OrderOutbox
{
AccountNumber = incomingOrder.AccountNumber,
OrderId = incomingOrder.OrderId,
Quantity = incomingOrder.Quantity,
Id = Guid.NewGuid().ToString(),
OrderProcessed = false
};
// Transactions must share the same partition key and container in Cosmos.
// Order ID ('/orderId') is used as the partition key.
PartitionKey partitionKey = new PartitionKey(incomingOrder.OrderId);
// Create and execute the batch with the two items
TransactionalBatch batch = _container.CreateTransactionalBatch(partitionKey)
.CreateItem<Order>(incomingOrder)
.CreateItem<OrderOutbox>(orderCreated);
using TransactionalBatchResponse batchResponse = await batch.ExecuteAsync();
if (batchResponse.IsSuccessStatusCode)
{
Console.WriteLine("Transactional batch succeeded");
for (var i = 0; i < batchResponse.Count; i++)
{
var result = batchResponse.GetOperationResultAtIndex<dynamic>(i);
Console.WriteLine($"Document {i + 1}:");
Console.WriteLine(result.Resource);
}
}
else
{
Console.WriteLine("Transactional batch failed");
for (var i = 0; i < batchResponse.Count; i++)
{
var result = batchResponse.GetOperationResultAtIndex<dynamic>(i);
Console.WriteLine($"Document {i + 1}: {result.StatusCode}");
}
}
return new OkResult();
}
}
}

Why not use the output binding for Cosmos in this function?

The output binding for Cosmos is great but it doesn’t satisfy our requirement to create a transaction for both of the insert operations.


Azure Functions – Outbox processor

In the outbox pattern, there needs to be a mechanism that initiates the retrieval of messages waiting to be published. That mechanism can be a job or even better, something that is event-based. The change feed in Cosmos DB outputs changes from a container in the order that they occur. Azure Functions provides a trigger for the container’s change feed.

The function for the outbox processor will use the Cosmos DB change feed trigger, including a collection of other helpful bindings:

[FunctionName("OrderOutboxWorker")]
public static async Task Run(
[CosmosDBTrigger(
databaseName: "%CosmosDBDatabaseName%",
collectionName: "%CosmosDBCollectionName%",
ConnectionStringSetting = "CosmosDBConnectionString",
CreateLeaseCollectionIfNotExists = true,
LeaseCollectionName = "%CosmosDBLeaseCollectionName%")] IReadOnlyList<Document> input,
[CosmosDB(
databaseName: "%CosmosDBDatabaseName%",
collectionName: "%CosmosDBCollectionName%",
ConnectionStringSetting = "CosmosDBConnectionString",
SqlQuery = "select * from Orders r where r.orderProcessed = false")] IEnumerable<Document> ordersCreated,
[CosmosDB(
databaseName: "%CosmosDBDatabaseName%",
collectionName: "%CosmosDBCollectionName%",
ConnectionStringSetting = "CosmosDBConnectionString"
)] DocumentClient client,
[ServiceBus(
"%TopicName%",
Connection = "ServiceBusConnectionString",
EntityType = ServiceBusEntityType.Topic
)] IAsyncCollector<ServiceBusMessage> ordersToProcess,
ILogger log)
{
}

Let’s talk briefly about the trigger and bindings used in the function:

  • CosmosDBTrigger – This trigger will be used to invoke the function. Rather than polling or scheduling a job to query the database for updates, the change feed from Cosmos DB will be the source of invocation.
  • CosmosDB input binding with SQL query – The SQLQuery attribute will retrieve multiple documents that satisfy the query, in this case it will be for messages that are ready to publish.
  • CosmosDB document client – This input binding provides a document client instance that will be used to update documents. After messages are published, the document client will update the state of the items to reflect the processed state.
  • ServiceBus output binding – Publishes multiple messages to a Service Bus topic with the IAsyncCollector.

The code for the outbox processor function, in its entirety (gist reference):

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using Azure.Messaging.ServiceBus;
using Microsoft.Azure.Cosmos;
using Microsoft.Azure.Documents;
using Microsoft.Azure.Documents.Client;
using Microsoft.Azure.Documents.Linq;
using Microsoft.Azure.WebJobs;
using Microsoft.Azure.WebJobs.Host;
using Microsoft.Azure.WebJobs.ServiceBus;
using Microsoft.Extensions.Logging;
using Newtonsoft.Json;
using OutboxProcessor.Models;
namespace OutboxProcessor
{
public static class OrderOutboxWorker
{
[FunctionName("OrderOutboxWorker")]
public static async Task Run(
[CosmosDBTrigger(
databaseName: "%CosmosDBDatabaseName%",
collectionName: "%CosmosDBCollectionName%",
ConnectionStringSetting = "CosmosDBConnectionString",
CreateLeaseCollectionIfNotExists = true,
LeaseCollectionName = "%CosmosDBLeaseCollectionName%")] IReadOnlyList<Document> input,
[CosmosDB(
databaseName: "%CosmosDBDatabaseName%",
collectionName: "%CosmosDBCollectionName%",
ConnectionStringSetting = "CosmosDBConnectionString",
SqlQuery = "select * from Orders r where r.orderProcessed = false")] IEnumerable<Document> ordersCreated,
[CosmosDB(
databaseName: "%CosmosDBDatabaseName%",
collectionName: "%CosmosDBCollectionName%",
ConnectionStringSetting = "CosmosDBConnectionString"
)] DocumentClient client,
[ServiceBus(
"%TopicName%",
Connection = "ServiceBusConnectionString",
EntityType = ServiceBusEntityType.Topic
)] IAsyncCollector<ServiceBusMessage> ordersToProcess,
ILogger log)
{
// This function is triggered off the change feed in CosmosDB. When
// new items are added to the orders container, it will be invoked
// so that the outbox transaction can be completed.
if (input != null && input.Count > 0)
{
log.LogInformation("Documents modified " + input.Count);
log.LogInformation("First document Id " + input[0].Id);
// Iterate throught the collection of orders that are ready to be processed
foreach (var o in ordersCreated)
{
// Deserialize the document into an order object so that the
// Order ID can be referenced when setting message properties
var order = JsonConvert.DeserializeObject<Order>(o.ToString());
// Create a service bus message with the order object
var jsonBody = JsonConvert.SerializeObject(order);
var byteArray = Encoding.UTF8.GetBytes(jsonBody);
var msg = new ServiceBusMessage(byteArray);
// Set the message ID to the order ID
msg.MessageId = order.OrderId.ToString();
msg.ContentType = "application/json";
msg.ApplicationProperties.Add("MessageType", "OrderCreatedEvent");
// Publish the message
await ordersToProcess.AddAsync(msg);
// Update the order processed flag in the container to complete
// the outbox transaction
o.SetPropertyValue("orderProcessed", true);
await client.ReplaceDocumentAsync(o.SelfLink, o);
}
}
}
}
}

With the outbox pattern implementation complete, it’s time to talk about one last piece: duplicate messages.

Duplicate detection with Azure Service Bus

It’s possible that publishing the same message more than once can occur. We all know that consumers should be idempotent so that processing the message multiple times will not change the state of the underlying system. In reality, this sounds trivial, but that is not the case in practice when there are multiple dependencies and systems involved.

Azure Service Bus has a great feature called duplicate detection that can help share some of this responsibility. When this feature is enabled for a queue or topic, it uses the MessageId property to identify duplicate messages when they are published. If a duplicate is found within the configured time window, it will be ignored and dropped – saving the downstream consumers from some extra work.

At the very end of the outbox processor function, the MessageId property it set to the OrderId to help reduce the possibility of duplicate messages.

Disclaimer: This doesn’t mean that the consumers are no longer responsible, or should not strive for idempotency. It is just another feature that can be used to help with the challenge of duplicate messages in an event-driven architecture.

Summary

It’s always fun taking an old pattern and implementing it in a modern way that is event-driven and takes advantage of services that integrate well together. I hope this implementation was useful and interesting. Please feel free to provide feedback if you see anything missing or incorrect, there is always room for improvement. 🙂

References

  1. Thanks for sharing David, I have the following comments
    1. I think the query “select * from Orders r where r.orderProcessed = false” is a cross-partition query that will consume more RUs negatively impacting the order creation process.
    2. The consumers might be interested in knowing order updates as well, in which case the processor should subscribe to all the changes and not just ‘Orders created’
    3. Customer Id is another candidate to consider for the partition key, it also solves the problem of ordering for orders coming from the same customer, which cannot be done with Order Id as the partition key.

    Reply

  2. Distributed Systems October 31, 2022 at 3:14 am

    What happens when the outbox processor fails to publish to outbox? Since the change feed is fetched on change trigger, this will leave some changes in the cosmos db, potentially until a next change happens, which also depends on how the change fetch is designed. Is there some retry mechanism on failure to publish? How is the pathological failure (repeated failure to publish) case handled?

    Reply

    1. You bring up a really interesting scenario. There is a bit of resiliency in the SDK for retries here. At the same time, the abstraction might be a bit much with the output binding since there aren’t any configuration details that we can make adjustments for (retry count, backoff, etc.). As much as I like the output bindings and the convenience they offer, perhaps this might be a better case for using the SDK directly again, with a try/catch block and some logging.

      Reply

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out /  Change )

Twitter picture

You are commenting using your Twitter account. Log Out /  Change )

Facebook photo

You are commenting using your Facebook account. Log Out /  Change )

Connecting to %s