今天繼續給你們分享的是ActiveMQ,若有不足,敬請指教。java
上次咱們說到,咱們發現消費者每次只能消費一條消息。當隊列中有多條消息的時候,咱們須要屢次運行消費者,才能消費完這些消息。很麻煩!!!!如何解決這個問題呢?web
這就須要使用ActiveMQ監聽器來監聽隊列,持續消費消息。redis
package com.xkt.listener; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageListener; import javax.jms.TextMessage; /** * @author lzx * */ public class MyListener implements MessageListener { @Override public void onMessage(Message message) { if (null != message) { if (message instanceof TextMessage) { try { TextMessage tMsg = (TextMessage) message; String content = tMsg.getText(); System.out.println("監聽到的消息是 " + content); } catch (JMSException e) { e.printStackTrace(); } } } } }
package com.xkt.consumer; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.MessageConsumer; import javax.jms.Session; import org.apache.activemq.ActiveMQConnectionFactory; import com.xkt.listener.MyListener; /** * @author lzx * */ public class Myconsumer { private ConnectionFactory factory; private Connection connection; private Session session; private Destination destination; private MessageConsumer consumer; public void receiveFromMq() { try { factory = new ActiveMQConnectionFactory("admin", "admin", "tcp://192.168.109.3:61616"); connection = factory.createConnection(); connection.start(); session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); // 建立目的地, 目的地命名即隊列命名, 消息消費者須要經過此命名訪問對應的隊列 destination = session.createQueue("queue"); // 5.建立消息消費者, 建立的消息消費者與某目的地對應, 即方法參數目的地 consumer = session.createConsumer(destination); // 7.加載監聽器 consumer.setMessageListener(new MyListener()); // 監聽器須要持續加載,這裏咱們使用輸入流阻塞當前線程結束。監聽指定隊列,只要有消息進來,就消費這條消息 System.in.read(); // 在java項目中,能夠經過IO阻塞程序,持續加載監聽器 // 在web項目中,能夠經過配置文件,直接加載監聽器。 } catch (Exception e) { e.printStackTrace(); System.out.println("讀取失敗"); } finally { if (null != consumer) { try { consumer.close(); } catch (JMSException e) { // TODO Auto-generated catch block e.printStackTrace(); } } if (null != session) { try { session.close(); } catch (JMSException e) { // TODO Auto-generated catch block e.printStackTrace(); } } if (null != connection) { try { connection.close(); } catch (JMSException e) { // TODO Auto-generated catch block e.printStackTrace(); } } } } }
圖示 |
---|
圖示 |
---|
圖示 |
---|
在以上示例中,只能向一個消費者發送消息。可是有一些場景,需求有多個消費者都能接收到消息,好比:美團APP天天的消息推送。該如何實現呢?apache
package com.xkt.subscriber; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.Session; import javax.jms.TextMessage; import javax.jms.Topic; import javax.jms.TopicConnection; import javax.jms.TopicConnectionFactory; import javax.jms.TopicSession; import javax.jms.TopicSubscriber; import org.apache.activemq.ActiveMQConnectionFactory; /** * @author lzx * */ public class MySubscriber implements Runnable { /** * 多線程的線程安全問題 解決方案: * * (1)加鎖 --極不推薦 (2)不使用全局變量 ---> SpringMVC是線程安全的嗎? 答:默認不是 解決辦法:(1)使用原型模式--不推薦 * (2)不使用全局變量 (3)ThreadLocal (3)其它框架來代替,好比redis */ private TopicConnectionFactory factory; private TopicConnection connection; private TopicSession session; private Topic topic; private TopicSubscriber subscriber; private Message message; @Override public void run() { try { // 一、建立鏈接工廠 factory = new ActiveMQConnectionFactory("admin", "admin", "tcp://192.168.109.3:61616"); // 二、建立鏈接 connection = factory.createTopicConnection(); connection.start(); // 三、建立會話 session = connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE); // 四、建立topic主題 topic = session.createTopic("topic-gzsxt"); // 五、建立訂閱者 subscriber = session.createSubscriber(topic); // 六、訂閱 while (true) { message = subscriber.receive(); if (null != message) { if (message instanceof TextMessage) { TextMessage tMsg = (TextMessage) message; String content = tMsg.getText(); System.out.println("訂閱者: " + Thread.currentThread().getName() + " 接收的消息是:" + content); } } } } catch (JMSException e) { e.printStackTrace(); } } }
package com.xkt.test; import com.xkt.subscriber.MySubscriber; /** * @author lzx * */ public class TestMQ { public static void main(String[] args) { MySubscriber sub = new MySubscriber(); Thread t1 = new Thread(sub); Thread t2 = new Thread(sub); t1.start(); t2.start(); } }
package com.xkt.publish; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.Session; import javax.jms.Topic; import javax.jms.TopicConnection; import javax.jms.TopicConnectionFactory; import javax.jms.TopicPublisher; import javax.jms.TopicSession; import org.apache.activemq.ActiveMQConnectionFactory; /** * @author lzx * */ public class MyPublisher { private TopicConnectionFactory factory; private TopicConnection connection; private TopicSession session; private Topic topic; private TopicPublisher publisher; private Message message; public void publish(String msg) { try { // 一、建立鏈接工廠 factory = new ActiveMQConnectionFactory("admin", "admin", "tcp://192.168.109.3:61616"); // 二、建立鏈接 connection = factory.createTopicConnection(); connection.start(); // 三、建立會話 session = connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE); // 四、建立topic主題 topic = session.createTopic("topic-gzsxt"); // 五、建立發佈者 publisher = session.createPublisher(topic); // 六、建立消息對象 message = session.createTextMessage(msg); // 七、發佈消息 publisher.publish(message); } catch (Exception e) { e.printStackTrace(); } finally { if (null != publisher) { try { publisher.close(); } catch (JMSException e) { // TODO Auto-generated catch block e.printStackTrace(); } } if (null != session) { try { session.close(); } catch (JMSException e) { // TODO Auto-generated catch block e.printStackTrace(); } } if (null != connection) { try { connection.stop(); connection.close(); } catch (JMSException e) { // TODO Auto-generated catch block e.printStackTrace(); } } if (null != session) { try { session.close(); } catch (JMSException e) { // TODO Auto-generated catch block e.printStackTrace(); } } } } }
package com.xkt.test; import org.junit.Test; import com.xkt.publish.MyPublisher; import com.xkt.subscriber.MySubscriber; /** * @author lzx * */ public class TestMQ { public static void main(String[] args) { MySubscriber sub = new MySubscriber(); Thread t1 = new Thread(sub); Thread t2 = new Thread(sub); t1.start(); t2.start(); } @Test public void publish() { MyPublisher publisher = new MyPublisher(); publisher.publish("hello,歡迎收聽FM 89.9頻道-交通頻道"); } }
2.2.6 查看測試結果安全
版權說明:歡迎以任何方式進行轉載,但請在轉載後註明出處!服務器