Headers Exchange Type is the most powerful exchange type in RabbitMQ – AMQP. Headers exchanges route a message based on the message headers matching, they ignore routing keys.
The diagram here represents 3 Queues (HealthQ, SportsQ, and EducationQ) are bound to the Headers Exchange (my-header-exchange) with JSON like headers. We will discuss each of these headers below in detail.
The flow of a message in Headers Exchange
- One or more Queues bind (linked) to a Headers Exchange using header properties( H ).
- A Producer sends a message to this Exchange with a Header property (MH).
- If MH matches with H, the message is forwarded to the Queue. The Headers matching algorithm is discussed next.
- The consumers listening to the Queue receives the message and processes it.
The Headers matching algorithm
- There are 2 types of headers matching allowed which are any (similar to logical OR) or all (similar to logical AND).
- They are represented in the bindings as
{ "x-match", "any" ..}
or { “x-match”, “all” ..}. - The x-match = any means, a message sent to the Exchange should contain at least one of the headers that Queue is linked with, then the message will be routed to the Queue.
- On the other hand, if a queue is bound with headers has
x-match = all
, messages that have all of its listed headers will be forwarded to the Queue. - You can see in the above diagram if a message with header
{"h1": "Header1"}
is sent tomy-header-exchange
, it will be routed to bothHealthQ
andEducationQ
. - Only when the message has both the
h1
andh2
headers with correct values, it will be forwarded toSportsQ
, HealthQ and EducationQ as well.
Implementation of Headers Exchange
We will follow the same steps we did for other exchanges implementation.
Step-1
Create an instance of the Connection
class as a singleton object if you have not done it yet. It is safe to share the same single instance of the Connection object and that is the recommended approach.
package com.amqp.exchanges.all;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class ConnectionManager {
private static Connection connection = null;
public static Connection getConnection() {
if (connection == null) {
try {
ConnectionFactory connectionFactory = new ConnectionFactory();
connection = connectionFactory.newConnection("amqp://guest:guest@localhost:5672/");
} catch (IOException | TimeoutException e) {
e.printStackTrace();
}
}
return connection;
}
}
Step-2
Create/declare a Headers Exchange with the name my-header-exchange
. We will make use of the Channel.exchangeDeclare
method as shown below. Create a class as HeadersExchange
and add the below code as a method in it.
public static void declareExchange() throws IOException, TimeoutException {
Channel channel = ConnectionManager.getConnection().createChannel();
//Declare my-header-exchange
channel.exchangeDeclare("my-header-exchange", BuiltinExchangeType.HEADERS, true);
channel.close();
}
Step-3
Declare the Queues which will be linked to this exchange. We will create 3 queues as HealthQ
, SportsQ
and EducationQ
. Will set the durable=ture
, autoDelete=false
, exclusive=false
and arguments=null
as shown below.
public static void declareQueues() throws IOException, TimeoutException {
//Create a channel - do not share the Channel instance
Channel channel = ConnectionManager.getConnection().createChannel();
//Create the Queues
channel.queueDeclare("HealthQ", true, false, false, null);
channel.queueDeclare("SportsQ", true, false, false, null);
channel.queueDeclare("EducationQ", true, false, false, null);
channel.close();
}
Step-4
Till now we have declared the Headers Exchange and Queues. It is time to set the bindings between them using respective routing headers. Look at the routing headers for each of the Queues, HealthQ with any of h1 = Header1
or h2 = Header2
, SportsQ with both h1 = Header1
and h2 = Header2
, and EducationQ with any of h1 = Header1
or h2 = Header2
. The protocol of any of the header or both of the header matching is defined in x-match
header property.
public static void declareBindings() throws IOException, TimeoutException {
Channel channel = ConnectionManager.getConnection().createChannel();
//Create bindings - (queue, exchange, routingKey, headers) - routingKey != null
Map<String, Object> healthArgs = new HashMap<>();
healthArgs.put("x-match", "any"); //Match any of the header
healthArgs.put("h1", "Header1");
healthArgs.put("h2", "Header2");
channel.queueBind("HealthQ", "my-header-exchange", "", healthArgs);
Map<String, Object> sportsArgs = new HashMap<>();
sportsArgs.put("x-match", "all"); //Match all of the header
sportsArgs.put("h1", "Header1");
sportsArgs.put("h2", "Header2");
channel.queueBind("SportsQ", "my-header-exchange", "", sportsArgs);
Map<String, Object> educationArgs = new HashMap<>();
educationArgs.put("x-match", "any"); //Match any of the header
educationArgs.put("h1", "Header1");
educationArgs.put("h2", "Header2");
channel.queueBind("EducationQ", "my-header-exchange", "", educationArgs);
channel.close();
}
Step-5
Now you need the consumers listening to each of the Queues to see what messages they receive. As you can see there are 2 callbacks, the first one is DeliverCallback
– with the parameters (consumerTag, message)
and the second one is CancelCallback
with the parameter – consumerTag
. I have also added the message.getEnvelope()
to give you an idea to obtain the exchange name and other properties from the received message.
public static void subscribeMessage() throws IOException, TimeoutException {
Channel channel = ConnectionManager.getConnection().createChannel();
channel.basicConsume("HealthQ", true, ((consumerTag, message) -> {
System.out.println("\n\n=========== Health Queue ==========");
System.out.println(consumerTag);
System.out.println("HealthQ: " + new String(message.getBody()));
System.out.println(message.getEnvelope());
}), consumerTag -> {
System.out.println(consumerTag);
});
channel.basicConsume("SportsQ", true, ((consumerTag, message) -> {
System.out.println("\n\n ============ Sports Queue ==========");
System.out.println(consumerTag);
System.out.println("SportsQ: " + new String(message.getBody()));
System.out.println(message.getEnvelope());
}), consumerTag -> {
System.out.println(consumerTag);
});
channel.basicConsume("EducationQ", true, ((consumerTag, message) -> {
System.out.println("\n\n ============ Education Queue ==========");
System.out.println(consumerTag);
System.out.println("EducationQ: " + new String(message.getBody()));
System.out.println(message.getEnvelope());
}), consumerTag -> {
System.out.println(consumerTag);
});
}
Step-6
Publish messages with different header properties. Observe the headers and compare the output. The first message is sent with h1
and h3
and the second message is sent with h1
, h2
and h3
.
public static void publishMessage() throws IOException, TimeoutException {
Channel channel = ConnectionManager.getConnection().createChannel();
String message = "Header Exchange example 1";
Map<String, Object> headerMap = new HashMap<>();
headerMap.put("h1", "Header1");
headerMap.put("h3", "Header3");
BasicProperties properties = new BasicProperties()
.builder().headers(headerMap).build();
channel.basicPublish("my-header-exchange", "", properties, message.getBytes());
message = "Header Exchange example 2";
headerMap.put("h2", "Header2");
properties = new BasicProperties()
.builder().headers(headerMap).build();
channel.basicPublish("my-header-exchange", "", properties, message.getBytes());
channel.close();
}
Step-7
Let’s put all together and invoke each of these methods inside a public static void main(String[] args)
of the HeadersExchange class. I will use two separate threads to asynchronously run the publish and subscribe methods.
public static void main(String[] args) throws IOException, TimeoutException {
HeadersExchange.declareQueues();
HeadersExchange.declareExchange();
HeadersExchange.declareBindings();
//Threads created to publish-subscribe asynchronously
Thread subscribe = new Thread() {
@Override
public void run() {
try {
HeadersExchange.subscribeMessage();
} catch (IOException | TimeoutException e) {
e.printStackTrace();
}
}
};
Thread publish = new Thread() {
@Override
public void run() {
try {
HeadersExchange.publishMessage();
} catch (IOException | TimeoutException e) {
e.printStackTrace();
}
}
};
subscribe.start();
publish.start();
}
}
Output:
=========== Health Queue ========== amq.ctag-jQKFauizf0BhnEHToOBlBQ HealthQ: Header Exchange example 1 Envelope(deliveryTag=1, redeliver=false, exchange=my-header-exchange, routingKey=) =========== Health Queue ========== amq.ctag-jQKFauizf0BhnEHToOBlBQ HealthQ: Header Exchange example 2 Envelope(deliveryTag=2, redeliver=false, exchange=my-header-exchange, routingKey=) ============ Sports Queue ========== amq.ctag-QK-momZTsJNXrLsZ4EtPBw SportsQ: Header Exchange example 2 Envelope(deliveryTag=3, redeliver=false, exchange=my-header-exchange, routingKey=) ============ Education Queue ========== amq.ctag-S9ti35xtL_2-azL6kfgNFg EducationQ: Header Exchange example 1 Envelope(deliveryTag=4, redeliver=false, exchange=my-header-exchange, routingKey=) ============ Education Queue ========== amq.ctag-S9ti35xtL_2-azL6kfgNFg EducationQ: Header Exchange example 2 Envelope(deliveryTag=5, redeliver=false, exchange=my-header-exchange, routingKey=)
As you can see the first message reaches HealthQ and EducationQ as it has only h1
. The second message reaches all the Queues as it meets x-match = any
as well as x-match = all
.
Conclusion:
I hope to have given you a practical guide on the Headers Exchange and its implementation in RabbitMQ using AMQP. You need to understand its working and its application in your projects, specifically the way routing headers matching works. I would love to hear your feedback in the comments below if you have any.