CQRS based on RabbitMQ
CQRS in NestJS Using RabbitMQ
CQRS (Command Query Responsibility Segregation) is a design pattern that decouples the read (query) and write (command) sides of your application. By combining this pattern with RabbitMQ and NestJS’s powerful modular system, we can build a robust and scalable architecture for asynchronous processing and eventual consistency.
This article explores how to implement a CQRS system in NestJS using the @nestjstools/messaging
library and a RabbitMQ-backed channel.
🧱 Why CQRS with Messaging?
Separation of concerns: Commands (writes) and queries (reads) evolve independently.
Scalability: Commands can be processed asynchronously in separate services.
Event-driven architecture: Events can propagate across distributed systems.
Reliability: RabbitMQ queues offer retry logic, durability, and fault tolerance.
🛠Messaging Setup with RabbitMQ
We define a MessagingWrapperModule
that registers message buses and connects them to appropriate channels:
@Module({})
export class MessagingWrapperModule {
static forRoot(enableConsumer: boolean): DynamicModule {
return {
imports: [
MessagingRabbitmqExtensionModule,
MessagingModule.forRoot({
buses: [
{ name: 'sync-command.bus', channels: ['sync-channel'] },
{ name: 'command.bus', channels: ['amqp-command.channel'] },
{ name: 'event.bus', channels: ['amqp-event.channel'] },
],
channels: [
new InMemoryChannelConfig({ name: 'sync-channel' }),
new AmqpChannelConfig({
name: 'amqp-command.channel',
connectionUri: 'amqp://guest:guest@localhost:5672/',
exchangeName: 'book_shop.exchange',
bindingKeys: ['book_shop.#'],
exchangeType: ExchangeType.TOPIC,
queue: 'book_shop.command',
autoCreate: true,
enableConsumer,
}),
new AmqpChannelConfig({
name: 'amqp-event.channel',
connectionUri: 'amqp://guest:guest@localhost:5672/',
exchangeName: 'book_shop.exchange',
bindingKeys: ['book_shop.#'],
exchangeType: ExchangeType.TOPIC,
queue: 'book_shop.event',
autoCreate: true,
enableConsumer,
}),
],
debug: true,
}),
],
providers: [
{
provide: Service.CommandBus,
useFactory: (bus: IMessageBus) => new InternalMessageBus(bus),
inject: ['command.bus'],
},
{
provide: Service.EventBus,
useFactory: (bus: IMessageBus) => new InternalMessageBus(bus),
inject: ['event.bus'],
},
{
provide: Service.SyncCommandBus,
useFactory: (bus: IMessageBus) => new InternalMessageBus(bus),
inject: ['sync-command.bus'],
},
],
exports: [
Service.CommandBus,
Service.EventBus,
Service.SyncCommandBus,
],
module: MessagingWrapperModule,
global: true,
};
}
}
📦 InternalMessageBus Wrapper
export class InternalMessageBus {
constructor(private readonly messageBus: IMessageBus) {}
async dispatch(message: object, routingKey: string): Promise<void> {
this.messageBus.dispatch(new RoutingMessage(message, routingKey));
return Promise.resolve();
}
}
This class acts as a lightweight abstraction over the raw IMessageBus
, giving you a consistent and simplified interface to dispatch messages.
🧩 Dependency Injection
Use the following helpers to inject your buses where needed:
import { Inject } from '@nestjs/common';
import { Service } from '@messaging-wrapper/messaging-wrapper/dependency-injection/service';
export const SyncCommandBus = Inject(Service.SyncCommandBus);
export const CommandBus = Inject(Service.CommandBus);
export const EventBus = Inject(Service.EventBus);
The service identifiers are managed through an enum:
export enum Service {
SyncCommandBus = 'MCommandBus',
CommandBus = 'MSyncCommandBus',
EventBus = 'MSyncEventBus',
}
✅ Example Usage
@Controller('/orders')
export class OrderController {
constructor(@CommandBus private readonly commandBus: InternalMessageBus) {}
@Get('/simulate')
simulate(): string {
this.commandBus.dispatch(
new CompleteOrder('order-uuid-123', 'Star Wars: The New Galactic', 2),
'book_shop.command.complete_order',
);
return 'ok';
}
}
🧪 Final Thoughts
With RabbitMQ and CQRS in NestJS:
Commands and events are routed across channels and buses.
Consumers process messages reliably in background workers.
The architecture supports distributed, event-driven patterns with ease.
This setup makes your system more scalable, maintainable, and resilient—ideal for microservices, modular monoliths, and modern DDD architectures.
Last updated