This is part of a multi part blog post where I go over an example event-driven microservice project, using a fictional payment processor named Regis Pay as an example. To see the previous blog post click this link on the transactional outbox pattern.
This blog post will be exploring event sourcing.
Event sourcing is a software design pattern that involves modeling the state of an application as a sequence of events. Instead of storing the current state of an entity, you store a log of events that describe actions that have occurred. These events are immutable and represent changes in the system.
Here's how it typically works:
Event sourcing offers several benefits, including:
However, event sourcing also introduces complexity, especially around managing event consistency, versioning, and handling distributed systems. It's not a one-size-fits-all solution, but it can be powerful in the right context, such as domains with complex business logic or regulatory requirements.
Before diving straight into the code example it's worth mentioning a technique that is useful when exploring the usage of event sourcing and that is Event Storming. This technique can particularly useful when it comes to defining the domain events that you store against your domain aggregate.
Event Storming is a collaborative modeling technique used primarily in Domain-Driven Design (DDD) to explore complex business domains and design software systems. It's a workshop format where stakeholders from various backgrounds, including domain experts, developers, and business analysts, come together to visualize and understand the domain's behavior through events.
Here's how Event Storming typically works:
Event Storming encourages a collaborative, visual approach to domain exploration, fostering shared understanding among stakeholders and helping teams uncover complex domain logic and requirements. It's particularly useful in the early stages of a project when teams are trying to grasp the intricacies of a new domain or when redesigning existing systems.
The code in this section mainly focuses on persisting the data to the event store, with the implementation aware of the technology used, in this case Cosmos DB.
IEventStore.cs - This interface defines the contract for an event store, which includes methods for loading event streams and appending events to a stream.
namespace Regis.Pay.Common.EventStore
{
public interface IEventStore
{
Task<EventStream> LoadStreamAsync(string streamId);
Task<bool> AppendToStreamAsync(
string streamId,
int expectedVersion,
IEnumerable<IDomainEvent> events);
}
}
CosmosEventStore.cs - This class CosmosEventStore
implements the IEventStore
interface. It provides functionality to load event streams and append events to a Cosmos DB container.
using Microsoft.Azure.Cosmos;
using Newtonsoft.Json.Linq;
using Newtonsoft.Json;
using Regis.Pay.Common.Configuration;
namespace Regis.Pay.Common.EventStore
{
public class CosmosEventStore : IEventStore
{
private readonly IEventTypeResolver _eventTypeResolver;
private readonly Container _container;
private readonly CosmosConfigOptions _cosmosConfigOptions;
public CosmosEventStore(
IEventTypeResolver eventTypeResolver,
Container container,
CosmosConfigOptions cosmosConfigOptions)
{
_eventTypeResolver = eventTypeResolver;
_container = container;
_cosmosConfigOptions = cosmosConfigOptions;
}
public async Task<EventStream> LoadStreamAsync(string streamId)
{
var sqlQueryText = "SELECT * FROM events e"
+ " WHERE e.stream.id = @streamId"
+ " ORDER BY e.stream.version";
QueryDefinition queryDefinition = new QueryDefinition(sqlQueryText)
.WithParameter("@streamId", streamId);
int version = 0;
var events = new List<IDomainEvent>();
FeedIterator<EventWrapper> feedIterator = _container.GetItemQueryIterator<EventWrapper>(queryDefinition);
while (feedIterator.HasMoreResults)
{
FeedResponse<EventWrapper> response = await feedIterator.ReadNextAsync();
foreach (var eventWrapper in response)
{
version = eventWrapper.StreamInfo.Version;
events.Add(eventWrapper.GetEvent(_eventTypeResolver));
}
}
return new EventStream(streamId, version, events);
}
public async Task<bool> AppendToStreamAsync(string streamId, int expectedVersion, IEnumerable<IDomainEvent> events)
{
var partitionKey = new PartitionKey(streamId);
dynamic[] parameters =
[
streamId,
expectedVersion,
SerializeEvents(streamId, expectedVersion, events)
];
return await _container.Scripts.ExecuteStoredProcedureAsync<bool>("spAppendToStream", partitionKey, parameters);
}
private static string SerializeEvents(string streamId, int expectedVersion, IEnumerable<IDomainEvent> events)
{
var items = events.Select(e => new EventWrapper
{
Id = $"{streamId}:{++expectedVersion}",
StreamInfo = new StreamInfo
{
Id = streamId,
Version = expectedVersion
},
EventType = e.GetType().Name,
EventData = JObject.FromObject(e)
});
return JsonConvert.SerializeObject(items);
}
}
}
This section focuses on code related to the domain as the title implies. Pay attention (no pun intended) to how the payment is loaded. In the previous section events are retrieved from the event store in order of version
, this list of events is then passed into the Mutate
method on the AggregateBase.cs
class. This will replay the events in order, invoking the associated When
method for the domain event.
PaymentRepository.cs - This class PaymentRepository
is responsible for loading and saving payments using an event store.
using Regis.Pay.Common.EventStore;
namespace Regis.Pay.Domain
{
public class PaymentRepository : IPaymentRepository
{
private const string StreamIdPrefix = "pay";
private readonly IEventStore _eventStore;
public PaymentRepository(IEventStore eventStore)
{
_eventStore = eventStore;
}
public async Task<Payment> LoadAsync(string streamId)
{
var stream = await _eventStore.LoadStreamAsync(streamId);
if (stream is null)
{
throw new Exception($"Unable to find payment for streamId: {streamId}"); //Add custom exception
}
return new Payment(stream!.Events);
}
public async Task<bool> SaveAsync(Payment payment)
{
if (payment.Changes.Any())
{
var streamId = $"{StreamIdPrefix}:{payment.PaymentId}";
return await _eventStore.AppendToStreamAsync(
streamId,
payment.Version,
payment.Changes);
}
return true;
}
}
}
Payment.cs - This class Payment
represents a payment entity in the domain model. It inherits from AggregateBase
and defines properties and methods related to payment events.
In the example I have defined properties that could/would be useful for usage down stream such as ThridPartyReference
(which is a typo I'll need to update 😄) and PaymentCompletedTimestamp
.
using Regis.Pay.Common.EventStore;
using Regis.Pay.Domain.Events;
namespace Regis.Pay.Domain
{
public class Payment : AggregateBase
{
public Guid PaymentId { get; private set; }
public decimal Amount { get; private set; }
public string Currency { get; private set; }
public DateTime? PaymentCreatedTimestamp { get; private set; }
public Guid? ThridPartyReference { get; private set; }
public DateTime? PaymentCompletedTimestamp { get; private set; }
public Payment(IEnumerable<IDomainEvent> events) : base(events)
{
}
public Payment(Guid paymentId, decimal amount, string currency) : base()
{
Apply(new PaymentInitiated()
{
PaymentId = paymentId,
Amount = amount,
Currency = currency
});
}
public void Created()
{
Apply(new PaymentCreated());
}
public void Settled(Guid paymentReference)
{
Apply(new PaymentSettled() { PaymentReference = paymentReference });
}
public void Complete()
{
Apply(new PaymentCompleted());
}
public void When(PaymentInitiated @event)
{
PaymentId = @event.PaymentId;
Amount = @event.Amount;
Currency = @event.Currency;
}
public void When(PaymentCreated @event)
{
PaymentCreatedTimestamp = @event.Timestamp;
}
public void When(PaymentSettled @event)
{
ThridPartyReference = @event.PaymentReference;
}
public void When(PaymentCompleted @event)
{
PaymentCompletedTimestamp = @event.Timestamp;
}
}
}
AggregateBase.cs - This abstract class AggregateBase
serves as a base class for aggregate roots in the domain model. It provides functionality for managing domain events and versioning.
using Regis.Pay.Common.EventStore;
namespace Regis.Pay.Domain
{
public abstract class AggregateBase
{
public int Version { get; private set; }
public List<IDomainEvent> Changes { get; }
protected AggregateBase()
{
Changes = new List<IDomainEvent>();
}
protected AggregateBase(IEnumerable<IDomainEvent> events)
{
Changes = new List<IDomainEvent>();
foreach (var @event in events)
{
Mutate(@event);
Version += 1;
}
}
protected void Apply(IDomainEvent @event)
{
Changes.Add(@event);
Mutate(@event);
}
private void Mutate(IDomainEvent @event)
{
((dynamic)this).When((dynamic)@event);
}
}
}
Here I'll go over what the above code produces when data is persisted to the event store in Cosmos DB. There are four domain events defined in the Regis Pay example which is represented by this diagram:
When each domain event is persisted to the event store they would look like the following:
PaymentInitiated
{
"id": "pay:7551548a-b366-4081-9330-ae19bfc5557f:1",
"stream": {
"id": "pay:7551548a-b366-4081-9330-ae19bfc5557f",
"version": 1
},
"eventType": "PaymentInitiated",
"eventData": {
"PaymentId": "7551548a-b366-4081-9330-ae19bfc5557f",
"Amount": 86.1,
"Currency": "GBP",
"Timestamp": "2024-04-06T00:02:14.9984673Z"
},
"_rid": "J+MUALnYydYmAAAAAAAAAA==",
"_self": "dbs/J+MUAA==/colls/J+MUALnYydY=/docs/J+MUALnYydYmAAAAAAAAAA==/",
"_etag": "\"00000000-0000-0000-87b5-aedb6add01da\"",
"_attachments": "attachments/",
"_ts": 1712361735
}
PaymentCreated
{
"id": "pay:7551548a-b366-4081-9330-ae19bfc5557f:2",
"stream": {
"id": "pay:7551548a-b366-4081-9330-ae19bfc5557f",
"version": 2
},
"eventType": "PaymentCreated",
"eventData": {
"Timestamp": "2024-04-06T00:02:20.2212849Z"
},
"_rid": "J+MUALnYydYnAAAAAAAAAA==",
"_self": "dbs/J+MUAA==/colls/J+MUALnYydY=/docs/J+MUALnYydYnAAAAAAAAAA==/",
"_etag": "\"00000000-0000-0000-87b5-b1ca6dc601da\"",
"_attachments": "attachments/",
"_ts": 1712361740
}
PaymentSettled
{
"id": "pay:7551548a-b366-4081-9330-ae19bfc5557f:3",
"stream": {
"id": "pay:7551548a-b366-4081-9330-ae19bfc5557f",
"version": 3
},
"eventType": "PaymentSettled",
"eventData": {
"PaymentReference": "ff7fc400-783e-4070-8a91-bfb3b906729f",
"Timestamp": "2024-04-06T00:02:24.9964504Z"
},
"_rid": "J+MUALnYydYoAAAAAAAAAA==",
"_self": "dbs/J+MUAA==/colls/J+MUALnYydY=/docs/J+MUALnYydYoAAAAAAAAAA==/",
"_etag": "\"00000000-0000-0000-87b5-b49f61f401da\"",
"_attachments": "attachments/",
"_ts": 1712361745
}
PaymentCompleted
{
"id": "pay:7551548a-b366-4081-9330-ae19bfc5557f:4",
"stream": {
"id": "pay:7551548a-b366-4081-9330-ae19bfc5557f",
"version": 4
},
"eventType": "PaymentCompleted",
"eventData": {
"Timestamp": "2024-04-06T00:02:30.0929079Z"
},
"_rid": "J+MUALnYydYpAAAAAAAAAA==",
"_self": "dbs/J+MUAA==/colls/J+MUALnYydY=/docs/J+MUALnYydYpAAAAAAAAAA==/",
"_etag": "\"00000000-0000-0000-87b5-b7a8c20f01da\"",
"_attachments": "attachments/",
"_ts": 1712361750
}
When querying data from Cosmos DB, you could query using the stream id. In the example above it would be something like:
SELECT * FROM c
WHERE c.stream.id = "pay:7551548a-b366-4081-9330-ae19bfc5557f"
I have included a screenshot of what it looks like when you query Cosmos DB. This is how I retrieved the persisted domain events in the examples above.
In this demo I put a demo payment through by sending a request to the API, notice in the logs for the change feed when it logs change feed changes with the domain event. This is the change feed working after each domain event is persisted to the event store.
This demo was all ran locally using Docker Desktop. If you want to test for yourself feel free to check out this getting started guide.
The Event Souring pattern is a great pattern to use IMO, but I caveat that with the it depends on your particular use case where context is king 👑. In my experience though, where I have used this pattern in the context of payments and orders in a event driven microservice environment it made perfect sense and solved a lot of business needs. That doesn't necessarily mean I would only recommend it in a event driven microservice environment, it's just an area I have seen this successfully adopted, having said that I can see this pattern also being useful in many contexts.
If you want to read more about this pattern I would recommend the Microsoft documentation and the Event Sourcing blog post by Martin Fowler and all the code walked through in this blog post can be found here.