You will learn to collect rejected messages in RabbitMQ. There can be several reasons to do that, for example, to track down the spammers in an Open CRM system. We will learn this with an example of a Topic Exchange, please check the previous article on Topic Exchange if you are new to RabbitMQ.

What is Alternate Exchange?

  • There will be situations when a message sent to an Exchange can not be forwarded to any of the bound Queues or bound-exchanges. These messages are discarded and get lost in rabbitMQ.
  • An alternate exchange can be defined to collect these discarded messages before they are lost.
  • Any unrouted message will be finally sent to the defined alternative-exchange.
  • Any existing exchange can be set as an alternative exchange, but a Fanout exchange is typically used to do this job as they don’t perform any filtration.

Alternate Exchange recommendations

  • Any of the 4 types of Exchanges (Direct, Fanout, Topic, and Headers) can be assigned as an alternate exchange for another exchange of any type.
  • It is recommended to use a Fanout Exchange as an alternate exchange as it forwards the message unconditionally.
  • Let’s say you want to set an alternate exchange for the exchange ex-abc. You just need to set the arguments alternate-exchange = "alt-xyz" for ex-abc. So the alt-xyz becomes an alternate queue for ex-abc.

Message routing in an Alternate Exchange

  • A producer sends a message with a routing key RK to a Topic Exchange (alt.topic.exchange). It can be any type of Exchange in a real project scenario.
  • The Fanout Exchange (alt.fanout.exchange) is set as the alternative exchange for the topic exchange (alt.topic.exchange).
  • If the message routing key RK matches with any of the routing patterns using which the Queues are bound to the Topic Exchange, then the message is routed to one or more matching Queues.
  • If the message routing key does not match with any pattern, it will not be routed to any Queues. So, whenever a message can not be routed to any of its bound Queues, it is rejected.
  • The rejected messages are by default deleted by RabbitMQ. But in our case, we have the alternate exchange which will receive this rejected message and forward it to the altQueue.
  • Finally, the subscribers linked the respective queues receive the message and process it.

Implementation of an alternate exchange

We will now implement an example to demonstrate the working of the exchange as an alternate exchange. The code here is similar to the previous article, so let’s do it step by step.

step-1
Create an instance of the Connection class as a singleton object. It is safe to share the same single instance of the Connection object and that is the recommended approach.

package com.amqp.exchanges.all;

import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class ConnectionManager {

  private static Connection connection = null;

  /**
   * Create RabbitMQ Connection
   *
   * @return Connection
   */
  public static Connection getConnection() {
    if (connection == null) {
      try {
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connection = connectionFactory.newConnection("amqp://guest:guest@localhost:5672/");
      } catch (IOException | TimeoutException e) {
        e.printStackTrace();
      }
    }
    return connection;
  }
}

Step-2
Declare the two exchange, Fanout Exchange with the name alt.fanout.exchange and  a topic exchange with the name alt.topic.exchange. We will make use of the Channel.exchangeDeclare method as shown below. Create a class as AlternateExchange and add the below code as a method in it. As you can see, the property alternate-exchange" = "alt.fanout.exchange sets the alternate exchange.

  public static void declareExchange() throws IOException, TimeoutException {
    Channel channel = ConnectionManager.getConnection().createChannel();
    //Declare the fanout exchange
    channel.exchangeDeclare("alt.fanout.exchange", BuiltinExchangeType.FANOUT, true);

    //Declare the topic exchange and set the alternate-exchange = "alt.fanout.exchange"
    Map<String, Object> arguments = new HashMap<>();
    arguments.put("alternate-exchange", "alt.fanout.exchange");
    channel.exchangeDeclare("alt.topic.exchange", BuiltinExchangeType.TOPIC, true, false, arguments);
    channel.close();
  }

Step-3
Declare the Queues used in this example. We will create 3 Queues (HealthQ, SportsQ, EducationQ) for the Topic exchange and 1 Queue for the Fanout exchange. These Queues will be linked later to their respective Exchanges.

  public static void declareQueues() throws IOException, TimeoutException {
    //Create a channel - do not share the Channel instance
    Channel channel = ConnectionManager.getConnection().createChannel();

    //Create the Queues for "alt.topic.exchange".
    channel.queueDeclare("HealthQ", true, false, false, null);
    channel.queueDeclare("SportsQ", true, false, false, null);
    channel.queueDeclare("EducationQ", true, false, false, null);

    //Create the Queues for "alt.fanout.exchange"
    channel.queueDeclare("altQueue", true, false, false, null);

    //Close the channel
    channel.close();
  }

Step-4
Now link the Queues to respective Exchanges. We will use the declareBindings method. Look at the routing patterns Topic-Queues use.

  public static void declareBindings() throws IOException, TimeoutException {
    Channel channel = ConnectionManager.getConnection().createChannel();
    //Create bindings for alt.topic.exchange - (queue, exchange, routingKey)
    channel.queueBind("HealthQ", "alt.topic.exchange", "health.*");
    channel.queueBind("SportsQ", "alt.topic.exchange", "#.sports.*");
    channel.queueBind("EducationQ", "alt.topic.exchange", "#.education");

    //Binding for fanoutExchange - alt.fanout.exchange and altQueue
    channel.queueBind("altQueue", "alt.fanout.exchange", "");

    channel.close();
  }

Step-5
Assign subscribers to these Queues to demo the working of alternate exchange.

  public static void subscribeMessage() throws IOException, TimeoutException {
    Channel channel = ConnectionManager.getConnection().createChannel();

    DeliverCallback deliverCallback = (consumerTag, message) -> {
      System.out.println("\n" + consumerTag);
      System.out.println(new String(message.getBody()));
    };

    CancelCallback cancelCallback = consumerTag -> {
      System.out.println(consumerTag);
    };

    channel.basicConsume("HealthQ", true, deliverCallback, cancelCallback);
    channel.basicConsume("SportsQ", true, deliverCallback, cancelCallback);
    channel.basicConsume("EducationQ", true, deliverCallback, cancelCallback);

    //Consume the rejected message
    channel.basicConsume("altQueue", true, (consumerTag, message) -> {
      System.out.println("\n" + consumerTag);
      System.out.println("rejected message: " + new String(message.getBody()));
    }, consumerTag -> {
    });
  }

Step-6
Now, let’s publish two messages to the topic exchange. As you can guess, the message with the routing key education will reach EducationQ and the second message does not reach any Queue. Hence it will be collected by the alternate-exchange alt.fanout.exchange.

  public static void publishMessage() throws IOException, TimeoutException {
    Channel channel = ConnectionManager.getConnection().createChannel();
    String message = "Learn something new everyday";
    channel.basicPublish("alt.topic.exchange", "education", null, message.getBytes());

    //No Queue for this routingKey.
    message = "Stay fit in Mind and Body";
    channel.basicPublish("alt.topic.exchange", "education.health", null, message.getBytes());

    channel.close();
  }

Step-7
It is time to put all together inside a public static void main to test its working.

  public static void main(String[] args) throws IOException, TimeoutException {
    AlternateExchange.declareQueues();
    AlternateExchange.declareExchange();
    AlternateExchange.declareBindings();

    //Threads created to publish-subscribe asynchronously
    Thread subscribe = new Thread() {
      @Override
      public void run() {
        try {
          AlternateExchange.subscribeMessage();
        } catch (IOException | TimeoutException e) {
          e.printStackTrace();
        }
      }
    };

    Thread publish = new Thread() {
      @Override
      public void run() {
        try {
          AlternateExchange.publishMessage();
        } catch (IOException | TimeoutException e) {
          e.printStackTrace();
        }
      }
    };

    subscribe.start();
    publish.start();
  }
}

Output:

amq.ctag-Ksj0kubI-yc5F_de2_89cA
Learn something new everyday

amq.ctag-YvcEMaqEvoTpVEuFlTE1Xw
rejected message: Stay fit in Mind and Body

Conclusion:

You have seen the working of an Alternate exchange. Let me know if you have any feedback in the comments below.