下載地址:http://activemq.apache.org/components/classic/download/html
這裏筆者是下載的linux版的:java
由於activeMQ底層是使用java編寫的,因此須要安裝jdk,這個請移步我以前的博客:linux
http://www.javashuo.com/article/p-qttsebbc-bm.htmlweb
安裝activeMq:redis
# 安裝apache [root@localhost ~]# yum install ttpd # 下載的apache-activemq並上傳到linux的home下,解壓 [root@localhost home]# tar -zxvf apache-activemq-5.16.0-bin.tar.gz # 進入到bin目錄下 [root@localhost home]# cd /apache-activemq-5.16.0/bin # 啓動 [root@localhost bin]# ./activemq start INFO: Loading '/home/apache-activemq-5.16.0//bin/env' INFO: Using java '/usr/local/java/jdk1.8.0_20//bin/java' INFO: Starting - inspect logfiles specified in logging.properties and log4j.properties to get details INFO: pidfile created : '/home/apache-activemq-5.16.0//data/activemq.pid' (pid '7517') # activemq的默認端口是61616,查看是否啓動的三種方式 # 第一種 [root@localhost bin]# ps -ef |grep activemq # 第二種 [root@localhost bin]# netstat -ano|grep 61616 tcp6 0 0 :::61616 :::* LISTEN off (0.00/0/0) # 第三種 [root@localhost bin]# lsof -i:61616 COMMAND PID USER FD TYPE DEVICE SIZE/OFF NODE NAME java 7517 root 132u IPv6 39926 0t0 TCP *:61616 (LISTEN) # 帶日誌的啓動方式 [root@localhost bin]# ./activemq start > /home/apache-activemq-5.16.0/myrunmq.log [root@localhost bin]# cd .. # 能夠看到,啓動日誌都已經記錄到日誌裏了 [root@localhost apache-activemq-5.16.0]# cat myrunmq.log INFO: Loading '/home/apache-activemq-5.16.0//bin/env' INFO: Using java '/usr/local/java/jdk1.8.0_20//bin/java' INFO: Starting - inspect logfiles specified in logging.properties and log4j.properties to get details INFO: pidfile created : '/home/apache-activemq-5.16.0//data/activemq.pid' (pid '7787') # 關閉activemq [root@localhost bin]# ./activemq stop
前臺訪問的端口是8161,在查看前臺時,要關閉linux和windows的防火牆:spring
# 關閉linux防火牆 [root@localhost apache-activemq-5.16.0]# systemctl stop firewalld
在訪問以前,須要修改conf目錄下的jetty.xml,將下面的host修改爲本身的ip,以及修改用戶名和密碼。shell
<bean id="jettyPort" class="org.apache.activemq.web.WebConsolePort" init-method="start"> <!-- the default port number for the web console --> <property name="host" value="127.0.0.1"/> <property name="port" value="8161"/> </bean> # 用戶名和密碼可修改可不修改,默認爲admin/admin <bean id="securityConstraint" class="org.eclipse.jetty.util.security.Constraint"> <property name="name" value="BASIC" /> <property name="roles" value="user,admin" /> <!-- set authenticate=false to disable login --> <property name="authenticate" value="true" /> </bean>
修改完成以後重啓activemq數據庫
[root@localhost bin]# ./activemq restart
查看,地址爲192.168.189.150:8161apache
到這裏就說明activemq安裝成功了。windows
JMS(java message service)是一個用於提供消息服務的技術規範,他制定了在整個消息服務提供過程當中的全部數據結構和交互流程。當兩個程序使用jms進行通訊時,他們並非直接相連的,而是經過一個共同的消息收發服務鏈接起來的,達到解耦的效果。jms爲標準消息協議和消息服務提供了一組通用的接口,包括建立、發送、讀取消息等。
異步:客戶端不用發送請求,JMS自動將消息發送給客戶端
可靠:JMS保證消息只傳遞一次
JMS provider:實現了jms接口和規範的消息中間件
JMS producer:消息生產者,建立和發送JMS消息的客戶端應用
JMS consumer:消息消費者,接受和處理JMS消息的客戶端應用
JMS message:由消息頭、消息屬性、消息體組成
消息頭(在send方法以前,經過setXXX()設置):
JMSDestination:消息發送的目的地,主要是指Queue(點對點傳送模型)和Topic(發佈訂閱模型)
JMSDeliverMode:消息是否持久
JMSExpiration:設置消息過時時間
JMSPriority:消息優先級,0-4被稱爲普通消息,5-9是加急消息,默認爲4
JMSMessageID:惟一識別每一個消息的標識,由MQ產者或者本身設定
消息屬性:除消息頭之外的值,如識別,去重,重點標註等方法,如textMessage.setStringProperty("c1","VIP");
消息體:
TextMessage:普通字符串
MapMessage:map類型,其中key爲String類型,而值爲java的基本類型
BytesMessage:二進制數組消息
StreamMessage:java數據流消息,用個標準流來順序填充和讀取
ObjectMessage:對象消息,包含一個可序列化的java對象
點對點消息傳送模型:應用程序由消息隊列、發送者、接收者組成,每一個消息發送給一個特殊的消息隊列,該隊列保存了全部發送給它的消息,處理消費掉的和已過時的消息
點對點消息傳送的特性:
1.每一個消息只有一個接收者
2.消息發送者和接收者沒有時間依賴性
3.當消息發送者發送消息時,不管接收者程序在不在運行,都能發送消息
4.當接收者收到消息時,會發送確認收到通知
發佈訂閱消息傳遞模型:發佈者發佈一個消息,該消息經過topic傳遞給全部訂閱的客戶端,發佈者和訂閱者彼此不知道對方,是匿名的且能夠動態發佈和消息訂閱。
發佈訂閱消息傳遞的特性:
1.一個消息能夠傳遞給多個訂閱者
2.發佈者和訂閱者有時間依賴性
3.爲了緩和嚴格的時間相關性,JMS容許訂閱者建立一個可持久化的訂閱
1.引入jar包
<dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-all</artifactId> <version>5.16.0</version> </dependency>
2.生產者代碼
package activemq; import org.apache.activemq.ActiveMQConnectionFactory; import javax.jms.Connection; import javax.jms.JMSException; import javax.jms.MessageProducer; import javax.jms.Queue; import javax.jms.Session; import javax.jms.TextMessage; /** * @className: Jmsproducer * @description: activemq生產者 * @author: charon * @create: 2020-12-27 22:36 */ public class JmsProducer { /** 聲明activemq的地址 */ private static final String ACTIVEMQ_URL = "tcp://192.168.189.150:61616"; /** 隊列名 */ private static final String QUEUE_NAME = "queue01"; /** * @param args 參數 */ public static void main(String[] args) throws JMSException { // 建立鏈接工廠 ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL); // 得到鏈接 Connection conn = activeMQConnectionFactory.createConnection(); conn.start(); // 建立會話 Session session = conn.createSession(false,Session.AUTO_ACKNOWLEDGE); // 建立隊列 Queue queue = session.createQueue(QUEUE_NAME); // 建立消息的生產者 MessageProducer messageProducer = session.createProducer(queue); // 建立消息 for (int i = 0; i < 5; i++) { // 消息體 TextMessage textMessage = session.createTextMessage("textMessage:第【 "+i+" 】條消息"); // 消息頭 // textMessage.setJMSDeliveryMode(DeliveryMode.NON_PERSISTENT)); // 消息屬性 // textMessage.setStringProperty("c1","VIP"); messageProducer.send(textMessage); } // 關閉資源 messageProducer.close(); session.close(); conn.close(); } }
運行代碼在瀏覽器上查看,能夠看到queue01裏面有5條消息:
package activemq; import org.apache.activemq.ActiveMQConnectionFactory; import javax.jms.Connection; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.MessageListener; import javax.jms.Queue; import javax.jms.Session; import javax.jms.TextMessage; import java.io.IOException; /** * @className: JmsConsumer * @description: activeMq的消費者 * @author: charon * @create: 2020-12-28 08:10 */ public class JmsConsumer { /** 聲明activemq的地址 */ private static final String ACTIVEMQ_URL = "tcp://192.168.189.150:61616"; /** 隊列名 */ private static final String QUEUE_NAME = "queue01"; public static void main(String[] args) throws JMSException, IOException { // 建立鏈接工廠 ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL); // 得到鏈接 Connection conn = activeMQConnectionFactory.createConnection(); conn.start(); // 建立會話 Session session = conn.createSession(false,Session.AUTO_ACKNOWLEDGE); // 建立隊列 Queue queue = session.createQueue(QUEUE_NAME); // 建立消息的生產者 MessageConsumer messageConsumer = session.createConsumer(queue); // 同步方式,生產環境並不適用,這種方式將阻塞知道得到並返回第一條消息 // while (true){ // TextMessage textMessage =(TextMessage) messageConsumer.receive(); // if(null!=textMessage){ // System.out.println("---消費者收到消息:"+textMessage.getText()); // }else{ // break; // } // } // 異步方式,建立監聽,在又消息到達時,調用listener的onMessage方法, messageConsumer.setMessageListener(new MessageListener() { @Override public void onMessage(Message message) { if(message != null && message instanceof TextMessage){ TextMessage textMessage = (TextMessage) message; System.out.println("--消費者接受到消息:"+textMessage); } } }); System.in.read(); // 關閉資源 messageConsumer.close(); session.close(); conn.close(); } }
運行消費者的代碼,應該我上面生產者的代碼運行了兩次,因此消息有10條。
在這裏,筆者使用的基於Zookeeper+levelDb搭建的activeMq集羣,爲了不單點故障,使用一主兩從的架構。使用Zookeeper集羣註冊全部的ActiveMQ Broker但只有其中一個Broker能夠提供服務,它被視爲master,也就是說若是master由於故障而不能提供服務,Zookeeper會從SLave中選舉出一個Broker充當master。
我這邊的zookeeper集羣已經搭建好了,150和151是follower,152是leader。
# 每臺服務器上安裝activeMq,同時在集羣環境下,activemq的jetty.xml文件重的host要改爲0.0.0.0 # 修改activeMq.xml,註釋掉kahadb這個配置,actviemq默認的是kahadb,而且添加leveldb [root@localhost conf]# vi activemq.xml <!-- <persistenceAdapter> <kahaDB directory="${activemq.data}/kahadb"/> </persistenceAdapter> --> <persistenceAdapter> <replicatedLevelDB directory="${activemq.data}/leveldb" replicas="3" <!--實例間的通訊地址--> bind="tcp://0.0.0.0:62222" <!--zookeeper的地址--> zkAddress="192.168.189.150:2181,192.168.189.151:2181,192.168.189.152:2181" <!--修改成每一個服務器的節點的ip--> hostname="192.168.189.152" sync="local_disk" zkPath="/activemq/leveldb-stores"/> </persistenceAdapter> # 啓動三個節點的activemq [root@localhost bin]# ./activemq restart # 查看 鏈接zookeeper客戶端 [root@localhost bin]# zkCli.sh [zk: localhost(CONNECTED) 1] ls /activemq/leveldb-stores [00000000022, 00000000020, 00000000021] # 訪問 [zk: 192.168.189.150(CONNECTED) 3] get /activemq/leveldb-stores/00000000020 {"id":"localhost","container":null,"address":"tcp://192.168.189.150:62222","position":-1,"weight":1,"elected":"0000000020"} [zk: 192.168.189.150(CONNECTED) 4] get /activemq/leveldb-stores/00000000021 {"id":"localhost","container":null,"address":null,"position":-1,"weight":1,"elected":null} [zk: 192.168.189.150(CONNECTED) 5] get /activemq/leveldb-stores/00000000022 {"id":"localhost","container":null,"address":null,"position":-1,"weight":1,"elected":null}
從上面能夠看到,只有00000000020這個幾點的elected裏面有值,代表它被選舉爲master節點了。
在瀏覽器上依次訪問:192.168.189.150:8161 , 192.168.189.151:8161,192.168.189.152:8161
只有192.168.189.150:8161能夠訪問成功,由於只有master節點能夠對外提供訪問,因此只有一個節點能訪問到,那麼它就是master節點。
第二種查看的方式:
查看activemq的日誌,最後一行,能夠看到,MasterLevelDBStore即爲master節點,SlaveLevelDBStore即爲slave節點。
第三種查看的方式爲使用zookeeper的可視化工具。
因爲activeMq集羣是基於zookeeper集羣實現的,因此要注意一下三點:
集羣的代碼和上面單機的代碼大體是一直的,就只須要修改一個activemq的地址。
/** 聲明集羣中activemq的地址,使用failover協議,隨機 */ private static final String ACTIVEMQ_URL = "failover:(tcp://192.168.189.150:61616,tcp://192.168.189.151:61616,tcp://192.168.189.152:61616)?Randomize=false";
1.消息發送方式
默認狀況下,非持久化的消息是異步發送的,持久化的消息是同步發送的。可是在開啓事務的狀況下,消息都是異步發送的,效率會有2個數量級的提高,因此在發送持久化消息時,請開啓事務模式。
2.儲存機制
在一般狀況下,非持久化的消息時存儲在內存中的,持久化消息時存儲在文件中的,他們的最大限制在配置文件中的
因此儘可能不要用非持久化文件,若是非要用的化,能夠將臨時文件的限制調大。同時,非持久化的消息要及時處理,不要堆積,或者啓動事務。啓動事務後,commit()會等待服務器的消息返回,也不會致使消息丟失了。
3.死信隊列
一條消息在被重發屢次後(默認是6次),將會被ActiveMQ移入死信隊列;說白了就是異常消息的歸併處理的集合,主要是處理失敗的消息。能夠在activeMQ.DLQ這個隊列中查看。
4.重複消息,冪等性調用
在網絡延遲的狀況洗啊,可能會形成MQ重試,可能會形成重複消費。若是消息是作數據庫的插入操做,給這個消息作一個惟一主鍵,那麼就算出現重複消費的狀況,由於惟一主鍵,會形成主鍵衝突,避免數據庫出現髒數據。若是是第三方消費,能夠在每條數據裏面加一個全局惟一的id,若是消息消費了,就將消息存在redis中,在消費消息以前將id到redis中查詢一下,判斷是否消費過,若是沒有消費過,就處理,若是消費過了,就不處理了。
參考網址: