原本只是想看下metaq的文檔,結果發現好亂,如今metaq其實有兩個大分支了,一個是莊曉丹維護的已開源的,另一個是淘寶內部的,本質結構原理沒太大區別,只不過開源的已經去掉了對淘系相關的依賴。而後淘系的metaq已經到3.*版本了,可是文檔比較亂,深刻到細節時,發現好亂,一個點有好幾種說法,火大,乾脆本身看metaq的源碼,有點意思,作個筆記記錄下,怕我之後忘記了。有少許的章節和圖片從內網拿來的,大部分是本身寫的,記錄下幾個主要的點。java
一:metaq是什麼數據庫
metaq是一個分佈式消息中間件,消息中間件是典型的生產者-消費者模型,核心做用是解耦,生產者和消費者彼此沒有直接依賴,同步化解成了異步。metaq並無遵循jms規範,jms規範體如今系統層面和api層面。api
消費模型緩存
例如jms定義了兩種消息傳遞方式:安全
1 基於隊列的點對點消費模型服務器
2 基於發佈/訂閱的消費模型網絡
Metaq只有發佈訂閱的消費方式。session
消息類型數據結構
JMS定義的消息類型有TextMessage、MapMessage、BytesMessage、StreamMessage、ObjectMessage。Metaq只有一種類型:Message。併發
消息持久性
JMS定義兩種持久性類型:
PERSISTENT 指示JMS provider持久保存消息,以保證消息不會由於JMS provider的失敗而丟失。
NON_PERSISTENT 不要求JMS provider持久保存消息。
Metaq的消息都是持久性的
API
JMS定義了消息中間件的生產端api和消費端api,這些api都是約定的接口,都都被metaq無視了。
二:一些概念
消息生產者
負責產生消息併發送消息到meta服務器
消息消費者
負責消息的消費,meta採用pull模型,由消費者主動從meta服務器拉取數據並解析成消息並消費
Topic
消息的主題,由用戶定義並在服務端配置。producer發送消息到某個topic下,consumer從某個topic下消費消息
分區
同一個topic下面還分爲多個分區,如meta-test這個topic咱們能夠分爲10個分區,分別有兩臺服務器提供,那麼可能每臺服務器提供5個分 區,假設服務器分別爲0和1,則全部分區爲0-0、0-一、0-二、0-三、0-四、1-0、1-一、1-二、1-三、1-4
Message
消息,負載用戶數據並在生產者、服務端和消費者之間傳輸
Broker
就是meta的服務端或者說服務器,在消息中間件中也一般稱爲broker。
消費者分組(Group)
消費者能夠是多個消費者共同消費一個topic下的消息,每一個消費者消費部分消息。這些消費者就組成一個分組,擁有同一個分組名稱,一般也稱爲消費者集羣
Offset
消息在broker上的每一個分區都是組織成一個文件列表,消費者拉取數據須要知道數據在文件中的偏移量,這個偏移量就是所謂offset。Offset是絕對偏移量,服務器會將offset轉化爲具體文件的相對偏移量
三:整體結構圖
四:消息存儲
消息中間件中消息堆積是很常見,這要求broker具備消息存儲的能力,消息存儲結構決定了消息的讀寫性能,對總體性能有很大影響,metaq是分佈式的,多個borker能夠爲一個topic提供服務,一個topic下的消息分散存儲在多個broker,它們是多對多關係。
以下圖
消息定義
id
消息的惟一id,系統自動產生,用戶沒法設置,在發送成功後由服務器返回,發送失敗則爲0。
topic
消息的主題,訂閱者訂閱該主題便可接收發送到該主題下的消息,必須
data
消息的有效載荷,也就是消息內容,meta永遠不會修改消息內容,你發送出去是什麼樣子,接收到就是什麼樣子。
attribute
消息屬性,一個字符串,可選。發送者可設置消息屬性來讓消費者過濾。
物理文件
metaq將消息存儲在本地文件中,每一個文件最大大小爲1G,若是寫入新的消息時,超過當前文件大小,則會自動新建一個文件。文件名稱爲起始字節大小,例如,假設文件最大尺寸爲1k,有三個文件,則文件名如
下(長度爲20位,不足補0):
00000000000000000000
00000000000000001024
00000000000000002048
即便一個broker爲多個topic服務,這些topic的消息都存儲同一個文件組中,消息順序寫入,永遠都是當前文件在寫,其餘文件只讀。
索引文件
弄清消息的物理存儲後,也許咱們會有一個疑問:如何讀取指定topic的當前消息?的確,僅僅只存儲消息是沒法作到這個的,因此metaq還有索引文件,相似數據庫的索引,可是有很大區別。broker將消息存儲到文件後,會將該消息在文件的物理位置,消息大小,消息類型封裝成一個固定大小的數據結構,暫且稱這個數據結構爲索引單元吧,大小固定爲16k,消息在物理文件的位置稱爲offset。
索引單元結構
offset |
size |
messateType |
8字節 |
4字節 |
4字節 |
多個索引單元組成了一個索引文件,索引文件默認固定大小爲20M,和消息文件同樣,文件名是
起始字節位置,寫滿後,產生一個新的文件。
邏輯分區
一個邏輯分區其實是一組索引文件。一個topic在一個broker上能夠有多個邏輯分區,默認爲1,但可自由配置。爲何會有多個分區的狀況?邏輯分區的做用不只僅是經過索引提供快速定位消息的功能,它還跟整個metaq的集羣有很大的關係。
邏輯結構圖
五:集羣與負載均衡
Topic分佈
一個topic能夠分佈在多臺broker上,具體體現就是多個broker配置了這個topic,而且最少有一個分區。假若有一個topic名爲」t1」,兩個broker:b1,b2;每一個borker都爲t1配置了兩個分區。那麼t1一共有4個分區:b1-1,b1-2,b2-1,b2-2。生產者和消費者對topic發佈消息或消費消息時,目的地都是以分區爲單位。當一個topic消息量逐漸變大時,能夠將topic分佈在更多的borker上。某個broker上的分區數越多,意味着該borker承擔更繁重的任務,分區數能夠認爲是權重的表現形式。
生產者
生產者在經過zk獲取分區列表以後,會按照brokerId和分區號的順序排列組織成一個有序的分區列表,發送的時候按照從頭至尾循環往復的方式選擇一個分區來發送消息。這是默認的分區策略,考慮到咱們的broker服務器軟硬件配置基本一致,默認的輪詢策略已然足夠。若是你想實現本身的負載均衡策略,能夠實現上文提到過的PartitionSelector接口,並在建立producer的時候傳入便可。在broker由於重啓或者故障等因素沒法服務的時候,producer經過zookeeper會感知到這個變化,將失效的分區從列表中移除作到fail over。由於從故障到感知變化有一個延遲,可能在那一瞬間會有部分的消息發送失敗。
消費者
消費者的負載均衡會相對複雜一些。咱們這裏討論的是單個分組內的消費者集羣的負載均衡,不一樣分組的負載均衡互不干擾,沒有討論的必要。 消費者的負載均衡跟topic的分區數目緊密相關,要考察幾個場景。 首先是,單個分組內的消費者數目若是比總的分區數目多的話,則多出來的消費者不參與消費
其次,若是分組內的消費者數目比分區數目小,則有部分消費者要額外承擔消息的消費任務,具體見示例圖以下
六:文件讀寫
消息存儲在文件中,如何保證性能?Metaq使用了文件內存映射特性,對應的是MappedByteBuffer對象。 MappedByteBuffer 只是一種特殊的 ByteBuffer ,便是ByteBuffer的子類。 MappedByteBuffer 將文件直接映射到內存(這裏的內存指的是虛擬內存,並非物理內存)。一般,能夠映射整個文件,若是文件比較大的話能夠分段進行映射, 只要指定文件的那個部分就能夠。並且,與ByteBuffer十分相似,沒有構造函數(你不可new MappedByteBuffer()來構造一個MappedByteBuffer),咱們能夠經過 java.nio.channels.FileChannel 的 map() 方法來獲取 MappedByteBuffer 。其實說的通俗一點就是Map把文件的內容被映像到計算機虛擬內存的一塊區域,這樣就能夠直接操做內存當中的數據而無需操做的時候每次都經過I/O去物理 硬盤讀取文件,因此效率上有很大的提高。
映射方式
MappedByteBuffer map(int mode,long position,long size); 能夠把文件的從position開始的size大小的區域映射爲內存映像文件,mode指出了可訪問該內存映像文件的方式:
READ_ONLY,(只讀)
試圖修改將致使拋出異常
READ_WRITE(讀/寫)
對獲得的緩衝區的更改最終將傳播到文件;該更改對映射到同一文件的其餘程序不必定是可見的。
PRIVATE(專用)
對獲得的緩衝區的更改不會傳播到文件,而且該更改對映射到同一文件的其餘程序也不是可見的;相反,會建立緩衝區已修改部分的專用副本。
三個關鍵方法
fore()
緩衝區是READ_WRITE模式下,此方法對緩衝區內容的修改強行寫入文件
load()
將緩衝區的內容載入內存,並返回該緩衝區的引用
isLoaded()
若是緩衝區的內容在物理內存中,則返回真,不然返回假
調用信道的map()方法後,便可將文件的某一部分或所有映射到內存中,映射內存緩衝區是個直接緩衝區,繼承自ByteBuffer,但相對於ByteBuffer,它有更多的優勢: a. 讀取快 b. 寫入快 c. 隨時隨地寫入
釋放內存句柄
經過FileChannel.map方法能夠獲得一個MappedByteBuffer,但FileChannel沒有提供unmap方法,FileChannel關閉後,不會釋放映射的MappedByteBuffer。致使的問題是一個map過的文件關閉後,卻沒法將其刪除。根據JAVADOC的說明,是在垃圾收集的時候.而衆所周知垃圾收集是程序根本沒法控制的,有個土方:
Java代碼
1. AccessController.doPrivileged(new PrivilegedAction() {
2. public Object run() {
3. try {
4. Method getCleanerMethod = buffer.getClass().getMethod("cleaner", new Class[0]);
5. getCleanerMethod.setAccessible(true);
6. sun.misc.Cleaner cleaner = (sun.misc.Cleaner)
7. getCleanerMethod.invoke(byteBuffer, new Object[0]);
8. cleaner.clean();
9. } catch (Exception e) {
10. e.printStackTrace();
11. }
12.
13. return null;
14. }
15. });
若是但願更加高效地處理映射到內存中的文件,把文件的內容加載到物理內存中是一個好辦法。經過MappedByteBuffer類的load方法能夠把該緩衝區所對應的文件內容加載到物理內存中,以提升文件操做時的性能。因爲物理內存的容量受限,不太可能直接把一個大文件的所有內容一次性地加載到物理內存中。能夠每次只映射文件的部份內容,把這部份內容徹底加載到物理內存中進行處理。完成處理以後,再映射其餘部分的內容。因爲I/O操做通常比較耗時,出於性能考慮,不少操做在操做系統內部都是使用緩存的。在程序中對MappedByteBuffer作的修改不必定會當即同步到文件 系統中。若是在沒有同步以前發生了程序錯誤,可能致使所作的修改丟失。所以,在執行完某些重要文件內容的更新操做以後,應該調用MappedByteBuffer類 的force方法來強制要求把這些更新同步到底層文件中。能夠強制同步的更新有兩類,一類是文件的數據自己的更新,另外一類是文件的元數據的更新。在使用 force方法時,能夠經過參數來聲明是否在同步數據的更新時也同步元數據的更新。
七:消息消費
metaq的消費模型不是生產端推送,而是消費端不停拉取。可是注意,不停拉取不是指消費端定時拉取,而是拉取完一批消息,消費完畢,再去拉取下一批。這裏有實時性和吞吐量之間的矛盾,若是每次批量拉取的消息數量過少,會增長實時性,可是減小吞吐量;反之,若是每次批量拉取的消息數量過大,則實時性會打折扣,但吞吐量上升。因爲metaq的消息存儲結構,消費端拉取消息時,至少須要如下幾個參數:
· 消息主題
· 邏輯隊列序號
· 索引發始位置
· 消息最大長度
· 當前請求序列號
· 消費者分組名稱
Metaq恰好也定義了這樣的一個請求對象,恰好6個屬性,分別對應前面所說的參數。
Java代碼
1. public class GetCommand{
2. private final long offset;
3. private final intmaxSize;
4. private final int partition;
5. private final String group;
6. private Integer opaque;
7. private String topic;
8. ……
9. }
· 根據topic和partition找到邏輯隊列:A
· 根據offset從A定位指定的索引文件:B
· 從B中讀取全部的索引數據:C
· 遍歷C,根據索引單元的消息物理地址和消息長度,找到物理消息D,將D放入集合,並計算消息的累加長度,若大於請求裏消息最大長度maxSize,則終止遍歷,返回結果。
消息結果裏有當前批次消息的索引讀取結束位置(offset),消費端會將當前offset存儲在本地,下次拉取消息時,要將結束位置做爲參數放入消息拉取請求裏。因爲metaq是分佈式結構,消費端和生產端的對應關係可能會常常變更,offset不能僅僅只是保存到本地,必須保存在一個共享的存儲裏,好比zookeeper,數據庫,或共享的文件系統。默認狀況下,metaq將offset及時保存在本地,並定時寫入zookeeper。在某些狀況下,會發生消息重複消費,好比某個consumer掛掉了,新的consumer將會接替它繼續消費,可是offset是異步存儲的,可能新的consumer起來後,從zookeeper上拿到的仍是舊的offset,致使當前批次重複,產生重複消費。
八:可靠性保證
生產者可靠性保證
消息生產者發送消息後返回SendResult,若是isSuccess返回爲true,則表示消息已經確認發送到服務器並被服務器接收存儲。整個發送過程是一個同步的過程。保證消息送達服務器並返回結果。
服務器可靠性保證
消息生產者發送的消息,meta服務器收到後在作必要的校驗和檢查以後的第一件事就是寫入磁盤,寫入成功以後返回應答給生產者。所以,能夠確認每條發送結果爲成功的消息服務器都是寫入磁盤的。 寫入磁盤,不意味着數據落到磁盤設備上,畢竟咱們還隔着一層os,os對寫有緩衝。Meta有如下刷盤策略:
異步刷盤
每1000條(可配置),即強制調用一次force來寫入磁盤設備。
每隔10秒(可配置),強制調用一次force來寫入磁盤設備。
同步刷盤
此外,若是存儲配置上的groupCommitEnable選項爲true,則會在寫入消息後,當即強制刷盤。
消費者可靠性保證
消費者是一條接着一條地消費消息,只有在成功消費一條消息後纔會接着消費下一條。若是在消費某條消息失敗(如異常),則會嘗試重試消費這條消 息(默認最大5次),超過最大次數後仍然沒法消費,則將消息存儲在消費者的本地磁盤,由後臺線程繼續作重試。而主線程繼續日後走,消費後續的消息。所以, 只有在MessageListener確認成功消費一條消息後,meta的消費者纔會繼續消費另外一條消息。由此來保證消息的可靠消費。消費者的另外一個可靠性的關鍵點是offset的存儲,也就是拉取數據的偏移量。默認存儲在zoopkeeper上,zookeeper經過集羣來保證數據的安全性。Offset會按期保存,而且在每次從新負載均衡前都會強制保存一次,所以可能會存在極端狀況下的消息的重複消費。
九:zookeeper存儲結構
/meta/brokers/ids
描述broker的註冊信息
假若有3個broker,id分別爲m1,s1,s2,s1和s2是m1的slave(實際上這些id都是數字,不能有字母)。則結構爲
/meta/brokers/ids/m1/master
/meta/brokers/ids/m1/slaves1
/meta/brokers/ids/m1/slaves2
m1是master brokerid,若是根據m1找master brokerid,只需判斷m1/master是否存在。若是尋找m1的slave,只需找到m1下的3個節點,比對節點名稱是否以"slave"字符串開頭,如果,則截取slave id加入到slave節點集合。
/meta/brokers/topics
這個結構稍微有些複雜,仍是舉例說明吧。假若有如下broker信息:master m1,slave s1;master m2,slave s2;有一個topic名爲」hello」,兩組broker都配置了」hello」這個topic。則目錄以下:
/meta/brokers/topics/hello/m1-m
/meta/brokers/topics/hello/m2-m
/meta/brokers/topics/hello/s1-s
/meta/brokers/topics/hello/s2-s
-m表示master,-s表示slave,爲何要有這個結構呢?由於producer給某個topic推送消息時,須要知道哪些broker配置了該topic。
根據topic獲取master或者slave,很簡單,找到/meta/brokers/topics/hello的子目錄名稱,而後判斷是否以-m或者-s結尾,分別歸類爲master和slave。不過拿到master或者slave的brokeid後,還須要按照brokeid檢查broker是否存在。詳情能夠看MetaZookeeper的getMasterBrokersByTopic方法。
關於topic在broker上的分區信息,接着上面繼續思考,僅僅知道哪些borker配置了某個topic還不夠,
由於topic在一個broker上還有分區信息。假如hello這個topic在m1上有2個分區,能夠認爲
/meta/brokers/topics/hello是一個目錄,/meta/brokers/topics/hello/m1-m是一個文件,那麼hello這個
topic在m1上的分區信息就是文件裏的數據了。
/meta/brokers/topics/hello/m1-m的數據是一個整數,某個topic在某個broker上的分區號是遞增的,所以若是/meta/brokers/topics/hello/m1-m的數據爲2,則代表hello在m1上的分區有2個。詳情請看MetaZookeeper的getPartitionsForTopicsFromMaster方法。基於/meta/brokers/topics的結構,還能夠查找某個broker發佈了哪些topic。假如存在如下目錄
/meta/brokers/topics/hello1/m1-m
/meta/brokers/topics/hello1/m2-m
/meta/brokers/topics/hello1/s1-s
/meta/brokers/topics/hello1/s2-s
/meta/brokers/topics/hello2/m1-m
/meta/brokers/topics/hello2/m2-m
/meta/brokers/topics/hello2/s1-s
/meta/brokers/topics/hello2/s2-s
查找過程以下
· 找到/meta/brokers/topics的全部子目錄,獲得hello1和hello2,其實就是整個集羣裏有哪些topic。
· 遍歷每一個topic的子目錄,例如hello1的子目錄爲m1-m,m2-m,s1-s,s2-s
· 遍歷這些子目錄,找到角色爲master的brokerid是否和當前查找的brokerid一致,若是是,則將當前topic加入到指定brokerid發佈的topic集合裏。例如對於m1這個brokerid,輸出是hello1,hello2。詳情見getTopicsByBrokerIdFromMaster方法。
/meta/consumers/group/ids
存儲某個分組的消費者註冊信息,還有他們分別訂閱了哪些topic。group是個變量,以消費者的實際分組爲
準。假設有一個消費者分組名爲「hellogroup」,該分組有兩個消費者,id分別爲"c1"和"c2",c1訂閱了
topic "t1"和"t2",c3訂閱了"t3"和"t4"。則存在如下兩個節點:
/meta/consumers/hellogroup/ids/hellogroup_c1 節點數據爲「hello1,hello2」
/meta/consumers/hellogroup/ids/hellogroup_c2 節點數據爲"hello2,hello3"
消費者id的計算規則
consumerId=所屬分組名稱+「_」+consumerUUID
若是構建一個消費端時,配置裏指定了consumerUUID,則以該consumerUUID爲準,不然按照規則計算。見
ConsumerZookeeper的getConsumerUUID方法:
Java代碼
1. protected String getConsumerUUID(final ConsumerConfig consumerConfig) throws Exception {
2. String consumerUUID = null;
3. if (consumerConfig.getConsumerId() != null) {
4. consumerUUID = consumerConfig.getConsumerId();
5. }else {
6. consumerUUID =
7. RemotingUtils.getLocalAddress() + "-" + System.currentTimeMillis() + "-"
8. + this.counter.incrementAndGet();
9. }
10. return consumerUUID;
11. }
/meta/consumers/group/standby
group是一個變量,以實際消費者分組名稱爲準,這個比較簡單,存儲的是一個數字,假設爲n,那麼意思就是該分組的全部消費者都從第n個slave獲取信息,禁止寫入。默認狀況下,該值爲空,除非master掛掉,或者人工修改。有個問題待定:一個topic分佈在多個broker上,每一個broker的slave數量可能不同,例如某個broker的slave數量1,可是n卻爲2。以此推測,這個配置多是基於一個約定,就是每一個broker的slave數量都是相同的。
/meta/consumers/group/offsets/topic
存儲一個分組對某個topic不一樣分區的消費位置,group和topic是變量,以實際值爲準,假如一個topic名稱
爲t1,部署在兩臺broker:b1,b2;每一個broker有兩個分區。則一共有4個分區:b1-1,b1-2,b2-1,b2-2。一個
消費者分組「hellogroup」消費了這個topic,b1-1,b1-2,b2-1,b2-2的消費位置分別是1,2,3,4;則有如下節點:
/meta/consumers/hellogroup/offsets/t1/b1-1 數據爲1
/meta/consumers/hellogroup/offsets/t1/b1-2 數據爲2
/meta/consumers/hellogroup/offsets/t1/b2-1 數據爲3
/meta/consumers/hellogroup/offsets/t1/b2-2 數據爲4
/meta/consumers/group/owners/topic
存儲一個分組內,某個topic不一樣分區被哪一個消費者消費了,group和topic是變量,以實際值爲準。假如一個topic名稱爲t1,部署在1臺broker:b1;b1有兩個分區。則分區id爲:b1-1,b1-2。一個分組「hellogroup
消費了這個topic,消費者id分別爲c1,c2;c1消費了b1-1,c2消費了b1-2,則有如下節點:
/meta/consumers/hellogroup/owners/t1/b1-1 數據爲c1
/meta/consumers/hellogroup/owners/t1/b1-2 數據爲c2
十:通訊框架
使用淘寶內部一個基於nio的通訊框架gecko,相似tbremoting。實現方式和api使用都是相似的。不一樣的是tbremoting默認基於mina實現,而gecko全都是本身實現的。與tbremoting同樣,gecko也是基於Handler機制,向上提供request/processor方式進行業務處理。有關mina的資料介紹很是多,有興趣可本身學習下,這裏不作深刻介紹。Gecko的hander定義和mina很像。
Java代碼
1. public interface Handler {
2. void onSessionCreated(Session session);
3. void onSessionStarted(Session session);
4. void onSessionClosed(Session session);
5. void onMessageReceived(Session session, Object msg);
6. void onMessageSent(Session session, Object msg);
7. void onExceptionCaught(Session session, Throwablethrowable);
8. void onSessionExpired(Session session);
9. void onSessionIdle(Session session);
10. void onSessionConnected(Session session, Object... args);
11. }
關注void onMessageReceived(Session session, Object msg);當服務端或客戶端收到消息後,就會觸發這個方法。Session爲當前網絡鏈接,msg爲收到的信息,網絡中傳輸二進制數據,相似mina,在過濾器鏈中,二進制數據與java對象之間會互相編碼解碼,不須要應用層關心。gecko包裝了handler,對外只提供request/processor處理方式,意思是對於不一樣類型請求用相應的處理器處理。事實上onMessageReceived方法收到的msg只有兩種對象:RequestCommand和ResponseCommand。分別表明了請求和響應。
Java代碼
1. void onMessageReceived(Session session, Object msg){
2. ……
3. if (message instanceofRequestCommand) {
4. this.processRequest(session, message, defaultConnection);
5. } else if (message instanceofResponseCommand) {
6. this.processResponse(message, defaultConnection);
7. }
8. ……
9. }
看看MetaMorphosisBroker的registerProcessors()就知道了。摘錄片斷以下:
Java代碼
1. this.remotingServer.registerProcessor(GetCommand.class, new GetProcessor(this.brokerProcessor,
2. this.executorsManager.getGetExecutor()));
3. this.remotingServer.registerProcessor(PutCommand.class, new PutProcessor(this.brokerProcessor,
4. this.executorsManager.getUnOrderedPutExecutor()));
5. this.remotingServer.registerProcessor(OffsetCommand.class, new OffsetProcessor(this.brokerProcessor,
6. this.executorsManager.getGetExecutor()));
如下是對應關係(不是所有的),實際上,不一樣的request都有對應的通信協議
GetCommand.class/GetProcessor;
PutCommand.class/PutProcessor;
OffsetCommand.class/OffsetProcessor
十一:通訊協議
Meta的協議是基於文本行的協議,相似memcached的文本協議。通用的協議格式以下
command params opaque\r\n body
其中command爲協議命令,params爲參數列表,而opaque爲協議的序列號,用於請求和應答的映射。客戶端發送協議的時候須要自增此序列號, 而服務端將拷貝來自客戶端的序列號並做爲應答的序列號返回,客戶端可根據應答的序列號將應答和請求對應起來。body爲協議體,可選,在協議頭裏須要有字 段指名body長度
Put命令
參數
topic partition value-length flag [transactionKey]
說明
發送消息協議,topic爲發送的消息主題,partition爲發送的目的分區,value-length爲發送的消息體長度,flag爲消息標識位,transactionKey爲事務標識符,可選。
示例
put meta-test 0 5 0 1\r\nhello
get命令
參數
topic group partition offset maxSize
說明
消費者拉取消息協議,topic爲拉取的消息主題,group爲消費者分組名稱,partition爲拉取的目的分區,offset爲拉取的起始偏移量,maxSize爲本次拉取的最大數據量大小
示例
get meta-test example 0 1024 512 1\r\n
data命令
參數
total-length
說明
get請求返回的應答,total-length返回的數據長度
示例
data 5 1\r\nhello
result命令
參數
code length
說明
通用應答協議,如返回請求結果。code爲應答狀態碼,採用與HTTP應答狀態碼同樣的語義。length爲協議體長度
示例
result 200 0 1\r\n
offset命令
參數
topic group partition offset
說明
查詢離某個offset的最近有效的offset,topic爲查詢的消息主題,group爲消費者分組名稱,partition爲查詢的分區,offset爲查詢的offset
示例
offset meta-test example 0 1024 1\r\n
stats命令
參數
item(可選)
說明
查詢服務器的統計狀況,item爲查詢的項目名稱,如realtime(實時統計),具體的某個topic等,能夠爲空
示例
stats 1\r\n
十二:異步複製
Meta的HA(High Availability)提供了在某些Broker出現故障時繼續工做而不影響消息服務的可用性;跟HA關係緊密的就是Failover,當故障 Server恢復時能從新加入Cluster處理請求,這個過程對消息服務的使用者是透明的。Meta基於Master/Slave實現HA,Slave 以做爲Master的訂閱者(consumer)來跟蹤消息記錄,當消息發送到Master時候,Slave會定時的獲取此消息記錄,並存儲在本身的 Store實現上;當Master出現故障沒法繼續使用了,消息還會在Slave上Backup的記錄。這種方式不影響原有的消息的記錄,一旦 master記錄成功,就返回成功,不用等待在slave上是否記錄;正因如此,slave和master還有稍微一點的時間差別,在Master出故障 那一瞬間,或許有最新產生的消息,就沒法同步到slave;另外Slave能夠做爲Consumer的服務提供者,意思就是若是要寫入必須經過 Master,消費時候能夠從Slave上直接獲取。
Failover機制採用client端方式,Master和Slave都須要註冊到ZK上,一旦Master沒法使用,客戶端可以使用與之對應的Slave;當Master的故障恢復時候,這時候有兩種方式處理:
1. 原來的master變成Slave,Slave變成Master;恢復故障的broker做爲slave去以前的Slave同步消息。優勢簡單,可是須要slave和Master有同樣的配置和處理能力,這樣就能取代Master的位置。(目前Meta採用此方式)
2. 須要自動把請求從新轉移回恢復的Master。實現複雜,須要再次把最新的消息從Slave複製會Master,在複製期間還要考慮處理最新的消息服務(Producer能夠暫存消息在本地,等複製成功後再和Broker交互)。
十三:分佈式事務
metaq提供分了布式事務的功能,提及分佈式事務,就不能不說起XA。X/Open 組織定義了分佈式事務處理模型。
1. X/Open DTP 模型包括
2. 應用程序( AP )
3. 事務管理器( TM )
4. 資源管理器( RM )
5. 通訊資源管理器( CRM )
通常,常見的資源管理器( RM )是數據庫,常見的通訊資源管理器( CRM )是消息中間件。
X/Open DTP 模型
二階段提交示意圖
XA與JTA的關係
XA是一個規範,JTA也是一個規範,其實這兩個規範是同樣的,只不過XA跟語言無關,而JTA是java版的規範,進一步細化了XA規範,定義了明確清晰的接口。
JTA的主要接口
UserTransaction 面向應用程序的接口,控制事務的開始、掛起、提交、回滾等
begin()
開始一個分佈式事務,(在後臺 TransactionManager 會建立一個 Transaction 事務對象並把此對象經過 ThreadLocale關聯到當前線程上 )
commit()
提交事務(在後臺 TransactionManager 會從當前線程下取出事務對象並把此對象所表明的事務提交)
rollback()
回滾事務(在後臺 TransactionManager 會從當前線程下取出事務對象並把此對象所表明的事務回滾)
ugetStatus()
返回關聯到當前線程的分佈式事務的狀態
usetRollbackOnly()
標識關聯到當前線程的分佈式事務將被回滾
Transaction
表明一個物理意義上的事務,UserTransaction 接口中的 commit()、rollback(),getStatus() 等方法都將最終委託給 Transaction 類的對應方法執行。
commit() 提交事務
rollback() 回滾事務
setRollbackOnly() 標識關聯到當前線程的分佈式事務將被回滾
getStatus() 返回關聯到當前線程的分佈式事務的狀態
enListResource(XAResource xaRes, int flag) 將事務資源加入到當前的事務中
udelistResourc(XAResource xaRes, int flag) 將事務資源從當前事務中刪除
uregisterSynchronization(Synchronization sync) 回調接口,在事務完成時獲得通知從而觸發一些處理工做。當事務成功提交後,回調程序將被激活。
TransactionManager
不承擔實際事務處理功能,是用戶接口和實現接口的橋樑。調用 UserTransaction.begin() 方法時 TransactionManager 會建立一個 Transaction 對象,並把此對象關聯到當前線程上;一樣 UserTransaction.commit() 會調用 TransactionManager.commit(),方法將從當前線程下取出事務對象 Transaction 並提交, 即調用 Transaction.commit()。
begin() 開始事務
commit() 提交事務
rollback() 回滾事務
getStatus() 返回當前事務狀態
setRollbackOnly()
getTransaction() 返回關聯到當前線程的事務
setTransactionTimeout(int seconds) 設置事務超時時間
resume(Transaction tobj) 繼續當前線程關聯的事務
suspend() 掛起當前線程關聯的事務
XAResource
這是一個很是重要的接口,是對底層事務資源的抽象,定義了分佈式事務處理過程當中事務管理器和資源管理器之間的協議。
commit() 提交事務
isSameRM(XAResource xares) 檢查當前的 XAResource 與參數是否同一事務資源
prepare() 通知資源管理器準備事務的提交工做
rollback() 通知資源管理器回滾事務
消息提交和回滾
咱們熟悉了前面的一些概念,分佈式事務模型中有幾個角色。metaq和數據庫同樣實際上是一個RM,不過它沒有遵照JMS的分佈式事務標準,它對外呈現的就是一個XAResource。能夠粗略的講,只有數據可能會發生修改,才須要事務來保證數據的完整性,若是隻是讀取數據,則不須要事務,由於事務須要成本(數據庫讀取數據也會有事務的,這個緣由有不少方面,好比事務的隔離和MVCC )。因此,metaq的事務主要發生在生產者,一個典型的場景示例以下:
· 應用程序向數據庫寫入一條記錄
· 而後向metaq寫入一條消息
· 而後再向數據庫寫入一條日誌
· 若是日誌寫入失敗,則前面步驟所有回滾
· 若是日誌寫入成功,則前面步驟所有提交
若是metaq調用處於分佈式事務,則調用方式以下
Java代碼
1. XAMessageSessionFactory xaSF= new XAMetaMessageSessionFactory(new MetaClientConfig());
2. XAMessageProducer xaProducer=xaSF.createXAProducer();
3. XAResource metaXares = producer.getXAResource();
4. /**
5. *加入JTA事務 該接口最終會調用XAResource.start方法,即metaXares.start(Xid,int)方法,
6. *把該資源加入當前事務當中,發送一個帶XID的事務命定,通知Metaq啓動一個全局事務
7. *分支,用XID標示該全局事務。
8. */
9. tx.enlistResource(metaXares);
10. //事務中的業務操做 向meta server發送一條消息
11. String message="hello world!";
12. String topic="meta-test";
13. producer.sendMessage(new Message(topic, messate.getBytes());
看看兩階段提交和XAResouce,XAMessageProducer的getXAResource()方法可獲得一個TransactionContext對象,實現了XAResource接口。經過UserTransaction. enListResource(XAResource xaRes, intflag)方法將當前XAResource加入到分佈式事務裏時,XAResource的start方法將被調用。Start方法向metaq的broker發送一個事務開始的命令,表示後續的操做都在分佈式服務裏,這些操做要暫存是事務文件裏,不能直接寫到消息隊列裏。ransactionContext有prepare()和commit()方法,這兩個方法對應着分佈式事務提交的兩個階段。prepare階段,metaq只是將生產者發送的消息暫存在本地的事務日誌裏,其實就是一個文件,commit階段纔會從事務暫存文件裏提取消息,寫入到消息隊列。