Point-to-Point systems are about working with Queues for messaging. A client sends a message to a specific Queue and a specific subscriber listens or receives the message from it. In JMS Point-to-Point messaging System, a Queue is used to deal with a Single Sender and a Single Consumer. Make sure you have already gone through the previous tutorials on JMS Message Model and how to send and receive a message in JMS.

Use a Queue for PTP messaging

You know by now that Point to Point messaging is all about making use of Queue. JMS provides javax.jms.Queue to represent the queue object. I will be making use of JMS 2.0 API i.e. javax.jms.JMSProducer and javax.jms.JMSConsumer to send and receive messages.

I have already shown you multiple examples to use JMS Queue. The below code represents the steps to use a JMS Queue in JMS 2.0 API, just to refresh the concepts again.

Java
package lab00.simple.helloworld;

import javax.jms.*;
import javax.naming.InitialContext;
import javax.naming.NamingException;

public class TestHelloWorld {
  public static void main(String[] args) {

    ConnectionFactory connectionFactory = null;
    Queue queue = null;

    try {
      InitialContext initialContext = new InitialContext();
  
      //Step-1 Create ConnectionFactory
      connectionFactory
          = (ConnectionFactory) initialContext.lookup("jms/__defaultConnectionFactory");
 
      //Step-2 Get the Destination
      queue = (Queue) initialContext.lookup("jms/PTPQueue");
    } catch (NamingException e) {
      e.printStackTrace();
    }

    //Step-3 Create J MSContext
    try (JMSContext jmsContext = connectionFactory.createContext()) {
 
      //Step-4a Create a Text Message and send
      TextMessage textMessage = jmsContext.createTextMessage("Message using JMS 2.0");
      JMSProducer jmsProducer = jmsContext.createProducer().send(queue, textMessage);

      //Step-4b Receive the message
      TextMessage message = (TextMessage) jmsContext.createConsumer(queue).receive();
      System.out.println(message.getText());
    } catch (JMSException e) {
      e.printStackTrace();
    }
  }
}

NOTE: In a Point-to-Point messaging system, message gets deleted once the message is read (received) from the Queue. In other words it can only be read once after that it is removed from teh Queue.

Asynchronous Message Listeners in JMS

JMS allows you to assign an Async message Listener to the JMSConsumer. Async listener is available for both Queue and Topic. You can use a lambda function as shown below.

Java
@Test
public void testAsyncConsumer() throws InterruptedException {
  try(JMSContext jmsContext = connectionFactory.createContext()){
    JMSConsumer consumer = jmsContext.createConsumer(queue);
    consumer.setMessageListener(message -> {
      try {
        System.out.println(message.getBody(String.class));
      } catch (JMSException e) {
        e.printStackTrace();
      }
    });

    JMSProducer producer = jmsContext.createProducer().send(queue, "Message 1");
    producer.send(queue, "Message 2");
    producer.send(queue, "Message 3");

    //Close the Consumer
    consumer.close();
  }
}

Output

Message 1
Message 2
Message 3

NOTE: It is always recommended to use the async message listeners wherever possible. In this tutorial I have used the synchronous message processing for bravity, but async way is the right way to use in projects.

TemporaryQueue

A javax.jms.TemporaryQueue is a unique temporary Queue object created for the duration of a Connection. It can only be consumed by the connection that created it. JMS makes sure that the TemporaryQueue is deleted if there are no receivers assigned to it. We will discuss below the uses of TemporaryQueue in a typical request-response scenario where the receiver responds back to the sender with another message.

The below code demonstrates how to create a TemporaryQueue and link it to a message.

Java
TemporaryQueue tempReplyQ = jmsContext.createTemporaryQueue();

//Send the message
Message message = jmsContext.createTextMessage("Temp Queue demo");
message.setJMSReplyTo(tempReplyQ);
Java
public interface TemporaryQueue extends Queue {
    void 
    delete() throws JMSException; 
}

QueueBrowser in PTP messaging

A javax.jms.QueueBrowser is created from a JMSContext or Session. You can use QueueBrowser to explore the messages in a Queue without removing them.

Java
  @Test
  public void testQueueBrowser() throws JMSException {
    try (JMSContext jmsContext = connectionFactory.createContext()) {
      JMSProducer producer = jmsContext.createProducer();
      TextMessage message1 = jmsContext.createTextMessage("Start your day with a glass of Water!");
      TextMessage message2 = jmsContext.createTextMessage("Remember to do 10 mins stretching");

      producer.send(queue, message1);
      producer.send(queue, message2);

      QueueBrowser qBrowser = jmsContext.createBrowser(queue);
      Enumeration msgEnum = qBrowser.getEnumeration();
      while (msgEnum.hasMoreElements()) {
        TextMessage browsedMsg = (TextMessage) msgEnum.nextElement();
        System.out.println("Browsed message: " + browsedMsg.getText());
      }

      JMSConsumer consumer = jmsContext.createConsumer(queue);
      for (int i = 0; i < 2; i++) {
        System.out.println("Received message: " + consumer.receiveBody(String.class));
      }
    }
  }

Output

Browsed message: Start your day with a glass of Water!
Browsed message: Remember to do 10 mins stretching
Received message: Start your day with a glass of Water!
Received message: Remember to do 10 mins stretching

Reliability of Queue

A Queue is typically created by admin and exists for a long time. A Queue holds the messages even when no Consumers receiving the message. Hence it makes the usage quite reliable even without taking any special measures.

Message Request Reply in JMS

There are use cases when a message receiver wants to reply back to the sender. There are 2 ways to implement a request-reply scenario in a Point-to-Point messaging system.

  1. By making use of a TemporaryQueue.
  2. Or by using Another Queue as a reply queue.

1. Message request-reply using TemporaryQueue

The below code shows the use of a TemporaryQueue and the message receiver replies back on this Queue. Temporary Queues are session-scoped (jmsContext scoped), they are deleted when the session is closed.

Java
//SessionFactory & Queue creation skipped for bravity
try(JMSContext jmsContext = connectionFactory.createContext()) {
      JMSProducer producer = jmsContext.createProducer();

      //Use temporary Queue to send and receive messages.
      TemporaryQueue replyQueue = jmsContext.createTemporaryQueue();
      TextMessage message = jmsContext.createTextMessage("Sender message - Hi there!");
      message.setJMSReplyTo(replyQueue);
      producer.send(queue, message);
      System.out.println(message.getJMSMessageID());

      //Message received
      JMSConsumer consumer = jmsContext.createConsumer(queue);
      TextMessage messageReceived = (TextMessage) consumer.receive();
      System.out.println(messageReceived.getText());

      //Reply
      JMSProducer replyProducer = jmsContext.createProducer();
      TextMessage replyMessage = jmsContext.createTextMessage("Reply message - Hi, all well here!");
      replyMessage.setJMSCorrelationID(messageReceived.getJMSMessageID());
      replyProducer.send(messageReceived.getJMSReplyTo(), replyMessage);
 
      //Receive and process reply
      JMSConsumer replyConsumer = jmsContext.createConsumer(replyQueue);
      System.out.println( replyConsumer.receiveBody(String.class));
      TextMessage replyReceived = (TextMessage) replyConsumer.receive();
      System.out.println(replyReceived.getJMSCorrelationID());
      replyQueue.delete();
    }

Output

Sender message - Hi there!
Reply message - Hi, all well here!

As shown in the line-8, the temporary queue needs to be set on the message header. We have talked about headers in the JMS Message Model. The receiver reads the message and understands the queue to reply on as shown in line-21.

NOTE: Few JMS providers may need special privileges to create a temporaryQueue. Just talk to your JMS admin about it to allow the privileges.

2. Use another Queue as a reply Queue

Just like creating a temporary Queue, you can have multiple queues. One is for the message and the second one for replying to the original message. In the code below, queue and replyQueue are serving the exact same purpose.

Java
package lab02.message.requestresponse;

import labxx.common.settings.CommonSettings;
import javax.jms.*;

public class MessageRequestResponse {
  public static void main(String[] args) throws JMSException, InterruptedException {

    ConnectionFactory connectionFactory = CommonSettings.getConnectionFactory();
    Queue queue = CommonSettings.getDefaultQueue();
    Queue replyQueue = CommonSettings.getDefaultReplyQueue();

    Thread messageproducer = new Thread() {
      public void run() {
        try (JMSContext jmsContext = connectionFactory.createContext()) {
          JMSProducer producer = jmsContext.createProducer();
          //Send the message
          Message message = jmsContext.createTextMessage("Order placed successfully");
          message.setJMSReplyTo(replyQueue);
          producer.send(queue, message);
          sleep(2000);
          JMSConsumer replyConsumer = jmsContext.createConsumer(replyQueue);
          TextMessage replyMessage = (TextMessage) replyConsumer.receive();
          System.out.println("Received reply: " + replyMessage.getText());
        } catch (JMSException | InterruptedException ex) {
          ex.printStackTrace();
        }
      }
    };

    Thread messageConsumer = new Thread() {
      public void run() {
        try (JMSContext jmsContext = connectionFactory.createContext()) {
          //Receive message
          Thread.sleep(1000);
          JMSConsumer consumer = jmsContext.createConsumer(queue);
          Message msg = consumer.receive();
          System.out.println("Received message: " + msg.getBody(String.class));
          //Reply message
          jmsContext.createProducer().send(msg.getJMSReplyTo(), "Order will be dispatched soon!");
        } catch (JMSException | InterruptedException e) {
          e.printStackTrace();
        }
      }
    };

    messageproducer.start();
    messageConsumer.start();
  }
}

Output

Received message: Order placed successfully
Received reply: Order will be dispatched soon!

NOTE: When you have multiple replies on the reply queue, it becomes difficult to find out the link between message and reply. Meaning how would you know which replyMessage belongs to which message. The solution is to make use of JMSCorrelationID.

Use JMSCorrelationID in JMS Point-to-point Request Response Messaging

Below example demonstrates the use of JMSCorrelationID. A message is first sent to the queue, the receiver listens to the message and processes it. The received message has details like JMSreplyTo Queue name. The message receiver sends the reply on replyQueue and sets JMSCorrelationID from the receivedMessage id as shown in line 40.

Java
package lab02.message.requestresponse;

import labxx.common.settings.CommonSettings;
import javax.jms.*;

// MessageId and CorrelationId test
public class MessageIdCorrelationId {
  public static void main(String[] args) throws JMSException, InterruptedException {
    ConnectionFactory connectionFactory = CommonSettings.getConnectionFactory();
    Queue queue = CommonSettings.getDefaultQueue();
    Queue replyQueue = CommonSettings.getDefaultReplyQueue();

    try (JMSContext jmsContext = connectionFactory.createContext()) {

      //Message listener on replyQueue
      JMSConsumer replyConsumer = jmsContext.createConsumer(replyQueue);
      replyConsumer.setMessageListener(msg -> {
        try {
          System.out.println("Reply message: " + msg.getBody(String.class));
          System.out.println("Reply MessageID: " + msg.getJMSMessageID());
          System.out.println("Reply CorrelationID: " + msg.getJMSCorrelationID());
        } catch (JMSException e) {
          e.printStackTrace();
        }
      });

      //Message1
      JMSProducer producer = jmsContext.createProducer();
      TextMessage message = jmsContext.createTextMessage("Order placed successfully");
      message.setJMSReplyTo(replyQueue);
      producer.send(queue, message);
      System.out.println("Message1 " + message.getJMSMessageID());

      //Receive Message
      JMSConsumer consumer = jmsContext.createConsumer(queue);
      TextMessage receivedMsg = (TextMessage) consumer.receive();
      System.out.println("Message received: " + receivedMsg.getText());

      //Reply Message
      TextMessage replyMessage = jmsContext.createTextMessage("Order Acknowledged");
      replyMessage.setJMSCorrelationID(receivedMsg.getJMSMessageID());
      jmsContext.createProducer().send(receivedMsg.getJMSReplyTo(), replyMessage);
 
      //Remember to close Otherwise will throw Exception
      replyConsumer.close();
    }
  }
}
Message1 ID:12-192.168.0.197(bc:c9:94:43:39:c4)-61701-1576468524639
Message received: Order placed successfully
Reply message: Order Acknowledged
Reply MessageID: ID:18-192.168.0.197(bc:c9:94:43:39:c4)-61701-1576468524648
Reply CorrelationID: ID:12-192.168.0.197(bc:c9:94:43:39:c4)-61701-1576468524639

This is all as part of JMS point to point messaging using Queue. You have also learned to use asynchronous message processing in JMS. Please share your feedback on this article in the comments below.

By |Last Updated: April 3rd, 2024|Categories: Java™, JMS|

Table of Contents