JMS ensures guaranteed message delivery through JMS Message Acknowledgement (Acknowledge modes). If a session is transacted, message acknowledgment is handled automatically by the commit()
method. Otherwise, the recovery is handled by the rollback()
method. In this article, we will assume that the session is not transacted. This is an advance topic, make sure you have a basic understanding of JMS. You can refer Send and Receive a message using JMS if you are new to JMS.
Message Acknowledgement is used when the session is not transacted. The message acknowledgment is a protocol that is established between the JMS Client
(JMS Producer & JMS Consumer) and the JMS Server
.
JMS provides three acknowledgment options and the recovery is handled manually.
- AUTO_ACKNOWLEDGE – JMS Session automatically acknowledges the client’s receipt of the messages. No duplicate messages delivered.
- CLIENT_ACKNOWLEDGE – JMS clients have to acknowledge by calling the message’s
acknowledge()
method. - DUPS_OK_ACKNOWLEDGE – JMS Server can send duplicate messages to the JMSConsumer without being worried. The Client app should be able to handle/process the duplicate messages in this mode.
1. JMS AUTO_ACKNOWLEDGE messaging in action
This is the default Acknowledgement mode that gets set when you create a Session
or JMSContext
. You can manually specify the AUTO_ACKNOWLEDGE
mode as below.
JMSContext jmsContext = connectionFactory.createContext(JMSContext.AUTO_ACKNOWLEDGE);
In this mode, the JMS session automatically acknowledges a client’s receipt of a message when it has either successfully returned from a call to receive or the message listener it has called to process the message successfully returns. The below code shows the usage of this mode. Link to GitHub code repo.
package lab06.message.acknowledgement;
import labxx.common.settings.CommonSettings;
import javax.jms.*;
public class AutoAcknowledgeExample {
public static void main(String[] args) {
ConnectionFactory connectionFactory = CommonSettings.getConnectionFactory();
Queue queue = CommonSettings.getDefaultQueue();
Thread messageproducer = new Thread() {
public void run() {
try (JMSContext jmsContext = connectionFactory.createContext(JMSContext.AUTO_ACKNOWLEDGE)) {
JMSProducer producer = jmsContext.createProducer();
//Send the message
Message message = jmsContext.createTextMessage("This is an AUTO_ACKNOWLEDGEMENT message");
producer.send(queue, message);
}
}
};
Thread messageConsumer = new Thread() {
public void run() {
try (JMSContext jmsContext = connectionFactory.createContext(JMSContext.AUTO_ACKNOWLEDGE)) {
JMSConsumer consumer = jmsContext.createConsumer(queue);
TextMessage msg = (TextMessage) consumer.receive();
System.out.println("Received message: " + msg.getText());
} catch (JMSException e) {
e.printStackTrace();
}
}
};
messageConsumer.start();
messageproducer.start();
}
}
package labxx.common.settings;
import javax.jms.ConnectionFactory;
import javax.jms.Queue;
import javax.jms.Topic;
import javax.naming.InitialContext;
import javax.naming.NamingException;
public class CommonSettings {
private static ConnectionFactory CONNECTION_FACTORY = null;
private static Queue PTP_QUEUE = null;
private static Topic PUB_SUB_TOPIC = null;
private static Queue DEFAULT_REPLY_QUEUE = null;
static {
try {
InitialContext initialContext = new InitialContext();
CONNECTION_FACTORY = (ConnectionFactory) initialContext.lookup("jms/__defaultConnectionFactory");
PTP_QUEUE = (Queue) initialContext.lookup("jms/PTPQueue");
DEFAULT_REPLY_QUEUE = (Queue) initialContext.lookup("jms/ReplyQueue");
PUB_SUB_TOPIC = (Topic) initialContext.lookup("jms/PubSubTopic");
} catch (NamingException e) {
e.printStackTrace();
}
}
public static ConnectionFactory getConnectionFactory() {
return CONNECTION_FACTORY;
}
public static Queue getDefaultQueue() {
return PTP_QUEUE;
}
public static Queue getDefaultReplyQueue() {
return DEFAULT_REPLY_QUEUE;
}
public static Topic getDefautTopic() {
return PUB_SUB_TOPIC;
}
}
Output
Received message: This is an AUTO_ACKNOWLEDGEMENT message
2. JMS CLIENT_ACKNOWLEDGE messaging in action
When you set the session mode to CLIENT_ACKNOWLEDGE
, the client acknowledges the message by calling it’s acknowledge method. Acknowledging a consumed message automatically acknowledges the receipt of all messages that have been delivered by its session.
The below example demonstrates the same points discussed above. Link to GitHub code repo.
package lab06.message.acknowledgement;
import labxx.common.settings.CommonSettings;
import javax.jms.*;
public class ClientAcknowledgeExample {
public static void main(String[] args) throws InterruptedException {
ConnectionFactory connectionFactory = CommonSettings.getConnectionFactory();
Queue queue = CommonSettings.getDefaultQueue();
Thread messageproducer = new Thread() {
public void run() {
try (JMSContext jmsContext = connectionFactory.createContext(JMSContext.CLIENT_ACKNOWLEDGE)) {
JMSProducer producer = jmsContext.createProducer();
//Send the message
Message message = jmsContext.createTextMessage("This is a CLIENT_ACKNOWLEDGE message");
producer.send(queue, message);
message.acknowledge(); //This is Optional
}catch (JMSException e) {
e.printStackTrace();
}
}
};
Thread messageConsumer1 = new Thread() {
public void run() {
try (JMSContext jmsContext = connectionFactory.createContext(JMSContext.CLIENT_ACKNOWLEDGE)) {
JMSConsumer consumer = jmsContext.createConsumer(queue);
TextMessage msg = (TextMessage) consumer.receive(3000);
System.out.println("Received message: " + msg.getText());
} catch (JMSException e) {
e.printStackTrace();
}
}
};
Thread messageConsumer2 = new Thread(messageConsumer1);
Thread messageConsumer3 = new Thread() {
public void run() {
try (JMSContext jmsContext = connectionFactory.createContext(JMSContext.CLIENT_ACKNOWLEDGE)) {
JMSConsumer consumer = jmsContext.createConsumer(queue);
TextMessage msg = (TextMessage) consumer.receive();
System.out.println("Received message: " + msg.getText());
Thread.sleep(500);
msg.acknowledge(); //Important
} catch (JMSException | InterruptedException e) {
e.printStackTrace();
}
}
};
messageproducer.start();
messageConsumer1.start();
messageConsumer3.start();
Thread.sleep(1000);
messageConsumer2.start();
}
}
Output
Received message: This is a CLIENT_ACKNOWLEDGE message Received message: This is a CLIENT_ACKNOWLEDGE message Exception in thread "Thread-10" java.lang.NullPointerException at lab06.message.acknowledgement.ClientAcknowledgeExample$2.run(ClientAcknowledgeExample.java:33) at java.lang.Thread.run(Thread.java:748)
As you can see in the above output, for the message producer it is not needed to acknowledge the message. But as a good practice, you should always acknowledge (line-19).
On the other hand, messageConsumer1
receives the message but does not acknowledge, therefore the message is not removed from the JMS Server. As a result, messageConsumer3
is able to receive the same message, but this time it acknowledges the message, so it gets removed from the JMS server. Due to this messageConsumer2
does not receive the message and it throws the NullPointerException
when timeout occurs.
3. JMS DUPS_OK_ACKNOWLEDGE messaging in action
In this mode, the JMS Server is told to deliver messages more than once to a consumer. The advantage here is, the server does not have the overhead of taking care of duplicate message delivery. The client application needs to be tolerated or smart enough to deal with the duplication of messages.
NOTE: This does not guarantee a great performance, use this mode only with proper testing and cautions. Otherwise just stick to the
AUTO_ACKNOWLEDGE
mode.
Here is the link to GitHub code repo.
package lab06.message.acknowledgement;
import labxx.common.settings.CommonSettings;
import javax.jms.*;
public class DupsOkAcknowledgeExample {
public static void main(String[] args) {
ConnectionFactory connectionFactory = CommonSettings.getConnectionFactory();
Queue queue = CommonSettings.getDefaultQueue();
Thread messageproducer = new Thread() {
public void run() {
try (JMSContext jmsContext = connectionFactory.createContext()) {
JMSProducer producer = jmsContext.createProducer();
//Send the message
Message message = jmsContext.createTextMessage("This is an DUPS_OK_ACKNOWLEDGE message");
producer.send(queue, message);
}
}
};
Thread messageConsumer = new Thread() {
public void run() {
try (JMSContext jmsContext = connectionFactory.createContext(JMSContext.DUPS_OK_ACKNOWLEDGE)) {
JMSConsumer consumer = jmsContext.createConsumer(queue);
TextMessage msg = (TextMessage) consumer.receive();
System.out.println("Received message: " + msg.getText());
} catch (JMSException e) {
e.printStackTrace();
}
}
};
messageConsumer.start();
messageproducer.start();
}
}
Output
Received message: This is an DUPS_OK_ACKNOWLEDGE message
In the above code example, acknowledge mode is not specified in line-14, so the consumer is using AUTO_ACKNOWLEDGE mode. It is possible to use different acknowledge modes in producers and subscribers.
In reality, you would most likely stick to AUTO_ACKNOWLEDGE mode. But, it is always good to have depth knowledge of other modes as well. This is all as part of JMS Message Acknowledgement using which it ensures guaranteed message delivery.