Development

Building a Base RabbitMQ Message Handler in .NET

In this article, we’ll walk through creating a base RabbitMQ message handler class in .NET. This handler will be abstract and reusable across various message processing scenarios, utilizing dependency injection and hosted services for robust, asynchronous message handling.

Why Use a Base Class for Message Handling?

A base class offers a structured and reusable way to handle messages from RabbitMQ. By defining a common base, we can create multiple message handler classes that focus solely on processing their specific message type, without worrying about connection and channel management.

Setting Up the BaseMessageHandler Class

The BaseMessageHandler<T> class provides a blueprint for establishing a RabbitMQ connection, creating channels, registering subscribers, and processing messages. The generic parameter T represents the message type, allowing us to handle specific message types in derived classes.

C#
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System.Text;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.DependencyInjection;
using System.Threading;
using System.Threading.Tasks;

namespace RabbitWithBaseListener.Controllers
{
    public abstract class BaseMessageHandler<T> : IHostedService
    {
        private readonly IServiceProvider _serviceProvider;

        protected IConnection _connection;
        protected IModel _channel;
        protected IConnectionFactory _connectionFactory;

        protected BaseMessageHandler(IServiceProvider serviceProvider)
        {
            _serviceProvider = serviceProvider;
        }

        public Task StartAsync(CancellationToken cancellationToken)
        {
            RegisterSubscribers();
            return Task.CompletedTask;
        }

        public Task StopAsync(CancellationToken cancellationToken)
        {
            _connection?.Close();
            return Task.CompletedTask;
        }

        protected void InitializeRabbitMqConnection()
        {
            _connection = _connectionFactory.CreateConnection();
            CreateChannel();
        }

        protected virtual void RegisterSubscribers()
        {
            throw new NotImplementedException();
        }

        protected virtual void RegisterMessageHandlers()
        {
            throw new NotImplementedException();
        }

        protected virtual async void OnMessageReceived(object model, BasicDeliverEventArgs ea)
        {
            string message = Encoding.UTF8.GetString(ea.Body.ToArray());

            using var scope = _serviceProvider.CreateScope();
            await ProcessMessage(scope, message);

            _channel.BasicAck(ea.DeliveryTag, false);
        }

        protected virtual async Task ProcessMessage(IServiceScope scope, string message)
        {
            await Task.Run(() => throw new NotImplementedException());
        }

        private void CreateChannel()
        {
            if (_connection == null || !_connection.IsOpen)
                throw new InvalidOperationException("No RabbitMQ connections are available to perform this action.");

            _channel?.Dispose();
            _channel = _connection.CreateModel();
        }
    }
}

Explanation of Each Component

  1. Constructor: The BaseMessageHandler takes an IServiceProvider to create a scoped service instance for processing each message.
  2. StartAsync and StopAsync: Implemented as part of IHostedService, these methods start and stop the service.
    • StartAsync calls RegisterSubscribers, allowing derived classes to specify message subscriptions.
    • StopAsync gracefully closes the RabbitMQ connection.
  3. InitializeRabbitMqConnection: Establishes a RabbitMQ connection and sets up a channel.
  4. RegisterSubscribers and RegisterMessageHandlers: Virtual methods intended for derived classes to implement specific subscriber registrations and handlers.
  5. OnMessageReceived: Handles message receipt, processes the message within a service scope, and acknowledges it.
  6. ProcessMessage: Placeholder method for processing a message, to be implemented by derived classes.
  7. CreateChannel: Sets up the channel for communication with RabbitMQ, ensuring a connection is open.

Creating a Concrete Message Handler

To use BaseMessageHandler, create a concrete class that defines specific message subscriptions and processing logic. For example, let’s create a handler for processing OrderMessage types.

C#
public class OrderMessageHandler : BaseMessageHandler<OrderMessage>
{
    public OrderMessageHandler(IServiceProvider serviceProvider) : base(serviceProvider)
    {
        InitializeRabbitMqConnection();
    }

    protected override void RegisterSubscribers()
    {
        _channel.QueueDeclare("OrderQueue", durable: true, exclusive: false, autoDelete: false, arguments: null);
        var consumer = new EventingBasicConsumer(_channel);
        consumer.Received += OnMessageReceived;
        _channel.BasicConsume("OrderQueue", autoAck: false, consumer: consumer);
    }

    protected override async Task ProcessMessage(IServiceScope scope, string message)
    {
        var orderMessage = JsonSerializer.Deserialize<OrderMessage>(message);
        // Process the orderMessage here
        Console.WriteLine($"Processing Order: {orderMessage.OrderId}");
        await Task.CompletedTask;
    }
}
Shares: