ActiveMQ(1)---初識ActiveMQ

消息中間件的初步認識

什麼是消息中間件?

消息中間件是值利用高效可靠的消息傳遞機制進行平臺無關的數據交流,並基於數據通訊來進行分佈式系統的集成。經過提供消息傳遞和消息排隊模型,能夠在分佈式架構下擴展進程之間的通訊。html

消息中間件能作什麼?

消息中間件主要解決的就是分佈式系統之間消息傳遞的問題,它可以屏蔽各類平臺以及協議之間的特性,實現應用程序之間的協同。舉個很是簡單的例子,就拿一個電商平臺的註冊功能來簡單分析下,用戶註冊這一個服務,不單
單只是 insert 一條數據到數據庫裏面就完事了,還須要發送激活郵件、發送新人紅包或者積分、發送營銷短信等一系列操做。假如說這裏面的每個操做,都須要消耗 1s,那麼整個註冊過程就須要耗時 4s 才能響應給用戶。

java

可是咱們從註冊這個服務能夠看到,每個子操做都是相對獨立的,同時,基於領域劃分之後,發送激活郵件、發送營銷短信、贈送積分及紅包都屬於不一樣的子域。因此咱們能夠對這些子操做進行來實現異步化執行,相似於多線程並行處理的概念。
如何實現異步化呢?用多線程能實現嗎?多線程固然能夠實現,只是,消息的持久化、消息的重發這些條件,多線程並不能知足。因此須要藉助一些開源中間件來解決。而分佈式消息隊列就是一個很是好的解決辦法,引入分佈式消
息隊列之後,架構圖就變成這樣了(下圖是異步消息隊列的場景)。經過引入分佈式隊列,就可以大大提高程序的處理效率,而且還解決了各個模塊之間的耦合問題
➢ 這個是分佈式消息隊列的第一個解決場景【異步處理】

web

 

經過分佈式消息隊列來實現流量整形,好比在電商平臺的秒殺場景下,流量會很是大。經過消息隊列的方式能夠很好的緩解高流量的問題

spring

 

用戶提交過來的請求,先寫入到消息隊列。消息隊列是有長度的,若是消息隊列長度超過指定長度,直接拋棄
➢ 秒殺的具體核心處理業務,接收消息隊列中消息進行處理,這裏的消息處理能力取決於消費端自己的吞吐量固然,消息中間件還有更多應用場景,好比在弱一致性事務模型中,能夠採用分佈式消息隊列的實現最大能力通知方式來實現數據的最終一致性等等。

數據庫

ActiveMQ 簡介

ActiveMQ 是徹底基於 JMS 規範實現的一個消息中間件產品。是 Apache 開源基金會研發的消息中間件。ActiveMQ主要應用在分佈式系統架構中,幫助構建高可用、高性能、可伸縮的企業級面向消息服務的系統ActiveMQ 特性apache

1. 多語言和協議編寫客戶端api

  語言:java/C/C++/C#/Ruby/Perl/Python/PHP安全

  應用協議 :服務器

  openwire/stomp/REST/ws/notification/XMPP/AMQPsession

2. 徹底支持 jms1.1 和 J2ee1.4 規範

3. 對 spring 的支持,ActiveMQ 能夠很容易內嵌到 spring模塊中

ActiveMQ 安裝

1. 登陸到 http://activemq.apache.org/activemq-5150-release.html,找到 ActiveMQ 的下載地址

2. 直 接 copy 到 服 務 器 上 通 過 tar -zxvf apache-activeMQ.tar.gz
3. 啓動運行
  a) 普通啓動:到 bin 目錄下, sh activemq start
  b) 啓 動 並 指 定 日 志 文 件 sh activemq start > /tmp/activemqlog
4. 檢查是否已啓動
  ActiveMQ默認採用 61616 端口提供 JMS服務,使用 8161端口提供管理控制檯服務,執行如下命令能夠檢查是否成功啓動 ActiveMQ 服務
  netstat -an|grep 61616
5. 經過 http://192.168.11.156:8161 訪問 activeMQ 管理頁面 ,默認賬號密碼 admin/admin
6. 關閉 ActiveMQ; sh activemq stop

  從 JMS 規範來了解 ActiveMQ
JMS 定義

  Java 消息服務(Java Message Service)是 java 平臺中關於面向消息中間件的 API,用於在兩個應用程序之間,或者分佈式系統中發送消息,進行異步通訊。
JMS 是一個與具體平臺無關的 API,絕大多數( MOMMessage Oriented Middleware)(面向消息中間件)提供商都對 JMS 提供了支持。今天給你們講的 ActiveMQ 就是其中一個實現

什麼是 MOM  

MOM 是面向消息的中間件,使用消息傳送提供者來協調消息傳送操做。MOM 須要提供 API 和管理工具。客戶端使用 api 調用,把消息發送到由提供者管理的目的地。在發送消息以後,客戶端會繼續執行其餘工做,而且在接收
方收到這個消息確認以前,提供者一直保留該消息。

MOM 的特色
1. 消息異步接收,發送者不須要等待消息接受者響應
2. 消息可靠接收,確保消息在中間件可靠保存。只有接收方收到後才刪除消息
Java 消息傳送服務規範最初的開發目的是爲了使 Java 應用程序可以訪問現有 MOM 系統。引入該規範以後,它已被許多現有的 MOM 供應商採用而且已經憑藉自身的功能實現爲異步消息傳送系統。
其餘開源的 JMS 提供商JbossMQ(jboss4) 、 jboss messaging(jboss5)、 joram 、ubermq、mantamq、openjms…大部分基於的 JMS provider 開源的消息中間件都已經中止維護了,剩下的幾個都抱到了大腿,好比 Jboss mq 和 jboss、joram 與 jonas(objectweb 組 織 ) 、 ActiveMQ 與Geronimo(apache 基金組織)。

JMS 的體系結構 

 

經過 JMS 規範結合 ActiveMQ 實現消息發送案例

建立生產者
package com.lf.activemq;

import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnectionFactory;

public class JMSQueueProducer {
	
	public static void main(String[] args) {
		
		ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory("tcp://192.168.25.130:61616");
		Connection connection = null;
		try {
			connection = activeMQConnectionFactory.createConnection();
			connection.start();
			//建立session
			Session session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
			//建立目的地
			Destination destination = session.createQueue("myQueue");
			//建立消息生產者
			MessageProducer producer = session.createProducer(destination);
			//建立消息
			TextMessage message = session.createTextMessage("Hello lf!");
			//發送消息
			producer.send(message);
			
			session.commit();
			session.close();
		} catch (Exception e) {
			e.printStackTrace();
		}finally{
			if(connection != null){
				try {
					connection.close();
				} catch (JMSException e) {
					e.printStackTrace();
				}
			}
		}
	}
}

 

建立消息消費者
package com.lf.activemq;

import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnectionFactory;

public class JMSQueueReceiveder {
	
	public static void main(String[] args) {
		
		ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory("tcp://192.168.25.130:61616");
		Connection connection = null;
		try {
			connection = activeMQConnectionFactory.createConnection();
			connection.start();
			//建立session
			Session session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
			//建立目的地
			Destination destination = session.createQueue("myQueue");
			//建立消息接收者
			MessageConsumer consumer = session.createConsumer(destination);
			TextMessage message = (TextMessage) consumer.receive();
			System.out.println(message.getText());
			session.commit();
			session.close();
		} catch (Exception e) {
			e.printStackTrace();
		}finally{
			if(connection != null){
				try {
					connection.close();
				} catch (JMSException e) {
					e.printStackTrace();
				}
			}
		}
	}
}

  

結果:Hello lf!
阻塞式

2.消息消費者採用監聽器

  

package com.lf.activemq;

import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnectionFactory;

public class JMSQueueListenerReceiveder {
	
	public static void main(String[] args) {
		
		ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory("tcp://192.168.25.130:61616");
		Connection connection = null;
		try {
			connection = activeMQConnectionFactory.createConnection();
			connection.start();
			//建立session
			Session session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
			//建立目的地
			Destination destination = session.createQueue("myQueue");
			//建立消息接收者
			MessageConsumer consumer = session.createConsumer(destination);
			//建立消息監聽器
			MessageListener messageListener = new MessageListener() {
				
				@Override
				public void onMessage(Message message) {
					try {
						System.out.println(((TextMessage)message).getText());
						session.commit();
					} catch (JMSException e) {
						e.printStackTrace();
					}
				}
			};
			while(true){
				consumer.setMessageListener(messageListener);
			}
//			TextMessage message = (TextMessage) consumer.receive();
//			System.out.println(message.getText());
			//session.close();
		} catch (Exception e) {
			e.printStackTrace();
		}finally{
			if(connection != null){
				try {
					connection.close();
				} catch (JMSException e) {
					e.printStackTrace();
				}
			}
		}
	}
}

 這個案例的架構圖以下:

 

 

細化 JMS 的基本功能
  經過前面的內容講解以及案例演示,咱們已經知道了 JMS規範以及他的基本功能是用於和麪向消息中間件相互通訊的應用程序的接口,那麼 JMS 提供的具體標準有哪些呢?
咱們來仔細去研究下消息傳遞域JMS 規範中定義了兩種消息傳遞域:點對點(point-topoint )消息傳遞域 和發佈 / 訂 閱 消息傳遞域(publish/subscribe)
簡單理解就是:有點相似於咱們經過 qq 聊天的時候,在羣裏面發消息和給其中一個同窗私聊消息。在羣裏發消息,全部羣成員都能收到消息。私聊消息只能被私聊的學員能收到消息,點對點消息傳遞域
1. 每一個消息只能有一個消費者
2. 消息的生產者和消費者之間沒有時間上的相關性。不管消費者在生產者發送消息的時候是否處於運行狀態,均可以提取消息

 

發佈訂閱消息傳遞域
1. 每一個消息能夠有多個消費者
2. 生產者和消費者之間有時間上的相關性。訂閱一個主題的消費者只能消費自它訂閱以後發佈的消息。JMS 規範容許客戶建立持久訂閱,這在必定程度上下降了時間上的相關性要求。持久訂閱容許消費者消費它在未處於激活狀態時發送的消息

 

消息結構組成
  JMS 消息由及部分組成:消息頭、屬性、消息體
消息頭
  消息頭(Header) - 消息頭包含消息的識別信息和路由信息,消息頭包含一些標準的屬性如:
JMSDestination 消息發送的目的地,queue 或者 topic)
JMSDeliveryMode 傳送模式。持久模式和非持久模式
JMSPriority 消息優先級(優先級分爲 10 個級別,從 0(最低)到 9(最高). 若是不設定優先級,默認級別是 4。須要注意的是,JMS provider 並不必定保證按照優先級的順序提交消息)
JMSMessageID 惟一識別每一個消息的標識
屬性
按類型能夠分爲應用設置的屬性,標準屬性和消息中間件
定義的屬性
1. 應用程序設置和添加的屬性,好比Message.setStringProperty(「key」,」value」);經過下面的代碼能夠得到自定義屬性的,在接收端的代碼中編寫
在發送端,定義消息屬性
message.setStringProperty("lf","Hello World");

在接收端接收數據

Enumeration 
enumeration=message.getPropertyNames();
while(enumeration.hasMoreElements()){
 String 
name=enumeration.nextElement().toString();
 
System.out.println("name:"+name+":"+messag
e.getStringProperty(name));
 System.out.println();
} 

2. JMS 定義的屬性
使用「JMSX」做爲屬性名的前綴,經過下面這段代碼能夠返回全部鏈接支持的 JMSX 屬性的名字

3. JMS provider 特定的屬性
消息體
就是咱們須要傳遞的消息內容,JMS API 定義了 5 中消息體格式,可使用不一樣形式發送接收數據,並能夠兼容現有的消息格式,其中包括

TextMessage java.lang.String 對象,如 xml 文件內容
MapMessage 名/值對的集合,名是 String 對象,值類型能夠是 Java 任何基本類型
BytesMessage 字節流
StreamMessage Java 中的輸入輸出流
ObjectMessage Java 中的可序列化對象
Message 沒有消息體,只有消息頭和屬性

絕大部分的時候,咱們只須要基於消息體進行構造。
持久訂閱
持久訂閱的概念,也很容易理解,好比仍是以 QQ 爲例,咱們把 QQ 退出了,可是下次登陸的時候,仍然能收到離線的消息。持久訂閱就是這樣一個道理,持久訂閱有兩個特色:
1. 持久訂閱者和非持久訂閱者針對的 Domain 是 Pub/Sub,而不是 P2P
2. 當 Broker 發送消息給訂閱者時,若是訂閱者處於 未激活狀態狀態:持久訂閱者能夠收到消息,而非持久訂閱者則收不到消息。

固然這種方式也有必定的影響:當持久訂閱者處於 未激活狀態時,Broker 須要爲持久訂閱者保存消息;若是持久訂閱者訂閱的消息太多則會溢出

修改三處地方,而後先啓動消費端去註冊一個持久訂閱。持久訂閱時,客戶端向 JMS 服務器註冊一個本身身份的 ID,當這個客戶端處於離線時,JMS Provider 會爲這個 ID 保存全部發送到主題的消息,當客戶再次鏈接到 JMS
Provider 時,會根據本身的 ID 獲得全部當本身處於離線時發送到主題的消息。這個身份ID,在代碼中的體現就是 connection的 ClientID,這個其實很好理解,你要想收到朋友發送的 qq 消息,前提就是你得先註冊個 QQ 號,並且還要有臺能上網的設備,
電腦或手機。設備就至關因而 clientId 是惟一的;qq 號至關因而訂閱者的名稱,在同一臺設備上,不能用同一個 qq號掛 2 個客戶端。鏈接的 clientId 必須是惟一的,訂閱者的名稱在同一個鏈接內必須惟一。這樣才能惟一的肯定鏈接和訂閱者。

topic模式

package com.lf.activemq;

import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnectionFactory;

public class JMSTopicProducer2 {
	
	public static void main(String[] args) {
		
		ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory("tcp://192.168.25.130:61616");
		Connection connection = null;
		try {
			connection = activeMQConnectionFactory.createConnection();
			connection.start();
			//建立session
			Session session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
			//建立目的地
			Destination destination = session.createTopic("myTopic");
			//建立消息生產者
			MessageProducer producer = session.createProducer(destination);
			//建立消息
			TextMessage message = session.createTextMessage("Hello lf!--topic");
			//Text Map Bytes Stream Object
			//發送消息
			producer.send(message);
			
			session.commit();
			session.close();
		} catch (Exception e) {
			e.printStackTrace();
		}finally{
			if(connection != null){
				try {
					connection.close();
				} catch (JMSException e) {
					e.printStackTrace();
				}
			}
		}
	}
}

  topic模式接受者

package com.lf.activemq;

import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnectionFactory;

public class JMSTopicReceiveder {
	
	public static void main(String[] args) {
		
		ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory("tcp://192.168.25.130:61616");
		Connection connection = null;
		try {
			connection = activeMQConnectionFactory.createConnection();
			connection.start();
			//建立session
			Session session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
			//建立目的地
			Destination destination = session.createTopic("myTopic");
			//建立消息接收者
			MessageConsumer consumer = session.createConsumer(destination);
			TextMessage message = (TextMessage) consumer.receive();
			System.out.println(message.getText());
			session.commit();
			session.close();
		} catch (Exception e) {
			e.printStackTrace();
		}finally{
			if(connection != null){
				try {
					connection.close();
				} catch (JMSException e) {
					e.printStackTrace();
				}
			}
		}
	}
}

  

持久化消息:

package com.lf.activemq;

import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;

import org.apache.activemq.ActiveMQConnectionFactory;

public class JMSPersistentTopicConsumer {
	
	public static void main(String[] args) {
		
		ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory("tcp://192.168.25.130:61616");
		Connection connection = null;
		try {
			connection = activeMQConnectionFactory.createConnection();
			connection.setClientID("lf001");
			connection.start();
			//建立session
			Session session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
			//建立目的地
			Topic destination = session.createTopic("myTopic");
			//建立消息接收者
			MessageConsumer consumer = session.createDurableSubscriber(destination, "lf001");
			TextMessage message = (TextMessage) consumer.receive();
			System.out.println(message.getText());
			session.commit();
			session.close();
		} catch (Exception e) {
			e.printStackTrace();
		}finally{
			if(connection != null){
				try {
					connection.close();
				} catch (JMSException e) {
					e.printStackTrace();
				}
			}
		}
	}
}
先運行將持久化消息註冊上去,當消息發送者發送持久化消息就能夠找到

  持久化消息發送

package com.lf.activemq;

import javax.jms.Connection;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnectionFactory;

public class JMSPersistentTopicProducer {
	
	public static void main(String[] args) {
		
		ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory("tcp://192.168.25.130:61616");
		Connection connection = null;
		try {
			connection = activeMQConnectionFactory.createConnection();
			
			connection.start();
			//建立session
			Session session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
			//建立目的地
			Destination destination = session.createTopic("myTopic");
			//建立消息生產者
			MessageProducer producer = session.createProducer(destination);
			producer.setDeliveryMode(DeliveryMode.PERSISTENT);
			//建立消息
			TextMessage message = session.createTextMessage("Hello lf!--");
			//Text Map Bytes Stream Object
			//發送消息
			producer.send(message);
			
			session.commit();
			session.close();
		} catch (Exception e) {
			e.printStackTrace();
		}finally{
			if(connection != null){
				try {
					connection.close();
				} catch (JMSException e) {
					e.printStackTrace();
				}
			}
		}
	}
}

  

JMS 消息的可靠性機制
理論上來講,咱們須要保證消息中間件上的消息,只有被消費者確認過之後纔會被簽收,至關於咱們寄一個快遞出去,收件人沒有收到快遞,就認爲這個包裹仍是屬於待簽收狀態,這樣才能保證包裹可以安全達到收件人手裏。消息中間件也是同樣。
消息的消費一般包含 3 個階段:客戶接收消息、客戶處理消息、消息被確認首先,來簡單瞭解 JMS 的事務性會話和非事務性會話的概念JMS Session 接口提供了 commit 和 rollback 方法。事務提交意味着生產的全部消息被髮送,消費的全部消息被確認;
事務回滾意味着生產的全部消息被銷燬,消費的全部消息被恢復並從新提交,除非它們已通過期。 事務性的會話老是牽涉到事務處理中,commit 或 rollback 方法一旦被調用,一個事務就結束了,而另外一個事務被開始。關閉事務性會話將回滾其中的事務


在事務型會話中
在事務狀態下進行發送操做,消息並未真正投遞到中間件,而只有進行 session.commit 操做以後,消息纔會發送到中間件,再轉發到適當的消費者進行處理。若是是調用rollback 操做,則代表,當前事務期間內所發送的消息都取
消掉。經過在建立 session 的時候使用 true or false 來決定當前的會話是事務性仍是非事務性connection.createSession(Boolean.TRUE,Session.AUTO_ACKNOWLEDGE);在事務性會話中,消息的確認是自動進行,也就是經過
session.commit()之後,消息會自動確認。➢ 必須保證發送端和接收端都是事務性會話


在非事務型會話中
消息什麼時候被確認取決於建立會話時的應答模式(acknowledgement mode). 有三個可選項
Session.AUTO_ACKNOWLEDGE
當客戶成功的從 receive 方法返回的時候,或者從MessageListenner.onMessage 方法成功返回的時候,會話自動確認客戶收到消息。
Session.CLIENT_ACKNOWLEDGE
客戶經過調用消息的 acknowledge 方法確認消息。
CLIENT_ACKNOWLEDGE 特性
在這種模式中,確認是在會話層上進行,確認一個被消費的消息將自動確認全部已被會話消費的消息。列如,若是一個消息消費者消費了 10 個消息,而後確認了第 5 個消息,那麼 0~5 的消息都會被確認 ->
演示以下:發送端發送 10 個消息,接收端接收 10 個消息,可是在 i==5 的時候,調用 message.acknowledge()進行確認,會發現 0~4 的消息都會被確認Session.DUPS_ACKNOWLEDGE消息延遲確認。指定消息提供者在消息接收者沒有確認發送時從新發送消息,這種模式不在意接受者收到重複的消息。

消息的持久化存儲
消息的持久化存儲也是保證可靠性最重要的機制之一,也就是消息發送到 Broker 上之後,若是 broker 出現故障宕機了,那麼存儲在 broker 上的消息不該該丟失。能夠經過下面的代碼來設置消息發送端的持久化和非持久化特性

MessageProducer producer=session.createProducer(destination);
producer.setDeliveryMode(DeliveryMode.PERSISTENT);

  

對於非持久的消息,JMS provider 不會將它存到文件/數據庫等穩定的存儲介質中。也就是說非持久消息駐留在內存中,若是 jms provider 宕機,那麼內存中的非持久消息會丟失。➢ 對於持久消息,消息提供者會使用存儲-轉發機制,先將消息存儲到穩定介質中,等消息發送成功後再刪除。若是 jms provider 掛掉了,那麼這些未送達的消息不會丟失;jms provider 恢復正常後,會從新讀取這些消息,並傳送給對應的消費者。

相關文章
相關標籤/搜索