1.下載安裝包:curl -O https://archive.apache.org/dist/activemq/5.14.0/apache-activemq-5.14.0-bin.tar.gzjava
2.解壓:tar -zxvf apache-activemq-5.14.0-bin.tar.gzapache
3.重命名:mv apache-activemq-5.14.0-bin.tar.gz activemqcentos
4.進入bin目錄:cd .../activemq/bin ,運行權限命令:chmod 755 activemq ,啓動activemq: ./activemq服務器
5.查詢端口:61616 和8161 命令:netstat -lntp (centos 7若找不到命令,運行命令安裝:sudo yum install net-tools ,sudo yum provides ifconfig ,都裝一下)session
java模擬場景,代碼以下:curl
導入包:activemq-all-5.8.0.jartcp
Bean:ide
public class MqBean implements Serializable{ private Integer age; private String name; public Integer getAge() { return age; } public void setAge(Integer age) { this.age = age; } public String getName() { return name; } public void setName(String name) { this.name = name; } }
public static void main(String[] args) { ConnectionFactory connectionFactory; Connection connection; Session session; Destination destination; MessageProducer producer; connectionFactory = new ActiveMQConnectionFactory("admin", "admin", "tcp://192.168.3.159:61616"); try { connection = connectionFactory.createConnection(); connection.start(); //第一個參數是是不是事務型消息,設置爲true,第二個參數無效 //第二個參數是 //Session.AUTO_ACKNOWLEDGE爲自動確認,客戶端發送和接收消息不須要作額外的工做。異常也會確認消息,應該是在執行以前確認的 //Session.CLIENT_ACKNOWLEDGE爲客戶端確認。客戶端接收到消息後,必須調用javax.jms.Message的acknowledge方法。jms服務器纔會刪除消息。能夠在失敗的 //時候不確認消息,不確認的話不會移出隊列,一直存在,下次啓動繼續接受。接收消息的鏈接不斷開,其餘的消費者也不會接受(正常狀況下隊列模式不存在其餘消費者) //DUPS_OK_ACKNOWLEDGE容許副本的確認模式。一旦接收方應用程序的方法調用從處理消息處返回,會話對象就會確認消息的接收;並且容許重複確認。在須要考慮資源使用時,這種模式很是有效。 //待測試 session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE); destination = session.createQueue("test-queue"); producer = session.createProducer(destination); producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); //優先級不能影響先進先出。。。那這個用處到底是什麼呢呢呢呢 MqBean bean = new MqBean(); bean.setAge(13); for(int i=0;i<100;i++){ bean.setName("小黃"+i); producer.send(session.createObjectMessage(bean)); } producer.close(); System.out.println("呵呵"); } catch (JMSException e) { e.printStackTrace(); } }
注:在上面的代碼中,確認模式有三種,裏面的DUPS_OK_ACKNOWLEDGE和AUTO_ACKNOWLEDGE一直沒明白有什麼區別。由於沒法測試。不過大概也明白了一些。其實主要是MQ處理消息的流程決定的:測試
public static void main(String[] args) { ConnectionFactory connectionFactory; // Connection :JMS 客戶端到JMS Provider 的鏈接 Connection connection = null; // Session: 一個發送或接收消息的線程 Session session; // Destination :消息的目的地;消息發送給誰. Destination destination; // 消費者,消息接收者 MessageConsumer consumer; connectionFactory = new ActiveMQConnectionFactory("admin", "admin", "tcp://192.168.3.159:61616"); try { // 構造從工廠獲得鏈接對象 connection = connectionFactory.createConnection(); // 啓動 connection.start(); // 獲取操做鏈接 //這個最好仍是有事務 session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE); // 獲取session注意參數值xingbo.xu-queue是一個服務器的queue,須在在ActiveMq的console配置 destination = session.createQueue("test-queue"); consumer = session.createConsumer(destination); consumer.setMessageListener(new MessageListener() { @Override public void onMessage(Message message) { try { MqBean bean = (MqBean) ((ObjectMessage)message).getObject(); System.out.println(bean); if (null != message) { System.out.println("收到消息" + bean.getName()); } } catch (Exception e) { // TODO: handle exception } } }); } catch (Exception e) { e.printStackTrace(); } }
public static void main(String[] args) { ConnectionFactory connectionFactory; Connection connection; Session session; Destination destination; MessageProducer producer; connectionFactory = new ActiveMQConnectionFactory("admin", "admin", "tcp://192.168.3.159:61616"); try { connection = connectionFactory.createConnection(); connection.start(); session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE); destination = session.createTopic("test-topic"); producer = session.createProducer(destination); producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); //優先級不能影響先進先出。。。那這個用處到底是什麼呢呢呢呢 MqBean bean = new MqBean(); bean.setAge(13); for(int i=0;i<100;i++){ Thread.sleep(1000); bean.setName("小黃"+i); producer.send(session.createObjectMessage(bean)); } producer.close(); System.out.println("呵呵"); } catch (Exception e) { e.printStackTrace(); } }
public static void main(String[] args) { ConnectionFactory connectionFactory; // Connection :JMS 客戶端到JMS Provider 的鏈接 Connection connection = null; // Session: 一個發送或接收消息的線程 Session session; // Destination :消息的目的地;消息發送給誰. Destination destination; // 消費者,消息接收者 MessageConsumer consumer; connectionFactory = new ActiveMQConnectionFactory("admin", "admin", "tcp://192.168.3.159:61616"); try { // 構造從工廠獲得鏈接對象 connection = connectionFactory.createConnection(); // 啓動 connection.start(); // 獲取操做鏈接 //這個最好仍是有事務 session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE); // 獲取session注意參數值xingbo.xu-queue是一個服務器的queue,須在在ActiveMq的console配置 destination = session.createTopic("test-topic"); consumer = session.createConsumer(destination); consumer.setMessageListener(new MessageListener() { @Override public void onMessage(Message message) { try { MqBean bean = (MqBean) ((ObjectMessage)message).getObject(); System.out.println(bean); if (null != message) { System.out.println("收到消息" + bean.getName()); } } catch (Exception e) { // TODO: handle exception } } }); } catch (Exception e) { e.printStackTrace(); } }
以上的消息發送後,若是沒有接收到,能夠登陸本身的MQ管理頁面: http://192.168.3.159:8161/admin/ ,默認賬號密碼都是admin,查看隊列中的消息this
Number Of Pending Messages 等待消費的消息 這個是當前未出隊列的數量。能夠理解爲總接收數-總出隊列數
Messages Enqueued 進入隊列的消息 進入隊列的總數量,包括出隊列的。 這個數量只增不減
Messages Dequeued 出了隊列的消息 能夠理解爲是消費這消費掉的數量