最新在接觸ActiveMQ,裏面有個持久訂閱者模塊,功能是怎麼樣也演示不出來效果。配置參數比較簡單(配置沒幾個參數),消費者第一次運行時,須要指定ClientID(此時Broker已經記錄離線訂閱者信息),在啓動提供者,此時消息隊列存在一條記錄,而後在啓動消費者,可是怎麼樣也獲取不到消息,阿西吧~~~什麼鬼,百度上一大堆,都是這樣步驟,消費者端,指定如下ClientID就行了,可,想要的效果死活不出來。。。。。。session
廢話很少說,先上代碼,後面再分析異步
public void testTopicConsumer2() throws Exception { //第一步:建立ConnectionFactory String brokerURL = "tcp://192.168.31.215:61616"; ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(brokerURL); //第二步:經過工廠,建立Connection Connection connection = connectionFactory.createConnection(); //設置持久訂閱的客戶端ID String clientId = "10086"; connection.setClientID(clientId); //第三步:打開連接 connection.start(); //第四步:經過Connection建立session Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); //第五步:經過session建立Consumer Topic topic = session.createTopic("cyb-topic"); //建立持久訂閱的消費者客戶端 //第一個參數是指定Topic //第二個參數是自定義的ClientId MessageConsumer consumer = session.createDurableSubscriber(topic, clientId); consumer.setMessageListener(new MessageListener() { @Override public void onMessage(Message message) { //第七步:處理信息 if (message instanceof TextMessage){ TextMessage tm=(TextMessage)message; try{ System.out.println(tm.getText()); } catch (Exception e){ e.printStackTrace(); } } } }); //session.commit(); //第八步:關閉資源 consumer.close(); session.close(); connection.close(); }
只須要制定ClientID和建立持久客戶端便可tcp
public void testTopicProducer() throws Exception { Connection connection = null; MessageProducer producer = null; Session session = null; try { //第一步:建立ConnectionFactory,用於鏈接broker String brokerURL = "tcp://192.168.31.215:61616"; ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(brokerURL); //設置 //((ActiveMQConnectionFactory) connectionFactory).setProducerWindowSize(1000); //第二步:經過工廠,建立Connection connection = connectionFactory.createConnection(); //第三步:鏈接啓動 connection.start(); //第四步:經過鏈接獲取session會話 //第一個參數:是否啓用ActiveMQ事務,若是爲true,第二個參數無用 //第二個參數:應答模式,AUTO_ACKNOWLEDGE爲自動應答 session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); //第五步:經過session建立destination,兩種目的地:Queue、Topic //參數:消息隊列的名稱,在後臺管理系統中能夠看到 Topic topic = session.createTopic("cyb-topic"); //第六步:經過session建立MessageProducer producer = session.createProducer(topic); //producer.setDeliveryMode(DeliveryMode.PERSISTENT); //第七步:建立Message //方式一 //TextMessage message=new ActiveMQTextMessage(); //message.setText("queue test"); //方式二 TextMessage message1 = session.createTextMessage("topic->博客園地址:https://www.cnblogs.com/chenyanbin/"); //第八步:經過producer發送消息 producer.send(message1); //session.commit(); } catch (Exception e) { e.printStackTrace(); } finally { //第九步:關閉資源 producer.close(); session.close(); connection.close(); } }
首先明確一點,上面的代碼是沒有一點問題的。爲了節省時間,驗證步驟和上面的差很少,不啓動前兩步了,直接啓動第三步,也就是:ide
第一次運行消費者時,此時Broker已經記錄訂閱者ClientID,而後程序一閃而過,進入到藍色框中的,離線訂閱者中,而後在執行提供者,此時,Topic中,已經入隊一次,再次運行消費者時,運行是異步獲取的,運行一閃而過(鄙人猜想,多是ActiveMQ機制問題,內部邏輯大概是,先遍歷非持久訂閱者,而後在查看持久訂閱者,問題出在,程序執行太快,還沒到查看持久訂閱者時,程序就執行完了,因此第二次執行消費者時,加了個死循環,不停監聽隊列消息,具體ActiveMQ底層代碼沒看過,有興趣的能夠研究下,底層代碼找到相應位置後,記得告訴我哦~~~)spa
這個小問題,搗鼓一下午,百度上也說,就這2步驟配置便可,運行結果與初衷相違背,大半夜的都打算洗洗睡了,頭腦風暴想出來這個方法,在下面寫個死循環,不停監聽隊列消息,這纔有了這篇博客,好啦...好啦,時間不早了,立刻都快凌晨1點鐘了,明個還得上班,洗洗睡了zZZZZZZZZZcode