What is MassTransit?

MassTransit is a mature and feature-rich messaging framework that facilitates communication between distributed components using messaging patterns such as publish/subscribe, request/response, and message routing. Built on top of .NET, MassTransit abstracts away the complexities of message queuing systems like RabbitMQ, Azure Service Bus, and Amazon SQS, enabling developers to focus on application logic rather than infrastructure concerns.

Code Example

For this section I'll be taking code snippets from Regis Pay, an example event-driven microservice project, using a fictional payment processor named Regis Pay.

For more background on "Regis Pay" you can read from following blog post:

Now let's dive into the example code to illustrate how MassTransit can be integrated into a .NET application. In Regis Pay we have two services, one responsible for publishing messages Regis.Pay.ChangeFeed and one for consuming those messages Regis.Pay.EventConsumer.

  1. Installing MassTransit via Nuget:

To just install MassTransit you can use the following command:

dotnet add package MassTransit

However in the example project I am going to use it with RabbitMQ, so you don't need to install the above package as a prerequisites as it has it's own standalone package which you can get via using the following command:

dotnet add package MassTransit.RabbitMQ
  1. Defining the message contract

I have taken one event, the PaymentInitiated as an example. The code has been refactored to abstract classes and a interface, but essentially the shape of the message would look like the following:

public class PaymentInitiated 
{
    public string AggregateId { get; set; } = default!;
}
  1. Implement the consumer in Regis.Pay.EventConsumer

PaymentInitiatedConsumer.cs is where you can see the original source code. For the example below I have striped out bits to emphasize how you can read properties from the message, in this case the AggregateId property.

public class PaymentInitiatedConsumer : IConsumer<PaymentInitiated>
{
    private readonly ILogger<PaymentInitiatedConsumer> _logger;

    public PaymentInitiatedConsumer(ILogger<PaymentInitiatedConsumer> logger)
    {
        _logger = logger;
    }

    public async Task Consume(ConsumeContext<PaymentInitiated> context)
    {
        _logger.LogInformation("Consuming {event} for paymentId: {paymentId}", nameof(PaymentInitiated), context.Message.AggregateId);
    }
}
  1. Configure MassTransit in the Regis.Pay.EventConsumer service.

Again I have simplified the example below, you can find the original source code in ServiceCollectionExtensions.cs where it's also re-used for the publisher service Regis.Pay.ChangeFeed.

services.AddMassTransit(x =>
{
    x.UsingRabbitMq((context, cfg) =>
    {
        cfg.Host("localhost", "/", h => {
            h.Username("guest");
            h.Password("guest");
        });

        cfg.ConfigureEndpoints(context);
    });

    var assemblies = AppDomain.CurrentDomain.GetAssemblies()
                            .SelectMany(s => s.GetTypes())
                            .Where(p => typeof(IConsumer).IsAssignableFrom(p) && p.IsClass &&
                            !p.IsAbstract && p.Namespace!.Contains("Regis.Pay.EventConsumer."))
                            .Select(x => x.Assembly)
                            .ToArray();

    x.AddConsumers(assemblies);
    
});

Here you can see you don't need to specify every consumer you want registered, but instead you can do it via scanning the assemblies for the exact types you want.

  1. Send message from publisher service Regis.Pay.ChangeFeed

ChangeEventHandler.cs is where you can see the original source code and again to keep this focused on MassTransit I have simplified the code snippet below highlight the use of the IBus and the usage of the Publish method.

public class ChangeEventHandler(IBus bus) : IChangeEventHandler
{
    private readonly IBus _bus = bus;

    public async Task HandleAsync(IReadOnlyCollection<EventWrapper> events, CancellationToken cancellationToken)
    {
        foreach (var @event in events)
        {
            var integrationEvent = IntegrationEventResolver.Resolve(@event);

            if (@event.EventType == nameof(PaymentInitiated))
            {
                await _bus.Publish<PaymentInitiated>(integrationEvent);
            }
        }
    }
}

You would also have to register MassTransit for the Regis.Pay.ChangeFeed service which would be exactly like point 4. I have included it below for clarity.

services.AddMassTransit(x =>
{
    x.UsingRabbitMq((context, cfg) =>
    {
        cfg.Host("localhost", "/", h => {
            h.Username("guest");
            h.Password("guest");
        });

        cfg.ConfigureEndpoints(context);
    });
});

To help visualize the the consumer handling the message here's a GIF of a the Regis Pay example running in Docker and handling a payment go through.

As you can in the logs it consuming the message as-well as the other consumers and all of this done with little to no configuration as MassTransit handles that all for you. That said it abstracts it that you could even swap your messaging system as mentioned at the start.

Summary:

MassTransit empowers .NET developers to build resilient and scalable distributed systems by abstracting away the complexities of messaging infrastructure. With its intuitive API, extensive documentation, and vibrant community, MassTransit remains a top choice for implementing message-based communication patterns in .NET applications. Whether you're building microservices, event-driven architectures, or integration solutions, MassTransit provides the tools and abstractions necessary to unlock the full potential of distributed computing.