CQRS based on RabbitMQ

CQRS in NestJS Using RabbitMQ

CQRS (Command Query Responsibility Segregation) is a design pattern that decouples the read (query) and write (command) sides of your application. By combining this pattern with RabbitMQ and NestJS’s powerful modular system, we can build a robust and scalable architecture for asynchronous processing and eventual consistency.

This article explores how to implement a CQRS system in NestJS using the @nestjstools/messaging library and a RabbitMQ-backed channel.


🧱 Why CQRS with Messaging?

  • Separation of concerns: Commands (writes) and queries (reads) evolve independently.

  • Scalability: Commands can be processed asynchronously in separate services.

  • Event-driven architecture: Events can propagate across distributed systems.

  • Reliability: RabbitMQ queues offer retry logic, durability, and fault tolerance.


🛠 Messaging Setup with RabbitMQ

We define a MessagingWrapperModule that registers message buses and connects them to appropriate channels:

@Module({})
export class MessagingWrapperModule {
  static forRoot(enableConsumer: boolean): DynamicModule {
    return {
      imports: [
        MessagingRabbitmqExtensionModule,
        MessagingModule.forRoot({
          buses: [
            { name: 'sync-command.bus', channels: ['sync-channel'] },
            { name: 'command.bus', channels: ['amqp-command.channel'] },
            { name: 'event.bus', channels: ['amqp-event.channel'] },
          ],
          channels: [
            new InMemoryChannelConfig({ name: 'sync-channel' }),
            new AmqpChannelConfig({
              name: 'amqp-command.channel',
              connectionUri: 'amqp://guest:guest@localhost:5672/',
              exchangeName: 'book_shop.exchange',
              bindingKeys: ['book_shop.#'],
              exchangeType: ExchangeType.TOPIC,
              queue: 'book_shop.command',
              autoCreate: true,
              enableConsumer,
            }),
            new AmqpChannelConfig({
              name: 'amqp-event.channel',
              connectionUri: 'amqp://guest:guest@localhost:5672/',
              exchangeName: 'book_shop.exchange',
              bindingKeys: ['book_shop.#'],
              exchangeType: ExchangeType.TOPIC,
              queue: 'book_shop.event',
              autoCreate: true,
              enableConsumer,
            }),
          ],
          debug: true,
        }),
      ],
      providers: [
        {
          provide: Service.CommandBus,
          useFactory: (bus: IMessageBus) => new InternalMessageBus(bus),
          inject: ['command.bus'],
        },
        {
          provide: Service.EventBus,
          useFactory: (bus: IMessageBus) => new InternalMessageBus(bus),
          inject: ['event.bus'],
        },
        {
          provide: Service.SyncCommandBus,
          useFactory: (bus: IMessageBus) => new InternalMessageBus(bus),
          inject: ['sync-command.bus'],
        },
      ],
      exports: [
        Service.CommandBus,
        Service.EventBus,
        Service.SyncCommandBus,
      ],
      module: MessagingWrapperModule,
      global: true,
    };
  }
}

📦 InternalMessageBus Wrapper

export class InternalMessageBus {
  constructor(private readonly messageBus: IMessageBus) {}

  async dispatch(message: object, routingKey: string): Promise<void> {
    this.messageBus.dispatch(new RoutingMessage(message, routingKey));
    return Promise.resolve();
  }
}

This class acts as a lightweight abstraction over the raw IMessageBus, giving you a consistent and simplified interface to dispatch messages.


🧩 Dependency Injection

Use the following helpers to inject your buses where needed:

import { Inject } from '@nestjs/common';
import { Service } from '@messaging-wrapper/messaging-wrapper/dependency-injection/service';

export const SyncCommandBus = Inject(Service.SyncCommandBus);
export const CommandBus = Inject(Service.CommandBus);
export const EventBus = Inject(Service.EventBus);

The service identifiers are managed through an enum:

export enum Service {
  SyncCommandBus = 'MCommandBus',
  CommandBus = 'MSyncCommandBus',
  EventBus = 'MSyncEventBus',
}

✅ Example Usage

@Controller('/orders')
export class OrderController {
  constructor(@CommandBus private readonly commandBus: InternalMessageBus) {}

  @Get('/simulate')
  simulate(): string {
    this.commandBus.dispatch(
      new CompleteOrder('order-uuid-123', 'Star Wars: The New Galactic', 2),
      'book_shop.command.complete_order',
    );
    return 'ok';
  }
}

🧪 Final Thoughts

With RabbitMQ and CQRS in NestJS:

  • Commands and events are routed across channels and buses.

  • Consumers process messages reliably in background workers.

  • The architecture supports distributed, event-driven patterns with ease.

This setup makes your system more scalable, maintainable, and resilient—ideal for microservices, modular monoliths, and modern DDD architectures.

Last updated