Headers Exchange Type is the most powerful exchange type in AMQP (RabbitMQ). Headers exchanges route a message based on the message headers matching, they ignore routing keys.

The diagram here represents 3 Queues (HealthQ, SportsQ, and EducationQ) are bound to the Headers Exchange (my-headers-exchange) with JSON like headers. We will discuss each of these headers below in detail.

The flow of a message in Headers Exchange

  • One or more Queues bind (linked) to a Headers Exchange using header properties( H ).
  • A Producer sends a message to this Exchange with a Header property (MH).
  • If MH matches with H, the message is forwarded to the Queue. The Headers matching algorithm is discussed next.
  • The consumers listening to the Queue receives the message and processes it.

The Headers matching algorithm

  • There are 2 types of headers matching allowed which are any (similar to logical OR) or all (similar to logical AND).
  • They are represented in the bindings as { "x-match", "any" ..} or { “x-match”, “all” ..}.
  • The x-match = any means, a message sent to the Exchange should contain at least one of the headers that Queue is linked with, then the message will be routed to the Queue.
  • On the other hand, if a queue is bound with headers has x-match = all, messages that have all of its listed headers will be forwarded to the Queue.
  • You can see in the above diagram if a message with header {"h1": "Header1"} is sent to my-headers-exchange, it will be routed to both HealthQ and EducationQ.
  • Only when the message has both the h1 and h2 headers with correct values, it will be forwarded to SportsQ, HealthQ and EducationQ as well.

Implementation of Headers Exchange

We will follow the same steps we did for other exchanges implementation.

Step-1
Create an instance of the Connection class as a singleton object if you have not done it yet. It is safe to share the same single instance of the Connection object and that is the recommended approach.

package com.amqp.exchanges.all;

import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class ConnectionManager {

  private static Connection connection = null;

  public static Connection getConnection() {
    if (connection == null) {
      try {
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connection = connectionFactory.newConnection("amqp://guest:guest@localhost:5672/");
      } catch (IOException | TimeoutException e) {
        e.printStackTrace();
      }
    }
    return connection;
  }
}

Step-2
Create/declare a Headers Exchange with the name my-headers-exchange. We will make use of the Channel.exchangeDeclare method as shown below. Create a class as HeadersExchange and add the below code as a method in it.

  public static void declareExchange() throws IOException, TimeoutException {
    Channel channel = ConnectionManager.getConnection().createChannel();
    //Declare my-header-exchange
    channel.exchangeDeclare("my-headers-exchange", BuiltinExchangeType.HEADERS, true);
    channel.close();
  }

Step-3
Declare the Queues which will be linked to this exchange. We will create 3 queues as HealthQSportsQ and EducationQ. Will set the durable=ture, autoDelete=false, exclusive=false and arguments=null as shown below.

  public static void declareQueues() throws IOException, TimeoutException {
    //Create a channel - do not share the Channel instance
    Channel channel = ConnectionManager.getConnection().createChannel();

    //Create the Queues
    channel.queueDeclare("HealthQ", true, false, false, null);
    channel.queueDeclare("SportsQ", true, false, false, null);
    channel.queueDeclare("EducationQ", true, false, false, null);

    channel.close();
  }

Step-4
Till now we have declared the Headers Exchange and Queues. It is time to set the bindings between them using respective routing headers. Look at the routing headers for each of the Queues, HealthQ with any of h1 = Header1 or h2 = Header2, SportsQ with both h1 = Header1 and h2 = Header2, and EducationQ with any of h1 = Header1 or h2 = Header2. The protocol of any of the header or both of the header matching is defined in x-match header property.

  public static void declareBindings() throws IOException, TimeoutException {
    Channel channel = ConnectionManager.getConnection().createChannel();
    //Create bindings - (queue, exchange, routingKey, headers) - routingKey != null
    Map<String, Object> healthArgs = new HashMap<>();
    healthArgs.put("x-match", "any"); //Match any of the header
    healthArgs.put("h1", "Header1");
    healthArgs.put("h2", "Header2");
    channel.queueBind("HealthQ", "my-headers-exchange", "", healthArgs);

    Map<String, Object> sportsArgs = new HashMap<>();
    sportsArgs.put("x-match", "all"); //Match all of the header
    sportsArgs.put("h1", "Header1");
    sportsArgs.put("h2", "Header2");
    channel.queueBind("SportsQ", "my-headers-exchange", "", sportsArgs);

    Map<String, Object> educationArgs = new HashMap<>();
    educationArgs.put("x-match", "any"); //Match any of the header
    educationArgs.put("h1", "Header1");
    educationArgs.put("h2", "Header2");
    channel.queueBind("EducationQ", "my-headers-exchange", "", educationArgs);

    channel.close();
  }

Step-5
Now you need the consumers listening to each of the Queues to see what messages they receive. As you can see there are 2 callbacks, the first one is DeliverCallback – with the parameters (consumerTag, message) and the second one is CancelCallback with the parameter – consumerTag. I have also added the message.getEnvelope() to give you an idea to obtain the exchange name and other properties from the received message.

  public static void subscribeMessage() throws IOException, TimeoutException {
    Channel channel = ConnectionManager.getConnection().createChannel();
    channel.basicConsume("HealthQ", true, ((consumerTag, message) -> {
      System.out.println("\n\n=========== Health Queue ==========");
      System.out.println(consumerTag);
      System.out.println("HealthQ: " + new String(message.getBody()));
      System.out.println(message.getEnvelope());
    }), consumerTag -> {
      System.out.println(consumerTag);
    });

    channel.basicConsume("SportsQ", true, ((consumerTag, message) -> {
      System.out.println("\n\n ============ Sports Queue ==========");
      System.out.println(consumerTag);
      System.out.println("SportsQ: " + new String(message.getBody()));
      System.out.println(message.getEnvelope());
    }), consumerTag -> {
      System.out.println(consumerTag);
    });

    channel.basicConsume("EducationQ", true, ((consumerTag, message) -> {
      System.out.println("\n\n ============ Education Queue ==========");
      System.out.println(consumerTag);
      System.out.println("EducationQ: " + new String(message.getBody()));
      System.out.println(message.getEnvelope());
    }), consumerTag -> {
      System.out.println(consumerTag);
    });
  }

Step-6
Publish messages with different header properties. Observe the headers and compare the output. The first message is sent with h1 and h3 and the second message is sent with h1, h2 and h3.

  public static void publishMessage() throws IOException, TimeoutException {
    Channel channel = ConnectionManager.getConnection().createChannel();

    String message = "Header Exchange example 1";
    Map<String, Object> headerMap = new HashMap<>();
    headerMap.put("h1", "Header1");
    headerMap.put("h3", "Header3");
    BasicProperties properties = new BasicProperties()
        .builder().headers(headerMap).build();
    channel.basicPublish("my-header-exchange", "", properties, message.getBytes());

    message = "Header Exchange example 2";
    headerMap.put("h2", "Header2");
    properties = new BasicProperties()
        .builder().headers(headerMap).build();
    channel.basicPublish("my-header-exchange", "", properties, message.getBytes());
    channel.close();
  }

Step-7
Let’s put all together and invoke each of these methods inside a public static void main(String[] args) of the HeadersExchange class. I will use two separate threads to asynchronously run the publish and subscribe methods.

  public static void main(String[] args) throws IOException, TimeoutException {
    HeadersExchange.declareQueues();
    HeadersExchange.declareExchange();
    HeadersExchange.declareBindings();

    //Threads created to publish-subscribe asynchronously
    Thread subscribe = new Thread() {
      @Override
      public void run() {
        try {
          HeadersExchange.subscribeMessage();
        } catch (IOException | TimeoutException e) {
          e.printStackTrace();
        }
      }
    };

    Thread publish = new Thread() {
      @Override
      public void run() {
        try {
          HeadersExchange.publishMessage();
        } catch (IOException | TimeoutException e) {
          e.printStackTrace();
        }
      }
    };

    subscribe.start();
    publish.start();
  }
}

Output:

=========== Health Queue ==========
amq.ctag-jQKFauizf0BhnEHToOBlBQ
HealthQ: Header Exchange example 1
Envelope(deliveryTag=1, redeliver=false, exchange=my-header-exchange, routingKey=)


=========== Health Queue ==========
amq.ctag-jQKFauizf0BhnEHToOBlBQ
HealthQ: Header Exchange example 2
Envelope(deliveryTag=2, redeliver=false, exchange=my-header-exchange, routingKey=)


 ============ Sports Queue ==========
amq.ctag-QK-momZTsJNXrLsZ4EtPBw
SportsQ: Header Exchange example 2
Envelope(deliveryTag=3, redeliver=false, exchange=my-header-exchange, routingKey=)


 ============ Education Queue ==========
amq.ctag-S9ti35xtL_2-azL6kfgNFg
EducationQ: Header Exchange example 1
Envelope(deliveryTag=4, redeliver=false, exchange=my-header-exchange, routingKey=)


 ============ Education Queue ==========
amq.ctag-S9ti35xtL_2-azL6kfgNFg
EducationQ: Header Exchange example 2
Envelope(deliveryTag=5, redeliver=false, exchange=my-header-exchange, routingKey=)

As you can see the first message reaches HealthQ and EducationQ as it has only h1. The second message reaches all the Queues as it meets x-match = any as well as x-match = all.

Conclusion:

I hope to have given you a practical guide on the Headers Exchange and its implementation. You need to understand its working and its application in your projects, specifically the way routing headers matching works. I would love to hear your feedback in the comments below if you have any.

Code example