但願使用一套API,實現兩種模式下的消息發送和接收功能,方便業務程序調用java
一、發送Topic數組
二、發送Queue緩存
三、接收Topicsession
四、接收Queuetcp
根據功能設計公共調用接口
ide
/** * 數據分發接口(用於發送、接收消息隊列數據) * * @author eguid * */ public interface MsgDistributeInterface { /** * 發送到主題 * * @param topicName -主題 * @param data -數據 * @return */ public boolean sendTopic(String topicName, byte[] data); /** * 發送到主題 * @param topicName -主題 * @param data-數據 * @param offset -偏移量 * @param length -長度 * @return */ boolean sendTopic(String topicName, byte[] data, int offset, int length); /** * 發送到隊列 * * @param queueName -隊列名稱 * @param data -數據 * @return */ public boolean sendQueue(String queueName, byte[] data); /** * 發送到隊列 * @param queueName -隊列名稱 * @param data -數據 * @param offset * @param length * @return */ public boolean sendQueue(String queueName, byte[] data,int offset, int length); /** * 接收隊列消息 * @param queueName 隊列名稱 * @param listener * @throws JMSException */ void receiveQueue(String queueName, MessageListener listener) throws JMSException; /** * 訂閱主題 * @param topicName -主題名稱 * @param listener * @throws JMSException */ void receiveTopic(String topicName, MessageListener listener) throws JMSException; }
/** * 基於activeMQ的消息生產者/消費者實現(初始化該對象時即初始化鏈接消息隊列,若是沒法鏈接到消息隊列,當即拋出異常) * * @author eguid * */ public class ActiveMQImpl implements MsgDistributeInterface { private String userName; private String password; private String brokerURL; private boolean persistentMode;//持久化模式 //鏈接工廠 ConnectionFactory connectionFactory; //發送消息的線程 Connection connection; // 事務管理 Session session; //存放各個線程訂閱模式生產者 ThreadLocal<MessageProducer> topicThreadLocal = new ThreadLocal<MessageProducer>(); //存放各個線程隊列模式生產者 ThreadLocal<MessageProducer> queueThreadLocal = new ThreadLocal<MessageProducer>(); public ActiveMQImpl(String userName, String password, String brokerURL) throws JMSException { this(userName, password, brokerURL, true); } public ActiveMQImpl(String userName, String password, String brokerURL,boolean persistentMode) throws JMSException { this.userName = userName; this.password = password; this.brokerURL = brokerURL; this.persistentMode=persistentMode; init(); } public void init() throws JMSException { try { // 建立一個連接工廠 connectionFactory = new ActiveMQConnectionFactory(this.userName, this.password, this.brokerURL); // 從工廠中建立一個連接 connection = connectionFactory.createConnection(); // 開啓連接 connection.start(); // 建立一個事務(訂閱模式,事務採用自動確認方式) session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); } catch (JMSException e) { throw e; } } @Override public boolean sendTopic(String topicName, byte[] data) { return sendTopic(topicName, data, 0, data.length); } @Override public boolean sendTopic(String topicName, byte[] data, int offset, int length) { return send(true, topicName, data, offset, length); } @Override public boolean sendQueue(String queueName, byte[] data) { return sendQueue(queueName, data, 0, data.length); } @Override public boolean sendQueue(String queueName, byte[] data, int offset, int length) { return send(false, queueName, data, offset, length); } /** * 發送數據 * * @param name * @param data * @param offset * @param length * @param type * -類型 * @return */ private boolean send(boolean type, String name, byte[] data, int offset, int length) { try { MessageProducer messageProducer = getMessageProducer(name, type); BytesMessage msg = createBytesMsg(data, offset, length); System.err.println(Thread.currentThread().getName()+"發送消息"); // 發送消息 messageProducer.send(msg); } catch (JMSException e) { return false; } return false; } public void receive(String topicName) throws JMSException { final Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE); Topic topic =session.createTopic(topicName); MessageConsumer consumer=session.createConsumer(topic); consumer.setMessageListener(new MessageListener() { @Override public void onMessage(Message message) { BytesMessage msg=(BytesMessage) message; System.err.println(Thread.currentThread().getName()+"收到消息:"+msg.toString()); } }); } /** * 建立字節數組消息 * * @param data * @param offset * @param length * @return * @throws JMSException */ private BytesMessage createBytesMsg(byte[] data, int offset, int length) throws JMSException { BytesMessage msg = session.createBytesMessage(); msg.writeBytes(data, offset, length); return msg; } /** * 建立對象序列化消息 * @param obj * @return * @throws JMSException */ private ObjectMessage createMapMsg(Serializable obj) throws JMSException { // MapMessage msg = session.createMapMessage();//key-value形式的消息 ObjectMessage msg = session.createObjectMessage(obj); return msg; } /** * 建立字符串消息 * @param text * @return * @throws JMSException */ private TextMessage createTextMsg(String text) throws JMSException { TextMessage msg = session.createTextMessage(text); return msg; } /** * 獲取建立者 * * @param name -名稱(主題名稱和隊列名稱) * @param type -類型(true:topic,false:queue) * @return * @throws JMSException */ private MessageProducer getMessageProducer(String name, boolean type) throws JMSException { return type?getTopicProducer(name):getQueueProducer(name); } /** * 建立或獲取隊列 * @param queueName * @return * @throws JMSException */ private MessageProducer getQueueProducer(String queueName) throws JMSException { MessageProducer messageProducer = null; if ((messageProducer = queueThreadLocal.get()) == null) { Queue queue = session.createQueue(queueName); messageProducer = session.createProducer(queue); //是否持久化(1-不持久化(若是沒有消費者,消息就也會自動失效),2-持久化(若是沒有消費者進行消費,消息隊列也會緩存消息等待消費者進行消費)) messageProducer.setDeliveryMode(persistentMode?DeliveryMode.PERSISTENT:DeliveryMode.NON_PERSISTENT); queueThreadLocal.set(messageProducer); } return messageProducer; } /** * 建立或獲取主題 * @param topicName * @return * @throws JMSException */ private MessageProducer getTopicProducer(String topicName) throws JMSException { MessageProducer messageProducer = null; if ((messageProducer = topicThreadLocal.get()) == null) { Topic topic = session.createTopic(topicName); messageProducer = session.createProducer(topic); //是否持久化(1-不持久化(若是沒有消費者,消息就也會自動失效),2-持久化(若是沒有消費者進行消費,消息隊列也會緩存消息等待消費者進行消費)) messageProducer.setDeliveryMode(persistentMode?DeliveryMode.PERSISTENT:DeliveryMode.NON_PERSISTENT); topicThreadLocal.set(messageProducer); } return messageProducer; } public String getPassword() { return password; } public void setPassword(String password) { this.password = password; } @Override public void receiveQueue(String queueName,MessageListener listener) throws JMSException { final Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE); Queue topic =session.createQueue(queueName); MessageConsumer consumer=session.createConsumer(topic); consumer.setMessageListener(listener); } @Override public void receiveTopic(String topicName,MessageListener listener) throws JMSException { final Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE); Topic topic =session.createTopic(topicName); MessageConsumer consumer=session.createConsumer(topic); consumer.setMessageListener(listener); }
public static void main(String[] args) throws JMSException{ //若是建立失敗會當即拋出異常 MsgDistributeInterface producter = new ActiveMQImpl("system", "manager", "tcp://127.0.0.1:61616"); Test testMq = new Test(); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } //Thread 1 new Thread(testMq.new ProductorMq(producter)).start(); //Thread 2 new Thread(testMq.new ProductorMq(producter)).start(); //Thread 3 new Thread(testMq.new ProductorMq(producter)).start(); //Thread 4 new Thread(testMq.new ProductorMq(producter)).start(); //Thread 5 new Thread(testMq.new ProductorMq(producter)).start(); //Thread 6 new Thread(testMq.new ProductorMq(producter)).start(); //訂閱接收線程Thread 1 new Thread(new Runnable() { @Override public void run() { try { producter.receiveTopic("eguid-topic",new MessageListener() { @Override public void onMessage(Message message) { BytesMessage msg=(BytesMessage) message; System.err.println(Thread.currentThread().getName()+"訂閱主題消息:"+msg.toString()); } }); } catch (JMSException e) { // TODO Auto-generated catch block e.printStackTrace(); } } }).start(); //訂閱接收線程Thread 2 new Thread(new Runnable() { @Override public void run() { try { producter.receiveTopic("eguid-topic",new MessageListener() { @Override public void onMessage(Message message) { BytesMessage msg=(BytesMessage) message; System.err.println(Thread.currentThread().getName()+"訂閱主題消息:"+msg.toString()); } }); } catch (JMSException e) { // TODO Auto-generated catch block e.printStackTrace(); } } }).start(); //隊列消息生產線程Thread-1 new Thread(testMq.new QueueProductor(producter)).start(); //隊列消息生產線程Thread-2 new Thread(testMq.new QueueProductor(producter)).start(); //隊列接收線程Thread 1 new Thread(new Runnable() { @Override public void run() { try { producter.receiveQueue("eguid-queue",new MessageListener() { @Override public void onMessage(Message message) { BytesMessage msg=(BytesMessage) message; System.err.println(Thread.currentThread().getName()+"收到隊列消息:"+msg.toString()); } }); } catch (JMSException e) { // TODO Auto-generated catch block e.printStackTrace(); } } }).start(); //隊列接收線程Thread2 new Thread(new Runnable() { @Override public void run() { try { producter.receiveQueue("eguid-queue",new MessageListener() { @Override public void onMessage(Message message) { BytesMessage msg=(BytesMessage) message; System.err.println(Thread.currentThread().getName()+"收到隊列消息:"+msg.toString()); } }); } catch (JMSException e) { // TODO Auto-generated catch block e.printStackTrace(); } } }).start(); } private class ProductorMq implements Runnable{ Jtt809MsgProducter producter; public ProductorMq(Jtt809MsgProducter producter){ this.producter = producter; } @Override public void run() { while(true){ try { String wang=Thread.currentThread().getName()+"Hello eguid! This is topic."; producter.sendTopic("eguid-topic",wang.getBytes()); Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } } } } private class QueueProductor implements Runnable{ Jtt809MsgProducter producter; public QueueProductor(Jtt809MsgProducter producter){ this.producter = producter; } @Override public void run() { while(true){ try { String eguid=Thread.currentThread().getName()+"Hello eguid! This is queue."; producter.sendQueue("eguid-queue",eguid.getBytes()); Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } } } }-------------------End--------------------