Event Sourcing in .NET Core – part 2: storing events
Event Sourcing in .NET Core – part 2: storing events

Event Sourcing in .NET Core – part 2: storing events

2020, Apr 06    

And here we are for the second part of the Event Sourcing series. Last time we introduced the main idea and some of its benefits. This time we’ll see how we can start storing events in our system.

As usual, I have prepared a small demo, modeled around the banking example I depicted in part 1. Sources are available here.

Let’s do a quick recap: we’re trying to write a system that appends events to a log-like persistent storage using a CQRS approach. Query models are stored in a separate storage and built at regular intervals or every time an event occurs.

Events can be used for various reasons, like tracing the activity on the platform or rebuilding the state of the domain models at any specific point in time.

There are several options for storing events: we could use a big, massive table in a SQL db, a collection in NoSQL or a specialized ad-hoc system.

For this demo, I decided to go for the latter and give a chance to EventStore. From its home page:

Event Store is an industrial-strength event sourcing database that stores your critical data in streams of immutable events. It was built from the ground up for event sourcing.

It has decent documentation, good community and was created by the legend Greg Young. For those who don’t know him, he coined the term “CQRS”, I guess that’s enough.

Now, in our example we had these requirements:

  1. create customers
  2. create accounts for the customers
  3. withdraw money from an account
  4. deposit money on an account

The first thing to do, as usual, is to start modeling our domain. For the first one, the Customer class encapsulates more or less all the responsibilities.

As you can see, the class inherits from a BaseAggregateRoot class, which is implementing this interface:

public interface IAggregateRoot<out TKey> : IEntity<TKey>
{
    public long Version { get; }
    IReadOnlyCollection<IDomainEvent<TKey>> Events { get; }
    void ClearEvents()    
}

public interface IEntity<out TKey>
{
    TKey Id { get; }
}

We saw something similar in a previous post about the Outbox Pattern. The key difference here is that we’re storing a Version along with the events. It will be handy on several occasions, especially when resolving conflicts during writes or when building the query models.

Creating a Customer is quite simple (code omitted for brevity):

public class CreateCustomerHandler : INotificationHandler<CreateCustomer>
 {
        private readonly IEventsService<Customer, Guid> _eventsService;

        public async Task Handle(CreateCustomer command, CancellationToken cancellationToken)
        {
            var customer = new Customer(command.Id, command.FirstName, command.LastName);
            await _eventsService.PersistAsync(customer);
        }
}

As you can see we’re directly creating the Customer model and persisting it. The Command handler is not validating the command, this concern has been extracted and executed by another class.

The next step is to create an Account for this Customer:

public class CreateAccountHandler : INotificationHandler<CreateAccount>
{
        private readonly IEventsService<Customer, Guid> _customerEventsService;
        private readonly IEventsService<Account, Guid> _accountEventsService;

        public async Task Handle(CreateAccount command, CancellationToken cancellationToken)
        {
            var customer = await _customerEventsService.RehydrateAsync(command.CustomerId);
            if(null == customer)
                throw new ArgumentOutOfRangeException(nameof(CreateAccount.CustomerId), "invalid customer id");
          
            var account = new Account(command.AccountId, customer, command.Currency);
            await _accountEventsService.PersistAsync(account);
        }
}

Here we have to load (rehydrate) the Customer first. Of course we cannot (and should not) rely on the Queries persistence layer as it might be not in sync.

The IEventsService implementation of PersistAsync() has a quite important role: it will request our persistence layer ( Event Store ) to append the events for the aggregate and will publish its integration events. We’ll talk more about this in the next article of the series.

The Events Repository instead is responsible for appending events for an Aggregate root and rehydrating it.

As you can see from the code, the append operation is opening a transaction, looping over the domain events and persisting them.

Event Store is structured over the concept of “streams”. Every aggregate is represented by a single stream, identified by the Aggregate type and key, for example “Customer_540d1d96-3655-43a4-9078-3da7e7c5a3d2” .

When rehydrating an entity, all we have to do is build the stream name given the key and the type and then fetch batches of events starting from the first one ever.

Event Store also supports snapshots, basically “a projection of the current state of an aggregate at a given point“. They can be used to improve the time taken to build the current state by preventing loading all the events from the beginning. I haven’t implemented this technique in the demo yet, probably I’ll add it in the next weeks.

Enough food for thought for today. Next time: Kafka!

Did you like this post? Then