Quite often messages delivered to the receiver need to be filtered based on certain criteria. JMS provides message selectors that allow a JMS Consumer to specify the messages it is interested in, based on the message header. In this article, you will learn JMS Message Selectors to Filter Messages.
This is an advance tutorial, please make sure you have a basic understanding of JMS before proceeding with this article. Check out the JMS Point to Point messaging Model article if you are new to JMS.
Why Message Selectors?
When a message is broadcasted to many receivers, it is useful to place criteria into the Subscription to register specific interests. So only the interested-messages are delivered to the subscribers.
- A Message Selector is a
String
with syntax based on SQL92 conditional expression. - Only messages whose headers and properties match the selector are delivered.
- Message selectors can not reference the message body values.
- Message Selector is evaluated from left to right.
- Selector literals and operators are usually written in Upper Case, however, they are case insensitive.
1. Message Filter by Properties
Like I mentioned before, you can only filter messages based on Properties or Header, not based on the actual message content. The below example shows, only the level log is delivered to the consumer. Link to GitHub code repo.
package lab04.message.filtering;
import labxx.common.settings.CommonSettings;
import org.junit.jupiter.api.Test;
import javax.jms.*;
import java.util.UUID;
public class MessageSelectorExample {
private static ConnectionFactory connectionFactory = null;
private static Queue defaultQueue = null;
static {
connectionFactory = CommonSettings.getConnectionFactory();
defaultQueue = CommonSettings.getDefaultQueue();
}
@Test
public void messageFilterOnProperties() throws JMSException, InterruptedException {
try (JMSContext jmsContext = connectionFactory.createContext()) {
LogEvent event1 = new LogEvent("Sample ERROR Log", UUID.randomUUID().toString(), LogLevel.ERROR);
LogEvent event2 = new LogEvent("Sample DEBUG Log", UUID.randomUUID().toString(), LogLevel.DEBUG);
LogEvent event3 = new LogEvent("Sample INFO Log", UUID.randomUUID().toString(), LogLevel.INFO);
LogEvent event4 = new LogEvent("Sample WARN Log", UUID.randomUUID().toString(), LogLevel.WARN);
//NOTE - If you keep "logLevel = DEBUG", it will not work!
JMSConsumer consumer = jmsContext.createConsumer(defaultQueue, "logLevel = 'DEBUG'");
consumer.setMessageListener(msg -> {
System.out.println(msg);
try {
LogEvent event = (LogEvent) ((ObjectMessage) msg).getObject();
System.out.println(event);
} catch (JMSException e) {
e.printStackTrace();
}
});
JMSProducer producer = jmsContext.createProducer();
//send event1
ObjectMessage objectMessage = jmsContext.createObjectMessage();
objectMessage.setObject(event1);
objectMessage.setStringProperty("logLevel", event1.getLogLevel().name());
producer.send(defaultQueue, objectMessage);
//Send event2
objectMessage = jmsContext.createObjectMessage();
objectMessage.setObject(event2);
objectMessage.setStringProperty("logLevel", event2.getLogLevel().name());
producer.send(defaultQueue, objectMessage);
//Send event3
objectMessage = jmsContext.createObjectMessage();
objectMessage.setObject(event3);
objectMessage.setStringProperty("logLevel", event3.getLogLevel().name());
producer.send(defaultQueue, objectMessage);
//Send event4
objectMessage = jmsContext.createObjectMessage();
objectMessage.setObject(event4);
objectMessage.setStringProperty("logLevel", event4.getLogLevel().name());
producer.send(defaultQueue, objectMessage);
Thread.sleep(2000);
consumer.close();
}
}
}
package lab04.message.filtering;
import java.io.Serializable;
import java.time.LocalDateTime;
import java.util.StringJoiner;
public class LogEvent implements Serializable {
//Timestamp is the primary Key
private LocalDateTime id;
//Log Body
private String body;
//Unique Server Id
private String machineId;
private LogLevel logLevel;
public LogEvent() {
super();
}
public LogEvent(String body, String machineId, LogLevel logLevel) {
this.id = LocalDateTime.now();
this.body = body;
this.machineId = machineId;
this.logLevel = logLevel;
}
public LocalDateTime getId() {
return id;
}
public void setId(LocalDateTime id) {
this.id = id;
}
public String getBody() {
return body;
}
public void setBody(String body) {
this.body = body;
}
public String getMachineId() {
return machineId;
}
public void setMachineId(String machineId) {
this.machineId = machineId;
}
public LogLevel getLogLevel() {
return logLevel;
}
public void setLogLevel(LogLevel logLevel) {
this.logLevel = logLevel;
}
@Override
public String toString() {
return new StringJoiner(", ", LogEvent.class.getSimpleName() + "[", "]")
.add("id=" + id)
.add("body='" + body + "'")
.add("machineId='" + machineId + "'")
.add("logLevel=" + logLevel)
.toString();
}
}
package lab04.message.filtering;
public enum LogLevel {
TRACE, DEBUG, INFO, WARN, ERROR
}
Output
Class: com.sun.messaging.jmq.jmsclient.ObjectMessageImpl getJMSMessageID(): ID:13-192.168.0.197(98:ca:f3:fc:e3:8f)-51690-1577423107544 getJMSTimestamp(): 1577423107544 getJMSCorrelationID(): null JMSReplyTo: null JMSDestination: PTPQueue getJMSDeliveryMode(): PERSISTENT getJMSRedelivered(): false getJMSType(): null getJMSExpiration(): 0 getJMSDeliveryTime(): 0 getJMSPriority(): 4 Properties: {JMSXDeliveryCount=1, logLevel=DEBUG} LogEvent[id=2019-12-27T13:05:07.512, body='Sample DEBUG Log',
machineId='08b7c29b-0934-4f3d-8ddf-c406ea795dee', logLevel=DEBUG]
2. Message Filter by Header
You have seen message filtration based on JMS properties. Now, look at the below example to understand message filtration based on Header. A message with ERROR
level log or priority between 5 to 9 is delivered to the consumer.
package lab04.message.filtering;
import labxx.common.settings.CommonSettings;
import org.junit.jupiter.api.Test;
import javax.jms.*;
import java.util.UUID;
public class MessageSelectorExample {
private static ConnectionFactory connectionFactory = null;
private static Queue defaultQueue = null;
static {
connectionFactory = CommonSettings.getConnectionFactory();
defaultQueue = CommonSettings.getDefaultQueue();
}
@Test
public void messageFilterOnHeader() throws JMSException, InterruptedException {
try (JMSContext jmsContext = connectionFactory.createContext()) {
LogEvent event1 = new LogEvent("Sample ERROR Log", UUID.randomUUID().toString(), LogLevel.ERROR);
LogEvent event2 = new LogEvent("Sample DEBUG Log", UUID.randomUUID().toString(), LogLevel.DEBUG);
LogEvent event3 = new LogEvent("Sample INFO Log", UUID.randomUUID().toString(), LogLevel.INFO);
LogEvent event4 = new LogEvent("Sample WARN Log", UUID.randomUUID().toString(), LogLevel.WARN);
JMSConsumer consumer = jmsContext.createConsumer(defaultQueue, "logLevel = 'ERROR' OR JMSPriority BETWEEN 5 AND 9");
consumer.setMessageListener(msg -> {
System.out.println(msg);
try {
LogEvent event = (LogEvent) ((ObjectMessage) msg).getObject();
System.out.println(event);
} catch (JMSException e) {
e.printStackTrace();
}
});
JMSProducer producer = jmsContext.createProducer();
//send event1
ObjectMessage objectMessage = jmsContext.createObjectMessage();
objectMessage.setObject(event1);
objectMessage.setStringProperty("logLevel", event1.getLogLevel().name());
producer.send(defaultQueue, objectMessage);
//Send event2
objectMessage = jmsContext.createObjectMessage();
objectMessage.setObject(event2);
objectMessage.setStringProperty("logLevel", event2.getLogLevel().name());
producer.send(defaultQueue, objectMessage);
//Send event3
objectMessage = jmsContext.createObjectMessage();
objectMessage.setObject(event3);
objectMessage.setStringProperty("logLevel", event3.getLogLevel().name());
producer.setPriority(5);
producer.send(defaultQueue, objectMessage);
//Send event4
objectMessage = jmsContext.createObjectMessage();
objectMessage.setObject(event4);
objectMessage.setStringProperty("logLevel", event4.getLogLevel().name());
//Reset to normal priority
producer.setPriority(4);
producer.send(defaultQueue, objectMessage);
Thread.sleep(2000);
consumer.close();
}
}
}
Output
Class: com.sun.messaging.jmq.jmsclient.ObjectMessageImpl getJMSMessageID(): ID:12-192.168.0.197(b8:3d:5e:24:df:26)-53719-1577428536655 getJMSTimestamp(): 1577428536655 getJMSCorrelationID(): null JMSReplyTo: null JMSDestination: PTPQueue getJMSDeliveryMode(): PERSISTENT getJMSRedelivered(): false getJMSType(): null getJMSExpiration(): 0 getJMSDeliveryTime(): 0 getJMSPriority(): 4 Properties: {JMSXDeliveryCount=1, logLevel=ERROR} LogEvent[id=2019-12-27T14:35:36.623, body='Sample ERROR Log',
machineId='7d743936-aae8-44a2-a527-cd88236b6ea6', logLevel=ERROR] Class: com.sun.messaging.jmq.jmsclient.ObjectMessageImpl getJMSMessageID(): ID:14-192.168.0.197(b8:3d:5e:24:df:26)-53719-1577428536661 getJMSTimestamp(): 1577428536661 getJMSCorrelationID(): null JMSReplyTo: null JMSDestination: PTPQueue getJMSDeliveryMode(): PERSISTENT getJMSRedelivered(): false getJMSType(): null getJMSExpiration(): 0 getJMSDeliveryTime(): 0 getJMSPriority(): 5 Properties: {JMSXDeliveryCount=1, logLevel=INFO} LogEvent[id=2019-12-27T14:35:36.623, body='Sample INFO Log',
machineId='01ce3812-1869-4d23-96d8-cfc2ea6267c2', logLevel=INFO]
Rules for Selector
Like any query language, selectors are made up of Literals, Identifiers, Expressions, and Operators. I will discuss the rules for each of them below for reference.
1. Rules for Literals
- A
String
literal is enclosed in single quotes, and a single quote represented by doubled single quote. Example,logLevel = 'ERROR'
,name='Ram''s'
. - A numeric value is presented as it is with optional
+
or-
sign. Example,range BETWEEN -200 AND +30
. You can also just mention30
instead of+30
. - Numbers can also be represented in scientific notation, such as
-7E3
,6.2
etc. - Java
Long
andDouble
are also supported. - Boolean values can be represented as
TRUE
andFALSE
.
2. Rules for Identifiers
- Rules applied for Java identifiers holds true for JMS selector identifiers as well. Which means
_
and$
can be used in the identifiers. - Identifiers can not have the Java keywords.
- Identifiers can not have names like
NULL
,TRUE
,FALSE
,NOT
,AND
,OR
,BETWEEN
,LIKE
,IN
,IS
, orESCAPE
. - Identifiers are case sensitive .
- Any name beginning with
JMSX
is a JMS defined property. - Any name beginning with
JMS_
is a provider-specific property. - Any name (identifier) does not contain
JMS
, is an application-defined property.
3. Rules for Operators
- The standard rule for bracketing
()
applies here. - Logical operators are in the precedence order :
NOT
,AND
,OR
. - Comparison operators in the precedence order:
=
,>
,>=
,<
,<=
,<>
(Not equal operator). - Arithmetic Operators are in the precedence order:
- Unary Operator –
+
,-
. - Multiplication and Division –
*
,/
. - Addition and Subtraction –
+
,-
- Unary Operator –
You leaned JMS message Selectors and how it is used to filter messages that are delivered to the clients. It is very useful in real-world applications, specifically from a performance point of view. Your application can delegate a lot of processing load to the JMS providers and make sure only the necessary messages are delivered to it.