In this article, you will learn the JMS pub-sub (publish-subscribe) messaging model. As you read in the JMS introduction article, in a Pub/Sub model, the client sends a message to multiple receivers via an intermediator called Topic. The sender is often referred as Publisher and the receiver as Subscribers.
JMS pub/sub messaging example
Below is a simple code example that demonstrates the working of the Publish/Subscribe messaging model. I have created 2 main threads, publisher
and subscriber1
. Cloned the subscriber1 to subscriber2
. So basically I have one message publisher and 2 message subscribers. Link to GitHub repo.
package lab03.message.pubsub;
import labxx.common.settings.CommonSettings;
import javax.jms.*;
public class SimplePubSubExample {
private static ConnectionFactory connectionFactory = null;
private static Topic defaultTopic = null;
static {
connectionFactory = CommonSettings.getConnectionFactory();
defaultTopic = CommonSettings.getDefautTopic();
}
public static void main(String[] args) {
Thread publisher = new Thread(){
@Override
public void run(){
try(JMSContext jmsContext = connectionFactory.createContext()) {
Thread.sleep(1000);
JMSProducer producer = jmsContext.createProducer();
TextMessage message = jmsContext.createTextMessage("World needs to worry about the Climate changes");
producer.send(defaultTopic, message);
} catch (InterruptedException ex){
ex.printStackTrace();
}
}
};
Thread subscriber1 = new Thread(){
@Override
public void run(){
try(JMSContext jmsContext = connectionFactory.createContext()) {
JMSConsumer consumer = jmsContext.createConsumer(defaultTopic);
System.out.println("Message received: " + consumer.receive().getBody(String.class));
} catch (JMSException e){
e.printStackTrace();
}
}
};
Thread subscriber2 = new Thread(subscriber1);
publisher.start();
subscriber1.start();
subscriber2.start();
}
}
package labxx.common.settings;
import javax.jms.ConnectionFactory;
import javax.jms.Queue;
import javax.jms.Topic;
import javax.naming.InitialContext;
import javax.naming.NamingException;
public class CommonSettings {
private static ConnectionFactory CONNECTION_FACTORY = null;
private static Queue PTP_QUEUE = null;
private static Topic PUB_SUB_TOPIC = null;
private static Queue DEFAULT_REPLY_QUEUE = null;
static {
try {
InitialContext initialContext = new InitialContext();
CONNECTION_FACTORY = (ConnectionFactory) initialContext.lookup("jms/__defaultConnectionFactory");
PTP_QUEUE = (Queue) initialContext.lookup("jms/PTPQueue");
DEFAULT_REPLY_QUEUE = (Queue) initialContext.lookup("jms/ReplyQueue");
PUB_SUB_TOPIC = (Topic) initialContext.lookup("jms/PubSubTopic");
} catch (NamingException e) {
e.printStackTrace();
}
}
public static ConnectionFactory getConnectionFactory() {
return CONNECTION_FACTORY;
}
public static Queue getDefaultQueue() {
return PTP_QUEUE;
}
public static Queue getDefaultReplyQueue() {
return DEFAULT_REPLY_QUEUE;
}
public static Topic getDefautTopic() {
return PUB_SUB_TOPIC;
}
}
Output
Message received: World needs to worry about the Climate changes Message received: World needs to worry about the Climate changes
NOTE: There is typically some latency in all pub/sub systems, the code you write should take the delay into consideration.
Problem with a normal message Subscriber
The problem with a normal message Subscriber (like the one used in the above example), it is not durable. Meaning, if the consumer goes down (gets closed) for some reason, it will not be able to receive the previous messages from the Topic
after it resumes.
Let us look at the below code, the publisher sends 7 messages and the consumer only receives one message. Once the consumer is closed and recreated, it does not get the messages from the topic. Link to GitHub repo.
package lab03.message.pubsub;
import labxx.common.settings.CommonSettings;
import javax.jms.*;
public class NormalConsumerProblem {
private static ConnectionFactory connectionFactory = null;
private static Topic defaultTopic = null;
static {
connectionFactory = CommonSettings.getConnectionFactory();
defaultTopic = CommonSettings.getDefautTopic();
}
public static void main(String[] args) {
Thread publisher = new Thread() {
@Override
public void run() {
try (JMSContext jmsContext = connectionFactory.createContext()) {
JMSProducer producer = jmsContext.createProducer();
Thread.sleep(1000);
for (int i = 1; i < 7; i++) {
producer.send(defaultTopic, "Update " + i);
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
};
//Normal Consumer
Thread consumer = new Thread() {
@Override
public void run() {
try (JMSContext jmsContext = connectionFactory.createContext()) {
JMSConsumer consumer = jmsContext.createConsumer(defaultTopic);
System.out.println(consumer.receive().getBody(String.class));
Thread.sleep(2000);
consumer.close();
consumer = jmsContext.createConsumer(defaultTopic);
for (int i = 1; i < 6; i++) {
System.out.println(consumer.receive().getBody(String.class));
}
} catch (JMSException | InterruptedException e) {
e.printStackTrace();
}
}
};
publisher.start();
consumer.start();
}
}
Output
Update 1
In the real-world scenario, you need a more reliable way to subscribe to the topics. JMS provides DurableConsumer
to overcome this problem.
DurableConsumer example in JMS pub-sub
- When all messages from a topic must be received, durable subscriber (
DurableConsumer
) should be used. - JMS ensures that messages published while a durable subscriber is inactive are retained by JMS and delivered when the subscriber subsequently becomes active.
- Non-durable subscribers are used when the consumer can afford to miss messages when it is inactive.
Below code demonstrates the usage of DurableConsumer
. Unlike the previous example, the consumer here receives all the messages from the topic
. Link to GitHub.
package lab03.message.pubsub;
import labxx.common.settings.CommonSettings;
import javax.jms.*;
public class DurableConsumerExample {
private static ConnectionFactory connectionFactory = null;
private static Topic defaultTopic = null;
static {
connectionFactory = CommonSettings.getConnectionFactory();
defaultTopic = CommonSettings.getDefautTopic();
}
public static void main(String[] args) {
Thread publisher = new Thread() {
@Override
public void run() {
try (JMSContext jmsContext = connectionFactory.createContext()) {
JMSProducer producer = jmsContext.createProducer();
Thread.sleep(1000);
for (int i = 1; i < 7; i++) {
producer.send(defaultTopic, "Update " + i);
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
};
//Durable Consumer
Thread durableConsumer = new Thread() {
@Override
public void run() {
try (JMSContext jmsContext = connectionFactory.createContext()) {
jmsContext.setClientID("exampleApp");//important
JMSConsumer consumer = jmsContext.createDurableConsumer(defaultTopic, "logConsumer");
System.out.println(consumer.receive().getBody(String.class));
Thread.sleep(2000);
consumer.close();
consumer = jmsContext.createDurableConsumer(defaultTopic, "logConsumer");
for (int i = 1; i < 6; i++) {
System.out.println(consumer.receive().getBody(String.class));
}
} catch (JMSException | InterruptedException e) {
e.printStackTrace();
}
}
};
publisher.start();
durableConsumer.start();
}
}
Output
Update 1 Update 2 Update 3 Update 4 Update 5 Update 6
NOTE: It is important to set ClientID, this is usually the name of the subscribing client-app. Also, remember to set the
durableConsumer
name as shown in line-38 & line-42.
SharedConsumer to load balance
In JMS 2.0, multiple subscribers can listen to a Topic and they can distribute the message consumption task. Occasionally, you might need to load balance the multiple messages coming from a topic. This is when you can use SharedConsumer
to distribute the message consumption load.
Below example shows the creation of sharedConsumer
, remember to assign the same subscriber name as shown in line-37, line-38. Link to code on GitHub.
package lab03.message.pubsub;
import labxx.common.settings.CommonSettings;
import javax.jms.*;
public class SharedConsumerExample {
private static ConnectionFactory connectionFactory = null;
private static Topic defaultTopic = null;
static {
connectionFactory = CommonSettings.getConnectionFactory();
defaultTopic = CommonSettings.getDefautTopic();
}
public static void main(String[] args) {
Thread publisher = new Thread() {
@Override
public void run() {
try (JMSContext jmsContext = connectionFactory.createContext()) {
JMSProducer producer = jmsContext.createProducer();
Thread.sleep(1000);
for (int i = 1; i < 7; i++) {
producer.send(defaultTopic, "Update " + i);
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
};
//Shared Consumer
Thread sharedConsumer = new Thread() {
@Override
public void run() {
try (JMSContext jmsContext = connectionFactory.createContext()) {
JMSConsumer sharedConsumer1 = jmsContext.createSharedConsumer(defaultTopic, "sharedSubscriber");
JMSConsumer sharedConsumer2 = jmsContext.createSharedConsumer(defaultTopic, "sharedSubscriber");
for (int i = 0; i < 3; i++) {
System.out.println("Shared Consumer1: " + sharedConsumer1.receive().getBody(String.class));
System.out.println("Shared Consumer2: " + sharedConsumer2.receive().getBody(String.class));
}
Thread.sleep(3000);
sharedConsumer1.close();
sharedConsumer2.close();
} catch (JMSException | InterruptedException e) {
e.printStackTrace();
}
}
};
publisher.start();
sharedConsumer.start();
}
}
Output
Shared Consumer1: Update 1 Shared Consumer2: Update 2 Shared Consumer1: Update 3 Shared Consumer2: Update 4 Shared Consumer1: Update 5 Shared Consumer2: Update 6
Additionally, you can also use SharedDurableConsumer
which gives you combined abilities from DurableConsumer
and SharedConsumer
.
Async message Subscription
I have discussed the async subscription in the JMS Point-to-point Messaging article. You can use the same listener pattern to subscribe to a Topic asynchronously. The below code shows a simple example of an async message subscription. Code link to GitHub.
package lab03.message.pubsub;
import labxx.common.settings.CommonSettings;
import javax.jms.*;
public class AsyncPubSubExample {
private static ConnectionFactory connectionFactory = null;
private static Topic defaultTopic = null;
static {
connectionFactory = CommonSettings.getConnectionFactory();
defaultTopic = CommonSettings.getDefautTopic();
}
public static void main(String[] args) throws InterruptedException {
try (JMSContext jmsContext = connectionFactory.createContext()) {
JMSProducer producer = jmsContext.createProducer();
JMSConsumer consumer = jmsContext.createConsumer(defaultTopic);
consumer.setMessageListener(msg -> {
try {
System.out.println(msg.getBody(String.class));
} catch (JMSException e) {
e.printStackTrace();
}
});
for (int i = 1; i < 7; i++) {
producer.send(defaultTopic, "Message " + i);
}
Thread.sleep(1000);
consumer.close();
}
}
}
Output
Message 1 Message 2 Message 3 Message 4 Message 5 Message 6
This is all as part of JMS pub-sub message model, you have got a clear idea about the use of Topic in jms pub/sub. You learned about different consumers like DurableConsumer
, SharedConsumer
, SharedDurableConsumer
and also async message processing. Please share your thoughts in the comments below.