In this article, you will learn about Exchanges, Queues, and Bindings in detail. In the previous article, you learned about sending a simple message to a Queue named Queue-1 . Even though I did not involve any Exchange to send and receive the message, but RabbitMQ involves its default Exchange to do the work.

Exchanges in AMQP – RabbitMQ

  • Exchanges are the first-class citizen in AMQP, they can be created from the management UI as well as programmatically.
  • Exchanges take the message from the producer and route it to zero or more queues.
  • When declaring an Exchange, 3 important properties need to be taken care of:
    • Name – The name of the Exchange.
    • Durability – durable means the exchange will survive a server restart.
    • Auto-delete Based on this value Exchanges are deleted automatically when not in use.
  • There are 4 Exchanges Types in AMQP and they are:
    • Direct exchange type – amq.direct
    • Fan-out exchange type – amq.fanout
    • Topic exchange type – amq.topic
    • Headers exchange type – amq.match
  • When no exchange is specified, RabbitMQ uses its default exchange.

1. Declare an Exchange programmatically

An exchange can be created programmatically as well as from the management UI. We will create it programmatically now. As I mentioned earlier, we need to take care of the 3 properties – Name, Durable, Auto-delete and the exchange type while creating an Exchange. By default the created exchange is durable and auto-delete is false.

Below code, shows the creation of a direct Exchange with the name my-direct-exchange. I will explain the different types of Exchanges a little later, but for now, understand the creation of exchanges.

package com.amqp.exchanges;

import com.amqp.basic.queue.CommonConfigs;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class CreateExchange {
  public static void main(String[] args) throws IOException, TimeoutException {
    ConnectionFactory factory = new ConnectionFactory();
    Connection connection = factory.newConnection(CommonConfigs.AMQP_URL);
    Channel channel = connection.createChannel();

    //Create an exchange
    channel.exchangeDeclare("my-direct-exchange", BuiltinExchangeType.DIRECT, true);

    channel.close();
    connection.close();
  }
}
package com.amqp.basic.queue;

public class CommonConfigs {
  public static final String DEFAULT_QUEUE = "Queue-1";
  public static final String AMQP_URL = "amqp://guest:guest@localhost:5672/";
}

It is safe to declare the same exchange twice, second time rabbitMQ will just ignore the request as there is already one declared.

There are several methods available to create an Exchange in rabbitmq-java-client. Below is the list of methods available in the Channel.java interface to create an Exchange programmatically.

public interface Channel extends ShutdownNotifier, AutoCloseable {
...
  /* Actively declare a non-autodelete exchange with no extra arguments */
  Exchange.DeclareOk exchangeDeclare(String exchange, 
                    String type, 
                    boolean durable) throws IOException;

  /* Actively declare a non-autodelete exchange with no extra arguments  */
  Exchange.DeclareOk exchangeDeclare(String exchange,
                    BuiltinExchangeType type,
                    boolean durable) throws IOException;

  /* Declare an exchange. */
  Exchange.DeclareOk exchangeDeclare(String exchange, 
                    String type, 
                    boolean durable, 
                    boolean autoDelete, 
                    Map<String, Object> arguments) throws IOException;

  /*  Declare an exchange.*/
  Exchange.DeclareOk exchangeDeclare(String exchange, 
                    BuiltinExchangeType type, 
                    boolean durable, 
                    boolean autoDelete,
                    Map<String, Object> arguments) throws IOException;

  /* Declare an exchange, via an interface that allows the complete set of arguments. */
  Exchange.DeclareOk exchangeDeclare(String exchange,
                    String type, 
                    boolean durable,
                    boolean autoDelete, 
                    boolean internal,
                    Map<String, Object> arguments) throws IOException;

  /* Declare an exchange, via an interface that allows the complete set of arguments. */
  Exchange.DeclareOk exchangeDeclare(String exchange,
                    BuiltinExchangeType type,
                    boolean durable,
                    boolean autoDelete,
                    boolean internal,
                    Map<String, Object> arguments) throws IOException;

  /**
  * Like {@link Channel#exchangeDeclare(String, String, boolean, boolean, java.util.Map)} but
  * sets nowait parameter to true and returns nothing (as there will be no response from
  * the server).
  */
  void exchangeDeclareNoWait(String exchange,
                    String type,
                    boolean durable,
                    boolean autoDelete,
                    boolean internal,
                    Map<String, Object> arguments) throws IOException;

  /**
  * Like {@link Channel#exchangeDeclare(String, String, boolean, boolean, java.util.Map)} but
  * sets nowait parameter to true and returns nothing (as there will be no response from
  * the server).
  */
  void exchangeDeclareNoWait(String exchange,
                    BuiltinExchangeType type,
                    boolean durable,
                    boolean autoDelete,
                    boolean internal,
                    Map<String, Object> arguments) throws IOException;

  /* Declare an exchange passively; that is, check if the named exchange exists. */
  Exchange.DeclareOk exchangeDeclarePassive(String name) throws IOException;
...
}

Similarly, there are several other methods to delete an exchange in AMQP.

  /*  Delete an exchange */
  Exchange.DeleteOk exchangeDelete(String exchange, boolean ifUnused)
                                   throws IOException;

  /* Delete an exchange, without regard for whether it is in use or not */
  Exchange.DeleteOk exchangeDelete(String exchange) throws IOException;

  void exchangeDeleteNoWait(String exchange, boolean ifUnused)
                                   throws IOException; 

2. Create an Exchange in management-UI

You can also create an exchange in the management-UI. Just log in to the management-UI and go to the Exchanges tab. Fill in the Name as direct-exchange-ui, Select Type as Direct, Durability as Durable and click on Add Exchange.

Direct Exchange

Queues in AMQP – RabbitMQ

  • Queues in AMQ behaves like queues in other messaging systems or task queuing system.
  • Queues store messages in FIFO (First-In-First-Out) way.
  • Message Queue has properties like
    • Name – Defines the name of the Queue.
    • Durability – If set as Durable, message queue can not afford to lose any message.
    • Exclusive – If set, the Queue is deleted as soon as the connection is closed.
    • Auto-delete – If set as true, the Queue is deleted once the last Consumer has unsubscribed.

1. Declaration of Queues programmatically

The below code shows the declaration of a Queue programmatically. Later, we will learn how to bind the Exchange with Queues.

It is fine to issue same Queue creation/declaration command more than once. RabbitMQ will consider the subsequent requests as passive.

package com.amqp.exchanges;

import com.amqp.basic.queue.CommonConfigs;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class CreateQueues {
  public static void main(String[] args) throws IOException, TimeoutException {
    ConnectionFactory factory = new ConnectionFactory();
    Connection connection = factory.newConnection(CommonConfigs.AMQP_URL);
    Channel channel = connection.createChannel();

    //Create the Queues
    channel.queueDeclare("MobileQ", true, false, false, null);
    channel.queueDeclare("ACQ", true, false, false, null);
    channel.queueDeclare("LightQ", true, false, false, null);

    channel.close();
    connection.close();
  }
}
package com.amqp.basic.queue;

public class CommonConfigs {
  public static final String DEFAULT_QUEUE = "Queue-1";
  public static final String AMQP_URL = "amqp://guest:guest@localhost:5672/";
}

After executing the above code, login to the RabbitMQ management-UI and check the Queues tab. You will be able to find the 3 Queues listed which you just created.

Similar to Exchanges, there are several methods in Channel interfaces available to declare Queues in the AMQ system.

public interface Channel extends ShutdownNotifier, AutoCloseable {
...

  /* Actively declare a server-named exclusive, autodelete, non-durable queue. */
  Queue.DeclareOk queueDeclare() throws IOException;

  /* Declare a queue with the parameters */
  Queue.DeclareOk queueDeclare(String queue, 
                    boolean durable, 
                    boolean exclusive, 
                    boolean autoDelete,
                     Map<String, Object> arguments) throws IOException;

  /* Like {@link Channel#queueDeclare(String, boolean, boolean, boolean, java.util.Map)} but sets nowait
     flag to true and returns no result (as there will be no response from the server). */
  void queueDeclareNoWait(String queue, 
                    boolean durable, 
                    boolean exclusive, 
                    boolean autoDelete,
                    Map<String, Object> arguments) throws IOException;

2. Create a Queue in management-UI

You can also add a Queue from admin-UI. Login to the management-UI with the username and password (default is guest). Fill in the details like Name, Durability, Auto delete. Click on Add queue.

Create a Queue in RabbitMQ
Create a Queue in RabbitMQ

Binding a Queue with an Exchange

Till now you learned Exchanges and Queues. Bindings are nothing but routing between Exchanges and Queues. Messages are never published directly to a Queue.

Instead, the producer sends messages to an exchange. Exchanges are message routing agents, living in a virtual host (vhost) within RabbitMQ Server. Exchanges accept messages from the producer application and route them to message queues with the help of header attributes, bindings, and routing keys.

A binding is a Link configured to make a connection between a queue and an exchange. The routing key is a message attribute. The exchange might look at the routing key, depending on the exchange type it routes the message to the correct queue.

package com.amqp.exchanges;

import com.amqp.basic.queue.CommonConfigs;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class CreateBindings {
  public static void main(String[] args) throws IOException, TimeoutException {

    ConnectionFactory connectionFactory = new ConnectionFactory();
    Connection connection = connectionFactory.newConnection(CommonConfigs.AMQP_URL);
    try (Channel channel = connection.createChannel()) {
      //Create bindings - (queue, exchange, routingKey)
      channel.queueBind("MobileQ", "my-direct-exchange", "personalDevice");
      channel.queueBind("ACQ", "my-direct-exchange", "homeAppliance");
      channel.queueBind("LightQ", "my-direct-exchange", "homeAppliance");
    }
    connection.close();
  }
}

There are several methods in the Channel interface to register queue binds and unbinds as listed below.

public interface Channel extends ShutdownNotifier, AutoCloseable {
...

  /* Bind a queue to an exchange, with no extra arguments */
  Queue.BindOk queueBind(String queue, 
                    String exchange, 
                    String routingKey) throws IOException;

  /* Bind a queue to an exchange. */
  Queue.BindOk queueBind(String queue, 
                    String exchange,
                    String routingKey, 
                    Map<String, Object> arguments) throws IOException;

  /**
  * Same as queueBind(String, String, String, java.util.Map)} but sets nowait
  * parameter to true and returns void (as there will be no response
  * from the server).
  */
  void queueBindNoWait(String queue,
                    String exchange, 
                    String routingKey,
                    Map<String, Object> arguments) throws IOException;

  /**
  * Unbinds a queue from an exchange, with no extra arguments.
  */
  Queue.UnbindOk queueUnbind(String queue, 
                    String exchange, 
                    String routingKey) throws IOException;

  /**
  * Unbind a queue from an exchange.
  */
  Queue.UnbindOk queueUnbind(String queue, 
                    String exchange, 
                    String routingKey, 
                    Map<String, Object> arguments) throws IOException;

...
}

You can verify the bindings in the management-UI. You can click on the my-direct-exchange exchange in the exchange-tab or click on the individual Queues under the queues tab.

Produce and Consume a message in RabbitMQ

You have learned to create Exchanges, Queues and Bindings to route the message. So far you have created a direct exchange with the namemy-direct-exchange, created 3 queues as ACQ, LightQ, and MobileQ. The ACQ and LightQ are linked to my-direct-exchange with the routing key homeAppliance. And the MobileQ is linked to my-direct-exchange with the routing key personalDevice.

Publish and Subscribe messages in AMQP - RabbitMQ
Typical Publish and Subscribe messages in RabbitMQ

Let us write the code to send a message to the Exchange, and based on the routing key the message will be forwarded to one or more Queues. Then the subscribers listening to the Queue(s) will receive the message and process it. Remember to run the TestConsumer code first and then the TestProducer code, so that when you send a message there is an active consumer listening to the queue LightQ.

package com.amqp.exchanges;

import com.amqp.basic.queue.CommonConfigs;
import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class TestConsumer {
  public static void main(String[] args) throws IOException, TimeoutException {
    ConnectionFactory factory = new ConnectionFactory();
    Connection connection = factory.newConnection(CommonConfigs.AMQP_URL);
    Channel channel = connection.createChannel();

    DeliverCallback deliverCallback = (consumerTag, message) -> {
      System.out.println(consumerTag);
      System.out.println(new String(message.getBody(), "UTF-8"));
    };

    CancelCallback cancelCallback = consumerTag -> {
      System.out.println(consumerTag);
    };
    channel.basicConsume("LightQ", true, deliverCallback, cancelCallback);
  }
}
package com.amqp.exchanges;

import com.amqp.basic.queue.CommonConfigs;
import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class TestPublisher {
  public static void main(String[] args) throws IOException, TimeoutException {
    ConnectionFactory factory = new ConnectionFactory();
    Connection connection = factory.newConnection(CommonConfigs.AMQP_URL);
    Channel channel = connection.createChannel();

    //Basic publish
    String message = "Turn on home appliances";
    // publish with (exchange, routingKey, properties, messageBody)
    channel.basicPublish("my-direct-exchange", "homeAppliance", null, message.getBytes());

    //Close the channel and connection
    channel.close();
    connection.close();
  }
}

Output:

amq.ctag-cubuZxWsPr-BnQuFL9ZHEA
Turn on home appliances

What we have done above is, sent a message to the direct exchange my-direct-exchange with the routing key homeAppliance. So the message is forwarded to 2 queues LightQ and the ACQ. The active consumer listening to LightQ receives the message and prints it on the console.

Conclusion:

I have given you details about the Exchanges, Queues, and Bindings in AMQP (rabbitMQ). There are 4 types of Exchanges you need to learn which will be covered in the next tutorials. You must understand the importance of routing keys to bind Queues with Exchanges.