Event Sourcing on Azure – part 2: events persistence
Event Sourcing on Azure – part 2: events persistence

Event Sourcing on Azure – part 2: events persistence

2020, Oct 05    

Hi All! Welcome back for the second part of the Event Sourcing on Azure series. Today we’re going to digress a bit about the implementation details and some of the choices and tradeoff I’ve made. We will focus on how I’ve managed the events persistence and which tool I’ve chosen for it.

Last time we saw how a generic Event Sourcing architecture might look like. As I wrote already, there’s no silver bullet. There might be times where you can’t simply apply a design pattern as it is but you’ll have to bend it to your needs. But it’s good to know the basics, understand the ground rules and diverge if needed.

Now, take some time and browse through the countless lines of code of SuperSafeBank, I’ll wait. I started this repository to demonstrate how it’s possible to leverage Eventstore and Kafka to build an Event Sourcing system with DDD. A nice project to be fair, I learned quite a lot.

But still, with everybody and their dog using The Cloud™, it felt natural to evolve the codebase and move to Azure. I still have to migrate the entire solution, but the bulk of it is complete.

Let’s pause for a second and review what are the requirements. Our system needs to be capable of

  1. create customers
  2. get customer details by id
  3. create accounts for a customer
  4. get customer account details by account id
  5. withdraw money from an account
  6. deposit money on an account

Each one of those points will correspond to a REST endpoint. The previous, on-premise implementation was using a .NET Web API to expose all the endpoints. A Background Worker was responsible for handling the actual execution of the command and creating the Materialized Views. This is done by listening to a few Kafka topics and reacting to the received events.

The new Azure version instead gets rid of Eventstore and Kafka in favour of CosmosDB and ServiceBus, respectively.

CosmosDB is responsible of storing the events, handling versioning and consistency. A very simple implementation is available here, but basically this is the bulk of it:

var partitionKey = new PartitionKey(aggregateRoot.Id.ToString());
var firstEvent = aggregateRoot.Events.First();
var expectedVersion = firstEvent.AggregateVersion;

var dbVersionResp = await _container.GetItemLinqQueryable<EventData<TKey>>(
		requestOptions: new QueryRequestOptions()
		{
			PartitionKey = partitionKey
		}).Select(e => e.AggregateVersion)
	.MaxAsync();
if (dbVersionResp.Resource != expectedVersion)
	throw new AggregateException($"aggregate version mismatch, expected {expectedVersion} , got {dbVersionResp.Resource}");

var transaction = _container.CreateTransactionalBatch(partitionKey);

foreach (var @event in aggregateRoot.Events)
{
	var data = _eventSerializer.Serialize(@event);
	var eventType = @event.GetType();
	var eventData = EventData<TKey>.Create(aggregateRoot.Id, aggregateRoot.Version,	eventType.AssemblyQualifiedName, data);
	transaction.CreateItem(eventData);
}

await transaction.ExecuteAsync();

It will first query the latest version for a given Aggregate. As you can see, the Aggregate id is used as Partition Key. If the expected version doesn’t match, then somebody has already updated the data so we can’t proceed.

If everything is fine, it will open a transaction and write all the events available on the Aggregate.

Re-hydrating an Aggregate is quite easy :

public async Task<TA> RehydrateAsync(TKey key)
{
	var partitionKey = new PartitionKey(key.ToString());

	var events = new List<IDomainEvent<TKey>>();

	using var setIterator = _container.GetItemQueryIterator<EventData<TKey>>(requestOptions: new QueryRequestOptions { MaxItemCount = 100, PartitionKey = partitionKey });
	while (setIterator.HasMoreResults)
	{
		foreach (var item in await setIterator.ReadNextAsync())
		{
			var @event = _eventSerializer.Deserialize<TKey>(item.Type, item.Data);
			events.Add(@event);
		}
	}

	if (!events.Any())
		return null;

	var result = BaseAggregateRoot<TA, TKey>.Create(events.OrderBy(e => e.AggregateVersion));
	return result;
}

We basically query all the events for a given Aggregate, sort them by Version and replay then one after another. Piece of cake.

The next time we’ll see how to validate a Command before executing it. Ciao!

Did you like this post? Then