This article focuses on the Fanout Exchange type (amq.fanout) in RabbitMQ. You will learn to bind a Queue with a Fanout Exchange using an empty routing key. You have already learned about Exchanges and Queues in the Elements of RabbitMQ. The diagram here explains a producer sends a message to my-fanout-exchange
, and it is forwarded to all 3 Queues (MobileQ
, ACQ
and LightQ
) bound to this exchange with no routing key. The job of a Fanout Exchange is to brainlessly forward the messages sent to it to all the Queues linked to it.
The flow of a message in Fanout Exchange
- One or more Queues bind to the Fanout exchange with no routing keys.
- A publisher sends the Exchange a message.
- The Exchange then forwards the message to the Queues unconditionally.
Implementation of Fanout Exchange
Step-1
Create an instance of the Connection
class as a singleton object. It is safe to share the same single instance of the Connection object and this is the recommended approach.
package com.amqp.exchanges.all;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class ConnectionManager {
private static Connection connection = null;
public static Connection getConnection() {
if (connection == null) {
try {
ConnectionFactory connectionFactory = new ConnectionFactory();
connection = connectionFactory.newConnection();
} catch (IOException | TimeoutException e) {
e.printStackTrace();
}
}
return connection;
}
}
Step-2
Create/declare a Fanout Exchange with the name my-fanout-exchange
. We will make use of the Channel.exchangeDeclare
method as shown below. Create a class with the name FanoutExchange
and add the below code as a method in it.
public static void declareExchange() throws IOException, TimeoutException {
Channel channel = ConnectionManager.getConnection().createChannel();
//Declare my-fanout-exchange
channel.exchangeDeclare("my-fanout-exchange", BuiltinExchangeType.FANOUT, true);
channel.close();
}
Step-3
Declare the Queues which will be linked to this exchange with empty routing keys. We will create 3 queues as MobileQ
, ACQ
and LightQ
using the Channel.queueDeclare
method as shown below.
public static void declareQueues() throws IOException, TimeoutException {
//Create a channel - do no't share the Channel instance
Channel channel = ConnectionManager.getConnection().createChannel();
//Create the Queues
channel.queueDeclare("MobileQ", true, false, false, null);
channel.queueDeclare("ACQ", true, false, false, null);
channel.queueDeclare("LightQ", true, false, false, null);
channel.close();
}
Step-4
Till now we have declared the Fanout Exchange and Queues. It is time to link the Queues with empty routing keys. You should not set the keys as null
, instead use the empty String ""
as below.
public static void declareBindings() throws IOException, TimeoutException {
Channel channel = ConnectionManager.getConnection().createChannel();
//Create bindings - (queue, exchange, routingKey) - routingKey != null
channel.queueBind("MobileQ", "my-fanout-exchange", "");
channel.queueBind("ACQ", "my-fanout-exchange", "");
channel.queueBind("LightQ", "my-fanout-exchange", "");
channel.close();
}
Step-5
Now you need the consumers listening to each of the Queues to see what messages they receive. As you can see there are 2 callbacks, the first one is DeliverCallback
– with the parameters (consumerTag, message)
and the second one is CancelCallback
with the parameter – consumerTag
.
public static void subscribeMessage() throws IOException, TimeoutException {
Channel channel = ConnectionManager.getConnection().createChannel();
channel.basicConsume("LightQ", true, ((consumerTag, message) -> {
System.out.println(consumerTag);
System.out.println("LightQ: " + new String(message.getBody()));
}), consumerTag -> {
System.out.println(consumerTag);
});
channel.basicConsume("ACQ", true, ((consumerTag, message) -> {
System.out.println(consumerTag);
System.out.println("ACQ: " + new String(message.getBody()));
}), consumerTag -> {
System.out.println(consumerTag);
});
channel.basicConsume("MobileQ", true, ((consumerTag, message) -> {
System.out.println(consumerTag);
System.out.println("MobileQ: " + new String(message.getBody()));
}), consumerTag -> {
System.out.println(consumerTag);
});
}
Step-6
Let’s publish the message to my-fanout-exchange
using channel.basicPublish()
.
public static void publishMessage() throws IOException, TimeoutException {
Channel channel = ConnectionManager.getConnection().createChannel();
String message = "Main Power is ON";
channel.basicPublish("my-fanout-exchange", "", null, message.getBytes());
channel.close();
}
Step-7
Finally, we will put all together into work. We will invoke each of the methods inside a public static void main()
as shown below. I will use two separate threads to asynchronously run the publish and subscribe methods.
public static void main(String[] args) throws IOException, TimeoutException {
FanoutExchange.declareQueues();
FanoutExchange.declareExchange();
FanoutExchange.declareBindings();
//Threads created to publish-subscribe asynchronously
Thread subscribe = new Thread() {
@Override
public void run() {
try {
FanoutExchange.subscribeMessage();
} catch (IOException | TimeoutException e) {
e.printStackTrace();
}
}
};
Thread publish = new Thread() {
@Override
public void run() {
try {
FanoutExchange.publishMessage();
} catch (IOException | TimeoutException e) {
e.printStackTrace();
}
}
};
subscribe.start();
publish.start();
}
Output:
amq.ctag-TH1MQlvM7o8aoVTH-E5g0w LightQ: Main Power is ON amq.ctag-EQ4HZrsj7SqzQhfS2lAFEw ACQ: Main Power is ON amq.ctag-y1Qns3gFs8fj_t6sq6qtBw MobileQ: Main Power is ON
As you can see in the output, a fanout exchange forwards the message unconditionally to all the linked Queues. This is useful when you want to broadcast a message to all the Queues, kind of a publish/subscribe model.
Conclusion:
I hope to have given you a good inside of Fanout Exchange and its implementation in RabbitMQ using AMQP. It is useful when you want to forward/broadcast a message unconditionally. I would love to hear your feedback in the comments below. Check out the code on GitHub.