ActiveMQ消息選擇器Selector

1、前言java

  消息發送到Broker,消費者經過Destination能夠訂閱消費某個特定的通道內的消息。一些特殊狀況下,須要消費者對消息過濾下再進行消費,也就是篩選出某些特定消息。ActiveMQ提供了SQL92表達式語法的自定義消息篩選功能。很是方便快捷的可以開發出具備消息篩選功能的應用。apache

  ActiveMQ 支持:session

  1. 數字表達式: >,>=,<,<=,BETWEEN,=.
  2. 字符表達式:=,<>,IN.
  3. IS NULL 或則 IS NOT NULL.
  4. 邏輯AND, 邏輯OR, 邏輯NOT.

  常數類型:app

  1. 數字:3.1415926, 5。
  2. 字符: ‘a’,必須帶有單引號。
  3. NULL,特別的常量。
  4. 布爾類型: TRUEFALSE

2、程序案例tcp

  生產者:性能

package com.cfang.prebo.activemq.selector;

import java.util.Scanner;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageProducer;
import javax.jms.Session;

import org.apache.activemq.ActiveMQConnectionFactory;

public class Producer {

    public static void main(String[] args) {
        ConnectionFactory connectionFactory = null;
        Connection connection = null;
        Session session = null;
        Destination destination = null;
        MessageProducer producer = null;
        Message message = null;
        try {
            Scanner scanner = new Scanner(System.in);
            connectionFactory = new ActiveMQConnectionFactory("tcp://172.31.31.160:61618");
            connection = connectionFactory.createConnection(null, null);
            connection.start();
            session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            destination = session.createQueue("TP_Q_TEST_SELECTOR00");
            producer = session.createProducer(destination);
            while(true) {
                String line = scanner.nextLine();
                if("exit".equals(line)) {
                    break;
                }
                message = session.createTextMessage(line);
                message.setIntProperty("applicationName", line.length());
                message.setStringProperty("result", "RT");
                producer.send(message);
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            if(producer != null){ // 回收消息發送者
                try {
                    producer.close();
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }
            if(session != null){ // 回收會話對象
                try {
                    session.close();
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }
            if(connection != null){ // 回收鏈接對象
                try {
                    connection.close();
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }
        }
    }
    
}

  如上,生產者還能夠設置更多的條件,ActiveMQ也提供了全基本類型的 setXXXXXProperty方法去設置條件。spa

  消費者:code

package com.cfang.prebo.activemq.selector;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.Session;

import org.apache.activemq.ActiveMQConnectionFactory;

public class ConsumerA {

	public static void main(String[] args) {
		ConnectionFactory connectionFactory = null;
		Connection connection = null;
		Session session = null;
		Destination destination = null;
		MessageConsumer consumer = null;
		try {
			connectionFactory = new ActiveMQConnectionFactory("tcp://172.31.31.160:61618");
			connection = connectionFactory.createConnection(null, null);
			connection.start();
			session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
			destination = session.createQueue("TP_Q_TEST_SELECTOR00");
			consumer = session.createConsumer(destination,"applicationName=2 and result='RT'");
			consumer.setMessageListener(new MessageListener() {
				public void onMessage(Message message) {
					System.out.println(message);
				}
			});
		} catch (Exception e) {
			e.printStackTrace();
		} finally {
			
		}
	}
}

  如上,消費者就只消費  applicationName = 2 且  result = 'RT' 的消息。對象

3、小結blog

  一、提供了篩選功能,能夠減小 destination 的數量。能夠用於實現特定機器,特定消息(灰度?)。

  二、若是同時兩個消費者的話,一個異常不能消費了,那麼消息就會產生積壓。對另外一個正常的消費者而言,性能會降低,消費時間可能會變長。

相關文章
相關標籤/搜索