This article focuses on the Fanout Exchange type (amq.fanout) in RabbitMQ. You will learn to bind a Queue with a Fanout Exchange using an empty routing key. You have already learned about Exchanges and Queues in the Elements of RabbitMQ. The diagram here explains a producer sends a message to my-fanout-exchange
, and it is forwarded to all 3 Queues (MobileQ
, ACQ
and LightQ
) bound to this exchange with no routing key. The job of a Fanout Exchange is to brainlessly forward the messages sent to it to all the Queues linked to it.
The flow of a message in Fanout Exchange
- One or more Queues bind to the Fanout exchange with no routing keys.
- A publisher sends the Exchange a message.
- The Exchange then forwards the message to the Queues unconditionally.
Implementation of Fanout Exchange
Step-1
Create an instance of the Connection
class as a singleton object. It is safe to share the same single instance of the Connection object and this is the recommended approach.
package com.amqp.exchanges.all; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import java.io.IOException; import java.util.concurrent.TimeoutException; public class ConnectionManager { private static Connection connection = null; public static Connection getConnection() { if (connection == null) { try { ConnectionFactory connectionFactory = new ConnectionFactory(); connection = connectionFactory.newConnection(); } catch (IOException | TimeoutException e) { e.printStackTrace(); } } return connection; } }
Step-2
Create/declare a Fanout Exchange with the name my-fanout-exchange
. We will make use of the Channel.exchangeDeclare
method as shown below. Create a class with the name FanoutExchange
and add the below code as a method in it.
public static void declareExchange() throws IOException, TimeoutException { Channel channel = ConnectionManager.getConnection().createChannel(); //Declare my-fanout-exchange channel.exchangeDeclare("my-fanout-exchange", BuiltinExchangeType.FANOUT, true); channel.close(); }
Step-3
Declare the Queues which will be linked to this exchange with empty routing keys. We will create 3 queues as MobileQ
, ACQ
and LightQ
using the Channel.queueDeclare
method as shown below.
public static void declareQueues() throws IOException, TimeoutException { //Create a channel - do no't share the Channel instance Channel channel = ConnectionManager.getConnection().createChannel(); //Create the Queues channel.queueDeclare("MobileQ", true, false, false, null); channel.queueDeclare("ACQ", true, false, false, null); channel.queueDeclare("LightQ", true, false, false, null); channel.close(); }
Step-4
Till now we have declared the Fanout Exchange and Queues. It is time to link the Queues with empty routing keys. You should not set the keys as null
, instead use the empty String ""
as below.
public static void declareBindings() throws IOException, TimeoutException { Channel channel = ConnectionManager.getConnection().createChannel(); //Create bindings - (queue, exchange, routingKey) - routingKey != null channel.queueBind("MobileQ", "my-fanout-exchange", ""); channel.queueBind("ACQ", "my-fanout-exchange", ""); channel.queueBind("LightQ", "my-fanout-exchange", ""); channel.close(); }
Step-5
Now you need the consumers listening to each of the Queues to see what messages they receive. As you can see there are 2 callbacks, the first one is DeliverCallback
– with the parameters (consumerTag, message)
and the second one is CancelCallback
with the parameter – consumerTag
.
public static void subscribeMessage() throws IOException, TimeoutException { Channel channel = ConnectionManager.getConnection().createChannel(); channel.basicConsume("LightQ", true, ((consumerTag, message) -> { System.out.println(consumerTag); System.out.println("LightQ: " + new String(message.getBody())); }), consumerTag -> { System.out.println(consumerTag); }); channel.basicConsume("ACQ", true, ((consumerTag, message) -> { System.out.println(consumerTag); System.out.println("ACQ: " + new String(message.getBody())); }), consumerTag -> { System.out.println(consumerTag); }); channel.basicConsume("MobileQ", true, ((consumerTag, message) -> { System.out.println(consumerTag); System.out.println("MobileQ: " + new String(message.getBody())); }), consumerTag -> { System.out.println(consumerTag); }); }
Step-6
Let’s publish the message to my-fanout-exchange
using channel.basicPublish()
.
public static void publishMessage() throws IOException, TimeoutException { Channel channel = ConnectionManager.getConnection().createChannel(); String message = "Main Power is ON"; channel.basicPublish("my-fanout-exchange", "", null, message.getBytes()); channel.close(); }
Step-7
Finally, we will put all together into work. We will invoke each of the methods inside a public static void main()
as shown below. I will use two separate threads to asynchronously run the publish and subscribe methods.
public static void main(String[] args) throws IOException, TimeoutException { FanoutExchange.declareQueues(); FanoutExchange.declareExchange(); FanoutExchange.declareBindings(); //Threads created to publish-subscribe asynchronously Thread subscribe = new Thread() { @Override public void run() { try { FanoutExchange.subscribeMessage(); } catch (IOException | TimeoutException e) { e.printStackTrace(); } } }; Thread publish = new Thread() { @Override public void run() { try { FanoutExchange.publishMessage(); } catch (IOException | TimeoutException e) { e.printStackTrace(); } } }; subscribe.start(); publish.start(); }
Output:
amq.ctag-TH1MQlvM7o8aoVTH-E5g0w LightQ: Main Power is ON amq.ctag-EQ4HZrsj7SqzQhfS2lAFEw ACQ: Main Power is ON amq.ctag-y1Qns3gFs8fj_t6sq6qtBw MobileQ: Main Power is ON
As you can see in the output, a fanout exchange forwards the message unconditionally to all the linked Queues. This is useful when you want to broadcast a message to all the Queues, kind of a publish/subscribe model.
Conclusion:
I hope to have given you a good inside of Fanout Exchange and its implementation. It is useful when you want to forward/broadcast a message unconditionally. I would love to hear your feedback in the comments below. Check out the code on GitHub.
Best tutorial ever on rabbit mq :) thanks