Consuming message queues using .net core background workers – part 3: the code, finally
Consuming message queues using .net core background workers – part 3: the code, finally

Consuming message queues using .net core background workers – part 3: the code, finally

2019, Nov 05    

And here we go with the another part of this Series! This time we’re going to take a look at the code and see how a microservice can consume messages from a queue using a background worker.

Last time I introduced Background Workers and some possible use cases. The possibilities are limitless and today we’re going to see how we can integrate this idea in a microservice.

As usual, all the code is available on GitHub, so feel free to take a look.

We can find 3 projects in the solution:

  1. WebApiWithBackgroundWorker.Common contains the common code to open a persistent connection (more on this later) and the Message class. This one is just a simple DTO shared between the publisher and the subscriber and represents the “contract” of what will be sent over the network.
  2. WebApiWithBackgroundWorker.Publisher is the message producer. It opens a connection to RabbitMQ and sends messages.
  3. WebApiWithBackgroundWorker.Subscriber is our subscribing microservice. It hosts the background worker and exposes a single endpoint to display all the received messages.

Let’s get into the details now.

The RabbitPersistentConnection class is responsible of holding our TCP connection to RabbitMQ and generating channels. It is recommended to have a single connection per application and multiple channels, one per thread. Channels are not thread-safe, so sharing them is not a great idea.

Once a connection is acquired, we register to the ConnectionShutdown, CallbackException and ConnectionBlocked events and try to connect again if something bad occurs.

Beware that this is just example code: I have left out some security checks and exception handling so don’t use this on production.

The RabbitPublisher class is the core of the Publisher project. Nothing particular fancy here: it gets a channel from the connection and uses it to send messages to an exchange. We’re using a Fanout Exchange in this example so all the subscribers will receive the messages, discarding the routing key.

Let’s dig into the subscriber now.

The microservice exposes a single GET endpoint, /messages , that returns a list of all the received messages. The API Controller has a single dependency on the Message Repository. Easy-peasy.

For the sake of the example, the persistence is handled in-memory.

The same repository is also used by our BackgroundSubscriberWorker. This one is registered in the ConfigureServices() method of the Startup class with this call:

services.AddHostedService<BackgroundSubscriberWorker>();

That’s all we need to host a background worker. Well actually that’s not entirely true: as you may have noticed, the worker class inherits from BackgroundService which helps us encapsulating the logic and handling the lifetime. There’s a good article on the Microsoft website, make sure to check it out.

Our worker class is responsible of starting the RabbitSubscriber and listening to incoming messages. When one arrives, it calls the Add() method on the repository and that’s it.

As you may have noticed, the connection string is missing from the projects. I am using CloudAMQP to host RabbitMQ, they have a nice free tier, extremely useful for experiments like this one.

It took me a little bit more than expected to write this last article, I got distracted by other topics and also by everything else is happening to me these weeks. I’m about to move to Canada, packing an entire house is never easy.

In the next article we’ll see how it’s possible to process multiple messages concurrently using the System.Threading.Channels library.

Don’t miss Part 1 and Part 2!

Did you like this post? Then