消息隊列之activeMQ

1.activeMQ的主要功能

  1. 實現高可用、高伸縮、高性能、易用和安全的企業級面向消息服務的系統
  2. 異步消息的消費和處理
  3. 控制消息的消費順序
  4. 能夠和Spring/springBoot整合簡化編碼
  5. 配置集羣容錯的MQ集羣

2.activeMQ安裝

下載地址:http://activemq.apache.org/components/classic/download/html

這裏筆者是下載的linux版的:java

由於activeMQ底層是使用java編寫的,因此須要安裝jdk,這個請移步我以前的博客:linux

http://www.javashuo.com/article/p-qttsebbc-bm.htmlweb

安裝activeMq:redis

# 安裝apache
[root@localhost ~]# yum install ttpd
# 下載的apache-activemq並上傳到linux的home下,解壓
[root@localhost home]# tar -zxvf apache-activemq-5.16.0-bin.tar.gz 
# 進入到bin目錄下
[root@localhost home]# cd /apache-activemq-5.16.0/bin
# 啓動
[root@localhost bin]# ./activemq start
INFO: Loading '/home/apache-activemq-5.16.0//bin/env'
INFO: Using java '/usr/local/java/jdk1.8.0_20//bin/java'
INFO: Starting - inspect logfiles specified in logging.properties and log4j.properties to get details
INFO: pidfile created : '/home/apache-activemq-5.16.0//data/activemq.pid' (pid '7517')

# activemq的默認端口是61616,查看是否啓動的三種方式
# 第一種
[root@localhost bin]# ps -ef |grep activemq
# 第二種
[root@localhost bin]# netstat -ano|grep 61616
tcp6       0      0 :::61616                :::*                    LISTEN      off (0.00/0/0)
# 第三種
[root@localhost bin]# lsof -i:61616
COMMAND  PID USER   FD   TYPE DEVICE SIZE/OFF NODE NAME
java    7517 root  132u  IPv6  39926      0t0  TCP *:61616 (LISTEN)

# 帶日誌的啓動方式
[root@localhost bin]# ./activemq start > /home/apache-activemq-5.16.0/myrunmq.log
[root@localhost bin]# cd ..
# 能夠看到,啓動日誌都已經記錄到日誌裏了
[root@localhost apache-activemq-5.16.0]# cat myrunmq.log 
INFO: Loading '/home/apache-activemq-5.16.0//bin/env'
INFO: Using java '/usr/local/java/jdk1.8.0_20//bin/java'
INFO: Starting - inspect logfiles specified in logging.properties and log4j.properties to get details
INFO: pidfile created : '/home/apache-activemq-5.16.0//data/activemq.pid' (pid '7787')
# 關閉activemq
[root@localhost bin]# ./activemq stop

前臺訪問的端口是8161,在查看前臺時,要關閉linux和windows的防火牆:spring

# 關閉linux防火牆
[root@localhost apache-activemq-5.16.0]# systemctl stop firewalld

在訪問以前,須要修改conf目錄下的jetty.xml,將下面的host修改爲本身的ip,以及修改用戶名和密碼。shell

<bean id="jettyPort" class="org.apache.activemq.web.WebConsolePort" init-method="start">
    <!-- the default port number for the web console -->
    <property name="host" value="127.0.0.1"/>
    <property name="port" value="8161"/>
</bean>

# 用戶名和密碼可修改可不修改,默認爲admin/admin
<bean id="securityConstraint" class="org.eclipse.jetty.util.security.Constraint">
    <property name="name" value="BASIC" />
    <property name="roles" value="user,admin" />
    <!-- set authenticate=false to disable login -->
    <property name="authenticate" value="true" />
</bean>

修改完成以後重啓activemq數據庫

[root@localhost bin]# ./activemq restart

查看,地址爲192.168.189.150:8161apache

到這裏就說明activemq安裝成功了。windows

3.JMS

JMS(java message service)是一個用於提供消息服務的技術規範,他制定了在整個消息服務提供過程當中的全部數據結構和交互流程。當兩個程序使用jms進行通訊時,他們並非直接相連的,而是經過一個共同的消息收發服務鏈接起來的,達到解耦的效果。jms爲標準消息協議和消息服務提供了一組通用的接口,包括建立、發送、讀取消息等。

1 JMS的優點:

異步:客戶端不用發送請求,JMS自動將消息發送給客戶端

可靠:JMS保證消息只傳遞一次

2.JMS的四大組件:

  • JMS provider:實現了jms接口和規範的消息中間件

  • JMS producer:消息生產者,建立和發送JMS消息的客戶端應用

  • JMS consumer:消息消費者,接受和處理JMS消息的客戶端應用

  • JMS message:由消息頭、消息屬性、消息體組成

    消息頭(在send方法以前,經過setXXX()設置):

    JMSDestination:消息發送的目的地,主要是指Queue(點對點傳送模型)和Topic(發佈訂閱模型)

    JMSDeliverMode:消息是否持久

    JMSExpiration:設置消息過時時間

    JMSPriority:消息優先級,0-4被稱爲普通消息,5-9是加急消息,默認爲4

    JMSMessageID:惟一識別每一個消息的標識,由MQ產者或者本身設定

    消息屬性:除消息頭之外的值,如識別,去重,重點標註等方法,如textMessage.setStringProperty("c1","VIP");

    消息體:

    TextMessage:普通字符串

    MapMessage:map類型,其中key爲String類型,而值爲java的基本類型

    BytesMessage:二進制數組消息

    StreamMessage:java數據流消息,用個標準流來順序填充和讀取

    ObjectMessage:對象消息,包含一個可序列化的java對象

3.JMS的傳送模型:

  • 點對點消息傳送模型:應用程序由消息隊列、發送者、接收者組成,每一個消息發送給一個特殊的消息隊列,該隊列保存了全部發送給它的消息,處理消費掉的和已過時的消息

    點對點消息傳送的特性:

    1.每一個消息只有一個接收者

    2.消息發送者和接收者沒有時間依賴性

    3.當消息發送者發送消息時,不管接收者程序在不在運行,都能發送消息

    4.當接收者收到消息時,會發送確認收到通知

  • 發佈訂閱消息傳遞模型:發佈者發佈一個消息,該消息經過topic傳遞給全部訂閱的客戶端,發佈者和訂閱者彼此不知道對方,是匿名的且能夠動態發佈和消息訂閱。

    發佈訂閱消息傳遞的特性:

    1.一個消息能夠傳遞給多個訂閱者

    2.發佈者和訂閱者有時間依賴性

    3.爲了緩和嚴格的時間相關性,JMS容許訂閱者建立一個可持久化的訂閱

4.生產者代碼實現

1.引入jar包

<dependency>
    <groupId>org.apache.activemq</groupId>
    <artifactId>activemq-all</artifactId>
    <version>5.16.0</version>
</dependency>

2.生產者代碼

package activemq;

import org.apache.activemq.ActiveMQConnectionFactory;

import javax.jms.Connection;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;

/**
 * @className: Jmsproducer
 * @description: activemq生產者
 * @author: charon
 * @create: 2020-12-27 22:36
 */
public class JmsProducer {
    
    /** 聲明activemq的地址 */
    private static final String ACTIVEMQ_URL = "tcp://192.168.189.150:61616";

    /** 隊列名 */
    private static final String QUEUE_NAME = "queue01";

    /**
     * @param args 參數
     */
    public static void main(String[] args) throws JMSException {
        // 建立鏈接工廠
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
        // 得到鏈接
        Connection conn = activeMQConnectionFactory.createConnection();
        conn.start();
        // 建立會話
        Session session = conn.createSession(false,Session.AUTO_ACKNOWLEDGE);
        // 建立隊列
        Queue queue = session.createQueue(QUEUE_NAME);
        // 建立消息的生產者
        MessageProducer messageProducer = session.createProducer(queue);
        // 建立消息
        for (int i = 0; i < 5; i++) {
            // 消息體
            TextMessage textMessage = session.createTextMessage("textMessage:第【 "+i+" 】條消息");
            // 消息頭
            // textMessage.setJMSDeliveryMode(DeliveryMode.NON_PERSISTENT));
            // 消息屬性
            // textMessage.setStringProperty("c1","VIP");
            messageProducer.send(textMessage);
        }
        // 關閉資源
        messageProducer.close();
        session.close();
        conn.close();
    }
}

運行代碼在瀏覽器上查看,能夠看到queue01裏面有5條消息:

  • Number Of Pending Messages:等待消費的消息 這個是當前未出隊列的數量。能夠理解爲總接收數-總出隊列數
  • Number Of Consumers:消費者的數量
  • Messages Enqueued:進入隊列的消息 進入隊列的總數量,包括出隊列的。 這個數量只增不減
  • Messages Dequeued:出了隊列的消息 能夠理解爲是消費這消費掉的數量

5.消費者代碼實現

package activemq;

import org.apache.activemq.ActiveMQConnectionFactory;

import javax.jms.Connection;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;
import java.io.IOException;

/**
 * @className: JmsConsumer
 * @description: activeMq的消費者
 * @author: charon
 * @create: 2020-12-28 08:10
 */
public class JmsConsumer {
    /** 聲明activemq的地址 */
    private static final String ACTIVEMQ_URL = "tcp://192.168.189.150:61616";

    /** 隊列名 */
    private static final String QUEUE_NAME = "queue01";

    public static void main(String[] args) throws JMSException, IOException {
        // 建立鏈接工廠
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
        // 得到鏈接
        Connection conn = activeMQConnectionFactory.createConnection();
        conn.start();
        // 建立會話
        Session session = conn.createSession(false,Session.AUTO_ACKNOWLEDGE);
        // 建立隊列
        Queue queue = session.createQueue(QUEUE_NAME);
        // 建立消息的生產者
        MessageConsumer messageConsumer = session.createConsumer(queue);
        // 同步方式,生產環境並不適用,這種方式將阻塞知道得到並返回第一條消息
//        while (true){
//            TextMessage textMessage  =(TextMessage) messageConsumer.receive();
//            if(null!=textMessage){
//                System.out.println("---消費者收到消息:"+textMessage.getText());
//            }else{
//                break;
//            }
//        }

        // 異步方式,建立監聽,在又消息到達時,調用listener的onMessage方法,
        messageConsumer.setMessageListener(new MessageListener() {
            @Override
            public void onMessage(Message message) {
                if(message != null && message instanceof TextMessage){
                    TextMessage textMessage = (TextMessage) message;
                    System.out.println("--消費者接受到消息:"+textMessage);
                }
            }
        });
        
        System.in.read();
        // 關閉資源
        messageConsumer.close();
        session.close();
        conn.close();
    }
}

運行消費者的代碼,應該我上面生產者的代碼運行了兩次,因此消息有10條。

6.activeMQ集羣搭建

在這裏,筆者使用的基於Zookeeper+levelDb搭建的activeMq集羣,爲了不單點故障,使用一主兩從的架構。使用Zookeeper集羣註冊全部的ActiveMQ Broker但只有其中一個Broker能夠提供服務,它被視爲master,也就是說若是master由於故障而不能提供服務,Zookeeper會從SLave中選舉出一個Broker充當master。

我這邊的zookeeper集羣已經搭建好了,150和151是follower,152是leader。

# 每臺服務器上安裝activeMq,同時在集羣環境下,activemq的jetty.xml文件重的host要改爲0.0.0.0
# 修改activeMq.xml,註釋掉kahadb這個配置,actviemq默認的是kahadb,而且添加leveldb
[root@localhost conf]# vi activemq.xml
<!--        <persistenceAdapter>
            <kahaDB directory="${activemq.data}/kahadb"/>
        </persistenceAdapter> -->
<persistenceAdapter>
   <replicatedLevelDB
      directory="${activemq.data}/leveldb"
      replicas="3"
      <!--實例間的通訊地址-->
      bind="tcp://0.0.0.0:62222"
      <!--zookeeper的地址-->
      zkAddress="192.168.189.150:2181,192.168.189.151:2181,192.168.189.152:2181"
      <!--修改成每一個服務器的節點的ip-->
      hostname="192.168.189.152"
      sync="local_disk"
      zkPath="/activemq/leveldb-stores"/>
</persistenceAdapter>
# 啓動三個節點的activemq
[root@localhost bin]# ./activemq restart

# 查看 鏈接zookeeper客戶端
[root@localhost bin]# zkCli.sh
[zk: localhost(CONNECTED) 1] ls /activemq/leveldb-stores
[00000000022, 00000000020, 00000000021]
# 訪問
[zk: 192.168.189.150(CONNECTED) 3] get /activemq/leveldb-stores/00000000020
{"id":"localhost","container":null,"address":"tcp://192.168.189.150:62222","position":-1,"weight":1,"elected":"0000000020"}
[zk: 192.168.189.150(CONNECTED) 4] get /activemq/leveldb-stores/00000000021
{"id":"localhost","container":null,"address":null,"position":-1,"weight":1,"elected":null}
[zk: 192.168.189.150(CONNECTED) 5] get /activemq/leveldb-stores/00000000022
{"id":"localhost","container":null,"address":null,"position":-1,"weight":1,"elected":null}

從上面能夠看到,只有00000000020這個幾點的elected裏面有值,代表它被選舉爲master節點了。

在瀏覽器上依次訪問:192.168.189.150:8161 , 192.168.189.151:8161,192.168.189.152:8161

只有192.168.189.150:8161能夠訪問成功,由於只有master節點能夠對外提供訪問,因此只有一個節點能訪問到,那麼它就是master節點。

第二種查看的方式:

查看activemq的日誌,最後一行,能夠看到,MasterLevelDBStore即爲master節點,SlaveLevelDBStore即爲slave節點。

第三種查看的方式爲使用zookeeper的可視化工具。

因爲activeMq集羣是基於zookeeper集羣實現的,因此要注意一下三點:

  1. activeMQ的客戶端只能訪問master的Broker,其它處於Slave的Broker不能訪問,因此客戶端鏈接的Broker應該使用failover協議
  2. 當一個activeMQ節點掛掉或者一個Zookeeper節點掛掉,activeMQ服務正常運轉,可是若是僅剩一個activeMQ節點,因爲不能選舉Master,因此activeMQ不能正常運行;(一個就不成集羣了)
  3. 同理,若是Zookeeper僅剩一個節點是活動的,無論activeMQ是都存活或者說無論activeMQ個節點是否存活,activeMQ不能正常提供服務,必須依賴於Zookeeper集羣服務。

7.集羣代碼實現

集羣的代碼和上面單機的代碼大體是一直的,就只須要修改一個activemq的地址。

/** 聲明集羣中activemq的地址,使用failover協議,隨機 */
    private static final String ACTIVEMQ_URL = "failover:(tcp://192.168.189.150:61616,tcp://192.168.189.151:61616,tcp://192.168.189.152:61616)?Randomize=false";

8.activemq的高級特性

1.消息發送方式

默認狀況下,非持久化的消息是異步發送的,持久化的消息是同步發送的。可是在開啓事務的狀況下,消息都是異步發送的,效率會有2個數量級的提高,因此在發送持久化消息時,請開啓事務模式。

2.儲存機制

在一般狀況下,非持久化的消息時存儲在內存中的,持久化消息時存儲在文件中的,他們的最大限制在配置文件中的 節點配置的,可是在非持久化消息堆積到必定程度(內存告急)時,actviemq會將內存中的非持久化消息寫入臨時文件中,以騰出內存。可是它和持久化消息的區別在於,重啓後持久化消息會從文件中恢復,非持久化消息的臨時文件會刪除。

因此儘可能不要用非持久化文件,若是非要用的化,能夠將臨時文件的限制調大。同時,非持久化的消息要及時處理,不要堆積,或者啓動事務。啓動事務後,commit()會等待服務器的消息返回,也不會致使消息丟失了。

3.死信隊列

一條消息在被重發屢次後(默認是6次),將會被ActiveMQ移入死信隊列;說白了就是異常消息的歸併處理的集合,主要是處理失敗的消息。能夠在activeMQ.DLQ這個隊列中查看。

4.重複消息,冪等性調用

在網絡延遲的狀況洗啊,可能會形成MQ重試,可能會形成重複消費。若是消息是作數據庫的插入操做,給這個消息作一個惟一主鍵,那麼就算出現重複消費的狀況,由於惟一主鍵,會形成主鍵衝突,避免數據庫出現髒數據。若是是第三方消費,能夠在每條數據裏面加一個全局惟一的id,若是消息消費了,就將消息存在redis中,在消費消息以前將id到redis中查詢一下,判斷是否消費過,若是沒有消費過,就處理,若是消費過了,就不處理了。

參考網址:

https://blog.csdn.net/weixin_34122548/article/details/91929810?utm_medium=distribute.pc_relevant.none-task-blog-baidujs_title-2&spm=1001.2101.3001.4242

相關文章
相關標籤/搜索