消息隊列:快速上手ActiveMQ消息隊列的JMS方式使用(兩種模式:Topic和Queue的消息推送和訂閱)

一、實現功能

但願使用一套API,實現兩種模式下的消息發送和接收功能,方便業務程序調用數組

一、發送Topic緩存

二、發送Queuesession

三、接收Topictcp

四、接收Queueide

二、接口設計

根據功能設計公共調用接口測試

  1. /**
  2. * 數據分發接口(用於發送、接收消息隊列數據)
  3. *
  4. * @author eguid
  5. *
  6. */
  7. public interface MsgDistributeInterface {
  8.  
  9. /**
  10. * 發送到主題
  11. *
  12. * @param topicName -主題
  13. * @param data -數據
  14. * @return
  15. */
  16. public boolean sendTopic(String topicName, byte[] data);
  17.  
  18. /**
  19. * 發送到主題
  20. * @param topicName -主題
  21. * @param data-數據
  22. * @param offset -偏移量
  23. * @param length -長度
  24. * @return
  25. */
  26. boolean sendTopic(String topicName, byte[] data, int offset, int length);
  27.  
  28. /**
  29. * 發送到隊列
  30. *
  31. * @param queueName -隊列名稱
  32. * @param data -數據
  33. * @return
  34. */
  35. public boolean sendQueue(String queueName, byte[] data);
  36.  
  37. /**
  38. * 發送到隊列
  39. * @param queueName -隊列名稱
  40. * @param data -數據
  41. * @param offset
  42. * @param length
  43. * @return
  44. */
  45. public boolean sendQueue(String queueName, byte[] data,int offset, int length);
  46.  
  47. /**
  48. * 接收隊列消息
  49. * @param queueName 隊列名稱
  50. * @param listener
  51. * @throws JMSException
  52. */
  53. void receiveQueue(String queueName, MessageListener listener) throws JMSException;
  54.  
  55. /**
  56. * 訂閱主題
  57. * @param topicName -主題名稱
  58. * @param listener
  59. * @throws JMSException
  60. */
  61. void receiveTopic(String topicName, MessageListener listener) throws JMSException;
  62. }

三、基於ActiveMQ的接口實現

  1. /**
  2. * 基於activeMQ的消息生產者/消費者實現(初始化該對象時即初始化鏈接消息隊列,若是沒法鏈接到消息隊列,當即拋出異常)
  3. *
  4. * @author eguid
  5. *
  6. */
  7. public class ActiveMQImpl implements MsgDistributeInterface {
  8.  
  9. private String userName;
  10. private String password;
  11. private String brokerURL;
  12. private boolean persistentMode;//持久化模式
  13. //鏈接工廠
  14. ConnectionFactory connectionFactory;
  15. //發送消息的線程
  16. Connection connection;
  17. // 事務管理
  18. Session session;
  19.  
  20. //存放各個線程訂閱模式生產者
  21. ThreadLocal<MessageProducer> topicThreadLocal = new ThreadLocal<MessageProducer>();
  22. //存放各個線程隊列模式生產者
  23. ThreadLocal<MessageProducer> queueThreadLocal = new ThreadLocal<MessageProducer>();
  24.  
  25. public ActiveMQImpl(String userName, String password, String brokerURL) throws JMSException {
  26. this(userName, password, brokerURL, true);
  27. }
  28.  
  29. public ActiveMQImpl(String userName, String password, String brokerURL,boolean persistentMode) throws JMSException {
  30. this.userName = userName;
  31. this.password = password;
  32. this.brokerURL = brokerURL;
  33. this.persistentMode=persistentMode;
  34. init();
  35. }
  36.  
  37. public void init() throws JMSException {
  38. try {
  39. // 建立一個連接工廠
  40. connectionFactory = new ActiveMQConnectionFactory(this.userName, this.password, this.brokerURL);
  41. // 從工廠中建立一個連接
  42. connection = connectionFactory.createConnection();
  43. // 開啓連接
  44. connection.start();
  45. // 建立一個事務(訂閱模式,事務採用自動確認方式)
  46. session = connection.createSession( false, Session.AUTO_ACKNOWLEDGE);
  47. } catch (JMSException e) {
  48. throw e;
  49. }
  50. }
  51.  
  52. @Override
  53. public boolean sendTopic(String topicName, byte[] data) {
  54. return sendTopic(topicName, data, 0, data.length);
  55. }
  56.  
  57. @Override
  58. public boolean sendTopic(String topicName, byte[] data, int offset, int length) {
  59. return send(true, topicName, data, offset, length);
  60. }
  61.  
  62. @Override
  63. public boolean sendQueue(String queueName, byte[] data) {
  64. return sendQueue(queueName, data, 0, data.length);
  65. }
  66.  
  67. @Override
  68. public boolean sendQueue(String queueName, byte[] data, int offset, int length) {
  69. return send(false, queueName, data, offset, length);
  70. }
  71.  
  72. /**
  73. * 發送數據
  74. *
  75. * @param name
  76. * @param data
  77. * @param offset
  78. * @param length
  79. * @param type
  80. * -類型
  81. * @return
  82. */
  83. private boolean send(boolean type, String name, byte[] data, int offset, int length) {
  84. try {
  85. MessageProducer messageProducer = getMessageProducer(name, type);
  86.  
  87. BytesMessage msg = createBytesMsg(data, offset, length);
  88. System.err.println(Thread.currentThread().getName()+ "發送消息");
  89. // 發送消息
  90. messageProducer.send(msg);
  91. } catch (JMSException e) {
  92. return false;
  93. }
  94. return false;
  95. }
  96.  
  97. public void receive(String topicName) throws JMSException {
  98. final Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
  99. Topic topic =session.createTopic(topicName);
  100. MessageConsumer consumer=session.createConsumer(topic);
  101. consumer.setMessageListener( new MessageListener() {
  102. @Override
  103. public void onMessage(Message message) {
  104. BytesMessage msg=(BytesMessage) message;
  105. System.err.println(Thread.currentThread().getName()+ "收到消息:"+msg.toString());
  106. }
  107. });
  108.  
  109. }
  110. /**
  111. * 建立字節數組消息
  112. *
  113. * @param data
  114. * @param offset
  115. * @param length
  116. * @return
  117. * @throws JMSException
  118. */
  119. private BytesMessage createBytesMsg(byte[] data, int offset, int length) throws JMSException {
  120. BytesMessage msg = session.createBytesMessage();
  121. msg.writeBytes(data, offset, length);
  122. return msg;
  123. }
  124.  
  125. /**
  126. * 建立對象序列化消息
  127. * @param obj
  128. * @return
  129. * @throws JMSException
  130. */
  131. private ObjectMessage createMapMsg(Serializable obj) throws JMSException {
  132. // MapMessage msg = session.createMapMessage();//key-value形式的消息
  133. ObjectMessage msg = session.createObjectMessage(obj);
  134. return msg;
  135. }
  136.  
  137. /**
  138. * 建立字符串消息
  139. * @param text
  140. * @return
  141. * @throws JMSException
  142. */
  143. private TextMessage createTextMsg(String text) throws JMSException {
  144. TextMessage msg = session.createTextMessage(text);
  145. return msg;
  146. }
  147.  
  148.  
  149. /**
  150. * 獲取建立者
  151. *
  152. * @param name -名稱(主題名稱和隊列名稱)
  153. * @param type -類型(true:topic,false:queue)
  154. * @return
  155. * @throws JMSException
  156. */
  157. private MessageProducer getMessageProducer(String name, boolean type) throws JMSException {
  158. return type?getTopicProducer(name):getQueueProducer(name);
  159. }
  160.  
  161. /**
  162. * 建立或獲取隊列
  163. * @param queueName
  164. * @return
  165. * @throws JMSException
  166. */
  167. private MessageProducer getQueueProducer(String queueName) throws JMSException {
  168. MessageProducer messageProducer = null;
  169. if ((messageProducer = queueThreadLocal.get()) == null) {
  170. Queue queue = session.createQueue(queueName);
  171. messageProducer = session.createProducer(queue);
  172. //是否持久化(1-不持久化(若是沒有消費者,消息就也會自動失效),2-持久化(若是沒有消費者進行消費,消息隊列也會緩存消息等待消費者進行消費))
  173. messageProducer.setDeliveryMode(persistentMode?DeliveryMode.PERSISTENT:DeliveryMode.NON_PERSISTENT);
  174. queueThreadLocal.set(messageProducer);
  175. }
  176. return messageProducer;
  177. }
  178.  
  179. /**
  180. * 建立或獲取主題
  181. * @param topicName
  182. * @return
  183. * @throws JMSException
  184. */
  185. private MessageProducer getTopicProducer(String topicName) throws JMSException {
  186. MessageProducer messageProducer = null;
  187. if ((messageProducer = topicThreadLocal.get()) == null) {
  188. Topic topic = session.createTopic(topicName);
  189. messageProducer = session.createProducer(topic);
  190. //是否持久化(1-不持久化(若是沒有消費者,消息就也會自動失效),2-持久化(若是沒有消費者進行消費,消息隊列也會緩存消息等待消費者進行消費))
  191. messageProducer.setDeliveryMode(persistentMode?DeliveryMode.PERSISTENT:DeliveryMode.NON_PERSISTENT);
  192. topicThreadLocal.set(messageProducer);
  193. }
  194. return messageProducer;
  195. }
  196.  
  197. public String getPassword() {
  198. return password;
  199. }
  200.  
  201. public void setPassword(String password) {
  202. this.password = password;
  203. }
  204.  
  205. @Override
  206. public void receiveQueue(String queueName,MessageListener listener) throws JMSException {
  207. final Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
  208. Queue topic =session.createQueue(queueName);
  209. MessageConsumer consumer=session.createConsumer(topic);
  210. consumer.setMessageListener(listener);
  211.  
  212. }
  213.  
  214. @Override
  215. public void receiveTopic(String topicName,MessageListener listener) throws JMSException {
  216. final Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
  217. Topic topic =session.createTopic(topicName);
  218. MessageConsumer consumer=session.createConsumer(topic);
  219. consumer.setMessageListener(listener);
  220. }

四、測試一下Topic和Queue

  1. public static void main(String[] args) throws JMSException{
  2. //若是建立失敗會當即拋出異常
  3. MsgDistributeInterface producter = new ActiveMQImpl("system", "manager", "tcp://127.0.0.1:61616");
  4. Test testMq = new Test();
  5. try {
  6. Thread.sleep( 1000);
  7. } catch (InterruptedException e) {
  8. e.printStackTrace();
  9. }
  10. //Thread 1
  11. new Thread(testMq.new ProductorMq(producter)).start();
  12. //Thread 2
  13. new Thread(testMq.new ProductorMq(producter)).start();
  14. //Thread 3
  15. new Thread(testMq.new ProductorMq(producter)).start();
  16. //Thread 4
  17. new Thread(testMq.new ProductorMq(producter)).start();
  18. //Thread 5
  19. new Thread(testMq.new ProductorMq(producter)).start();
  20. //Thread 6
  21. new Thread(testMq.new ProductorMq(producter)).start();
  22.  
  23. //訂閱接收線程Thread 1
  24. new Thread(new Runnable() {
  25. @Override
  26. public void run() {
  27. try {
  28. producter.receiveTopic( "eguid-topic",new MessageListener() {
  29. @Override
  30. public void onMessage(Message message) {
  31. BytesMessage msg=(BytesMessage) message;
  32. System.err.println(Thread.currentThread().getName()+ "訂閱主題消息:"+msg.toString());
  33. }
  34. });
  35. } catch (JMSException e) {
  36. // TODO Auto-generated catch block
  37. e.printStackTrace();
  38. }
  39. }
  40. }).start();
  41. //訂閱接收線程Thread 2
  42. new Thread(new Runnable() {
  43. @Override
  44. public void run() {
  45. try {
  46. producter.receiveTopic( "eguid-topic",new MessageListener() {
  47. @Override
  48. public void onMessage(Message message) {
  49. BytesMessage msg=(BytesMessage) message;
  50. System.err.println(Thread.currentThread().getName()+ "訂閱主題消息:"+msg.toString());
  51. }
  52. });
  53. } catch (JMSException e) {
  54. // TODO Auto-generated catch block
  55. e.printStackTrace();
  56. }
  57. }
  58. }).start();
  59. //隊列消息生產線程Thread-1
  60. new Thread(testMq.new QueueProductor(producter)).start();
  61. //隊列消息生產線程Thread-2
  62. new Thread(testMq.new QueueProductor(producter)).start();
  63. //隊列接收線程Thread 1
  64. new Thread(new Runnable() {
  65. @Override
  66. public void run() {
  67. try {
  68. producter.receiveQueue( "eguid-queue",new MessageListener() {
  69. @Override
  70. public void onMessage(Message message) {
  71. BytesMessage msg=(BytesMessage) message;
  72. System.err.println(Thread.currentThread().getName()+ "收到隊列消息:"+msg.toString());
  73. }
  74. });
  75. } catch (JMSException e) {
  76. // TODO Auto-generated catch block
  77. e.printStackTrace();
  78. }
  79. }
  80. }).start();
  81. //隊列接收線程Thread2
  82. new Thread(new Runnable() {
  83. @Override
  84. public void run() {
  85. try {
  86. producter.receiveQueue( "eguid-queue",new MessageListener() {
  87. @Override
  88. public void onMessage(Message message) {
  89. BytesMessage msg=(BytesMessage) message;
  90. System.err.println(Thread.currentThread().getName()+ "收到隊列消息:"+msg.toString());
  91. }
  92. });
  93. } catch (JMSException e) {
  94. // TODO Auto-generated catch block
  95. e.printStackTrace();
  96. }
  97. }
  98. }).start();
  99. }
  100.  
  101. private class ProductorMq implements Runnable{
  102. Jtt809MsgProducter producter;
  103. public ProductorMq(Jtt809MsgProducter producter){
  104. this.producter = producter;
  105. }
  106.  
  107. @Override
  108. public void run() {
  109. while(true){
  110. try {
  111. String wang=Thread.currentThread().getName()+ "Hello eguid! This is topic.";
  112. producter.sendTopic( "eguid-topic",wang.getBytes());
  113.  
  114. Thread.sleep( 2000);
  115. } catch (InterruptedException e) {
  116. e.printStackTrace();
  117. }
  118. }
  119. }
  120. }
  121.  
  122. private class QueueProductor implements Runnable{
  123. Jtt809MsgProducter producter;
  124. public QueueProductor(Jtt809MsgProducter producter){
  125. this.producter = producter;
  126. }
  127.  
  128. @Override
  129. public void run() {
  130. while(true){
  131. try {
  132. String eguid=Thread.currentThread().getName()+ "Hello eguid! This is queue.";
  133. producter.sendQueue( "eguid-queue",eguid.getBytes());
  134. Thread.sleep( 2000);
  135. } catch (InterruptedException e) {
  136. e.printStackTrace();
  137. }
  138. }
  139. }
  140. }

-------------------End--------------------ui

相關文章
相關標籤/搜索