In this article, you will learn the JMS pub-sub (publish-subscribe) messaging model. As you read in the JMS introduction article, in a Pub/Sub model, the client sends a message to multiple receivers via an intermediator called Topic. The sender is often referred as Publisher and the receiver as Subscribers.

JMS pub/sub messaging example

Below is a simple code example that demonstrates the working of the Publish/Subscribe messaging model. I have created 2 main threads, publisher and subscriber1. Cloned the subscriber1 to subscriber2. So basically I have one message publisher and 2 message subscribers. Link to GitHub repo.

Java
package lab03.message.pubsub;

import labxx.common.settings.CommonSettings;
import javax.jms.*;

public class SimplePubSubExample {
  private static ConnectionFactory connectionFactory = null;
  private static Topic defaultTopic = null;
  static {
    connectionFactory = CommonSettings.getConnectionFactory();
    defaultTopic = CommonSettings.getDefautTopic();
  }

  public static void main(String[] args) {
    Thread publisher = new Thread(){
      @Override
      public void run(){
        try(JMSContext jmsContext = connectionFactory.createContext()) {
          Thread.sleep(1000);
          JMSProducer producer = jmsContext.createProducer();
          TextMessage message = jmsContext.createTextMessage("World needs to worry about the Climate changes");
          producer.send(defaultTopic, message);
        } catch (InterruptedException ex){
          ex.printStackTrace();
        }
      }
    };
  
    Thread subscriber1 = new Thread(){
      @Override
      public void run(){
        try(JMSContext jmsContext = connectionFactory.createContext()) {
          JMSConsumer consumer = jmsContext.createConsumer(defaultTopic);
          System.out.println("Message received: " + consumer.receive().getBody(String.class));
        } catch (JMSException e){
          e.printStackTrace();
        }
      }
    };
  
    Thread subscriber2 = new Thread(subscriber1);
    publisher.start();
    subscriber1.start();
    subscriber2.start();
  }
}
Java
package labxx.common.settings;

import javax.jms.ConnectionFactory;
import javax.jms.Queue;
import javax.jms.Topic;
import javax.naming.InitialContext;
import javax.naming.NamingException;

public class CommonSettings {
  private static ConnectionFactory CONNECTION_FACTORY = null;
  private static Queue PTP_QUEUE = null;
  private static Topic PUB_SUB_TOPIC = null;
  private static Queue DEFAULT_REPLY_QUEUE = null;

  static {
    try {
      InitialContext initialContext = new InitialContext();
      CONNECTION_FACTORY = (ConnectionFactory) initialContext.lookup("jms/__defaultConnectionFactory");
      PTP_QUEUE = (Queue) initialContext.lookup("jms/PTPQueue");
      DEFAULT_REPLY_QUEUE = (Queue) initialContext.lookup("jms/ReplyQueue");
      PUB_SUB_TOPIC = (Topic) initialContext.lookup("jms/PubSubTopic");
    } catch (NamingException e) {
      e.printStackTrace();
    }
  }

  public static ConnectionFactory getConnectionFactory() {
    return CONNECTION_FACTORY;
  }
  public static Queue getDefaultQueue() {
    return PTP_QUEUE;
  }
  public static Queue getDefaultReplyQueue() {
    return DEFAULT_REPLY_QUEUE;
  }
  public static Topic getDefautTopic() {
    return PUB_SUB_TOPIC;
  }
}

Output

Message received: World needs to worry about the Climate changes
Message received: World needs to worry about the Climate changes

NOTE: There is typically some latency in all pub/sub systems, the code you write should take the delay into consideration.

Problem with a normal message Subscriber

The problem with a normal message Subscriber (like the one used in the above example), it is not durable. Meaning, if the consumer goes down (gets closed) for some reason, it will not be able to receive the previous messages from the Topic after it resumes.

Let us look at the below code, the publisher sends 7 messages and the consumer only receives one message. Once the consumer is closed and recreated, it does not get the messages from the topic. Link to GitHub repo.

Java
package lab03.message.pubsub;

import labxx.common.settings.CommonSettings;
import javax.jms.*;

public class NormalConsumerProblem {
  private static ConnectionFactory connectionFactory = null;
  private static Topic defaultTopic = null;

  static {
    connectionFactory = CommonSettings.getConnectionFactory();
    defaultTopic = CommonSettings.getDefautTopic();
  }

  public static void main(String[] args) {
    Thread publisher = new Thread() {
      @Override
      public void run() {
        try (JMSContext jmsContext = connectionFactory.createContext()) {
          JMSProducer producer = jmsContext.createProducer();
          Thread.sleep(1000);
          for (int i = 1; i < 7; i++) {
            producer.send(defaultTopic, "Update " + i);
          }
        } catch (InterruptedException e) {
          e.printStackTrace();
        }
      }
    };
  
    //Normal Consumer
    Thread consumer = new Thread() {
      @Override
      public void run() {
        try (JMSContext jmsContext = connectionFactory.createContext()) {
          JMSConsumer consumer = jmsContext.createConsumer(defaultTopic);
          System.out.println(consumer.receive().getBody(String.class));
          Thread.sleep(2000);
          consumer.close();
          consumer = jmsContext.createConsumer(defaultTopic);
          for (int i = 1; i < 6; i++) {
            System.out.println(consumer.receive().getBody(String.class));
          }
        } catch (JMSException | InterruptedException e) {
          e.printStackTrace();
        }
      }
    };
    publisher.start();
    consumer.start();
  }
}

Output

Update 1

In the real-world scenario, you need a more reliable way to subscribe to the topics. JMS provides DurableConsumer to overcome this problem.

DurableConsumer example in JMS pub-sub

  • When all messages from a topic must be received, durable subscriber (DurableConsumer) should be used.
  • JMS ensures that messages published while a durable subscriber is inactive are retained by JMS and delivered when the subscriber subsequently becomes active.
  • Non-durable subscribers are used when the consumer can afford to miss messages when it is inactive.

Below code demonstrates the usage of DurableConsumer. Unlike the previous example, the consumer here receives all the messages from the topic. Link to GitHub.

Java
package lab03.message.pubsub;

import labxx.common.settings.CommonSettings;
import javax.jms.*;

public class DurableConsumerExample {
  private static ConnectionFactory connectionFactory = null;
  private static Topic defaultTopic = null;

  static {
    connectionFactory = CommonSettings.getConnectionFactory();
    defaultTopic = CommonSettings.getDefautTopic();
  }

  public static void main(String[] args) {
    Thread publisher = new Thread() {
      @Override
      public void run() {
        try (JMSContext jmsContext = connectionFactory.createContext()) {
          JMSProducer producer = jmsContext.createProducer();
          Thread.sleep(1000);
          for (int i = 1; i < 7; i++) {
            producer.send(defaultTopic, "Update " + i);
          }
        } catch (InterruptedException e) {
          e.printStackTrace();
        }
      }
    };
  
    //Durable Consumer
    Thread durableConsumer = new Thread() {
      @Override
      public void run() {
        try (JMSContext jmsContext = connectionFactory.createContext()) {
          jmsContext.setClientID("exampleApp");//important
          JMSConsumer consumer = jmsContext.createDurableConsumer(defaultTopic, "logConsumer");
          System.out.println(consumer.receive().getBody(String.class));
          Thread.sleep(2000);
          consumer.close();
          consumer = jmsContext.createDurableConsumer(defaultTopic, "logConsumer");
          for (int i = 1; i < 6; i++) {
            System.out.println(consumer.receive().getBody(String.class));
          }
        } catch (JMSException | InterruptedException e) {
          e.printStackTrace();
        }
      }
    };

    publisher.start();
    durableConsumer.start();
  }
}

Output

Update 1
Update 2
Update 3
Update 4
Update 5
Update 6

NOTE: It is important to set ClientID, this is usually the name of the subscribing client-app. Also, remember to set the durableConsumer name as shown in line-38 & line-42.

SharedConsumer to load balance

In JMS 2.0, multiple subscribers can listen to a Topic and they can distribute the message consumption task. Occasionally, you might need to load balance the multiple messages coming from a topic. This is when you can use SharedConsumer to distribute the message consumption load.

Below example shows the creation of sharedConsumer, remember to assign the same subscriber name as shown in line-37, line-38. Link to code on GitHub.

Java
package lab03.message.pubsub;

import labxx.common.settings.CommonSettings;
import javax.jms.*;

public class SharedConsumerExample {
  private static ConnectionFactory connectionFactory = null;
  private static Topic defaultTopic = null;

  static {
    connectionFactory = CommonSettings.getConnectionFactory();
    defaultTopic = CommonSettings.getDefautTopic();
  }

  public static void main(String[] args) {
    Thread publisher = new Thread() {
      @Override
      public void run() {
        try (JMSContext jmsContext = connectionFactory.createContext()) {
          JMSProducer producer = jmsContext.createProducer();
          Thread.sleep(1000);
          for (int i = 1; i < 7; i++) {
            producer.send(defaultTopic, "Update " + i);
          }
        } catch (InterruptedException e) {
          e.printStackTrace();
        }
      }
    };
   
    //Shared Consumer
    Thread sharedConsumer = new Thread() {
      @Override
      public void run() {
        try (JMSContext jmsContext = connectionFactory.createContext()) {
          JMSConsumer sharedConsumer1 = jmsContext.createSharedConsumer(defaultTopic, "sharedSubscriber");
          JMSConsumer sharedConsumer2 = jmsContext.createSharedConsumer(defaultTopic, "sharedSubscriber");
          for (int i = 0; i < 3; i++) {
            System.out.println("Shared Consumer1: " + sharedConsumer1.receive().getBody(String.class));
            System.out.println("Shared Consumer2: " + sharedConsumer2.receive().getBody(String.class));
          }
          Thread.sleep(3000);
          sharedConsumer1.close();
          sharedConsumer2.close();
        } catch (JMSException | InterruptedException e) {
          e.printStackTrace();
        }
      }
    };

    publisher.start();
    sharedConsumer.start();
  }
}

Output

Shared Consumer1: Update 1
Shared Consumer2: Update 2
Shared Consumer1: Update 3
Shared Consumer2: Update 4
Shared Consumer1: Update 5
Shared Consumer2: Update 6

Additionally, you can also use SharedDurableConsumer which gives you combined abilities from DurableConsumer and SharedConsumer.

Async message Subscription

I have discussed the async subscription in the JMS Point-to-point Messaging article. You can use the same listener pattern to subscribe to a Topic asynchronously. The below code shows a simple example of an async message subscription. Code link to GitHub.

Java
package lab03.message.pubsub;

import labxx.common.settings.CommonSettings;
import javax.jms.*;

public class AsyncPubSubExample {
  private static ConnectionFactory connectionFactory = null;
  private static Topic defaultTopic = null;

  static {
    connectionFactory = CommonSettings.getConnectionFactory();
    defaultTopic = CommonSettings.getDefautTopic();
  }

  public static void main(String[] args) throws InterruptedException {
    try (JMSContext jmsContext = connectionFactory.createContext()) {
      JMSProducer producer = jmsContext.createProducer();
      JMSConsumer consumer = jmsContext.createConsumer(defaultTopic);
      consumer.setMessageListener(msg -> {
        try {
          System.out.println(msg.getBody(String.class));
        } catch (JMSException e) {
          e.printStackTrace();
        }
      });
      for (int i = 1; i < 7; i++) {
        producer.send(defaultTopic, "Message " + i);
      }
      Thread.sleep(1000);
      consumer.close();
    }
  }
}

Output

Message 1
Message 2
Message 3
Message 4
Message 5
Message 6

This is all as part of JMS pub-sub message model, you have got a clear idea about the use of Topic in jms pub/sub. You learned about different consumers like DurableConsumer, SharedConsumer, SharedDurableConsumer and also async message processing. Please share your thoughts in the comments below.

By |Last Updated: April 3rd, 2024|Categories: Java™, JMS|