linux:安裝並使用activeMQ

1.下載安裝包:curl -O https://archive.apache.org/dist/activemq/5.14.0/apache-activemq-5.14.0-bin.tar.gzjava

2.解壓:tar -zxvf apache-activemq-5.14.0-bin.tar.gzapache

3.重命名:mv apache-activemq-5.14.0-bin.tar.gz activemqcentos

4.進入bin目錄:cd .../activemq/bin ,運行權限命令:chmod 755 activemq ,啓動activemq: ./activemq服務器

5.查詢端口:61616 和8161  命令:netstat -lntp (centos 7若找不到命令,運行命令安裝:sudo yum install net-tools ,sudo yum provides ifconfig ,都裝一下)session

 

java模擬場景,代碼以下:curl

導入包:activemq-all-5.8.0.jartcp

Bean:ide

public class MqBean implements Serializable{
    private Integer age;
    private String name;
    public Integer getAge() {
        return age;
    }
    public void setAge(Integer age) {
        this.age = age;
    }
    public String getName() {
        return name;
    }
    public void setName(String name) {
        this.name = name;
    }
}

隊列消息的發送:

public static void main(String[] args) {
    ConnectionFactory connectionFactory;
    Connection connection;
    Session session;
    Destination destination;
    MessageProducer producer;
    connectionFactory = new ActiveMQConnectionFactory("admin", "admin", "tcp://192.168.3.159:61616");
    try {
        connection = connectionFactory.createConnection();
        connection.start();
        //第一個參數是是不是事務型消息,設置爲true,第二個參數無效
        //第二個參數是
        //Session.AUTO_ACKNOWLEDGE爲自動確認,客戶端發送和接收消息不須要作額外的工做。異常也會確認消息,應該是在執行以前確認的
        //Session.CLIENT_ACKNOWLEDGE爲客戶端確認。客戶端接收到消息後,必須調用javax.jms.Message的acknowledge方法。jms服務器纔會刪除消息。能夠在失敗的
        //時候不確認消息,不確認的話不會移出隊列,一直存在,下次啓動繼續接受。接收消息的鏈接不斷開,其餘的消費者也不會接受(正常狀況下隊列模式不存在其餘消費者)
        //DUPS_OK_ACKNOWLEDGE容許副本的確認模式。一旦接收方應用程序的方法調用從處理消息處返回,會話對象就會確認消息的接收;並且容許重複確認。在須要考慮資源使用時,這種模式很是有效。
        //待測試
        session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
        destination = session.createQueue("test-queue");
        producer = session.createProducer(destination);
        producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
        //優先級不能影響先進先出。。。那這個用處到底是什麼呢呢呢呢
        MqBean bean = new MqBean();
        bean.setAge(13);
        for(int i=0;i<100;i++){
            bean.setName("小黃"+i);
            producer.send(session.createObjectMessage(bean));
        }
        producer.close();
        System.out.println("呵呵");
    } catch (JMSException e) {
        e.printStackTrace();
    }
}

注:在上面的代碼中,確認模式有三種,裏面的DUPS_OK_ACKNOWLEDGE和AUTO_ACKNOWLEDGE一直沒明白有什麼區別。由於沒法測試。不過大概也明白了一些。其實主要是MQ處理消息的流程決定的:測試

  1. 消息從生成方客戶端傳送到消息服務器。
  2. 消息服務器讀取消息。
  3. 消息被放置到持久性存儲器當中(出於可靠性的考慮)。
  4. 消息服務器確認收到消息(出於可靠性的考慮)。
  5. 消息服務器肯定消息的路由。
  6. 消息服務器寫出消息。
  7. 消息從消息服務器傳送到使用方客戶端。
  8. 使用方客戶端確認收到消息(出於可靠性的考慮)。
  9. 消息服務器處理客戶端確認(出於可靠性的考慮)。
  10. 消息服務器肯定已經處理客戶端確認。

 

隊列消息的接收:

public static void main(String[] args) {
    ConnectionFactory connectionFactory;
    // Connection :JMS 客戶端到JMS Provider 的鏈接  
    Connection connection = null;
    // Session: 一個發送或接收消息的線程  
    Session session;
    // Destination :消息的目的地;消息發送給誰.  
    Destination destination;
    // 消費者,消息接收者  
    MessageConsumer consumer;
    connectionFactory = new ActiveMQConnectionFactory("admin", "admin", "tcp://192.168.3.159:61616");
    try {
        // 構造從工廠獲得鏈接對象  
        connection = connectionFactory.createConnection();
        // 啓動  
        connection.start();
        // 獲取操做鏈接  
        //這個最好仍是有事務
        session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);
        // 獲取session注意參數值xingbo.xu-queue是一個服務器的queue,須在在ActiveMq的console配置  
        destination = session.createQueue("test-queue");
        consumer = session.createConsumer(destination);
        consumer.setMessageListener(new MessageListener() {
            @Override
            public void onMessage(Message message) {
                try {
                    MqBean bean = (MqBean) ((ObjectMessage)message).getObject();
                    System.out.println(bean);
                    if (null != message) {
                        System.out.println("收到消息" + bean.getName());
                    }
                } catch (Exception e) {
                    // TODO: handle exception
                }
            }
        });
    } catch (Exception e) {
        e.printStackTrace();
    }
}

 

訂閱消息的發送:

public static void main(String[] args) {
    ConnectionFactory connectionFactory;
    Connection connection;
    Session session;
    Destination destination;
    MessageProducer producer;
    connectionFactory = new ActiveMQConnectionFactory("admin", "admin", "tcp://192.168.3.159:61616");
    try {
        connection = connectionFactory.createConnection();
        connection.start();
        
        session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
        destination = session.createTopic("test-topic");
        producer = session.createProducer(destination);
        producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
        //優先級不能影響先進先出。。。那這個用處到底是什麼呢呢呢呢
        MqBean bean = new MqBean();
        bean.setAge(13);
        for(int i=0;i<100;i++){
            Thread.sleep(1000);
            bean.setName("小黃"+i);
            producer.send(session.createObjectMessage(bean));
        }
        producer.close();
        System.out.println("呵呵");
    } catch (Exception e) {
        e.printStackTrace();
    }
}

 

訂閱消息的接收:

public static void main(String[] args) {
    ConnectionFactory connectionFactory;
    // Connection :JMS 客戶端到JMS Provider 的鏈接  
    Connection connection = null;
    // Session: 一個發送或接收消息的線程  
    Session session;
    // Destination :消息的目的地;消息發送給誰.  
    Destination destination;
    // 消費者,消息接收者  
    MessageConsumer consumer;
    connectionFactory = new ActiveMQConnectionFactory("admin", "admin", "tcp://192.168.3.159:61616");
    try {
        // 構造從工廠獲得鏈接對象  
        connection = connectionFactory.createConnection();
        // 啓動  
        connection.start();
        // 獲取操做鏈接  
        //這個最好仍是有事務
        session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);
        // 獲取session注意參數值xingbo.xu-queue是一個服務器的queue,須在在ActiveMq的console配置  
        destination = session.createTopic("test-topic");
        consumer = session.createConsumer(destination);
        consumer.setMessageListener(new MessageListener() {
            @Override
            public void onMessage(Message message) {
                try {
                    MqBean bean = (MqBean) ((ObjectMessage)message).getObject();
                    System.out.println(bean);
                    if (null != message) {
                        System.out.println("收到消息" + bean.getName());
                    }
                } catch (Exception e) {
                    // TODO: handle exception
                }
            }
        });
    } catch (Exception e) {
        e.printStackTrace();
    }
}

 

以上的消息發送後,若是沒有接收到,能夠登陸本身的MQ管理頁面: http://192.168.3.159:8161/admin/ ,默認賬號密碼都是admin,查看隊列中的消息this

Number Of Pending Messages 等待消費的消息 這個是當前未出隊列的數量。能夠理解爲總接收數-總出隊列數

Messages Enqueued 進入隊列的消息 進入隊列的總數量,包括出隊列的。 這個數量只增不減

Messages Dequeued 出了隊列的消息 能夠理解爲是消費這消費掉的數量

相關文章
相關標籤/搜索