OpenSleigh – state persistence part 3: the outbox pattern
OpenSleigh – state persistence part 3: the outbox pattern

OpenSleigh – state persistence part 3: the outbox pattern

2021, Feb 14    

Hi All! Welcome back to the third part of this Series on OpenSleigh. Today we’ll see how we can use the Outbox pattern to ensure that outbound messages don’t get lost along with any Saga State modification.

The last time we saw how we can leverage locking and transactions to wrap message handlers execution.

Now, one thing that we definitely want to avoid is outgoing messages being dispatched before the current Saga State gets persisted.

Why? Because this would put the whole system in an inconsistent state.

Let’s suppose our State holds a counter of some operations. We might rely on that counter for some computations, or even to decide if our Saga is completed.

In a message handler, we increase this counter and send a message, but for some reason, the State does not get persisted. I think you got the idea now of the ripple effect this would cause.

Luckily for us, OpenSleigh is capable of handling all of this by leveraging the Outbox pattern. We talked already about it but let me refresh your memory real quick.

The gist of the pattern is that we don’t send messages immediately. We instead use a transaction to store atomically both the Saga State and the outgoing messages in our Persistence storage.

Then, a background worker takes care of pulling unprocessed messages, locking and publishing them.

public class OutboxBackgroundService : BackgroundService
{
    private readonly IOutboxProcessor _processor;

    protected override async Task ExecuteAsync(CancellationToken stoppingToken)
    {
        while (true)
        {
            await _processor.ProcessPendingMessagesAsync(stoppingToken);
            await Task.Delay(_options.Interval, stoppingToken);
        }
    }
}

We need to lock each message to prevent concurrent instances to send the same message:

public class OutboxProcessor
{
    public async Task ProcessPendingMessagesAsync(CancellationToken cancellationToken)
    {
        var messages = await _outboxRepository.ReadMessagesToProcess(cancellationToken);
        foreach(var message in messages)
        {
            var lockId = await _outboxRepository.LockAsync(message, cancellationToken);
            await _publisher.PublishAsync(message, cancellationToken);
            await _outboxRepository.ReleaseAsync(message, lockId, cancellationToken);
        }
    }
}

For those interested, you can find more details on the locking strategy used by OpenSleigh here.

That’s all for today! Don’t forget to check out the shiny new OpenSleigh website for more samples and documentation!

Did you like this post? Then