Nestjstools Messaging Docs
  • Introduction
    • What is this library?
    • Supported message brokers
  • Getting Started
    • Installation
    • Initialize Module
    • Message Handler
    • Disaptch a message
  • Components
    • Message Handlers
    • Normalizers
    • Exception Listeners
    • Middlewares
    • Message Bus
    • Channel
  • Broker integration
    • RabbitMQ
    • Redis
    • Google PubSub
    • Amazon SQS
    • Nats
  • Best practice
    • CQRS based on RabbitMQ
    • Create wrapper class for Message Bus
    • Consumer as the background process
Powered by GitBook
On this page
  • CQRS in NestJS Using RabbitMQ
  • 🧱 Why CQRS with Messaging?
  • 🛠 Messaging Setup with RabbitMQ
  • 📦 InternalMessageBus Wrapper
  • 🧩 Dependency Injection
  • ✅ Example Usage
  • 🧪 Final Thoughts
Export as PDF
  1. Best practice

CQRS based on RabbitMQ

CQRS in NestJS Using RabbitMQ

CQRS (Command Query Responsibility Segregation) is a design pattern that decouples the read (query) and write (command) sides of your application. By combining this pattern with RabbitMQ and NestJS’s powerful modular system, we can build a robust and scalable architecture for asynchronous processing and eventual consistency.

This article explores how to implement a CQRS system in NestJS using the @nestjstools/messaging library and a RabbitMQ-backed channel.


🧱 Why CQRS with Messaging?

  • Separation of concerns: Commands (writes) and queries (reads) evolve independently.

  • Scalability: Commands can be processed asynchronously in separate services.

  • Event-driven architecture: Events can propagate across distributed systems.

  • Reliability: RabbitMQ queues offer retry logic, durability, and fault tolerance.


🛠 Messaging Setup with RabbitMQ

We define a MessagingWrapperModule that registers message buses and connects them to appropriate channels:

@Module({})
export class MessagingWrapperModule {
  static forRoot(enableConsumer: boolean): DynamicModule {
    return {
      imports: [
        MessagingRabbitmqExtensionModule,
        MessagingModule.forRoot({
          buses: [
            { name: 'sync-command.bus', channels: ['sync-channel'] },
            { name: 'command.bus', channels: ['amqp-command.channel'] },
            { name: 'event.bus', channels: ['amqp-event.channel'] },
          ],
          channels: [
            new InMemoryChannelConfig({ name: 'sync-channel' }),
            new AmqpChannelConfig({
              name: 'amqp-command.channel',
              connectionUri: 'amqp://guest:guest@localhost:5672/',
              exchangeName: 'book_shop.exchange',
              bindingKeys: ['book_shop.#'],
              exchangeType: ExchangeType.TOPIC,
              queue: 'book_shop.command',
              autoCreate: true,
              enableConsumer,
            }),
            new AmqpChannelConfig({
              name: 'amqp-event.channel',
              connectionUri: 'amqp://guest:guest@localhost:5672/',
              exchangeName: 'book_shop.exchange',
              bindingKeys: ['book_shop.#'],
              exchangeType: ExchangeType.TOPIC,
              queue: 'book_shop.event',
              autoCreate: true,
              enableConsumer,
            }),
          ],
          debug: true,
        }),
      ],
      providers: [
        {
          provide: Service.CommandBus,
          useFactory: (bus: IMessageBus) => new InternalMessageBus(bus),
          inject: ['command.bus'],
        },
        {
          provide: Service.EventBus,
          useFactory: (bus: IMessageBus) => new InternalMessageBus(bus),
          inject: ['event.bus'],
        },
        {
          provide: Service.SyncCommandBus,
          useFactory: (bus: IMessageBus) => new InternalMessageBus(bus),
          inject: ['sync-command.bus'],
        },
      ],
      exports: [
        Service.CommandBus,
        Service.EventBus,
        Service.SyncCommandBus,
      ],
      module: MessagingWrapperModule,
      global: true,
    };
  }
}

📦 InternalMessageBus Wrapper

export class InternalMessageBus {
  constructor(private readonly messageBus: IMessageBus) {}

  async dispatch(message: object, routingKey: string): Promise<void> {
    this.messageBus.dispatch(new RoutingMessage(message, routingKey));
    return Promise.resolve();
  }
}

This class acts as a lightweight abstraction over the raw IMessageBus, giving you a consistent and simplified interface to dispatch messages.


🧩 Dependency Injection

Use the following helpers to inject your buses where needed:

import { Inject } from '@nestjs/common';
import { Service } from '@messaging-wrapper/messaging-wrapper/dependency-injection/service';

export const SyncCommandBus = Inject(Service.SyncCommandBus);
export const CommandBus = Inject(Service.CommandBus);
export const EventBus = Inject(Service.EventBus);

The service identifiers are managed through an enum:

export enum Service {
  SyncCommandBus = 'MCommandBus',
  CommandBus = 'MSyncCommandBus',
  EventBus = 'MSyncEventBus',
}

✅ Example Usage

@Controller('/orders')
export class OrderController {
  constructor(@CommandBus private readonly commandBus: InternalMessageBus) {}

  @Get('/simulate')
  simulate(): string {
    this.commandBus.dispatch(
      new CompleteOrder('order-uuid-123', 'Star Wars: The New Galactic', 2),
      'book_shop.command.complete_order',
    );
    return 'ok';
  }
}

🧪 Final Thoughts

With RabbitMQ and CQRS in NestJS:

  • Commands and events are routed across channels and buses.

  • Consumers process messages reliably in background workers.

  • The architecture supports distributed, event-driven patterns with ease.

This setup makes your system more scalable, maintainable, and resilient—ideal for microservices, modular monoliths, and modern DDD architectures.

PreviousBest practiceNextCreate wrapper class for Message Bus

Last updated 9 days ago