You will learn to collect rejected messages in RabbitMQ. There can be several reasons to do that, for example, to track down the spammers in an Open CRM system. We will learn this with an example of a Topic Exchange, please check the previous article on Topic Exchange if you are new to RabbitMQ.
What is Alternate Exchange?
- There will be situations when a message sent to an Exchange can not be forwarded to any of the bound Queues or bound-exchanges. These messages are discarded and get lost in rabbitMQ.
- An alternate exchange can be defined to collect these discarded messages before they are lost.
- Any unrouted message will be finally sent to the defined alternative-exchange.
- Any existing exchange can be set as an alternative exchange, but a Fanout exchange is typically used to do this job as they don’t perform any filtration.
Alternate Exchange recommendations
- Any of the 4 types of Exchanges (Direct, Fanout, Topic, and Headers) can be assigned as an alternate exchange for another exchange of any type.
- It is recommended to use a Fanout Exchange as an alternate exchange as it forwards the message unconditionally.
- Let’s say you want to set an alternate exchange for the exchange
ex-abc
. You just need to set the argumentsalternate-exchange
="alt-xyz"
forex-abc
. So thealt-xyz
becomes an alternate queue forex-abc
.
Message routing in an Alternate Exchange
- A producer sends a message with a routing key
RK
to a Topic Exchange (alt.topic.exchange
). It can be any type of Exchange in a real project scenario. - The Fanout Exchange (
alt.fanout.exchange
) is set as the alternative exchange for the topic exchange (alt.topic.exchange). - If the message routing key
RK
matches with any of the routing patterns using which the Queues are bound to the Topic Exchange, then the message is routed to one or more matching Queues. - If the message routing key does not match with any pattern, it will not be routed to any Queues. So, whenever a message can not be routed to any of its bound Queues, it is rejected.
- The rejected messages are by default deleted by RabbitMQ. But in our case, we have the alternate exchange which will receive this rejected message and forward it to the
altQueue
. - Finally, the subscribers linked the respective queues receive the message and process it.
Implementation of an alternate exchange
We will now implement an example to demonstrate the working of the exchange as an alternate exchange. The code here is similar to the previous article, so let’s do it step by step.
step-1
Create an instance of the Connection
class as a singleton object. It is safe to share the same single instance of the Connection object and that is the recommended approach.
package com.amqp.exchanges.all;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class ConnectionManager {
private static Connection connection = null;
/**
* Create RabbitMQ Connection
*
* @return Connection
*/
public static Connection getConnection() {
if (connection == null) {
try {
ConnectionFactory connectionFactory = new ConnectionFactory();
connection = connectionFactory.newConnection("amqp://guest:guest@localhost:5672/");
} catch (IOException | TimeoutException e) {
e.printStackTrace();
}
}
return connection;
}
}
Step-2
Declare the two exchange, Fanout Exchange with the name alt.fanout.exchange
and a topic exchange with the name alt.topic.exchange
. We will make use of the Channel.exchangeDeclare method as shown below. Create a class as AlternateExchange
and add the below code as a method in it. As you can see, the property alternate-exchange" = "alt.fanout.exchange
sets the alternate exchange.
public static void declareExchange() throws IOException, TimeoutException {
Channel channel = ConnectionManager.getConnection().createChannel();
//Declare the fanout exchange
channel.exchangeDeclare("alt.fanout.exchange", BuiltinExchangeType.FANOUT, true);
//Declare the topic exchange and set the alternate-exchange = "alt.fanout.exchange"
Map<String, Object> arguments = new HashMap<>();
arguments.put("alternate-exchange", "alt.fanout.exchange");
channel.exchangeDeclare("alt.topic.exchange", BuiltinExchangeType.TOPIC, true, false, arguments);
channel.close();
}
Step-3
Declare the Queues used in this example. We will create 3 Queues (HealthQ, SportsQ, EducationQ) for the Topic exchange and 1 Queue for the Fanout exchange. These Queues will be linked later to their respective Exchanges.
public static void declareQueues() throws IOException, TimeoutException {
//Create a channel - do not share the Channel instance
Channel channel = ConnectionManager.getConnection().createChannel();
//Create the Queues for "alt.topic.exchange".
channel.queueDeclare("HealthQ", true, false, false, null);
channel.queueDeclare("SportsQ", true, false, false, null);
channel.queueDeclare("EducationQ", true, false, false, null);
//Create the Queues for "alt.fanout.exchange"
channel.queueDeclare("altQueue", true, false, false, null);
//Close the channel
channel.close();
}
Step-4
Now link the Queues to respective Exchanges. We will use the declareBindings
method. Look at the routing patterns Topic-Queues use.
public static void declareBindings() throws IOException, TimeoutException {
Channel channel = ConnectionManager.getConnection().createChannel();
//Create bindings for alt.topic.exchange - (queue, exchange, routingKey)
channel.queueBind("HealthQ", "alt.topic.exchange", "health.*");
channel.queueBind("SportsQ", "alt.topic.exchange", "#.sports.*");
channel.queueBind("EducationQ", "alt.topic.exchange", "#.education");
//Binding for fanoutExchange - alt.fanout.exchange and altQueue
channel.queueBind("altQueue", "alt.fanout.exchange", "");
channel.close();
}
Step-5
Assign subscribers to these Queues to demo the working of alternate exchange.
public static void subscribeMessage() throws IOException, TimeoutException {
Channel channel = ConnectionManager.getConnection().createChannel();
DeliverCallback deliverCallback = (consumerTag, message) -> {
System.out.println("\n" + consumerTag);
System.out.println(new String(message.getBody()));
};
CancelCallback cancelCallback = consumerTag -> {
System.out.println(consumerTag);
};
channel.basicConsume("HealthQ", true, deliverCallback, cancelCallback);
channel.basicConsume("SportsQ", true, deliverCallback, cancelCallback);
channel.basicConsume("EducationQ", true, deliverCallback, cancelCallback);
//Consume the rejected message
channel.basicConsume("altQueue", true, (consumerTag, message) -> {
System.out.println("\n" + consumerTag);
System.out.println("rejected message: " + new String(message.getBody()));
}, consumerTag -> {
});
}
Step-6
Now, let’s publish two messages to the topic exchange. As you can guess, the message with the routing key education
will reach EducationQ
and the second message does not reach any Queue. Hence it will be collected by the alternate-exchange alt.fanout.exchange
.
public static void publishMessage() throws IOException, TimeoutException {
Channel channel = ConnectionManager.getConnection().createChannel();
String message = "Learn something new everyday";
channel.basicPublish("alt.topic.exchange", "education", null, message.getBytes());
//No Queue for this routingKey.
message = "Stay fit in Mind and Body";
channel.basicPublish("alt.topic.exchange", "education.health", null, message.getBytes());
channel.close();
}
Step-7
It is time to put all together inside a public static void main
to test its working.
public static void main(String[] args) throws IOException, TimeoutException {
AlternateExchange.declareQueues();
AlternateExchange.declareExchange();
AlternateExchange.declareBindings();
//Threads created to publish-subscribe asynchronously
Thread subscribe = new Thread() {
@Override
public void run() {
try {
AlternateExchange.subscribeMessage();
} catch (IOException | TimeoutException e) {
e.printStackTrace();
}
}
};
Thread publish = new Thread() {
@Override
public void run() {
try {
AlternateExchange.publishMessage();
} catch (IOException | TimeoutException e) {
e.printStackTrace();
}
}
};
subscribe.start();
publish.start();
}
}
Output:
amq.ctag-Ksj0kubI-yc5F_de2_89cA Learn something new everyday amq.ctag-YvcEMaqEvoTpVEuFlTE1Xw rejected message: Stay fit in Mind and Body
Conclusion:
You have seen the working of an Alternate exchange. Let me know if you have any feedback in the comments below.