In this article, we’ll walk through creating a base RabbitMQ message handler class in .NET. This handler will be abstract and reusable across various message processing scenarios, utilizing dependency injection and hosted services for robust, asynchronous message handling.
Why Use a Base Class for Message Handling?
A base class offers a structured and reusable way to handle messages from RabbitMQ. By defining a common base, we can create multiple message handler classes that focus solely on processing their specific message type, without worrying about connection and channel management.
Setting Up the BaseMessageHandler
Class
The BaseMessageHandler<T>
class provides a blueprint for establishing a RabbitMQ connection, creating channels, registering subscribers, and processing messages. The generic parameter T
represents the message type, allowing us to handle specific message types in derived classes.
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System.Text;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.DependencyInjection;
using System.Threading;
using System.Threading.Tasks;
namespace RabbitWithBaseListener.Controllers
{
public abstract class BaseMessageHandler<T> : IHostedService
{
private readonly IServiceProvider _serviceProvider;
protected IConnection _connection;
protected IModel _channel;
protected IConnectionFactory _connectionFactory;
protected BaseMessageHandler(IServiceProvider serviceProvider)
{
_serviceProvider = serviceProvider;
}
public Task StartAsync(CancellationToken cancellationToken)
{
RegisterSubscribers();
return Task.CompletedTask;
}
public Task StopAsync(CancellationToken cancellationToken)
{
_connection?.Close();
return Task.CompletedTask;
}
protected void InitializeRabbitMqConnection()
{
_connection = _connectionFactory.CreateConnection();
CreateChannel();
}
protected virtual void RegisterSubscribers()
{
throw new NotImplementedException();
}
protected virtual void RegisterMessageHandlers()
{
throw new NotImplementedException();
}
protected virtual async void OnMessageReceived(object model, BasicDeliverEventArgs ea)
{
string message = Encoding.UTF8.GetString(ea.Body.ToArray());
using var scope = _serviceProvider.CreateScope();
await ProcessMessage(scope, message);
_channel.BasicAck(ea.DeliveryTag, false);
}
protected virtual async Task ProcessMessage(IServiceScope scope, string message)
{
await Task.Run(() => throw new NotImplementedException());
}
private void CreateChannel()
{
if (_connection == null || !_connection.IsOpen)
throw new InvalidOperationException("No RabbitMQ connections are available to perform this action.");
_channel?.Dispose();
_channel = _connection.CreateModel();
}
}
}
Explanation of Each Component
- Constructor: The
BaseMessageHandler
takes anIServiceProvider
to create a scoped service instance for processing each message. StartAsync
andStopAsync
: Implemented as part ofIHostedService
, these methods start and stop the service.StartAsync
callsRegisterSubscribers
, allowing derived classes to specify message subscriptions.StopAsync
gracefully closes the RabbitMQ connection.
InitializeRabbitMqConnection
: Establishes a RabbitMQ connection and sets up a channel.RegisterSubscribers
andRegisterMessageHandlers
: Virtual methods intended for derived classes to implement specific subscriber registrations and handlers.OnMessageReceived
: Handles message receipt, processes the message within a service scope, and acknowledges it.ProcessMessage
: Placeholder method for processing a message, to be implemented by derived classes.CreateChannel
: Sets up the channel for communication with RabbitMQ, ensuring a connection is open.
Creating a Concrete Message Handler
To use BaseMessageHandler
, create a concrete class that defines specific message subscriptions and processing logic. For example, let’s create a handler for processing OrderMessage
types.
public class OrderMessageHandler : BaseMessageHandler<OrderMessage>
{
public OrderMessageHandler(IServiceProvider serviceProvider) : base(serviceProvider)
{
InitializeRabbitMqConnection();
}
protected override void RegisterSubscribers()
{
_channel.QueueDeclare("OrderQueue", durable: true, exclusive: false, autoDelete: false, arguments: null);
var consumer = new EventingBasicConsumer(_channel);
consumer.Received += OnMessageReceived;
_channel.BasicConsume("OrderQueue", autoAck: false, consumer: consumer);
}
protected override async Task ProcessMessage(IServiceScope scope, string message)
{
var orderMessage = JsonSerializer.Deserialize<OrderMessage>(message);
// Process the orderMessage here
Console.WriteLine($"Processing Order: {orderMessage.OrderId}");
await Task.CompletedTask;
}
}