This article focuses on the Topic Exchange type (amq.topic) in RabbitMQ. You will learn to bind a Queue with a Topic Exchange using a routing key pattern. You have already learned about Exchanges and Queues in the Elements of RabbitMQ. The diagram here explains a producer sends a message to my-topic-exchange
, there are 3 Queues bound to this exchange with 3 routing key patterns (health.*
, #.sports.*
and #.education
). As you have noticed, in a topic exchange the Queues are linked using the routing key patterns instead of a simple routing key. Before understanding the message in Topic Exchange, let us understand the routing patterns.
Routing patterns in Topic Exchange
- A routing key in Topic Exchange must consist of Zero or more words delimited by dots e.g.
health.education
. - A routing key in topic exchange is often called as a routing pattern.
- A routing pattern is like a regular expression with only
*
,.
and#
allowed. - The symbol star (*) means exactly one word allowed.
- Similarly, the symbol hash (#) means zero or more number of words allowed.
- The symbol dot (.) means – word delimiter. Multiple key terms are separated by the dot delimiter.
- If a routing pattern is
health.*
, it means any message sent with the routing key health as the first word will reach the queue. For example,health.education
will reach this Queue, butsports.health
will not work.
The flow of a message in Topic Exchange
- A message Queue binds to an Exchange with a routing key pattern (P).
- A publisher sends a message with a routing key (K) to the Topic Exchange.
- The message is passed to the Queue if P matches with K. The routing key matching is decided as discussed below.
- The consumer subscribing the Queue receives the message.
Routing pattern matching examples
- Routing Key Pattern – It is the routing key pattern that binds a specific Queue with an Exchange.
- Valid Routing Key – The message with this key reaches the linked Queue.
- Invalid Routing Key – The message with this key does not reach the Queue.
Routing Key Pattern | Valid Routing Key | Invalid Routing Key |
---|---|---|
health.* – health as the first word followed by one word. | health.education, health.sports, health.anything | health, health.education.anything, health.education.sports |
#.sports.* – Zero or more words, then sports , after that exactly one word. | sports.education, sports.sports.sports, sports.sports | sports, education.sports, anything.sports.anything.xyz |
#.education – Zero or more words followed by the word education . | health.education, education.education, education | education.health, anything.education.anything |
Implementation of Topic Exchange
Step-1
Create an instance of the Connection
class as a singleton object. It is safe to share the same single instance of the Connection object and that is the recommended approach.
package com.amqp.exchanges.all;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class ConnectionManager {
private static Connection connection = null;
public static Connection getConnection() {
if (connection == null) {
try {
ConnectionFactory connectionFactory = new ConnectionFactory();
connection = connectionFactory.newConnection();
} catch (IOException | TimeoutException e) {
e.printStackTrace();
}
}
return connection;
}
}
Step-2
Create/declare a Topic Exchange with the name my-topic-exchange
. We will make use of the Channel.exchangeDeclare
method as shown below. Create a class as TopicExchange
and add the below code as a method in it.
public static void declareExchange() throws IOException, TimeoutException {
Channel channel = ConnectionManager.getConnection().createChannel();
//Create Topic Exchange
channel.exchangeDeclare("my-topic-exchange", BuiltinExchangeType.TOPIC, true);
channel.close();
}
Step-3
We will now declare the three Queues (HealthQ
, SportsQ
, EducationQ
) which will later be linked to our topic exchange.
public static void declareQueues() throws IOException, TimeoutException {
//Create a channel - do not share the Channel instance
Channel channel = ConnectionManager.getConnection().createChannel();
//Create the Queues
channel.queueDeclare("HealthQ", true, false, false, null);
channel.queueDeclare("SportsQ", true, false, false, null);
channel.queueDeclare("EducationQ", true, false, false, null);
channel.close();
}
Steps-4
Now, let us bind the Queues with routing key patterns. The HealthQ
with health.*
, SportsQ
with #.sports.*
and EducationQ
with #.education
.
public static void declareBindings() throws IOException, TimeoutException {
Channel channel = ConnectionManager.getConnection().createChannel();
//Create bindings - (queue, exchange, routingKey) - routingKey != null
channel.queueBind("HealthQ", "my-topic-exchange", "health.*");
channel.queueBind("SportsQ", "my-topic-exchange", "#.sports.*");
channel.queueBind("EducationQ", "my-topic-exchange", "#.education");
channel.close();
}
Step-5
Next, we will assign the Consumers to listen to these 3 Queues. As you can see there are 2 callbacks, the first one is DeliverCallback
– with the parameters (consumerTag, message)
and the second one is CancelCallback
with the parameter – consumerTag
. I have also added message.getEnvelope()
to give you an idea to obtain the exchange name and routing key etc from the received message.
public static void subscribeMessage() throws IOException, TimeoutException {
Channel channel = ConnectionManager.getConnection().createChannel();
channel.basicConsume("HealthQ", true, ((consumerTag, message) -> {
System.out.println("\n\n=========== Health Queue ==========");
System.out.println(consumerTag);
System.out.println("HealthQ: " + new String(message.getBody()));
System.out.println(message.getEnvelope());
}), consumerTag -> {
System.out.println(consumerTag);
});
channel.basicConsume("SportsQ", true, ((consumerTag, message) -> {
System.out.println("\n\n ============ Sports Queue ==========");
System.out.println(consumerTag);
System.out.println("SportsQ: " + new String(message.getBody()));
System.out.println(message.getEnvelope());
}), consumerTag -> {
System.out.println(consumerTag);
});
channel.basicConsume("EducationQ", true, ((consumerTag, message) -> {
System.out.println("\n\n ============ Education Queue ==========");
System.out.println(consumerTag);
System.out.println("EducationQ: " + new String(message.getBody()));
System.out.println(message.getEnvelope());
}), consumerTag -> {
System.out.println(consumerTag);
});
}
Step-6
Publish the messages with different routing keys. We will compare the routing keys with the output.
public static void publishMessage() throws IOException, TimeoutException {
Channel channel = ConnectionManager.getConnection().createChannel();
String message = "Drink a lot of Water and stay Healthy!";
channel.basicPublish("my-topic-exchange", "health.education", null, message.getBytes());
message = "Learn something new everyday";
channel.basicPublish("my-topic-exchange", "education", null, message.getBytes());
message = "Stay fit in Mind and Body";
channel.basicPublish("my-topic-exchange", "education.health", null, message.getBytes());
channel.close();
}
Step-7
Let’s put all together and invoke each of these methods inside a public static void main(String[] args)
of the TopicExchange
class. I will use two separate threads to asynchronously run the publish
and subscribe
methods.
public static void main(String[] args) throws IOException, TimeoutException {
TopicExchange.declareExchange();
TopicExchange.declareQueues();
TopicExchange.declareBindings();
Thread subscribe = new Thread(() -> {
try {
TopicExchange.subscribeMessage();
} catch (IOException | TimeoutException e) {
e.printStackTrace();
}
});
Thread publish = new Thread(() -> {
try {
TopicExchange.publishMessage();
} catch (IOException | TimeoutException e) {
e.printStackTrace();
}
});
subscribe.start();
publish.start();
}
Output:
=========== Health Queue ========== amq.ctag-rIH5mStgCYa38YXceb1NiQ HealthQ: Drink a lot of Water and stay Healthy! Envelope(deliveryTag=1, redeliver=false, exchange=my-topic-exchange, routingKey=health.education) ============ Education Queue ========== amq.ctag-D6Bu38bHVGbqpsfA16WM4Q EducationQ: Drink a lot of Water and stay Healthy! Envelope(deliveryTag=2, redeliver=false, exchange=my-topic-exchange, routingKey=health.education) ============ Education Queue ========== amq.ctag-D6Bu38bHVGbqpsfA16WM4Q EducationQ: Learn something new everyday Envelope(deliveryTag=3, redeliver=false, exchange=my-topic-exchange, routingKey=education)
As you can see in the output, the message with the routing key health.education
reaches HealthQ and EducationQ. The message with the routing key education
reaches EducationQ. But the message with the routing key education.health
does not match with any routing patterns, hence it does not reach any queue.
Conclusion:
I hope to have given you a practical guide on the Topic Exchange and its implementation in RabbitMQ. You need to understand its working and its application in your projects, specifically the way routing key matching works. I would love to hear your feedback in the comments below if you have any.