html
下載java
壓縮包上傳到Linux系統linux
apache-activemq-5.15.9-bin.tar.gzspring
解壓縮sql
tar -zxvf apache.activemq-5.15.0-bin.tar.gz數據庫
啓動apache
bin目錄下: ./activemq startvim
####查看是否啓動命令:./activemq statuscentos
進入管理後臺(默認後臺管理端口8161;默認openwire端口61616)
http://127.0.0.1:8161/admin admin admin
錯誤處理
查看日誌:./activemq console
hostname不合法
ERROR | Failed to start Apache ActiveMQ (localhost, ID:VM_0_5_centos-46296-1554189350972-0:1)
java.net.URISyntaxException: Illegal character in hostname at index 7: ws://VM_0_5_centos:61614?maximumConnections=1000&wireFormat.maxFrameSize=104857600
at java.net.URI$Parser.fail(URI.java:2848)[:1.8.0_201]
at java.net.URI$Parser.parseHostname(URI.java:3387)[:1.8.0_201]
at java.net.URI$Parser.parseServer(URI.java:3236)[:1.8.0_201]
at java.net.URI$Parser.parseAuthority(URI.java:3155)[:1.8.0_201]
at java.net.URI$Parser.parseHierarchical(URI.java:3097)[:1.8.0_201]
at java.net.URI$Parser.parse(URI.java:3053)[:1.8.0_201]
at java.net.URI.<init>(URI.java:673)[:1.8.0_201]
at
解決辦法:
編輯文件:vim /etc/hostname,文件內直接填寫新的主機名(能夠嘗試只用這一步)
若是還不行,執行下面的:
編輯文件:vim /etc/sysconfig/network
改HOSTNAME=xxxxxxx爲 HOSTNAME=wly 保存
改映射: Vim /etc/hosts
127.0.0.1 new-hostname.domainname ******
星號爲原來的主機名,現只將*****改爲爲wly 保存
重啓linux機器:reboot
java消息服務(java message service)
MOM(message oriented middleware)面向消息中間件
轉載:http://www.javashuo.com/article/p-rpbcvyhr-ho.html
JMS是什麼 JMS 全稱:Java Message Service,Java消息服務,是Java EE中的一個技術。
JMS規範
JMS定義了Java 中訪問消息中間件的接口,並無給予實現,實現JMS接口的消息中間件成爲JMS Provider,例如:Active MQ
JMS Provider
實現JMS接口和規範的消息中間件
JMS message
JMS的消息,JMS消息由三部分組成:消息頭、消息屬性、消息體
消息頭包含消息的識別消息和路由消息,消息頭包含一些標準的屬性以下:
(1)JMSDestination: 消息發送的目的地,主要是指Queue和Topic,由send方法設置.
(2)JMSDeliveryMode:傳送模式。有兩種:持久模式和非持久模式。一條持久性的消息應該被傳輸"一次僅僅一次",這就意味着若是JMS提供者出現故障,該消息並不會丟失,它會在服務器恢復以後再次傳遞。一條非持久的消息最多會傳遞一次,這意味着服務器出現故障,該消息將永遠丟失。由send方法設置
(3)JMSExpiration:消息過時時間,等於Destination的send方法中的timeToLive值加上發送時刻的GMT的時間值。若是timeToLive值等於零,則JMSExpiration被設置爲零,表示該消息永不過時。若是發送後,在消息過時時間以後消息尚未被髮送到目的地,則該消息被清除。由send方法設置
(4)JMSPriority:消息優先級,從0-9十個級別,0-4是普通消息,5-9是加急消息。JMS不要求JMS Provider嚴格按照這十個優先級發送消息,但必須保證加急消息要先於普通消息到達,默認是4級。由send方法設置
(5)JMSMessageID:惟一識別每一個消息的標識,由JMS Provider產生。由send方法設置
(6)JMSTimestamp:一個JMS Provider在調用send()方法時自動設置,它是消息被髮送和消費者實際接收的時間差。由客戶端設置
(7)JMSCorrelationID:用來鏈接到另一個消息,典型的應用是在回覆消息中鏈接到原消息。在大多數狀況下,JMSCorrelationID用於將一條消息標記爲對JMSMessageID標示的上一條消息的應答,不過,JMSCorrelationID能夠是任何值,不只僅是JMSMessageID。由客戶端設置
(8)JMSType: 消息類型的標識符,由客戶端設置
(9)JMSReplyTo: 提供本消息回覆消息的目的地址,由客戶端設置
(10)JMSRedelivered:若是一個客戶端收到一個設置了JMSRedelivered屬性的消息,則表示可能客戶端曾經在早些時候收到過該消息,但並無簽收(acknowledged)。若是該消息被從新傳送,JMSRedelivered=true 不然 JMSRedelivered=flase 。由JMS Provider設置
消息體,JMS API定義了5種消息體格式,也叫消息類型,可使用不一樣形式發送接收數據,並能夠兼容現有的消息格式。
包括:TextMessage、MapMessage、BytesMessage、StreamMessage、ObjectMessage
消息屬性,包含如下三種類型的屬性:
1.應用程序設置和添加的數據,好比:message.setStringProperty("userName",userName);
2.JMS定義的屬性,使用"JMSX"做爲屬性名的前綴, connection.getMetaData().getJMSXPropertyNames() 方法返回全部鏈接支持的JMSX屬性的名字。
3.JMS供應商特定的屬性
JMS producer
消息生產者,建立和發送JMS消息的客戶端應用
JMS consumer
消息消費者,建立和處理JMS消息的客戶端應用
JMS domains: 消息傳遞域
JMS規範中定義了兩種消息傳遞域: 點對點(point-to-point,簡寫成PTP);消息傳遞域和發佈/訂閱消息傳遞域(publish/subscribe,簡寫成pub/sub)
1.點對點消息傳遞域的特色以下:
a.每一個消息只能有一個消費者
b.消息的生產者和消費者之間沒有時間上的相關性。不管消費者在生產者發送消息的時候是否處於運行狀態,它均可以提取消息。
2.發佈/訂閱消息傳遞域的特色以下:
a.每一個消息能夠有多個消費者
b.生產者和消費者之間有時間上的相關性。訂閱一個主題的消費者只能消費自它訂閱以後發佈的消息。JMS規範容許客戶建立持久訂閱(setClientID),這在必定程度上放鬆了時間上的相關性要求。持久訂閱容許消費者消費它在未處於激活狀態時發送的消息。
3.在點對點消息傳遞域中,目的地被稱爲隊列(queue);在發佈/訂閱消息傳遞域中,目的地被稱爲主題(topic)
Connection factory: 鏈接工廠,用來建立鏈接對象,以鏈接到JMS的provider
JMS Connection: 封裝了客戶與JMS提供者之間的一個虛擬的鏈接 JMS Session: 是生產和消費消息的一個單線程上下文 會話用於建立消息生產者(producer)、消息消費者(consumer)和消息(message)等。會話提供了一個事務性的上下文,在這個上下文中,一組發送和接收被組合到了一個原子操做中。 Destination:消息發送到的目的地 Acknowledge:簽收 Transaction:事務 JMS client: 用來收發消息的Java應用
JMS的可靠性機制:JMS消息以後被確認後,纔會認爲是被成功消費。
事務性會話,即設置爲ture,消息會在commit後自動確認;
非事務性會話,即設置爲false,在該模式下,消息是否被確認取決於建立會話時的應答模式。:
AUTO_ACKNOWLEDGE:當客戶端成功receive後,消息自動確認
CLIENT_ACKNOWLEDGE:客戶端手動確認(textMessage.acknowledge)
DUPS_OK_ACKNOWLEDGE:延遲確認(能夠設置時間)。
編寫一個測試類對ActiveMQ進行測試,首先得向pom文件中添加ActiveMQ相關的jar包:
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-all</artifactId>
</dependency>
queue的發送代碼以下(生產者):
public void testMQProducerQueue() throws Exception{
//一、建立工廠鏈接對象,須要制定ip和端口號
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.156.44:61616");
//二、使用鏈接工廠建立一個鏈接對象
Connection connection = connectionFactory.createConnection();
//三、開啓鏈接
connection.start();
//四、使用鏈接對象建立會話(session)對象
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//五、使用會話對象建立目標對象,包含queue和topic(一對一和一對多)
Queue queue = session.createQueue("test-queue");
//六、使用會話對象建立生產者對象
MessageProducer producer = session.createProducer(queue);
//七、使用會話對象建立一個消息對象
TextMessage textMessage = session.createTextMessage("hello!test-queue");
//八、發送消息
producer.send(textMessage);
//九、關閉資源
producer.close();
session.close();
connection.close();
}
接收代碼(消費者):
public void TestMQConsumerQueue() throws Exception{
//一、建立工廠鏈接對象,須要制定ip和端口號
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.156.44:61616");
//二、使用鏈接工廠建立一個鏈接對象
Connection connection = connectionFactory.createConnection();
//三、開啓鏈接
connection.start();
//四、使用鏈接對象建立會話(session)對象
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//五、使用會話對象建立目標對象,包含queue和topic(一對一和一對多)
Queue queue = session.createQueue("test-queue");
//六、使用會話對象建立生產者對象
MessageConsumer consumer = session.createConsumer(queue);
//七、向consumer對象中設置一個messageListener對象,用來接收消息
consumer.setMessageListener(new MessageListener() {
而後當咱們運行queue發送的時候能夠看到隊列裏已經有一條消息了,但沒有發送出去: 而後在運行queue 的接收端,能夠看到消息已經發出了:
能夠本地建立並運行。
BrokerService brokerservice = new BrokerService();
brokrService.setUseJmx(true);
brokrService.addConnector("tcp://localhost:61616");
brokrService.start();
Q:消息的發送策略
A:持久化消息(默認)/非持久化消息
PS:設置消息發送端發送持久化消息`異步方式`: connectionFactory.setUseAsyncSend(true);
設置消息發送端發送非持久化消息(默認爲異步方式):textMessage.setJMSDeliveryMode(DeliveryMode.NON_PERSISTENCE)
回執窗口大小設置:connectionFactory.setProducerWindowSize();
若是須要對非持久化消息的每次發送的消息都得到broker的回執:connectionFactory.setAllwaysSyncSend();
Q:comsumer獲取消息時pull仍是push
A:默認狀況下,mq服務器(broker)採用異步方式向客戶端主動推送消息(push)
prefetchsize:預取消息數量(broker每次主動推送的消息數量)。Queue默認值是1000;topic持久化消息默認值是100,非持久化消息默認值是32766。若是設置爲0,此時對於consumer來講,就是pull模式。
Q:acknowledge爲何可以在第5次把前面執行的消息確認掉? A:源碼deliveryMessage:存在以前執行完但沒有確認的消息隊列 #消息確認 ACK_TYPE:消息端和broker交換ack指令的時候,還須要告知broker ACK_TYPE REDELIVERED_ACK_TYPE:重發策略 DELIVERED_ACK_TYPE:消息已經接受,可是還沒有處理結束 STANDARD_ACK_TYPE:消息處理成功
service-jms.xml
實現MessageListener
client與broker的通訊協議
支持的協議TCP(默認)、UDP、NIO、SSL、Http(s)、vm
寫入速度快,容易恢復
文件默認大小是32M
<jdbcPersistenceAdapter dataSource="#mysqlDataSource" createTablesOnStartup="true"/>
鏈接數據庫成功後,會建立三張表ACTIVEMQ_ACKS、ACTIVEMQ_LOCK、ACTIVEMQ_MSGS
5.8版本之後引入的持久化策略,一般用於集羣配置
networkConnector:用來配置broker與broker之間的通訊鏈接
<!--靜態網絡鏈接--> <networkConnectors> <networkConnectot uri="static://(tcp://192.168.1.1:61616,tcp://192.168.1.2:61616)"/> </networkConnectors> #雙向鏈接:duplex
<!--丟失的消息:配置消息迴流,解決該問題--> <policyEntry queue=">" enableAudit="false"> <networkBridgeFilterFactory> <conditionalNetwordBridgeFilterFactory replayWhenNoConsumers="true" /> </networkBridgeFilterFactory> </policyEntry>
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("failover:(tcp://192.168.156.44:61616,tcp://192.168.156.45:61616")";