Quite often messages delivered to the receiver need to be filtered based on certain criteria. JMS provides message selectors that allow a JMS Consumer to specify the messages it is interested in, based on the message header. In this article, you will learn JMS Message Selectors to Filter Messages.

This is an advance tutorial, please make sure you have a basic understanding of JMS before proceeding with this article. Check out the JMS Point to Point messaging Model article if you are new to JMS.

Why Message Selectors?

When a message is broadcasted to many receivers, it is useful to place criteria into the Subscription to register specific interests. So only the interested-messages are delivered to the subscribers.

  • A Message Selector is a String with syntax based on SQL92 conditional expression.
  • Only messages whose headers and properties match the selector are delivered.
  • Message selectors can not reference the message body values.
  • Message Selector is evaluated from left to right.
  • Selector literals and operators are usually written in Upper Case, however, they are case insensitive.

1. Message Filter by Properties

Like I mentioned before, you can only filter messages based on Properties or Header, not based on the actual message content. The below example shows, only the level log is delivered to the consumer. Link to GitHub code repo.

Java
package lab04.message.filtering;

import labxx.common.settings.CommonSettings;
import org.junit.jupiter.api.Test;
import javax.jms.*;
import java.util.UUID;

public class MessageSelectorExample {
  private static ConnectionFactory connectionFactory = null;
  private static Queue defaultQueue = null;
  static {
    connectionFactory = CommonSettings.getConnectionFactory();
    defaultQueue = CommonSettings.getDefaultQueue();
  }

  @Test
  public void messageFilterOnProperties() throws JMSException, InterruptedException {
    try (JMSContext jmsContext = connectionFactory.createContext()) {
      LogEvent event1 = new LogEvent("Sample ERROR Log", UUID.randomUUID().toString(), LogLevel.ERROR);
      LogEvent event2 = new LogEvent("Sample DEBUG Log", UUID.randomUUID().toString(), LogLevel.DEBUG);
      LogEvent event3 = new LogEvent("Sample INFO Log", UUID.randomUUID().toString(), LogLevel.INFO);
      LogEvent event4 = new LogEvent("Sample WARN Log", UUID.randomUUID().toString(), LogLevel.WARN);

      //NOTE - If you keep "logLevel = DEBUG", it will not work!
      JMSConsumer consumer = jmsContext.createConsumer(defaultQueue, "logLevel = 'DEBUG'");
 
      consumer.setMessageListener(msg -> {
        System.out.println(msg);
        try {
          LogEvent event = (LogEvent) ((ObjectMessage) msg).getObject();
          System.out.println(event);
        } catch (JMSException e) {
          e.printStackTrace();
        }
      });

      JMSProducer producer = jmsContext.createProducer();

      //send event1
      ObjectMessage objectMessage = jmsContext.createObjectMessage();
      objectMessage.setObject(event1);
      objectMessage.setStringProperty("logLevel", event1.getLogLevel().name());
      producer.send(defaultQueue, objectMessage);

      //Send event2
      objectMessage = jmsContext.createObjectMessage();
      objectMessage.setObject(event2);
      objectMessage.setStringProperty("logLevel", event2.getLogLevel().name());
      producer.send(defaultQueue, objectMessage);

      //Send event3
      objectMessage = jmsContext.createObjectMessage();
      objectMessage.setObject(event3);
      objectMessage.setStringProperty("logLevel", event3.getLogLevel().name());
      producer.send(defaultQueue, objectMessage);
 
      //Send event4
      objectMessage = jmsContext.createObjectMessage();
      objectMessage.setObject(event4);
      objectMessage.setStringProperty("logLevel", event4.getLogLevel().name());
      producer.send(defaultQueue, objectMessage);
      Thread.sleep(2000);
      consumer.close();
    }
  }
}
Java
package lab04.message.filtering;

import java.io.Serializable;
import java.time.LocalDateTime;
import java.util.StringJoiner;

public class LogEvent implements Serializable {
  //Timestamp is the primary Key
  private LocalDateTime id;

  //Log Body
  private String body;

  //Unique Server Id
  private String machineId;
  private LogLevel logLevel;
  public LogEvent() {
    super();
  }

  public LogEvent(String body, String machineId, LogLevel logLevel) {
    this.id = LocalDateTime.now();
    this.body = body;
    this.machineId = machineId;
    this.logLevel = logLevel;
  }
  public LocalDateTime getId() {
    return id;
  }
  public void setId(LocalDateTime id) {
    this.id = id;
  }
  public String getBody() {
    return body;
  }
  public void setBody(String body) {
    this.body = body;
  }
  public String getMachineId() {
    return machineId;
  }
  public void setMachineId(String machineId) {
    this.machineId = machineId;
  }
  public LogLevel getLogLevel() {
    return logLevel;
  }
  public void setLogLevel(LogLevel logLevel) {
    this.logLevel = logLevel;
  }
  @Override
  public String toString() {
    return new StringJoiner(", ", LogEvent.class.getSimpleName() + "[", "]")
        .add("id=" + id)
        .add("body='" + body + "'")
        .add("machineId='" + machineId + "'")
        .add("logLevel=" + logLevel)
        .toString();
  }
}
Java
package lab04.message.filtering;

public enum LogLevel {
  TRACE, DEBUG, INFO, WARN, ERROR
}

Output

Class:			com.sun.messaging.jmq.jmsclient.ObjectMessageImpl
getJMSMessageID():	ID:13-192.168.0.197(98:ca:f3:fc:e3:8f)-51690-1577423107544
getJMSTimestamp():	1577423107544
getJMSCorrelationID():	null
JMSReplyTo:		null
JMSDestination:		PTPQueue
getJMSDeliveryMode():	PERSISTENT
getJMSRedelivered():	false
getJMSType():		null
getJMSExpiration():	0
getJMSDeliveryTime():	0
getJMSPriority():	4
Properties:		{JMSXDeliveryCount=1, logLevel=DEBUG}
LogEvent[id=2019-12-27T13:05:07.512, body='Sample DEBUG Log',
machineId='08b7c29b-0934-4f3d-8ddf-c406ea795dee', logLevel=DEBUG]

2. Message Filter by Header

You have seen message filtration based on JMS properties. Now, look at the below example to understand message filtration based on Header. A message with ERROR level log or priority between 5 to 9 is delivered to the consumer.

Java
package lab04.message.filtering;

import labxx.common.settings.CommonSettings;
import org.junit.jupiter.api.Test;
import javax.jms.*;
import java.util.UUID;

public class MessageSelectorExample {
  private static ConnectionFactory connectionFactory = null;
  private static Queue defaultQueue = null;
  static {
    connectionFactory = CommonSettings.getConnectionFactory();
    defaultQueue = CommonSettings.getDefaultQueue();
  }

  @Test
  public void messageFilterOnHeader() throws JMSException, InterruptedException {
    try (JMSContext jmsContext = connectionFactory.createContext()) {
      LogEvent event1 = new LogEvent("Sample ERROR Log", UUID.randomUUID().toString(), LogLevel.ERROR);
      LogEvent event2 = new LogEvent("Sample DEBUG Log", UUID.randomUUID().toString(), LogLevel.DEBUG);
      LogEvent event3 = new LogEvent("Sample INFO Log", UUID.randomUUID().toString(), LogLevel.INFO);
      LogEvent event4 = new LogEvent("Sample WARN Log", UUID.randomUUID().toString(), LogLevel.WARN);
      JMSConsumer consumer = jmsContext.createConsumer(defaultQueue, "logLevel = 'ERROR' OR JMSPriority BETWEEN 5 AND 9");
    
      consumer.setMessageListener(msg -> {
        System.out.println(msg);
        try {
          LogEvent event = (LogEvent) ((ObjectMessage) msg).getObject();
          System.out.println(event);
        } catch (JMSException e) {
          e.printStackTrace();
        }
      });
 
      JMSProducer producer = jmsContext.createProducer();
 
      //send event1
      ObjectMessage objectMessage = jmsContext.createObjectMessage();
      objectMessage.setObject(event1);
      objectMessage.setStringProperty("logLevel", event1.getLogLevel().name());
      producer.send(defaultQueue, objectMessage);
   
      //Send event2
      objectMessage = jmsContext.createObjectMessage();
      objectMessage.setObject(event2);
      objectMessage.setStringProperty("logLevel", event2.getLogLevel().name());
      producer.send(defaultQueue, objectMessage);

      //Send event3
      objectMessage = jmsContext.createObjectMessage();
      objectMessage.setObject(event3);
      objectMessage.setStringProperty("logLevel", event3.getLogLevel().name());
      producer.setPriority(5);
      producer.send(defaultQueue, objectMessage);

      //Send event4
      objectMessage = jmsContext.createObjectMessage();
      objectMessage.setObject(event4);
      objectMessage.setStringProperty("logLevel", event4.getLogLevel().name());

      //Reset to normal priority
      producer.setPriority(4);
      producer.send(defaultQueue, objectMessage);
      Thread.sleep(2000);
      consumer.close();
    }
  }
}

Output

Class:			com.sun.messaging.jmq.jmsclient.ObjectMessageImpl
getJMSMessageID():	ID:12-192.168.0.197(b8:3d:5e:24:df:26)-53719-1577428536655
getJMSTimestamp():	1577428536655
getJMSCorrelationID():	null
JMSReplyTo:		null
JMSDestination:		PTPQueue
getJMSDeliveryMode():	PERSISTENT
getJMSRedelivered():	false
getJMSType():		null
getJMSExpiration():	0
getJMSDeliveryTime():	0
getJMSPriority():	4
Properties:		{JMSXDeliveryCount=1, logLevel=ERROR}
LogEvent[id=2019-12-27T14:35:36.623, body='Sample ERROR Log', 
machineId='7d743936-aae8-44a2-a527-cd88236b6ea6', logLevel=ERROR] Class: com.sun.messaging.jmq.jmsclient.ObjectMessageImpl getJMSMessageID(): ID:14-192.168.0.197(b8:3d:5e:24:df:26)-53719-1577428536661 getJMSTimestamp(): 1577428536661 getJMSCorrelationID(): null JMSReplyTo: null JMSDestination: PTPQueue getJMSDeliveryMode(): PERSISTENT getJMSRedelivered(): false getJMSType(): null getJMSExpiration(): 0 getJMSDeliveryTime(): 0 getJMSPriority(): 5 Properties: {JMSXDeliveryCount=1, logLevel=INFO} LogEvent[id=2019-12-27T14:35:36.623, body='Sample INFO Log',
machineId='01ce3812-1869-4d23-96d8-cfc2ea6267c2', logLevel=INFO]

Rules for Selector

Like any query language, selectors are made up of Literals, Identifiers, Expressions, and Operators. I will discuss the rules for each of them below for reference.

1. Rules for Literals

  • A String literal is enclosed in single quotes, and a single quote represented by doubled single quote. Example, logLevel = 'ERROR', name='Ram''s'.
  • A numeric value is presented as it is with optional + or - sign. Example, range BETWEEN -200 AND +30. You can also just mention 30 instead of +30 .
  • Numbers can also be represented in scientific notation, such as -7E3, 6.2 etc.
  • Java Long and Double are also supported.
  • Boolean values can be represented as TRUE and FALSE.

2. Rules for Identifiers

  • Rules applied for Java identifiers holds true for JMS selector identifiers as well. Which means _ and $ can be used in the identifiers.
  • Identifiers can not have the Java keywords.
  • Identifiers can not have names like NULL, TRUE, FALSE, NOT, AND, OR, BETWEEN, LIKE, IN, IS, or ESCAPE.
  • Identifiers are case sensitive .
  • Any name beginning with JMSX is a JMS defined property.
  • Any name beginning with JMS_ is a provider-specific property.
  • Any name (identifier) does not contain JMS, is an application-defined property.

3. Rules for Operators

  • The standard rule for bracketing () applies here.
  • Logical operators are in the precedence order : NOT, AND, OR.
  • Comparison operators in the precedence order: =, >, >=, <, <=,<>(Not equal operator).
  • Arithmetic Operators are in the precedence order:
    • Unary Operator – +, -.
    • Multiplication and Division – *, /.
    • Addition and Subtraction – +, -

You leaned JMS message Selectors and how it is used to filter messages that are delivered to the clients. It is very useful in real-world applications, specifically from a performance point of view. Your application can delegate a lot of processing load to the JMS providers and make sure only the necessary messages are delivered to it.

By |Last Updated: April 3rd, 2024|Categories: Java™, JMS|

Table of Contents