This article focuses on the Fanout Exchange type (amq.fanout) in RabbitMQ. You will learn to bind a Queue with a Fanout Exchange using an empty routing key. You have already learned about Exchanges and Queues in the Elements of RabbitMQ. The diagram here explains a producer sends a message to my-fanout-exchange , and it is forwarded to all 3 Queues (MobileQ, ACQ and LightQ) bound to this exchange with no routing key. The job of a Fanout Exchange is to brainlessly forward the messages sent to it to all the Queues linked to it.

The flow of a message in Fanout Exchange

  • One or more Queues bind to the Fanout exchange with no routing keys.
  • A publisher sends the Exchange a message.
  • The Exchange then forwards the message to the Queues unconditionally.

Implementation of Fanout Exchange

Step-1
Create an instance of the Connection class as a singleton object. It is safe to share the same single instance of the Connection object and this is the recommended approach.

Java
package com.amqp.exchanges.all;

import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class ConnectionManager {
  private static Connection connection = null;
 
  public static Connection getConnection() {
    if (connection == null) {
      try {
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connection = connectionFactory.newConnection();
      } catch (IOException | TimeoutException e) {
        e.printStackTrace();
      }
    }
    return connection;
  }
}

Step-2
Create/declare a Fanout Exchange with the name my-fanout-exchange. We will make use of the Channel.exchangeDeclare method as shown below. Create a class with the name FanoutExchange and add the below code as a method in it.

Java
  public static void declareExchange() throws IOException, TimeoutException {
    Channel channel = ConnectionManager.getConnection().createChannel();

    //Declare my-fanout-exchange
    channel.exchangeDeclare("my-fanout-exchange", BuiltinExchangeType.FANOUT, true);
    channel.close();
  }

Step-3
Declare the Queues which will be linked to this exchange with empty routing keys. We will create 3 queues as MobileQ, ACQ and LightQ using the Channel.queueDeclare method as shown below.

Java
  public static void declareQueues() throws IOException, TimeoutException {

    //Create a channel - do no't share the Channel instance
    Channel channel = ConnectionManager.getConnection().createChannel();
 
    //Create the Queues
    channel.queueDeclare("MobileQ", true, false, false, null);
    channel.queueDeclare("ACQ", true, false, false, null);
    channel.queueDeclare("LightQ", true, false, false, null);
    channel.close();
  }

Step-4
Till now we have declared the Fanout Exchange and Queues. It is time to link the Queues with empty routing keys. You should not set the keys as null, instead use the empty String "" as below.

Java
  public static void declareBindings() throws IOException, TimeoutException {
    Channel channel = ConnectionManager.getConnection().createChannel();

    //Create bindings - (queue, exchange, routingKey) - routingKey != null
    channel.queueBind("MobileQ", "my-fanout-exchange", "");
    channel.queueBind("ACQ", "my-fanout-exchange", "");
    channel.queueBind("LightQ", "my-fanout-exchange", "");
    channel.close();
  }

Step-5
Now you need the consumers listening to each of the Queues to see what messages they receive. As you can see there are 2 callbacks, the first one is DeliverCallback – with the parameters (consumerTag, message) and the second one is CancelCallback with the parameter – consumerTag.

Java
  public static void subscribeMessage() throws IOException, TimeoutException {
    Channel channel = ConnectionManager.getConnection().createChannel();

    channel.basicConsume("LightQ", true, ((consumerTag, message) -> {
      System.out.println(consumerTag);
      System.out.println("LightQ: " + new String(message.getBody()));
    }), consumerTag -> {
      System.out.println(consumerTag);
    });

    channel.basicConsume("ACQ", true, ((consumerTag, message) -> {
      System.out.println(consumerTag);
      System.out.println("ACQ: " + new String(message.getBody()));
    }), consumerTag -> {
      System.out.println(consumerTag);
    });
 
    channel.basicConsume("MobileQ", true, ((consumerTag, message) -> {
      System.out.println(consumerTag);
      System.out.println("MobileQ: " + new String(message.getBody()));
    }), consumerTag -> {
      System.out.println(consumerTag);
    });
  }

Step-6
Let’s publish the message to my-fanout-exchange using channel.basicPublish().

Java
  public static void publishMessage() throws IOException, TimeoutException {
    Channel channel = ConnectionManager.getConnection().createChannel();
    String message = "Main Power is ON";
    channel.basicPublish("my-fanout-exchange", "", null, message.getBytes());
    channel.close();
  }

Step-7
Finally, we will put all together into work. We will invoke each of the methods inside a public static void main() as shown below. I will use two separate threads to asynchronously run the publish and subscribe methods.

Java
  public static void main(String[] args) throws IOException, TimeoutException {
    FanoutExchange.declareQueues();
    FanoutExchange.declareExchange();
    FanoutExchange.declareBindings();
    
    //Threads created to publish-subscribe asynchronously
    Thread subscribe = new Thread() {
      @Override
      public void run() {
        try {
          FanoutExchange.subscribeMessage();
        } catch (IOException | TimeoutException e) {
          e.printStackTrace();
        }
      }
    };
  
    Thread publish = new Thread() {
      @Override
      public void run() {
        try {
          FanoutExchange.publishMessage();
        } catch (IOException | TimeoutException e) {
          e.printStackTrace();
        }
      }
    };
 
    subscribe.start();
    publish.start();
  }

Output:

amq.ctag-TH1MQlvM7o8aoVTH-E5g0w
LightQ: Main Power is ON
amq.ctag-EQ4HZrsj7SqzQhfS2lAFEw
ACQ: Main Power is ON
amq.ctag-y1Qns3gFs8fj_t6sq6qtBw
MobileQ: Main Power is ON

As you can see in the output, a fanout exchange forwards the message unconditionally to all the linked Queues. This is useful when you want to broadcast a message to all the Queues, kind of a publish/subscribe model.

Conclusion:

I hope to have given you a good inside of Fanout Exchange and its implementation in RabbitMQ using AMQP. It is useful when you want to forward/broadcast a message unconditionally. I would love to hear your feedback in the comments below. Check out the code on GitHub.


By |Last Updated: May 22nd, 2024|Categories: RabbitMQ|

Table of Contents