CQRS based on RabbitMQ
CQRS in NestJS Using RabbitMQ
CQRS (Command Query Responsibility Segregation) is a design pattern that decouples the read (query) and write (command) sides of your application. By combining this pattern with RabbitMQ and NestJS’s powerful modular system, we can build a robust and scalable architecture for asynchronous processing and eventual consistency.
This article explores how to implement a CQRS system in NestJS using the @nestjstools/messaging library and a RabbitMQ-backed channel.
🧱 Why CQRS with Messaging?
Separation of concerns: Commands (writes) and queries (reads) evolve independently.
Scalability: Commands can be processed asynchronously in separate services.
Event-driven architecture: Events can propagate across distributed systems.
Reliability: RabbitMQ queues offer retry logic, durability, and fault tolerance.
🛠 Messaging Setup with RabbitMQ
We define a MessagingWrapperModule that registers message buses and connects them to appropriate channels:
@Module({})
export class MessagingWrapperModule {
static forRoot(enableConsumer: boolean): DynamicModule {
return {
imports: [
MessagingRabbitmqExtensionModule,
MessagingModule.forRoot({
buses: [
{ name: 'sync-command.bus', channels: ['sync-channel'] },
{ name: 'command.bus', channels: ['amqp-command.channel'] },
{ name: 'event.bus', channels: ['amqp-event.channel'] },
],
channels: [
new InMemoryChannelConfig({ name: 'sync-channel' }),
new AmqpChannelConfig({
name: 'amqp-command.channel',
connectionUri: 'amqp://guest:guest@localhost:5672/',
exchangeName: 'book_shop.exchange',
bindingKeys: ['book_shop.#'],
exchangeType: ExchangeType.TOPIC,
queue: 'book_shop.command',
autoCreate: true,
enableConsumer,
}),
new AmqpChannelConfig({
name: 'amqp-event.channel',
connectionUri: 'amqp://guest:guest@localhost:5672/',
exchangeName: 'book_shop.exchange',
bindingKeys: ['book_shop.#'],
exchangeType: ExchangeType.TOPIC,
queue: 'book_shop.event',
autoCreate: true,
enableConsumer,
}),
],
debug: true,
}),
],
providers: [
{
provide: Service.CommandBus,
useFactory: (bus: IMessageBus) => new InternalMessageBus(bus),
inject: ['command.bus'],
},
{
provide: Service.EventBus,
useFactory: (bus: IMessageBus) => new InternalMessageBus(bus),
inject: ['event.bus'],
},
{
provide: Service.SyncCommandBus,
useFactory: (bus: IMessageBus) => new InternalMessageBus(bus),
inject: ['sync-command.bus'],
},
],
exports: [
Service.CommandBus,
Service.EventBus,
Service.SyncCommandBus,
],
module: MessagingWrapperModule,
global: true,
};
}
}📦 InternalMessageBus Wrapper
This class acts as a lightweight abstraction over the raw IMessageBus, giving you a consistent and simplified interface to dispatch messages.
🧩 Dependency Injection
Use the following helpers to inject your buses where needed:
The service identifiers are managed through an enum:
✅ Example Usage
🧪 Final Thoughts
With RabbitMQ and CQRS in NestJS:
Commands and events are routed across channels and buses.
Consumers process messages reliably in background workers.
The architecture supports distributed, event-driven patterns with ease.
This setup makes your system more scalable, maintainable, and resilient—ideal for microservices, modular monoliths, and modern DDD architectures.
Last updated