1、前言java
消息發送到Broker,消費者經過Destination能夠訂閱消費某個特定的通道內的消息。一些特殊狀況下,須要消費者對消息過濾下再進行消費,也就是篩選出某些特定消息。ActiveMQ提供了SQL92表達式語法的自定義消息篩選功能。很是方便快捷的可以開發出具備消息篩選功能的應用。apache
ActiveMQ 支持:session
>
,>=
,<
,<=
,BETWEEN
,=
.=
,<>
,IN
.IS NULL
或則 IS NOT NULL
.AND
, 邏輯OR
, 邏輯NOT
.常數類型:app
NULL
,特別的常量。TRUE
,FALSE
2、程序案例tcp
生產者:性能
package com.cfang.prebo.activemq.selector; import java.util.Scanner; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageProducer; import javax.jms.Session; import org.apache.activemq.ActiveMQConnectionFactory; public class Producer { public static void main(String[] args) { ConnectionFactory connectionFactory = null; Connection connection = null; Session session = null; Destination destination = null; MessageProducer producer = null; Message message = null; try { Scanner scanner = new Scanner(System.in); connectionFactory = new ActiveMQConnectionFactory("tcp://172.31.31.160:61618"); connection = connectionFactory.createConnection(null, null); connection.start(); session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); destination = session.createQueue("TP_Q_TEST_SELECTOR00"); producer = session.createProducer(destination); while(true) { String line = scanner.nextLine(); if("exit".equals(line)) { break; } message = session.createTextMessage(line); message.setIntProperty("applicationName", line.length()); message.setStringProperty("result", "RT"); producer.send(message); } } catch (Exception e) { e.printStackTrace(); } finally { if(producer != null){ // 回收消息發送者 try { producer.close(); } catch (JMSException e) { e.printStackTrace(); } } if(session != null){ // 回收會話對象 try { session.close(); } catch (JMSException e) { e.printStackTrace(); } } if(connection != null){ // 回收鏈接對象 try { connection.close(); } catch (JMSException e) { e.printStackTrace(); } } } } }
如上,生產者還能夠設置更多的條件,ActiveMQ也提供了全基本類型的 setXXXXXProperty方法去設置條件。spa
消費者:code
package com.cfang.prebo.activemq.selector; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.Destination; import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.MessageListener; import javax.jms.Session; import org.apache.activemq.ActiveMQConnectionFactory; public class ConsumerA { public static void main(String[] args) { ConnectionFactory connectionFactory = null; Connection connection = null; Session session = null; Destination destination = null; MessageConsumer consumer = null; try { connectionFactory = new ActiveMQConnectionFactory("tcp://172.31.31.160:61618"); connection = connectionFactory.createConnection(null, null); connection.start(); session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); destination = session.createQueue("TP_Q_TEST_SELECTOR00"); consumer = session.createConsumer(destination,"applicationName=2 and result='RT'"); consumer.setMessageListener(new MessageListener() { public void onMessage(Message message) { System.out.println(message); } }); } catch (Exception e) { e.printStackTrace(); } finally { } } }
如上,消費者就只消費 applicationName = 2 且 result = 'RT' 的消息。對象
3、小結blog
一、提供了篩選功能,能夠減小 destination 的數量。能夠用於實現特定機器,特定消息(灰度?)。
二、若是同時兩個消費者的話,一個異常不能消費了,那麼消息就會產生積壓。對另外一個正常的消費者而言,性能會降低,消費時間可能會變長。