Nestjstools Messaging Docs
  • Introduction
    • What is this library?
    • Supported message brokers
  • Getting Started
    • Installation
    • Initialize Module
    • Message Handler
    • Disaptch a message
  • Components
    • Message Handlers
    • Normalizers
    • Exception Listeners
    • Middlewares
    • Message Bus
    • Channel
  • Broker integration
    • RabbitMQ
    • Redis
    • Google PubSub
    • Amazon SQS
    • Nats
  • Best practice
    • CQRS based on RabbitMQ
    • Create wrapper class for Message Bus
    • Consumer as the background process
Powered by GitBook
On this page
  • RabbitMQ Channel Integration
  • πŸ“¦ Installation
  • 🧩 Basic Configuration Example
  • πŸ›  Exchange Types
  • πŸ” Cross-Language Messaging
  • πŸͺ¦ Dead Letter Queue (DLQ) – How It Works
  • πŸ”§ Configuration Table: AmqpChannelConfig
  • βœ‰οΈ Custom Routing with AmqpMessageOptions
  • Mapping Messages in RabbitMQ Channels
Export as PDF
  1. Broker integration

RabbitMQ

RabbitMQ Channel Integration

The @nestjstools/messaging-rabbitmq-extension provides seamless integration with RabbitMQ for asynchronous and synchronous message processing in NestJS applications.

πŸ“¦ Installation

Install both core messaging and the RabbitMQ extension:

npm install @nestjstools/messaging @nestjstools/messaging-rabbitmq-extension
# or
yarn add @nestjstools/messaging @nestjstools/messaging-rabbitmq-extension

🧩 Basic Configuration Example

import { MessagingModule } from '@nestjstools/messaging';
import {
  AmqpChannelConfig,
  InMemoryChannelConfig,
  ExchangeType,
} from '@nestjstools/messaging/channels';
import { MessagingRabbitmqExtensionModule } from '@nestjstools/messaging-rabbitmq-extension';
import { SendMessageHandler } from './handlers/send-message.handler';

@Module({
  imports: [
    MessagingRabbitmqExtensionModule ,
    MessagingModule.forRoot({
      messageHandlers: [SendMessageHandler],
      buses: [
        { name: 'message.bus', channels: ['my-channel'] },
        { name: 'command-bus', channels: ['amqp-command'] },
        { name: 'event-bus', channels: ['amqp-event'] },
      ],
      channels: [
        new InMemoryChannelConfig({ name: 'my-channel' }),
        new AmqpChannelConfig({
          name: 'amqp-command',
          connectionUri: 'amqp://guest:guest@localhost:5672/',
          exchangeName: 'my_app_command.exchange',
          exchangeType: ExchangeType.TOPIC,
          queue: 'my_app.command',
          bindingKeys: ['my_app.command.#'],
          autoCreate: true,
        }),
        new AmqpChannelConfig({
          name: 'amqp-event',
          connectionUri: 'amqp://guest:guest@localhost:5672/',
          exchangeName: 'my_app_event.exchange',
          exchangeType: ExchangeType.TOPIC,
          queue: 'my_app.event',
          bindingKeys: ['my_app_event.#'],
          autoCreate: true,
          avoidErrorsForNotExistedHandlers: true,
        }),
      ],
      debug: true,
    }),
  ],
})
export class AppModule {}

πŸ›  Exchange Types

Exchange Type
Description

TOPIC

Route messages using wildcard-based routing keys (my_app.command.#).

DIRECT

Exact routing match. You must define matching bindingKeys.

FANOUT

Broadcasts messages to all queues bound to the exchange, regardless of routing key.


πŸ” Cross-Language Messaging

You can publish messages from other services (non-NestJS apps) by following these rules:

  1. Send a Message to the appropriate queue.

  2. Set Header: messaging-routing-key should match the handler:

@MessageHandler('my_app_command.create_user')

πŸͺ¦ Dead Letter Queue (DLQ) – How It Works

When deadLetterQueueFeature: true is enabled on an AmqpChannelConfig, the system automatically handles failed messages by routing them to a dedicated "dead letter" queue instead of discarding them or causing application crashes.

Behavior:

  1. Message Handling Fails If a message handler throws an unhandled exception, the message is not acknowledged (nack) and is redirected to the DLQ.

  2. DLQ Naming Convention The DLQ is created automatically and typically named by appending dead_letter_queueto the original queue name. Example: If your queue is my_app.command, the dead letter queue will be my_app.command.dead_letter_queue.

  3. Message Retention Failed messages remain in the DLQ until manually processed, examined, or retried.

  4. Retry Strategy You can manually re-publish messages from the DLQ back to the original exchange with the same routing key (or via tooling or scripts) when you're ready to retry.


πŸ” Example Use Case:

new AmqpChannelConfig({
  name: 'amqp-command',
  connectionUri: 'amqp://guest:guest@localhost:5672/',
  exchangeName: 'my_app_command.exchange',
  exchangeType: ExchangeType.TOPIC,
  queue: 'my_app.command',
  bindingKeys: ['my_app.command.#'],
  autoCreate: true,
  deadLetterQueueFeature: true, // βœ… Enable DLQ
});

πŸ”§ Configuration Table: AmqpChannelConfig

Property
Description
Default

name

Name of the channel (e.g., 'amqp-command').

(required)

connectionUri

RabbitMQ connection URI (e.g., 'amqp://guest:guest@localhost:5672/').

(required)

exchangeName

Exchange name in RabbitMQ.

(required)

bindingKeys

Routing keys for queue bindings (e.g., ['my_app.command.#']).

[]

exchangeType

Type of RabbitMQ exchange (TOPIC, FANOUT, DIRECT).

(required)

queue

Name of the queue to consume from.

(required)

autoCreate

Automatically create exchanges, queues, and bindings if missing.

true

enableConsumer

Enable message consumption from this channel.

true

enableWorker

Enables the internal worker that processes messages. If false, messages are ignored.

true

avoidErrorsForNotExistedHandlers

Skip errors when no handler exists for a routed message. Useful for optional event handlers.

false

middlewares

Middleware pipeline for pre-processing messages.

[]

normalizer

Attach a normalizer for custom serialization (e.g., Protobuf, Base64).

undefined

deadLetterQueueFeature

Enables capturing failed messages into a DLQ.

false


βœ‰οΈ Custom Routing with AmqpMessageOptions

You can customize routing at dispatch time:

this.messageBus.dispatch(
  new RoutingMessage(
    new SendMessage('Hello Rabbit!'),
    'app.command.execute',
    new AmqpMessageOptions('exchange_name', 'rabbitmq_routing_key_to_queue')
  ),
);

Mapping Messages in RabbitMQ Channels

RabbitMQ uses different exchange types to route messages based on routing keys and bindings. Here’s how message routing works for each exchange type in the context of messaging channels:

Topic Exchange

Topic exchanges route messages based on pattern-matching in the routing key.

  • Use wildcards like # and * in your binding keys for flexible routing.

  • Example: If you bind your queue with my_app.command.#, messages with routing keys such as my_app.command.user.create or my_app.command.system.shutdown will be routed to that queue.

  • βœ… This is ideal for structured, hierarchical routing across many message types.

Direct Exchange

Direct exchanges use exact matching between the routing key and the binding key.

  • Ensure that your queue has binding keys explicitly defined.

  • If no binding key is provided, RabbitMQ defaults to the routing key specified in the message handler.

  • Use this when you need precise, one-to-one message routing.

Fanout Exchange

Fanout exchanges broadcast messages to all queues bound to the exchange, ignoring routing keys entirely.

  • Every bound queue receives the message.

  • Best used for scenarios like logging, notifications, or pub-sub events where all consumers should receive the message.

PreviousBroker integrationNextRedis

Last updated 3 days ago