This article focuses on the Direct Exchange type (amq.direct) in RabbitMQ. You will learn to bind a Queue with an Exchange using a routing key. You have already learned about Exchanges and Queues in the Elements of RabbitMQ. The diagram here explains a producer sends a message to my-direct-exchange , there are 3 Queues bound to this exchange with 3 routing keys. The MobileQ linked with personalDevice routing key, ACQ with homeAppliance and also LightQ with homeAppliance to the exchange.

The flow of a message in Direct Exchange

  • A Producer sends a message to an Exchange.
  • A Queue binds to an Exchange using a routing key.
  • Usually, there is more than one Queue bind to an Exchange using the same/different routing keys.
  • The message sent to the Exchange contains a routing key. Based on the routing key the message is forwarded to one or more Queues.
  • Consumers subscribe to the Queue receives the message and processes it.

Implementation of Direct Exchange

Step-1
Create an instance of the Connection class as a singleton object. It is safe to share the same single instance of the Connection object and that is the recommended approach.

package com.amqp.exchanges.all;

import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class ConnectionManager {

  private static Connection connection = null;

  public static Connection getConnection() {
    if (connection == null) {
      try {
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connection = connectionFactory.newConnection();
      } catch (IOException | TimeoutException e) {
        e.printStackTrace();
      }
    }
    return connection;
  }
}

Step-2
Create/declare a direct Exchange with the name my-direct-exchange. We will make use of the Channel.exchangeDeclare method as shown below. Create a class as DirectExchange and add the below code as a method in it.

  public static void declareExchange() throws IOException, TimeoutException {
    Channel channel = ConnectionManager.getConnection().createChannel();
    //Declare my-direct-exchange DIRECT exchange
    channel.exchangeDeclare("my-direct-exchange", BuiltinExchangeType.DIRECT, true);
    channel.close();
  }

Step-3
Declare the Queues which will be linked to this exchange. We will create 3 queues as MobileQ, ACQ and LightQ.

  public static void declareQueues() throws IOException, TimeoutException {
    //Create a channel - do no't share the Channel instance
    Channel channel = ConnectionManager.getConnection().createChannel();

    //queueDeclare  - (queueName, durable, exclusive, autoDelete, arguments)
    channel.queueDeclare("MobileQ", true, false, false, null);
    channel.queueDeclare("ACQ", true, false, false, null);
    channel.queueDeclare("LightQ", true, false, false, null);

    channel.close();
  }

Step-4
Till now we have declared the Direct Exchange and Queues. It is time to set the bindings between them using respective routing keys. Routing keys are shown in the above diagram i.e. MobileQ with personalDevice, ACQ with homeAppliance and LightQ also with homeAppliance as the routing key.

  public static void declareBindings() throws IOException, TimeoutException {
    Channel channel = ConnectionManager.getConnection().createChannel();
    //Create bindings - (queue, exchange, routingKey)
    channel.queueBind("MobileQ", "my-direct-exchange", "personalDevice");
    channel.queueBind("ACQ", "my-direct-exchange", "homeAppliance");
    channel.queueBind("LightQ", "my-direct-exchange", "homeAppliance");
    channel.close();
  }

Step-5
Now you need the consumers listening to each of the Queues to see what messages they receive. As you can see there are 2 callbacks, the first one is DeliverCallback – with the parameters (consumerTag, message) and the second one is CancelCallback with the parameter – consumerTag.

public static void subscribeMessage() throws IOException {
    Channel channel = ConnectionManager.getConnection().createChannel();
    channel.basicConsume("LightQ", true, ((consumerTag, message) -> {
      System.out.println(consumerTag);
      System.out.println("LightQ:" + new String(message.getBody()));
    }), consumerTag -> {
      System.out.println(consumerTag);
    });

    channel.basicConsume("ACQ", true, ((consumerTag, message) -> {
      System.out.println(consumerTag);
      System.out.println("ACQ:" + new String(message.getBody()));
    }), consumerTag -> {
      System.out.println(consumerTag);
    });

    channel.basicConsume("MobileQ", true, ((consumerTag, message) -> {
      System.out.println(consumerTag);
      System.out.println("MobileQ:" + new String(message.getBody()));
    }), consumerTag -> {
      System.out.println(consumerTag);
    });
  }

Step-6
All set, let us publish a message using the channel.basicPublish().

  //Publish the message
  public static void publishMessage() throws IOException, TimeoutException {
    Channel channel = ConnectionManager.getConnection().createChannel();
    String message = "Direct message - Turn on the Home Appliances ";
    channel.basicPublish("my-direct-exchange", "homeAppliance", null, message.getBytes());
    channel.close();
  }

Step-7
Let’s put all together and invoke each of these methods inside a public static void main(String[] args) of the DirectExchange class. I will use two separate threads to asynchronously run the publish and subscribe methods.

  public static void main(String[] args) throws IOException, TimeoutException {
    DirectExchange.declareQueues();
    DirectExchange.declareExchange();
    DirectExchange.declareBindings();

    Thread subscribe = new Thread(){
      @Override
      public void run() {
        try {
          DirectExchange.subscribeMessage();
        } catch (IOException e) {
          e.printStackTrace();
        }
      }
    };

    Thread publish = new Thread(){
      @Override
      public void run() {
        try {
          DirectExchange.publishMessage();
        } catch (IOException | TimeoutException e) {
          e.printStackTrace();
        }
      }
    };

    subscribe.start();
    publish.start();
  }

Output:

amq.ctag-pwi5fZTAawuHbx_2FDjj1Q
LightQ:Direct message - Turn on the Home Appliances 
amq.ctag-BdMKFsjxp5aioPU4CR5FIg
ACQ:Direct message - Turn on the Home Appliances 

As you can see in the output, only the LightQ and ACQ receive the message as they are linked to the Exchange with the routing key homeAppliance.

Conclusion:

I hope to have given you a practical guide on Direct Exchange and its implementation. You need to understand its working and its application in your projects, specifically if you need selective routing based on the routing key. I would love to hear your feedback in the comments below if you have any.