消息隊列:快速上手ActiveMQ消息隊列的JMS方式使用(兩種模式:Topic和Queue的消息推送和訂閱)

一、實現功能

但願使用一套API,實現兩種模式下的消息發送和接收功能,方便業務程序調用java

一、發送Topic數組

二、發送Queue緩存

三、接收Topicsession

四、接收Queuetcp

二、接口設計

根據功能設計公共調用接口
ide

/**
 * 數據分發接口(用於發送、接收消息隊列數據)
 * 
 * @author eguid
 *
 */
public interface MsgDistributeInterface {

	/**
	 * 發送到主題
	 * 
	 * @param topicName -主題
	 * @param data -數據
	 * @return
	 */
	public boolean sendTopic(String topicName, byte[] data);
	
	/**
	 * 發送到主題
	 * @param topicName -主題
	 * @param data-數據
	 * @param offset -偏移量
	 * @param length -長度
	 * @return
	 */
	boolean sendTopic(String topicName, byte[] data, int offset, int length);

	/**
	 * 發送到隊列
	 * 
	 * @param queueName -隊列名稱
	 * @param data -數據
	 * @return
	 */
	public boolean sendQueue(String queueName, byte[] data);

	/**
	 * 發送到隊列
	 * @param queueName -隊列名稱
	 * @param data -數據
	 * @param offset
	 * @param length
	 * @return
	 */
	public boolean sendQueue(String queueName, byte[] data,int offset, int length);

	/**
	 * 接收隊列消息
	 * @param queueName 隊列名稱
	 * @param listener
	 * @throws JMSException
	 */
	void receiveQueue(String queueName, MessageListener listener) throws JMSException;

	/**
	 * 訂閱主題
	 * @param topicName -主題名稱
	 * @param listener
	 * @throws JMSException
	 */
	void receiveTopic(String topicName, MessageListener listener) throws JMSException;
}

三、基於ActiveMQ的接口實現

/**
 * 基於activeMQ的消息生產者/消費者實現(初始化該對象時即初始化鏈接消息隊列,若是沒法鏈接到消息隊列,當即拋出異常)
 * 
 * @author eguid
 *
 */
public class ActiveMQImpl implements MsgDistributeInterface {

	private String userName;
	private String password;
	private String brokerURL;
	private boolean persistentMode;//持久化模式
	//鏈接工廠
	ConnectionFactory connectionFactory;
	//發送消息的線程
	Connection connection;
	// 事務管理
	Session session;

	//存放各個線程訂閱模式生產者
	ThreadLocal<MessageProducer> topicThreadLocal = new ThreadLocal<MessageProducer>();
	//存放各個線程隊列模式生產者
	ThreadLocal<MessageProducer> queueThreadLocal = new ThreadLocal<MessageProducer>();

	public ActiveMQImpl(String userName, String password, String brokerURL) throws JMSException {
		this(userName, password, brokerURL, true);
	}
	
	public ActiveMQImpl(String userName, String password, String brokerURL,boolean persistentMode) throws JMSException {
		this.userName = userName;
		this.password = password;
		this.brokerURL = brokerURL;
		this.persistentMode=persistentMode;
		init();
	}

	public void init() throws JMSException {
		try {
			// 建立一個連接工廠
			connectionFactory = new ActiveMQConnectionFactory(this.userName, this.password, this.brokerURL);
			// 從工廠中建立一個連接
			connection = connectionFactory.createConnection();
			// 開啓連接
			connection.start();
			// 建立一個事務(訂閱模式,事務採用自動確認方式)
			session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
		} catch (JMSException e) {
			throw e;
		}
	}

	@Override
	public boolean sendTopic(String topicName, byte[] data) {
		return sendTopic(topicName, data, 0, data.length);
	}

	@Override
	public boolean sendTopic(String topicName, byte[] data, int offset, int length) {
		return send(true, topicName, data, offset, length);
	}

	@Override
	public boolean sendQueue(String queueName, byte[] data) {
		return sendQueue(queueName, data, 0, data.length);
	}

	@Override
	public boolean sendQueue(String queueName, byte[] data, int offset, int length) {
		return send(false, queueName, data, offset, length);
	}

	/**
	 * 發送數據
	 * 
	 * @param name
	 * @param data
	 * @param offset
	 * @param length
	 * @param type
	 *            -類型
	 * @return
	 */
	private boolean send(boolean type, String name, byte[] data, int offset, int length) {
		try {
			MessageProducer messageProducer = getMessageProducer(name, type);
			
			BytesMessage msg = createBytesMsg(data, offset, length);
			  System.err.println(Thread.currentThread().getName()+"發送消息");
			// 發送消息
			messageProducer.send(msg);
		} catch (JMSException e) {
			return false;
		}
		return false;
	}
	
	public void receive(String topicName) throws JMSException {
		final Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE); 
		Topic topic =session.createTopic(topicName);
		MessageConsumer consumer=session.createConsumer(topic);
		consumer.setMessageListener(new MessageListener() {
			@Override
			public void onMessage(Message message) {
				BytesMessage msg=(BytesMessage) message;
				System.err.println(Thread.currentThread().getName()+"收到消息:"+msg.toString());
			}
		});
		
	}
	/**
	 * 建立字節數組消息
	 * 
	 * @param data
	 * @param offset
	 * @param length
	 * @return
	 * @throws JMSException
	 */
	private BytesMessage createBytesMsg(byte[] data, int offset, int length) throws JMSException {
		BytesMessage msg = session.createBytesMessage();
		msg.writeBytes(data, offset, length);
		return msg;
	}
	
	/**
	 * 建立對象序列化消息
	 * @param obj
	 * @return
	 * @throws JMSException
	 */
	private ObjectMessage createMapMsg(Serializable obj) throws JMSException {
//		MapMessage msg = session.createMapMessage();//key-value形式的消息
		ObjectMessage msg = session.createObjectMessage(obj);
		return msg;
	}
	
	/**
	 * 建立字符串消息
	 * @param text
	 * @return
	 * @throws JMSException
	 */
	private TextMessage createTextMsg(String text) throws JMSException {
		TextMessage msg = session.createTextMessage(text);
		return msg;
	}

	
	/**
	 * 獲取建立者
	 * 
	 * @param name -名稱(主題名稱和隊列名稱)
	 * @param type -類型(true:topic,false:queue)
	 * @return
	 * @throws JMSException
	 */
	private MessageProducer getMessageProducer(String name, boolean type) throws JMSException {
		return type?getTopicProducer(name):getQueueProducer(name);
	}

	/**
	 * 建立或獲取隊列
	 * @param queueName
	 * @return
	 * @throws JMSException
	 */
	private MessageProducer getQueueProducer(String queueName) throws JMSException {
		MessageProducer messageProducer = null;
		if ((messageProducer = queueThreadLocal.get()) == null) {
			Queue queue = session.createQueue(queueName);
			messageProducer = session.createProducer(queue);
			//是否持久化(1-不持久化(若是沒有消費者,消息就也會自動失效),2-持久化(若是沒有消費者進行消費,消息隊列也會緩存消息等待消費者進行消費))
			messageProducer.setDeliveryMode(persistentMode?DeliveryMode.PERSISTENT:DeliveryMode.NON_PERSISTENT);
			queueThreadLocal.set(messageProducer);
		}
		return messageProducer;
	}
	
	/**
	 * 建立或獲取主題
	 * @param topicName
	 * @return
	 * @throws JMSException
	 */
	private MessageProducer getTopicProducer(String topicName) throws JMSException {
		MessageProducer messageProducer = null;
		if ((messageProducer = topicThreadLocal.get()) == null) {
			Topic topic = session.createTopic(topicName);
			messageProducer = session.createProducer(topic);
			//是否持久化(1-不持久化(若是沒有消費者,消息就也會自動失效),2-持久化(若是沒有消費者進行消費,消息隊列也會緩存消息等待消費者進行消費))
			messageProducer.setDeliveryMode(persistentMode?DeliveryMode.PERSISTENT:DeliveryMode.NON_PERSISTENT);
			topicThreadLocal.set(messageProducer);
		}
		return  messageProducer;
	}
	
	public String getPassword() {
		return password;
	}

	public void setPassword(String password) {
		this.password = password;
	}

	@Override
	public void receiveQueue(String queueName,MessageListener listener) throws JMSException {
		final Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE); 
		Queue topic =session.createQueue(queueName);
		MessageConsumer consumer=session.createConsumer(topic);
		consumer.setMessageListener(listener);
		
	}

	@Override
	public void receiveTopic(String topicName,MessageListener listener) throws JMSException {
		final Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE); 
		Topic topic =session.createTopic(topicName);
		MessageConsumer consumer=session.createConsumer(topic);
		consumer.setMessageListener(listener);
	}

四、測試一下Topic和Queue

public static void main(String[] args) throws JMSException{
		//若是建立失敗會當即拋出異常
		MsgDistributeInterface  producter = new ActiveMQImpl("system", "manager", "tcp://127.0.0.1:61616");
        Test testMq = new Test();
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        //Thread 1
        new Thread(testMq.new ProductorMq(producter)).start();
        //Thread 2
        new Thread(testMq.new ProductorMq(producter)).start();
        //Thread 3
        new Thread(testMq.new ProductorMq(producter)).start();
        //Thread 4
        new Thread(testMq.new ProductorMq(producter)).start();
        //Thread 5
        new Thread(testMq.new ProductorMq(producter)).start();
        //Thread 6
        new Thread(testMq.new ProductorMq(producter)).start();
        
        //訂閱接收線程Thread 1
        new Thread(new Runnable() {
			@Override
			public void run() {
				try {
					producter.receiveTopic("eguid-topic",new MessageListener() {
						@Override
						public void onMessage(Message message) {
							BytesMessage msg=(BytesMessage) message;
							System.err.println(Thread.currentThread().getName()+"訂閱主題消息:"+msg.toString());
						}
					});
				} catch (JMSException e) {
					// TODO Auto-generated catch block
					e.printStackTrace();
				}
			}
		}).start();
        //訂閱接收線程Thread 2
        new Thread(new Runnable() {
			@Override
			public void run() {
				try {
					producter.receiveTopic("eguid-topic",new MessageListener() {
						@Override
						public void onMessage(Message message) {
							BytesMessage msg=(BytesMessage) message;
							System.err.println(Thread.currentThread().getName()+"訂閱主題消息:"+msg.toString());
						}
					});
				} catch (JMSException e) {
					// TODO Auto-generated catch block
					e.printStackTrace();
				}
			}
		}).start();
        //隊列消息生產線程Thread-1
        new Thread(testMq.new  QueueProductor(producter)).start();
        //隊列消息生產線程Thread-2
        new Thread(testMq.new  QueueProductor(producter)).start();
        //隊列接收線程Thread 1
        new Thread(new Runnable() {
			@Override
			public void run() {
				try {
					producter.receiveQueue("eguid-queue",new MessageListener() {
						@Override
						public void onMessage(Message message) {
							BytesMessage msg=(BytesMessage) message;
							System.err.println(Thread.currentThread().getName()+"收到隊列消息:"+msg.toString());
						}
					});
				} catch (JMSException e) {
					// TODO Auto-generated catch block
					e.printStackTrace();
				}
			}
		}).start();
      //隊列接收線程Thread2
        new Thread(new Runnable() {
			@Override
			public void run() {
				try {
					producter.receiveQueue("eguid-queue",new MessageListener() {
						@Override
						public void onMessage(Message message) {
							BytesMessage msg=(BytesMessage) message;
							System.err.println(Thread.currentThread().getName()+"收到隊列消息:"+msg.toString());
						}
					});
				} catch (JMSException e) {
					// TODO Auto-generated catch block
					e.printStackTrace();
				}
			}
		}).start();
    }

    private class ProductorMq implements Runnable{
    	Jtt809MsgProducter producter;
        public ProductorMq(Jtt809MsgProducter producter){
            this.producter = producter;
        }

        @Override
        public void run() {
            while(true){
                try {
                	String wang=Thread.currentThread().getName()+"Hello eguid! This is topic.";
                    producter.sendTopic("eguid-topic",wang.getBytes());
                    
                    Thread.sleep(2000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }
    
    private class QueueProductor implements Runnable{
    	Jtt809MsgProducter producter;
        public QueueProductor(Jtt809MsgProducter producter){
            this.producter = producter;
        }

        @Override
        public void run() {
            while(true){
                try {
                	String eguid=Thread.currentThread().getName()+"Hello eguid! This is queue.";
                    producter.sendQueue("eguid-queue",eguid.getBytes());
                    Thread.sleep(2000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }
-------------------End--------------------
相關文章
相關標籤/搜索