This will be the start of a multi part blog post where I go over an example event-driven microservice project, using a fictional payment processor named Regis Pay as an example.
In this blog post I'll go over the transactional outbox pattern with Azure Cosmos DB. If you've done a bit of research on this topic before you've probably seen Microsoft documentation with the same title, which I'll link here: Transactional Outbox pattern with Azure Cosmos DB. It's worth reading this as it gives a good explanation of this pattern and why you would want to use it.
I'll try not to repeat the contents of the Microsoft documentation, as it's also worth noting that is specifically written with Azure services in mind. Here is a short explanation of the pattern without being technology specific:
Here's a very crude diagram I made up to illustrate the above points and how they fit together.
So bringing it back specifically to Azure Cosmos DB, why use this as the database? Here I'll go over the what and why.
Before I delve into the why, I'll go over the what. Here's a quote taken from this wiki page.
Azure Cosmos DB is a globally distributed, multi-model database service offered by Microsoft. It is designed to provide high availability, scalability, and low-latency access to data for modern applications.
There are many advantages to using this database which you can explore on the product page Azure Cosmos DB and find out more from a technical aspect with the Azure Cosmos DB documentation. Alternatives are available, Amazon DynamoDB come to mind.
The TL;DR answer, the Change Feed feature which you can read more about on Change feed in Azure Cosmos DB.
This makes it a perfect fit for the transactional outbox pattern. The change feed is the process of receiving the info from the outbox table, except in Cosmos DB terminology it's referred to as leases
and Cosmos DB has built the syncing of change events, so you don't have too.
So what does it look like? In Regis Pay here is the Worker.cs
code for setting up the change feed processor:
using Microsoft.Azure.Cosmos;
using Regis.Pay.Common.Configuration;
using Regis.Pay.Common.EventStore;
namespace Regis.Pay.ChangeFeed
{
public class Worker(
ILogger<Worker> logger,
CosmosClient cosmosClient,
CosmosConfigOptions cosmosConfigOptions,
IChangeEventHandler changeEventHandler) : BackgroundService
{
private readonly ILogger<Worker> _logger = logger;
private readonly CosmosClient _cosmosClient = cosmosClient;
private readonly CosmosConfigOptions _cosmosConfigOptions = cosmosConfigOptions;
private readonly IChangeEventHandler _changeEventHandler = changeEventHandler;
private ChangeFeedProcessor _changeFeedProcessor;
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
_changeFeedProcessor = await StartChangeFeedProcessorAsync();
}
public override async Task StopAsync(CancellationToken cancellationToken)
{
await _changeFeedProcessor.StopAsync();
}
private async Task<ChangeFeedProcessor> StartChangeFeedProcessorAsync()
{
var leaseContainer = _cosmosClient.GetContainer(_cosmosConfigOptions.DatabaseName, _cosmosConfigOptions.LeasesContainerName);
ChangeFeedProcessor changeFeedProcessor = _cosmosClient.GetContainer(_cosmosConfigOptions.DatabaseName, _cosmosConfigOptions.ContainerName)
.GetChangeFeedProcessorBuilder<EventWrapper>(processorName: "eventsChangeFeed", onChangesDelegate: HandleChangesAsync)
.WithInstanceName("Regis.Pay.ChangeFeed")
.WithLeaseContainer(leaseContainer)
.Build();
_logger.LogInformation("Starting Change Feed Processor...");
await changeFeedProcessor.StartAsync();
_logger.LogInformation("Change Feed Processor started.");
return changeFeedProcessor;
}
async Task HandleChangesAsync(
ChangeFeedProcessorContext context,
IReadOnlyCollection<EventWrapper> events,
CancellationToken cancellationToken)
{
await _changeEventHandler.HandleAsync(events, cancellationToken);
}
}
}
The important part is in method StartChangeFeedProcessorAsync()
, specifically this code block:
var leaseContainer = _cosmosClient.GetContainer(_cosmosConfigOptions.DatabaseName, _cosmosConfigOptions.LeasesContainerName);
ChangeFeedProcessor changeFeedProcessor = _cosmosClient.GetContainer(_cosmosConfigOptions.DatabaseName, _cosmosConfigOptions.ContainerName)
.GetChangeFeedProcessorBuilder<EventWrapper>(processorName: "eventsChangeFeed", onChangesDelegate: HandleChangesAsync)
.WithInstanceName("Regis.Pay.ChangeFeed")
.WithLeaseContainer(leaseContainer)
.Build();
Breaking it down:
ChangeFeedProcessor
: By getting the container you want to listen to change events and using the GetChangeFeedProcessorBuilder<T>
method.onChangesDelegate: HandleChangesAsync
: The method that processes the change events. This then gets forwarded onto IChangeEventHandler
which I'll go over below.The ChangeEventHandler.cs
contains the logic for handling the changes.
using MassTransit;
using Regis.Pay.Common.EventStore;
using Regis.Pay.Domain;
namespace Regis.Pay.ChangeFeed
{
public interface IChangeEventHandler
{
Task HandleAsync(IReadOnlyCollection<EventWrapper> events, CancellationToken cancellationToken);
}
public class ChangeEventHandler(
IBus bus,
ILogger<ChangeEventHandler> logger) : IChangeEventHandler
{
private readonly IBus _bus = bus;
private readonly ILogger<ChangeEventHandler> _logger = logger;
public async Task HandleAsync(IReadOnlyCollection<EventWrapper> events, CancellationToken cancellationToken)
{
foreach (var @event in events)
{
_logger.LogInformation("Detected change feed {event} for {eventId}", @event.EventType, @event.Id);
var integrationEvent = IntegrationEventResolver.Resolve(@event);
if (integrationEvent is null)
{
_logger.LogInformation("No integration event found for event with {eventId}", @event.Id);
break;
}
await _bus.Publish(integrationEvent, cancellationToken);
}
_logger.LogInformation("Finished handling changes.");
}
}
}
Breaking it down:
IChangeEventHandler
interface defines a method for handling change feed events.IntegrationEventResolver
is a separate utility class for resolving integration events based on EventWrapper
.IBus
dependency represents the MassTransit bus for event publishing, which is currently setup to use RabbitMQ.The change event handler will handle multiple changes and publish multiple events. Taking one domain event as an example, say the PaymentInitiated.cs
domain event. This would result in a PaymentInitiated.cs
integration event being published.
This should then be consumed by the PaymentInitiatedConsumer.cs
which contains the logic for handling/consuming the event.
using MassTransit;
using Regis.Pay.Domain;
using Regis.Pay.Domain.IntegrationEvents;
namespace Regis.Pay.EventConsumer.Consumers
{
public class PaymentInitiatedConsumer : IConsumer<PaymentInitiated>
{
private readonly IPaymentRepository _paymentRepository;
private readonly ILogger<PaymentInitiatedConsumer> _logger;
public PaymentInitiatedConsumer(
IPaymentRepository paymentRepository,
ILogger<PaymentInitiatedConsumer> logger)
{
_paymentRepository = paymentRepository;
_logger = logger;
}
public async Task Consume(ConsumeContext<PaymentInitiated> context)
{
_logger.LogInformation("Consuming {event} for paymentId: {paymentId}", nameof(PaymentInitiated), context.Message.AggregateId);
var payment = await _paymentRepository.LoadAsync(context.Message.AggregateId);
await Task.Delay(300); // Do some process here on payment initiated. eg. save to SQL database or third party system.
payment.Created();
await _paymentRepository.SaveAsync(payment);
}
}
}
In this example:
Task.Delay(300)
which represents some kind of processYou can then repeat this process of the change feed to consumer until you have the desired state.
There are a couple things I wanted to mention with regards to the current setup:
The transactional outbox pattern is a important pattern to know in the microservice world, it's one I see heavily used by Companies with a lot of microservices and for good reason it's a battle tested approach that works well, keeping all the “-ilities” in mind.