This article focuses on the Topic Exchange type (amq.topic) in RabbitMQ. You will learn to bind a Queue with a Topic Exchange using a routing key pattern. You have already learned about Exchanges and Queues in the Elements of RabbitMQ. The diagram here explains a producer sends a message to my-topic-exchange , there are 3 Queues bound to this exchange with 3 routing key patterns (health.*, #.sports.* and #.education). As you have noticed, in a topic exchange the Queues are linked using the routing key patterns instead of a simple routing key. Before understanding the message in Topic Exchange, let us understand the routing patterns.

Routing patterns in Topic Exchange

  • A routing key in Topic Exchange must consist of Zero or more words delimited by dots e.g. health.education.
  • A routing key in topic exchange is often called as a routing pattern.
  • A routing pattern is like a regular expression with only *, . and # allowed.
  • The symbol star (*) means exactly one word allowed.
  • Similarly, the symbol hash (#) means zero or more number of words allowed.
  • The symbol dot (.) means – word delimiter. Multiple key terms are separated by the dot delimiter.
  • If a routing pattern is health.*, it means any message sent with the routing key health as the first word will reach the queue. For example, health.education will reach this Queue, but sports.health will not work.

The flow of a message in Topic Exchange

  • A message Queue binds to an Exchange with a routing key pattern (P).
  • A publisher sends a message with a routing key (K) to the Topic Exchange.
  • The message is passed to the Queue if P matches with K. The routing key matching is decided as discussed below.
  • The consumer subscribing the Queue receives the message.

Routing pattern matching examples

  • Routing Key Pattern – It is the routing key pattern that binds a specific Queue with an Exchange.
  • Valid Routing Key – The message with this key reaches the linked Queue.
  • Invalid Routing Key – The message with this key does not reach the Queue.
Routing Key PatternValid Routing KeyInvalid Routing Key
health.*health as the first word followed by one word.health.education,
health.sports,
health.anything
health, health.education.anything, health.education.sports
#.sports.* – Zero or more words, then sports, after that exactly one word.sports.education,
sports.sports.sports,
sports.sports
sports,
education.sports,
anything.sports.anything.xyz
#.education – Zero or more words followed by the word education.health.education,
education.education,
education
education.health,
anything.education.anything

Implementation of Topic Exchange

Step-1
Create an instance of the Connection class as a singleton object. It is safe to share the same single instance of the Connection object and that is the recommended approach.

Java
package com.amqp.exchanges.all;

import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class ConnectionManager {
  private static Connection connection = null;

  public static Connection getConnection() {
    if (connection == null) {
      try {
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connection = connectionFactory.newConnection();
      } catch (IOException | TimeoutException e) {
        e.printStackTrace();
      }
    }
    return connection;
  }
}

Step-2
Create/declare a Topic Exchange with the name my-topic-exchange. We will make use of the Channel.exchangeDeclare method as shown below. Create a class as TopicExchange and add the below code as a method in it.

Java
  public static void declareExchange() throws IOException, TimeoutException {
    Channel channel = ConnectionManager.getConnection().createChannel();

    //Create Topic Exchange
    channel.exchangeDeclare("my-topic-exchange", BuiltinExchangeType.TOPIC, true);
    channel.close();
  }

Step-3
We will now declare the three Queues (HealthQ, SportsQ, EducationQ) which will later be linked to our topic exchange.

Java
  public static void declareQueues() throws IOException, TimeoutException {
    //Create a channel - do not share the Channel instance
    Channel channel = ConnectionManager.getConnection().createChannel();

    //Create the Queues
    channel.queueDeclare("HealthQ", true, false, false, null);
    channel.queueDeclare("SportsQ", true, false, false, null);
    channel.queueDeclare("EducationQ", true, false, false, null);
    channel.close();
  }

Steps-4
Now, let us bind the Queues with routing key patterns. The HealthQ with health.*, SportsQ with #.sports.* and EducationQ with #.education.

Java
  public static void declareBindings() throws IOException, TimeoutException {
    Channel channel = ConnectionManager.getConnection().createChannel();

    //Create bindings - (queue, exchange, routingKey) - routingKey != null
    channel.queueBind("HealthQ", "my-topic-exchange", "health.*");
    channel.queueBind("SportsQ", "my-topic-exchange", "#.sports.*");
    channel.queueBind("EducationQ", "my-topic-exchange", "#.education");
    channel.close();
  }

Step-5
Next, we will assign the Consumers to listen to these 3 Queues. As you can see there are 2 callbacks, the first one is DeliverCallback – with the parameters (consumerTag, message) and the second one is CancelCallback with the parameter – consumerTag. I have also added message.getEnvelope() to give you an idea to obtain the exchange name and routing key etc from the received message.

Java
public static void subscribeMessage() throws IOException, TimeoutException {
    Channel channel = ConnectionManager.getConnection().createChannel();

    channel.basicConsume("HealthQ", true, ((consumerTag, message) -> {
      System.out.println("\n\n=========== Health Queue ==========");
      System.out.println(consumerTag);
      System.out.println("HealthQ: " + new String(message.getBody()));
      System.out.println(message.getEnvelope());
    }), consumerTag -> {
      System.out.println(consumerTag);
    });

    channel.basicConsume("SportsQ", true, ((consumerTag, message) -> {
      System.out.println("\n\n ============ Sports Queue ==========");
      System.out.println(consumerTag);
      System.out.println("SportsQ: " + new String(message.getBody()));
      System.out.println(message.getEnvelope());
    }), consumerTag -> {
      System.out.println(consumerTag);
    });

    channel.basicConsume("EducationQ", true, ((consumerTag, message) -> {
      System.out.println("\n\n ============ Education Queue ==========");
      System.out.println(consumerTag);
      System.out.println("EducationQ: " + new String(message.getBody()));
      System.out.println(message.getEnvelope());
    }), consumerTag -> {
      System.out.println(consumerTag);
    });
  }

Step-6
Publish the messages with different routing keys. We will compare the routing keys with the output.

Java
  public static void publishMessage() throws IOException, TimeoutException {
    Channel channel = ConnectionManager.getConnection().createChannel();

    String message = "Drink a lot of Water and stay Healthy!";
    channel.basicPublish("my-topic-exchange", "health.education", null, message.getBytes());

    message = "Learn something new everyday";
    channel.basicPublish("my-topic-exchange", "education", null, message.getBytes());

    message = "Stay fit in Mind and Body";
    channel.basicPublish("my-topic-exchange", "education.health", null, message.getBytes());
    channel.close();
  }

Step-7
Let’s put all together and invoke each of these methods inside a public static void main(String[] args) of the TopicExchange class. I will use two separate threads to asynchronously run the publish and subscribe methods.

Java
  public static void main(String[] args) throws IOException, TimeoutException {
    TopicExchange.declareExchange();
    TopicExchange.declareQueues();
    TopicExchange.declareBindings();

    Thread subscribe = new Thread(() -> {
      try {
        TopicExchange.subscribeMessage();
      } catch (IOException | TimeoutException e) {
        e.printStackTrace();
      }
    });

    Thread publish = new Thread(() -> {
      try {
        TopicExchange.publishMessage();
      } catch (IOException | TimeoutException e) {
        e.printStackTrace();
      }
    });
    subscribe.start();
    publish.start();
  }

Output:

=========== Health Queue ==========
amq.ctag-rIH5mStgCYa38YXceb1NiQ
HealthQ: Drink a lot of Water and stay Healthy!
Envelope(deliveryTag=1, redeliver=false, exchange=my-topic-exchange, routingKey=health.education)


 ============ Education Queue ==========
amq.ctag-D6Bu38bHVGbqpsfA16WM4Q
EducationQ: Drink a lot of Water and stay Healthy!
Envelope(deliveryTag=2, redeliver=false, exchange=my-topic-exchange, routingKey=health.education)


 ============ Education Queue ==========
amq.ctag-D6Bu38bHVGbqpsfA16WM4Q
EducationQ: Learn something new everyday
Envelope(deliveryTag=3, redeliver=false, exchange=my-topic-exchange, routingKey=education)

As you can see in the output, the message with the routing key health.education reaches HealthQ and EducationQ. The message with the routing key education reaches EducationQ. But the message with the routing key education.health does not match with any routing patterns, hence it does not reach any queue.

Conclusion:

I hope to have given you a practical guide on the Topic Exchange and its implementation in RabbitMQ. You need to understand its working and its application in your projects, specifically the way routing key matching works. I would love to hear your feedback in the comments below if you have any.


By |Last Updated: May 22nd, 2024|Categories: RabbitMQ|

Table of Contents