Quite often messages delivered to the receiver need to be filtered based on certain criteria. JMS provides message selectors that allow a JMS Consumer to specify the messages it is interested in, based on the message header. In this article, you will learn JMS Message Selectors to Filter Messages.
This is an advance tutorial, please make sure you have a basic understanding of JMS before proceeding with this article. Check out the JMS Point to Point messaging Model article if you are new to JMS.
Why Message Selectors?
When a message is broadcasted to many receivers, it is useful to place criteria into the Subscription to register specific interests. So only the interested-messages are delivered to the subscribers.
- A Message Selector is a
Stringwith syntax based on SQL92 conditional expression. - Only messages whose headers and properties match the selector are delivered.
- Message selectors can not reference the message body values.
- Message Selector is evaluated from left to right.
- Selector literals and operators are usually written in Upper Case, however, they are case insensitive.
1. Message Filter by Properties
Like I mentioned before, you can only filter messages based on Properties or Header, not based on the actual message content. The below example shows, only the level log is delivered to the consumer. Link to GitHub code repo.
package lab04.message.filtering;
import labxx.common.settings.CommonSettings;
import org.junit.jupiter.api.Test;
import javax.jms.*;
import java.util.UUID;
public class MessageSelectorExample {
private static ConnectionFactory connectionFactory = null;
private static Queue defaultQueue = null;
static {
connectionFactory = CommonSettings.getConnectionFactory();
defaultQueue = CommonSettings.getDefaultQueue();
}
@Test
public void messageFilterOnProperties() throws JMSException, InterruptedException {
try (JMSContext jmsContext = connectionFactory.createContext()) {
LogEvent event1 = new LogEvent("Sample ERROR Log", UUID.randomUUID().toString(), LogLevel.ERROR);
LogEvent event2 = new LogEvent("Sample DEBUG Log", UUID.randomUUID().toString(), LogLevel.DEBUG);
LogEvent event3 = new LogEvent("Sample INFO Log", UUID.randomUUID().toString(), LogLevel.INFO);
LogEvent event4 = new LogEvent("Sample WARN Log", UUID.randomUUID().toString(), LogLevel.WARN);
//NOTE - If you keep "logLevel = DEBUG", it will not work!
JMSConsumer consumer = jmsContext.createConsumer(defaultQueue, "logLevel = 'DEBUG'");
consumer.setMessageListener(msg -> {
System.out.println(msg);
try {
LogEvent event = (LogEvent) ((ObjectMessage) msg).getObject();
System.out.println(event);
} catch (JMSException e) {
e.printStackTrace();
}
});
JMSProducer producer = jmsContext.createProducer();
//send event1
ObjectMessage objectMessage = jmsContext.createObjectMessage();
objectMessage.setObject(event1);
objectMessage.setStringProperty("logLevel", event1.getLogLevel().name());
producer.send(defaultQueue, objectMessage);
//Send event2
objectMessage = jmsContext.createObjectMessage();
objectMessage.setObject(event2);
objectMessage.setStringProperty("logLevel", event2.getLogLevel().name());
producer.send(defaultQueue, objectMessage);
//Send event3
objectMessage = jmsContext.createObjectMessage();
objectMessage.setObject(event3);
objectMessage.setStringProperty("logLevel", event3.getLogLevel().name());
producer.send(defaultQueue, objectMessage);
//Send event4
objectMessage = jmsContext.createObjectMessage();
objectMessage.setObject(event4);
objectMessage.setStringProperty("logLevel", event4.getLogLevel().name());
producer.send(defaultQueue, objectMessage);
Thread.sleep(2000);
consumer.close();
}
}
}package lab04.message.filtering;
import java.io.Serializable;
import java.time.LocalDateTime;
import java.util.StringJoiner;
public class LogEvent implements Serializable {
//Timestamp is the primary Key
private LocalDateTime id;
//Log Body
private String body;
//Unique Server Id
private String machineId;
private LogLevel logLevel;
public LogEvent() {
super();
}
public LogEvent(String body, String machineId, LogLevel logLevel) {
this.id = LocalDateTime.now();
this.body = body;
this.machineId = machineId;
this.logLevel = logLevel;
}
public LocalDateTime getId() {
return id;
}
public void setId(LocalDateTime id) {
this.id = id;
}
public String getBody() {
return body;
}
public void setBody(String body) {
this.body = body;
}
public String getMachineId() {
return machineId;
}
public void setMachineId(String machineId) {
this.machineId = machineId;
}
public LogLevel getLogLevel() {
return logLevel;
}
public void setLogLevel(LogLevel logLevel) {
this.logLevel = logLevel;
}
@Override
public String toString() {
return new StringJoiner(", ", LogEvent.class.getSimpleName() + "[", "]")
.add("id=" + id)
.add("body='" + body + "'")
.add("machineId='" + machineId + "'")
.add("logLevel=" + logLevel)
.toString();
}
}
package lab04.message.filtering;
public enum LogLevel {
TRACE, DEBUG, INFO, WARN, ERROR
}
Output
Class: com.sun.messaging.jmq.jmsclient.ObjectMessageImpl
getJMSMessageID(): ID:13-192.168.0.197(98:ca:f3:fc:e3:8f)-51690-1577423107544
getJMSTimestamp(): 1577423107544
getJMSCorrelationID(): null
JMSReplyTo: null
JMSDestination: PTPQueue
getJMSDeliveryMode(): PERSISTENT
getJMSRedelivered(): false
getJMSType(): null
getJMSExpiration(): 0
getJMSDeliveryTime(): 0
getJMSPriority(): 4
Properties: {JMSXDeliveryCount=1, logLevel=DEBUG}
LogEvent[id=2019-12-27T13:05:07.512, body='Sample DEBUG Log',
machineId='08b7c29b-0934-4f3d-8ddf-c406ea795dee', logLevel=DEBUG]
2. Message Filter by Header
You have seen message filtration based on JMS properties. Now, look at the below example to understand message filtration based on Header. A message with ERROR level log or priority between 5 to 9 is delivered to the consumer.
package lab04.message.filtering;
import labxx.common.settings.CommonSettings;
import org.junit.jupiter.api.Test;
import javax.jms.*;
import java.util.UUID;
public class MessageSelectorExample {
private static ConnectionFactory connectionFactory = null;
private static Queue defaultQueue = null;
static {
connectionFactory = CommonSettings.getConnectionFactory();
defaultQueue = CommonSettings.getDefaultQueue();
}
@Test
public void messageFilterOnHeader() throws JMSException, InterruptedException {
try (JMSContext jmsContext = connectionFactory.createContext()) {
LogEvent event1 = new LogEvent("Sample ERROR Log", UUID.randomUUID().toString(), LogLevel.ERROR);
LogEvent event2 = new LogEvent("Sample DEBUG Log", UUID.randomUUID().toString(), LogLevel.DEBUG);
LogEvent event3 = new LogEvent("Sample INFO Log", UUID.randomUUID().toString(), LogLevel.INFO);
LogEvent event4 = new LogEvent("Sample WARN Log", UUID.randomUUID().toString(), LogLevel.WARN);
JMSConsumer consumer = jmsContext.createConsumer(defaultQueue, "logLevel = 'ERROR' OR JMSPriority BETWEEN 5 AND 9");
consumer.setMessageListener(msg -> {
System.out.println(msg);
try {
LogEvent event = (LogEvent) ((ObjectMessage) msg).getObject();
System.out.println(event);
} catch (JMSException e) {
e.printStackTrace();
}
});
JMSProducer producer = jmsContext.createProducer();
//send event1
ObjectMessage objectMessage = jmsContext.createObjectMessage();
objectMessage.setObject(event1);
objectMessage.setStringProperty("logLevel", event1.getLogLevel().name());
producer.send(defaultQueue, objectMessage);
//Send event2
objectMessage = jmsContext.createObjectMessage();
objectMessage.setObject(event2);
objectMessage.setStringProperty("logLevel", event2.getLogLevel().name());
producer.send(defaultQueue, objectMessage);
//Send event3
objectMessage = jmsContext.createObjectMessage();
objectMessage.setObject(event3);
objectMessage.setStringProperty("logLevel", event3.getLogLevel().name());
producer.setPriority(5);
producer.send(defaultQueue, objectMessage);
//Send event4
objectMessage = jmsContext.createObjectMessage();
objectMessage.setObject(event4);
objectMessage.setStringProperty("logLevel", event4.getLogLevel().name());
//Reset to normal priority
producer.setPriority(4);
producer.send(defaultQueue, objectMessage);
Thread.sleep(2000);
consumer.close();
}
}
}Output
Class: com.sun.messaging.jmq.jmsclient.ObjectMessageImpl
getJMSMessageID(): ID:12-192.168.0.197(b8:3d:5e:24:df:26)-53719-1577428536655
getJMSTimestamp(): 1577428536655
getJMSCorrelationID(): null
JMSReplyTo: null
JMSDestination: PTPQueue
getJMSDeliveryMode(): PERSISTENT
getJMSRedelivered(): false
getJMSType(): null
getJMSExpiration(): 0
getJMSDeliveryTime(): 0
getJMSPriority(): 4
Properties: {JMSXDeliveryCount=1, logLevel=ERROR}
LogEvent[id=2019-12-27T14:35:36.623, body='Sample ERROR Log',
machineId='7d743936-aae8-44a2-a527-cd88236b6ea6', logLevel=ERROR]
Class: com.sun.messaging.jmq.jmsclient.ObjectMessageImpl
getJMSMessageID(): ID:14-192.168.0.197(b8:3d:5e:24:df:26)-53719-1577428536661
getJMSTimestamp(): 1577428536661
getJMSCorrelationID(): null
JMSReplyTo: null
JMSDestination: PTPQueue
getJMSDeliveryMode(): PERSISTENT
getJMSRedelivered(): false
getJMSType(): null
getJMSExpiration(): 0
getJMSDeliveryTime(): 0
getJMSPriority(): 5
Properties: {JMSXDeliveryCount=1, logLevel=INFO}
LogEvent[id=2019-12-27T14:35:36.623, body='Sample INFO Log',
machineId='01ce3812-1869-4d23-96d8-cfc2ea6267c2', logLevel=INFO]
Rules for Selector
Like any query language, selectors are made up of Literals, Identifiers, Expressions, and Operators. I will discuss the rules for each of them below for reference.
1. Rules for Literals
- A
Stringliteral is enclosed in single quotes, and a single quote represented by doubled single quote. Example,logLevel = 'ERROR',name='Ram''s'. - A numeric value is presented as it is with optional
+or-sign. Example,range BETWEEN -200 AND +30. You can also just mention30instead of+30. - Numbers can also be represented in scientific notation, such as
-7E3,6.2etc. - Java
LongandDoubleare also supported. - Boolean values can be represented as
TRUEandFALSE.
2. Rules for Identifiers
- Rules applied for Java identifiers holds true for JMS selector identifiers as well. Which means
_and$can be used in the identifiers. - Identifiers can not have the Java keywords.
- Identifiers can not have names like
NULL,TRUE,FALSE,NOT,AND,OR,BETWEEN,LIKE,IN,IS, orESCAPE. - Identifiers are case sensitive .
- Any name beginning with
JMSXis a JMS defined property. - Any name beginning with
JMS_is a provider-specific property. - Any name (identifier) does not contain
JMS, is an application-defined property.
3. Rules for Operators
- The standard rule for bracketing
()applies here. - Logical operators are in the precedence order :
NOT,AND,OR. - Comparison operators in the precedence order:
=,>,>=,<,<=,<>(Not equal operator). - Arithmetic Operators are in the precedence order:
- Unary Operator -
+,-. - Multiplication and Division -
*,/. - Addition and Subtraction -
+,-
- Unary Operator -
You leaned JMS message Selectors and how it is used to filter messages that are delivered to the clients. It is very useful in real-world applications, specifically from a performance point of view. Your application can delegate a lot of processing load to the JMS providers and make sure only the necessary messages are delivered to it.