消息中間件利用高效可靠的消息傳遞機制進行平臺無關的數據交流,並基於數據通訊來進行分佈式系統的集成。經過提供消息傳遞和消息排隊模型,它能夠在分佈式環境下擴展進程間的通訊。apache
ActiveMQ 是Apache出品,最流行的,能力強勁的開源消息總線。ActiveMQ 是一個徹底支持JMS1.1和J2EE 1.4規範的 JMS Provider實現。centos
對於消息的傳遞有兩種類型:瀏覽器
一種是點對點的,即一個生產者和一個消費者一一對應;服務器
另外一種是發佈/ 訂閱模式,即一個生產者產生消息並進行發送後,能夠由多個消費者進行接收。 session
1.官方網站下載:http://activemq.apache.org/tcp
2.centos上安裝ActiveMQ
分佈式
1)將apache-activemq-5.12.0-bin.tar.gz 上傳至服務器ide
2)解壓測試
tar zxvf apache-activemq-5.12.0-bin.tar.gz網站
3)爲apache-activemq-5.12.0目錄賦權
chmod 777 apache-activemq-5.12.0
4)進入apache-activemq-5.12.0\bin目錄,賦與執行權限
chmod 755 activemq
3.啓動
./activemq start
啓動成功出現以下信息
4.打開瀏覽器輸入服務器地址
http://192.168.56.102:8161
便可訪問管理界面
點擊Manage ActiveMQ broker登陸,登陸名和密碼均爲admin
5.點對點消息列表Queues
Number Of Pending Messages :等待消費的消息 這個是當前未出隊列的數量。
Number Of Consumers :消費者 這個是消費者端的消費者數量
Messages Enqueued :進入隊列的消息 進入隊列的總數量,包括出隊列的。
Messages Dequeued :出了隊列的消息 能夠理解爲是消費這消費掉的數量。
1.1消息生產者
1)建立Maven工程,引入依賴
<dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-client</artifactId> <version>5.13.4</version> </dependency>
2)建立類QueueProducer main方法代碼以下:
//1.建立鏈接工廠 ConnectionFactory connectionFactory=new ActiveMQConnectionFactory("tcp://192.168.25.135:61616"); //2.獲取鏈接 Connection connection = connectionFactory.createConnection(); //3.啓動鏈接 connection.start(); //4.獲取session (參數1:是否啓動事務,參數2:消息確認模式) Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); //5.建立隊列對象 Queue queue = session.createQueue("test-queue"); //6.建立消息生產者 MessageProducer producer = session.createProducer(queue); //7.建立消息 TextMessage textMessage = session.createTextMessage("歡迎來到ActiveMQ世界"); //8.發送消息 producer.send(textMessage); //9.關閉資源 producer.close(); session.close(); connection.close();
上述代碼中第4步建立session 的兩個參數:
第2個參數 消息的確認模式
運行後經過ActiveMQ管理界面查詢
1.2 消息消費者
建立類QueueConsumer ,main方法代碼以下:
//1.建立鏈接工廠 ConnectionFactory connectionFactory=new ActiveMQConnectionFactory("tcp://192.168.56.102:61616"); //2.獲取鏈接 Connection connection = connectionFactory.createConnection(); //3.啓動鏈接 connection.start(); //4.獲取session (參數1:是否啓動事務,參數2:消息確認模式) Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); //5.建立隊列對象 Queue queue = session.createQueue("test-queue"); //6.建立消息消費 MessageConsumer consumer = session.createConsumer(queue); //7.監聽消息 consumer.setMessageListener(new MessageListener() { public void onMessage(Message message) { TextMessage textMessage=(TextMessage)message; try { System.out.println("接收到消息:"+textMessage.getText()); } catch (JMSException e) { // TODO Auto-generated catch block e.printStackTrace(); } } }); //8.等待鍵盤輸入 System.in.read(); //9.關閉資源 consumer.close(); session.close(); connection.close();
執行後看到控制檯輸出:歡迎來到神奇的ActiveMq世界
1.3 運行測試
同時開啓2個以上的消費者,再次運行生產者,觀察每一個消費者控制檯的輸出,會發現只有一個消費者會接收到消息。
2.1消息生產者
建立類TopicProducer ,main方法代碼以下:
//1.建立鏈接工廠 ConnectionFactory connectionFactory=new ActiveMQConnectionFactory("tcp://192.168.56.102:61616"); //2.獲取鏈接 Connection connection = connectionFactory.createConnection(); //3.啓動鏈接 connection.start(); //4.獲取session (參數1:是否啓動事務,參數2:消息確認模式) Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); //5.建立主題對象 Topic topic = session.createTopic("test-topic"); //6.建立消息生產者 MessageProducer producer = session.createProducer(topic); //7.建立消息 TextMessage textMessage = session.createTextMessage("歡迎來到神奇的ActiveMq世界"); //8.發送消息 producer.send(textMessage); //9.關閉資源 producer.close(); session.close(); connection.close();
2.2消息消費者
建立類TopicConsumer ,main方法代碼以下:
//1.建立鏈接工廠 ConnectionFactory connectionFactory=new ActiveMQConnectionFactory("tcp://192.168.56.102:61616"); //2.獲取鏈接 Connection connection = connectionFactory.createConnection(); //3.啓動鏈接 connection.start(); //4.獲取session (參數1:是否啓動事務,參數2:消息確認模式) Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); //5.建立主題對象 //Queue queue = session.createQueue("test-queue"); Topic topic = session.createTopic("test-topic"); //6.建立消息消費 MessageConsumer consumer = session.createConsumer(topic); //7.監聽消息 consumer.setMessageListener(new MessageListener() { public void onMessage(Message message) { TextMessage textMessage=(TextMessage)message; try { System.out.println("接收到消息:"+textMessage.getText()); } catch (JMSException e) { // TODO Auto-generated catch block e.printStackTrace(); } } }); //8.等待鍵盤輸入 System.in.read(); //9.關閉資源 consumer.close(); session.close(); connection.close();
2.3 運行測試
同時開啓2個以上的消費者,再次運行生產者,觀察每一個消費者控制檯的輸出,會發現每一個消費者會接收到消息。