本文記錄了月前筆者參與阿里雲中間件比賽中,實現的簡要具備持久化功能的消息隊列的設計與實現過程。須要聲明的是,LocalMQ 借鑑了 RocketMQ 在 Broker 部分的核心設計思想,最先的源碼也是基於 RocketMQ 源碼改造而來。本文涉及引用以及其餘消息隊列相關資料參考這裏,源代碼放於 LocalMQ 倉庫;另外筆者水平有限,後來由於畢業旅行也未繼續優化,本文不少內容可能存在謬誤與不足,請批評指正。java
所謂消息隊列,直觀來看有點像蓄水池,可以在生產者與消費者之間完成解耦,而且平衡生產者與消費者之間的計算量與可計算時間之間的差別;目前主流的消息隊列有著名的 Kafka、RabbitMQ、RocketMQ 等等。在筆者實現的 LocalMQ 中,從簡到復依次實現了 MemoryMessageMQ、EmbeddedMessageQueue 與 LocalMessageQueue 這三個版本;須要說明的是,在三個版本的消息隊列中,都是採起所謂的拉模式,即消費者主動向消息隊列請求拉取消息的模式。在 wx.demo.* 包下提供了不少的內部功能與性能測試用例,數組
// 首先在這裏:https://parg.co/beX 下載代碼 // 而後修改 DefaultProducer 對應的繼承類 // 測試 MemoryMessageQueue,則繼承 MemoryProducer; // 測試 EmbeddedMessageQueue,則繼承 EmbeddedProducer; // 默認測試 LocalMessageQueue,注意,須要對 DefaultPullConsumer 進行一樣修改 public class DefaultProducer extends LocalProducer // 使用 mvn 運行測試用例,也能夠在 Eclipse 或者 Intellij 中打開 mvn clean package -U assembly:assembly -Dmaven.test.skip=true java -Xmx2048m -Xms2048m -cp open-messaging-wx.demo-1.0.jar wx.demo.benchmark.ProducerBenchmark
最簡單的 MemoryMessageQueue 便是將消息數據按照選定主題存放在內存中,其主要結構以下圖所示:緩存
MemoryMessageQueue 提供了同步的消息提交與拉取操做,其利用 HashMap 堆上存儲來緩存全部的消息;而且在內存中維護了另外一個所謂的 QueueOffsets 來記錄每一個主題對應隊列的消費偏移量。相較於 MemoryMessageQueue 實現的簡單的不能進行持久化存儲的消息隊列,EmbeddedMessageQueue 則提供了稍微複雜點的支持磁盤持久化的消息隊列。EmbeddedMessageQueue 構建了基於 Java NIO 提供的 MappedByteBuffer 的 MappedPartitionQueue。每一個 MappedPartitionQueue 對應磁盤上的多個物理文件,而且爲上層應用抽象提供了邏輯上的單一文件。EmbeddedMessageQueue 結構以下圖所示:性能優化
EmbeddedMessageQueue 的主要流程爲生產者同步地像 Bucket Queue 中提交消息,每一個 Bucket 能夠視做某個主題(Topic)或者隊列(Queue)。而 EmbeddedMessageQueue 還包含着負責按期將 MappedPartitionQueue 中數據持久化寫入到磁盤的異步線程,該線程會按期地完成 Flush 操做。EmbeddedMessageQueue 假設某個 BucketQueue 被分配給某個 Consumer 以後就被其佔用,該 Consumer 會消費其中所有的緩存消息;每一個 Consumer 會包含獨立地 Consumer Offset Table 來記錄當前某個隊列地消費狀況。EmbeddedMessageQueue 的缺陷在於:服務器
混合處理與標記位:EmbeddedMessageQueue 僅提供了最簡單的消息序列化模型,沒法記錄額外的消息屬性;網絡
持久化存儲到磁盤的時機:EmbeddedMessageQueue 僅使用了一級緩存,而且僅在某個 Partition 寫滿時才進行文件的持久化操做;數據結構
添加消息的後處理:EmbeddedMessageQueue 是將消息直接寫入到 BucketQueue 包含的 MappedPartitionQueue 中,沒法動態地進行索引、篩選等消息後處理,其可擴展性較差。併發
未考慮斷續拉取的狀況:EmbeddedMessageQueue 中是假設 Consumer 可以單次處理完某個 BucketQueue 中的單個 Partition 的所有消息,所以記錄其處理值時也僅是記錄了文件級別的位移,若是存在某次是僅拉取了單個 Partition 中部份內容,則下次的起始拉取點仍是下個文件首。app
EmbeddedMessageQueue 中咱們能夠在各 Producer 線程中單獨將消息持久化入文件中,而在 LocalMessageQueue 中,咱們是將消息統一寫入 MessageStore 中,而後又 PostPutMessageService 進行二次處理。 LocalMessageQueue 的結構以下所示:dom
LocalMessageQueue 最大的變化在於將消息統一存儲在獨立地 MessageStore 中(相似於 RocketMQ 中的 CommitLog),而後針對 Topic-queueId 將消息劃分到不一樣的 ConsumeQueue 中;這裏的 queueId 是由對應的 Producer 專屬編號決定的,每一個 Consumer 即會被分配佔用某個 ConsumeQueue(相似於 RocketMQ 中的 consumequeue),從而保證某個 Producer 生產的某個主題下的消息被專注的 Consumer 消費。LocalMessageQueue 一樣使用 MappedPartitionQueue 提供底層文件系統抽象,而且構建了獨立的 ConsumerOffsetManager 對消費者的消費進度進行管理,從而方便異常恢復。
本部分圖來源於分佈式開放消息系統(RocketMQ)的原理與實踐
消息產品的一個重要特性是順序保證,也就是消息消費的順序要與發送的時間順序保持一致;在多發送端的狀況下,保證全局順序代價比較大,只要求各個發送端的順序有保障便可; 舉個例子 P1 發送 M11, M12, M13,P2 發送 M21, M22, M23,在消費的時候,只要求保證 M11, M12, M13(M21,M22,M23)的順序,也就是說,實際消費順序爲: M11, M21, M12, M13, M22, M23 正確; M11, M21, M22, M12, M13, M23 正確 M11, M13, M21, M22, M23, M12 錯誤,M12 與 M13 的順序顛倒了;假如生產者產生了 2 條消息:M一、M2,要保證這兩條消息的順序,最直觀的方式就是採起相似於 TCP 中的確認消息:
不過該模型中若是 M1 與 M2 分別被髮送到了兩臺不一樣的消息服務器上,咱們沒法控制消息服務器發送 M1 與 M2 的前後時機;有可能 M2 已經被髮送到了消費者,M1 才被髮送到了消息服務器上。針對這個問題改進版的思路便是將 M1 與 M2 發送到單一消息服務器中,而後根據先到達先消費的原則發送給對應的消費者:
不過在實際狀況下每每由於網絡延遲或其餘問題致使在 M1 發送耗時大於 M2 的狀況下,M2 會先於 M1 被消費。所以若是咱們要保證嚴格的順序消息,那麼必需要保證生產者、消息服務器與消費者之間的一對一對應關係。在 LocalMQ 的實現中,咱們首先會將消息按照生產者劃分到惟一的 Topic-queueId 隊列中;而且保證同一時刻該消費隊列只會被某個消費者獨佔。若是某個消費者在消費完該隊列以前意外中斷,那麼在保留窗口期內不會將該隊列從新分配;在窗口期以外則將該隊列分配給新的消費者,而且即便原有消費者恢復工做也沒法繼續拉取該隊列中包含的消息。
LocalMQ 中目前是實現了基於文件系統的持久化存儲,主要功能實如今 MappedPartition 與 MappedPartitionQueue 這兩個類中,筆者也會在下文中詳細介紹這兩個類的實現。本部分咱們討論下數據存儲的文件格式,對於 LocalMessageQueue 而言,其文件存儲以下:
* messageStore * -- MapFile1 * -- MapFile2 * consumeQueue * -- Topic1 * ---- queueId1 * ------ MapFile1 * ------ MapFile2 * ---- queueId2 * ------ MapFile1 * ------ MapFile2 * -- Queue1 * ---- queueId1 * ------ MapFile1 * ------ MapFile2 * ---- queueId2 * ------ MapFile1 * ------ MapFile2
LocalMessageQueue 中採用了消息統一存儲的方案,所以全部的消息實際內容會被存放在 messageStore 目錄下。而 consumeQueue 中則存放了消息的索引,即在 messageStore 中的偏移地址。LocalMQ 中使用 MappedPartitionQueue 來管理某個邏輯上單一的文件,而根據不一樣的單文件大小限制會自動將其切割爲多個物理上獨立的 Mapped File。每一個 MappedPartition 使用 offset,即該文件首地址的全局偏移量命名;而使用 pos / position 統一表示單文件中局部偏移量,使用 index 表示某個文件在其文件夾中的下標。
在編寫的過程當中,筆者發現對於執行流的優化、避免重複計算與額外變量、選擇使用合適的併發策略都會對結果形成極大的影響,譬如筆者從 SpinLock 切換到重入鎖以後,本地測試 TPS 增長了約 5%。另外筆者也統計了消費者工做中不一樣階段的時間佔比,其中構建(包括消息屬性的序列化)與發送操做(寫入到 MappedFileQueue 中,未使用二級緩存)都是同步進行,兩者的時間佔比也是最多。
[2017-06-01 12:13:21,802] INFO: 構建耗時佔比:0.471270,發送耗時佔比:0.428567,持久化耗時佔比:0.100163 [2017-06-01 12:25:31,275] INFO: 構建耗時佔比:0.275170,發送耗時佔比:0.573520,持久化耗時佔比:0.151309
筆者在實現 LocalMQ 的過程當中感觸最深的就是實現相同功能的不一樣代碼在性能上的差別可能會很大。在實現過程當中應該避免冗餘變量聲明與建立、避免額外空間申請與垃圾回收、避免冗餘的執行過程;另外儘量選用合適的數據結構,譬如筆者在部分實現中從 ArrayList 遷移到了 LinkedList,從 ConcurrentHashMap 遷移到了 HashMap,都帶來了必定的評測指標提高。
異步 IO,順序 Flush;筆者發現,若是多個線程進行併發 Flush 操做,反而不如單線程進行順序 Flush。
儘可能減小鎖控制的範圍。
併發計算優化,將全部的耗時計算放到能夠併發的 Producer 中。
使用合理的鎖,重入鎖相較於自旋鎖有近 5 倍的 TPS 提高。
源代碼參考這裏
MemoryMessageQueue 是最簡易的實現,不過其代碼可以反映出某個消息隊列的基本流程,首先在生產者咱們須要建立消息而且發送給消息隊列:
// 建立消息 BytesMessage message = messageFactory.createBytesMessageToTopic(topic, body); // 發送消息 messageQueue.putMessage(topic, message);
在 putMessage
函數中則將消息存入內存存儲中:
// 存放全部消息 private Map<String, ArrayList<Message>> messageBuckets = new HashMap<>(); // 添加消息 public synchronized PutMessageResult putMessage(String bucket, Message message) { if (!messageBuckets.containsKey(bucket)) { messageBuckets.put(bucket, new ArrayList<>(1024)); } ArrayList<Message> bucketList = messageBuckets.get(bucket); bucketList.add(message); return new PutMessageResult(PutMessageStatus.PUT_OK, null); }
而 Consumer 則根據指定的 Bucket 與 queueId 來拉取消息,若是存在多個 Bucket 須要拉取則進行輪詢:
//use Round Robin int checkNum = 0; while (++checkNum <= bucketList.size()) { String bucket = bucketList.get((++lastIndex) % (bucketList.size())); Message message = messageQueue.pullMessage(queue, bucket); if (message != null) { return message; } }
而 MemoryMessageQueue 的 pullMessage
函數則首先判斷目標 Bucket 是否存在,而且根據內置的 queueOffset 中記錄的拉取偏移量來判斷是否拉取完畢。若沒有拉取完畢則返回消息而且更新本地偏移量;
private Map<String, HashMap<String, Integer>> queueOffsets = new HashMap<>(); ... public synchronized Message pullMessage(String queue, String bucket) { ... ArrayList<Message> bucketList = messageBuckets.get(bucket); if (bucketList == null) { return null; } HashMap<String, Integer> offsetMap = queueOffsets.get(queue); if (offsetMap == null) { offsetMap = new HashMap<>(); queueOffsets.put(queue, offsetMap); } int offset = offsetMap.getOrDefault(bucket, 0); if (offset >= bucketList.size()) { return null; } Message message = bucketList.get(offset); offsetMap.put(bucket, ++offset); ... }
源代碼參考這裏
EmbeddedMessageQueue 中引入了消息持久化支持,本部分咱們也主要討論消息序列化與底層的 MappedPartitionQueue 實現。
EmbeddedMessageQueue 中定義的消息格式以下:
序號 | 消息存儲結構 | 備註 | 長度(字節數) |
---|---|---|---|
1 | TOTALSIZE | 消息大小 | 4 |
2 | MAGICCODE | 消息的 MAGIC CODE | 4 |
3 | BODY | 前 4 個字節存放消息體大小值,後 bodyLength 大小的空間存儲消息體內容 | 4 + bodyLength |
4 | headers* | 前 2 個字節(short)存放頭部大小,後存放 headersLength 大小的頭部數據 | 2 + headersLength |
5 | properties* | 前 2 個字節(short)存放屬性值大小,後存放 propertiesLength 大小的屬性數據 | 2 + propertiesLength |
EmbeddedMessageSerializer 是繼承自 MessageSerializer 的主要負責消息持久化的類,其提供了消息長度的計算函數:
/** * Description 計算某個消息的長度,注意,headersByteArray 與 propertiesByteArray 在發送消息時完成轉換 * @param message * @param headersByteArray * @param propertiesByteArray * @return */ public static int calMsgLength(DefaultBytesMessage message, byte[] headersByteArray, byte[] propertiesByteArray) { // 消息體 byte[] body = message.getBody(); int bodyLength = body == null ? 0 : body.length; // 計算頭部長度 short headersLength = (short) headersByteArray.length; // 計算屬性長度 short propertiesLength = (short) propertiesByteArray.length; // 計算消息體總長度 return calMsgLength(bodyLength, headersLength, propertiesLength); }
而 EmbeddedMessageEncoder 的 encode 函數負責具體的消息序列化操做:
/** * Description 執行消息的編碼操做 * @param message 消息對象 * @param msgStoreItemMemory 內部緩存句柄 * @param msgLen 計算的消息長度 * @param headersByteArray 消息頭字節序列 * @param propertiesByteArray 消息屬性字節序列 */ public static final void encode( DefaultBytesMessage message, final ByteBuffer msgStoreItemMemory, int msgLen, byte[] headersByteArray, byte[] propertiesByteArray ) { // 消息體 byte[] body = message.getBody(); int bodyLength = body == null ? 0 : body.length; // 計算頭部長度 short headersLength = (short) headersByteArray.length; // 計算屬性長度 short propertiesLength = (short) propertiesByteArray.length; // 初始化存儲空間 resetByteBuffer(msgStoreItemMemory, msgLen); // 1 TOTALSIZE msgStoreItemMemory.putInt(msgLen); // 2 MAGICCODE msgStoreItemMemory.putInt(MESSAGE_MAGIC_CODE); // 3 BODY msgStoreItemMemory.putInt(bodyLength); if (bodyLength > 0) msgStoreItemMemory.put(message.getBody()); // 4 HEADERS msgStoreItemMemory.putShort((short) headersLength); if (headersLength > 0) msgStoreItemMemory.put(headersByteArray); // 5 PROPERTIES msgStoreItemMemory.putShort((short) propertiesLength); if (propertiesLength > 0) msgStoreItemMemory.put(propertiesByteArray); }
對應的反序列化操做則是由 EmbeddedMessageDecoder 完成,其主要從某個 ByteBuffer 中讀取數據:
/** * Description 從輸入的 ByteBuffer 中反序列化消息對象 * * @return 0 Come the end of the file // >0 Normal messages // -1 Message checksum failure */ public static DefaultBytesMessage readMessageFromByteBuffer(ByteBuffer byteBuffer) { // 1 TOTAL SIZE int totalSize = byteBuffer.getInt(); // 2 MAGIC CODE int magicCode = byteBuffer.getInt(); switch (magicCode) { case MESSAGE_MAGIC_CODE: break; case BLANK_MAGIC_CODE: return null; default: // log.warning("found a illegal magic code 0x" + Integer.toHexString(magicCode)); return null; } byte[] bytesContent = new byte[totalSize]; // 3 BODY int bodyLen = byteBuffer.getInt(); byte[] body = new byte[bodyLen]; if (bodyLen > 0) { // 讀取而且校驗消息體內容 byteBuffer.get(body, 0, bodyLen); } // 4 HEADERS short headersLength = byteBuffer.getShort(); KeyValue headers = null; if (headersLength > 0) { byteBuffer.get(bytesContent, 0, headersLength); String headersStr = new String(bytesContent, 0, headersLength, EmbeddedMessageDecoder.CHARSET_UTF8); headers = string2KeyValue(headersStr); } // 5 PROPERTIES // 獲取 properties 尺寸 short propertiesLength = byteBuffer.getShort(); KeyValue properties = null; if (propertiesLength > 0) { byteBuffer.get(bytesContent, 0, propertiesLength); String propertiesStr = new String(bytesContent, 0, propertiesLength, EmbeddedMessageDecoder.CHARSET_UTF8); properties = string2KeyValue(propertiesStr); } // 返回讀取到的消息 return new DefaultBytesMessage( totalSize, headers, properties, body ); }
EmbeddedMessageQueue 中消息的寫入其實是由 BucketQueue 的 putMessage/putMessages 函數完成的,這裏的某個 BucketQueue 就對應着 Topic-queueId 這個惟一的標識。這裏以批量寫入消息爲例,首先咱們從 BucketQueue 包含的 MappedPartitionQueue 中獲取到最新可用的某個 MappedPartition:
mappedPartition = this.mappedPartitionQueue.getLastMappedFileOrCreate(0);
而後調用 MappedPartition 的 appendMessages 方法,該方法會在下文介紹;這裏則是要討論添加消息的幾種結果對應的處理。若是添加成功,則直接返回成功;若是該 MappedPartition 剩餘空間不足以寫入消息隊列中的某條消息,則須要調用 MappedPartitionQueue 建立新的 MappedPartition,而且從新計算待寫入的消息序列:
... // 調用對應的 MappedPartition 追加消息 // 注意,這裏通過填充以後,會逆向地將消息在 MessageStore 中的偏移與 QueueOffset 中偏移添加進去 result = mappedPartition.appendMessages(messages, this.appendMessageCallback); // 根據追加結果進行不一樣的操做 switch (result.getStatus()) { case PUT_OK: break; case END_OF_FILE: this.messageQueue.getFlushAndUnmapPartitionService().putPartition(mappedPartition); // 若是已經到了文件最後,則建立新文件 mappedPartition = this.mappedPartitionQueue.getLastMappedFileOrCreate(0); if (null == mappedPartition) { // XXX: warn and notify me log.warning("建立 MappedPartition 錯誤, topic: " + messages.get(0).getTopicOrQueueName()); beginTimeInLock = 0; return new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, result); } // 不然從新進行添加操做 // 從結果中獲取處理完畢的消息數 int appendedMessageNum = result.getAppendedMessageNum(); // 建立臨時的 LeftMessages ArrayList<DefaultBytesMessage> leftMessages = new ArrayList<>(); // 添加全部未消費的消息 for (int i = appendedMessageNum; i < messages.size(); i++) { leftMessages.add(messages.get(i)); } result = mappedPartition.appendMessages(leftMessages, this.appendMessageCallback); break; case MESSAGE_SIZE_EXCEEDED: case PROPERTIES_SIZE_EXCEEDED: beginTimeInLock = 0; return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, result); case UNKNOWN_ERROR: beginTimeInLock = 0; return new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result); default: beginTimeInLock = 0; return new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result); } ...
某個 MappedPartition 映射物理上的單個文件,其初始化時以下傳入文件名與文件尺寸屬性:
/** * Description 初始化某個內存映射文件 * * @param fileName 文件名 * @param fileSize 文件尺寸 * @throws IOException 打開文件出現異常 */ private void init(final String fileName, final int fileSize) throws IOException { ... // 從文件名中獲取到當前文件的全局偏移量 this.fileFromOffset = Long.parseLong(this.file.getName()); ... // 嘗試打開文件 this.fileChannel = new RandomAccessFile(this.file, "rw").getChannel(); // 將文件映射到內存中 this.mappedByteBuffer = this.fileChannel.map(FileChannel.MapMode.READ_WRITE, 0, fileSize); }
初始化階段即打開文件映射,然後在寫入消息或者其餘內容時,其會調用傳入的消息編碼回調(便是咱們上文中介紹的消息序列化的包裹對象)將對象編碼爲字節流而且寫入:
public AppendMessageResult appendMessage(final DefaultBytesMessage message, final AppendMessageCallback cb) { ... // 獲取當前的寫入位置 int currentPos = this.wrotePosition.get(); // 若是當前仍是可寫的 if (currentPos < this.fileSize) { // 獲取到實際的寫入句柄 ByteBuffer byteBuffer = this.mappedByteBuffer.slice(); // 調整當前寫入位置 byteBuffer.position(currentPos); // 記錄信息 AppendMessageResult result = null; // 調用回調函數中的實際寫入操做 result = cb.doAppend(this.getFileFromOffset(), byteBuffer, this.fileSize - currentPos, message); this.wrotePosition.addAndGet(result.getWroteBytes()); this.storeTimestamp = result.getStoreTimestamp(); return result; } ... }
MappedPartitionQueue 用來管理多個物理上的映射文件,其構造函數以下:
// 存放全部的映射文件 private final CopyOnWriteArrayList<MappedPartition> mappedPartitions = new CopyOnWriteArrayList<MappedPartition>(); ... /** * Description 默認構造函數 * * @param storePath 傳入的存儲文件目錄,有可能傳入 MessageStore 目錄或者 ConsumeQueue 目錄 * @param mappedFileSize * @param allocateMappedPartitionService */ public MappedPartitionQueue(final String storePath, int mappedFileSize, AllocateMappedPartitionService allocateMappedPartitionService) { this.storePath = storePath; this.mappedFileSize = mappedFileSize; this.allocateMappedPartitionService = allocateMappedPartitionService; }{}
這裏以 load 函數爲例說明其加載過程:
/** * Description 加載內存映射文件序列 * * @return */ public boolean load() { // 讀取存儲路徑 File dir = new File(this.storePath); // 列舉目錄下全部文件 File[] files = dir.listFiles(); // 若是文件不爲空,則表示有必要加載 if (files != null) { // 重排序 Arrays.sort(files); // 遍歷全部的文件 for (File file : files) { // 若是碰到某個文件還沒有填滿,則返回加載完畢 if (file.length() != this.mappedFileSize) { log.warning(file + "\t" + file.length() + " length not matched message store config value, ignore it"); return true; } // 不然加載文件 try { // 實際讀取文件 MappedPartition mappedPartition = new MappedPartition(file.getPath(), mappedFileSize); // 設置當前文件指針到文件尾 mappedPartition.setWrotePosition(this.mappedFileSize); mappedPartition.setFlushedPosition(this.mappedFileSize); // 將文件放置到 MappedFiles 數組中 this.mappedPartitions.add(mappedPartition); // log.info("load " + file.getPath() + " OK"); } catch (IOException e) { log.warning("load file " + file + " error"); return false; } } } return true; }
處於性能的考慮,MappedPartitionQueue 還會提早建立文件,在 getLastMappedFileOrCreate 函數中,當 allocateMappedPartitionService 存在的狀況下則會調用該異步服務預建立文件:
/** * Description 根據起始偏移量查找最後一個文件 * * @param startOffset * @return */ public MappedPartition getLastMappedFileOrCreate(final long startOffset) { ... // 若是有必要建立文件 if (createOffset != -1) { // 獲取到下一個文件的路徑與文件名 String nextFilePath = this.storePath + File.separator + FSExtra.offset2FileName(createOffset); // 以及下下個文件的路徑與文件名 String nextNextFilePath = this.storePath + File.separator + FSExtra.offset2FileName(createOffset + this.mappedFileSize); // 指向待建立的映射文件句柄 MappedPartition mappedPartition = null; // 判斷是否存在建立映射文件的服務 if (this.allocateMappedPartitionService != null) { // 使用服務建立 mappedPartition = this.allocateMappedPartitionService.putRequestAndReturnMappedFile(nextFilePath, nextNextFilePath, this.mappedFileSize); // 進行預熱處理 } else { // 不然直接建立 try { mappedPartition = new MappedPartition(nextFilePath, this.mappedFileSize); } catch (IOException e) { log.warning("create mappedPartition exception"); } } ... return mappedPartition; } return mappedPartitionLast; }
這裏的 AllocateMappedPartitionService 則會不間斷地執行建立文件的請求:
@Override public void run() { ... // 循環執行文件分配請求 while (!this.isStopped() && this.mmapOperation()) {} ... } /** * Description 循環執行映射文件預分配 * * @Exception Only interrupted by the external thread, will return false */ private boolean mmapOperation() { ... // 執行操做 try { // 取出最新的執行對象 req = this.requestQueue.take(); // 取得待執行對象在請求表中的實例 AllocateRequest expectedRequest = this.requestTable.get(req.getFilePath()); ... // 判斷是否已經存在建立好的對象 if (req.getMappedPartition() == null) { // 記錄起始建立時間 long beginTime = System.currentTimeMillis(); // 構建內存映射文件對象 MappedPartition mappedPartition = new MappedPartition(req.getFilePath(), req.getFileSize()); ... // 進行文件預熱,僅預熱 MessageStore if (mappedPartition.getFileSize() >= mapedFileSizeCommitLog && isWarmMappedFileEnable) { mappedPartition.warmMappedFile(); } // 將建立好的對象回寫到請求中 req.setMappedPartition(mappedPartition); // 異常設置爲 false this.hasException = false; // 成功設置爲 true isSuccess = true; } ... }
EmbeddedMessageQueue 中還包含了某個 flushAndUnmapPartitionServices 用於異步 Flush 文件而且完成不用映射文件的關閉操做。該服務的核心代碼以下:
private final ConcurrentLinkedQueue<MappedPartition> mappedPartitions = new ConcurrentLinkedQueue<>(); ... @Override public void run() { while (!this.isStopped()) { int interval = 100; try { if (this.mappedPartitions.size() > 0) { long startTime = now(); // 取出待處理的 MappedPartition MappedPartition mappedPartition = this.mappedPartitions.poll(); // 將當前內容寫入到磁盤 mappedPartition.flush(0); // 釋放當前不須要使用的空間 mappedPartition.cleanup(); long past = now() - startTime; // EmbeddedProducer.flushEclipseTime.addAndGet(past); if (past > 500) { log.info("Flush data to disk and unmap MappedPartition costs " + past + " ms:" + mappedPartition.getFileName()); } } else { // 定時進行 Flush 操做 this.waitForRunning(interval); } } catch (Throwable e) { log.warning(this.getServiceName() + " service has exception. "); } } }
這裏的 mappedPartitions 便是在上文介紹的當添加消息且返回爲 END_OF_FILE 時候添加進來的。
源代碼參考這裏
LocalMessageQueue 中採用了中心化的消息存儲方案,其提供的 putMessage / putMessages 函數實際上會調用內置 MessageStore 對象的消息寫入函數:
// 使用 MessageStore 進行提交 PutMessageResult result = this.messageStore.putMessage(message);
而 MessageStore 便是存放全部真實消息的中心存儲,LocalMessageQueue 中支持更爲複雜的消息屬性:
序號 | 消息存儲結構 | 備註 | 長度(字節數) |
---|---|---|---|
1 | TOTALSIZE | 消息大小 | 4 |
2 | MAGICCODE | 消息的 MAGIC CODE | 4 |
3 | BODYCRC | 消息體 BODY CRC,用於重啓時校驗 | 4 |
4 | QUEUEID | 隊列編號,queueID | 4 |
5 | QUEUEOFFSET | 自增值,不是真正的 consume queue 的偏移量,能夠表明這個隊列中消息的個數,要經過這個值查找到 consume queue 中數據,QUEUEOFFSET * 12 纔是偏移地址 | 8 |
6 | PHYSICALOFFSET | 消息在 commitLog 中的物理起始地址偏移量 | 8 |
7 | STORETIMESTAMP | 存儲時間戳 | 8 |
8 | BODY | 前 4 個字節存放消息體大小值,後 bodyLength 大小的空間存儲消息體內容 | 4 + bodyLength |
9 | TOPICORQUEUENAME | 前 1 個字節存放 Topic 大小,後存放 topicOrQueueNameLength 大小的主題名 | 1 + topicOrQueueNameLength |
10 | headers* | 前 2 個字節(short)存放頭部大小,後存放 headersLength 大小的頭部數據 | 2 + headersLength |
11 | properties* | 前 2 個字節(short)存放屬性值大小,後存放 propertiesLength 大小的屬性數據 | 2 + propertiesLength |
其構造函數中初始化建立的 MappedPartitionQueue 是按照固定大小(默認單文件 1G)的映射文件組:
// 構造映射文件類 this.mappedPartitionQueue = new MappedPartitionQueue( ((LocalMessageQueueConfig) this.messageStore.getMessageQueueConfig()).getStorePathCommitLog(), mapedFileSizeCommitLog, messageStore.getAllocateMappedPartitionService(), this.flushMessageStoreService );
不一樣於 EmbeddedMessageQueue,LocalMessageQueue 並無在初次提交消息時就直接寫入按照 Topic-queueId 劃分的存儲內;而是依賴於內置的 PostPutMessageService :
/** * Description 執行消息後操做 */ private void doReput() { for (boolean doNext = true; this.isCommitLogAvailable() && doNext; ) { ... // 讀取當前的消息 SelectMappedBufferResult result = this.messageStore.getMessageStore().getData(reputFromOffset); // 若是消息不存在,則中止當前操做 if (result == null) { doNext = false; continue; } try { // 獲取當前消息的起始位置 this.reputFromOffset = result.getStartOffset(); // 順序讀取全部消息 for (int readSize = 0; readSize < result.getSize() && doNext; ) { // 讀取當前位置的消息 PostPutMessageRequest postPutMessageRequest = checkMessageAndReturnSize(result.getByteBuffer()); int size = postPutMessageRequest.getMsgSize(); readSpendTime.addAndGet(now() - startTime); startTime = now(); // 若是處理成功 if (postPutMessageRequest.isSuccess()) { if (size > 0) { // 執行消息寫入到 ConsumeQueue 的操做 this.messageStore.putMessagePositionInfo(postPutMessageRequest); // 修正當前讀取的位置 this.reputFromOffset += size; readSize += size; } else if (size == 0) { this.reputFromOffset = this.messageStore.getMessageStore().rollNextFile(this.reputFromOffset); readSize = result.getSize(); } putSpendTime.addAndGet(now() - startTime); } else if (!postPutMessageRequest.isSuccess()) { ... } } } finally { result.release(); } } }
而在 putMessagePositionInfo 函數中即進行實際的 ConsumeQueue 建立:
/** * Description 將消息的位置放置到 ConsumeQueue 中 * * @param postPutMessageRequest */ public void putMessagePositionInfo(PostPutMessageRequest postPutMessageRequest) { // 尋找或者建立 ConsumeQueue ConsumeQueue cq = this.findConsumeQueue(postPutMessageRequest.getTopic(), postPutMessageRequest.getQueueId()); // 將消息放置到 ConsumeQueue 中合適的位置 cq.putMessagePositionInfoWrapper(postPutMessageRequest.getCommitLogOffset(), postPutMessageRequest.getMsgSize(), postPutMessageRequest.getConsumeQueueOffset()); } /** * Description 根據主題與 QueueId 查找 ConsumeQueue,若是不存在則建立 * * @param topic * @param queueId * @return */ public ConsumeQueue findConsumeQueue(String topic, int queueId) { ConcurrentHashMap<Integer, ConsumeQueue> map = consumeQueueTable.get(topic); ... // 判斷該主題下是否存在 queueId,不存在則建立 ConsumeQueue logic = map.get(queueId); // 若是獲取爲空,則建立新的 ConsumeQueue if (null == logic) { ConsumeQueue newLogic = new ConsumeQueue(// topic, // 主題 queueId, // queueId LocalMessageQueueConfig.mapedFileSizeConsumeQueue, // 映射文件尺寸 this); ConsumeQueue oldLogic = map.putIfAbsent(queueId, newLogic); ... } return logic; }
而在 ConsumeQueue 的構造函數中完成實際的文件映射與讀取:
/** * Description 主要構造函數 * * @param topic * @param queueId * @param mappedFileSize * @param localMessageStore */ public ConsumeQueue( final String topic, final int queueId, final int mappedFileSize, final LocalMessageQueue localMessageStore) { ... // 當前隊列的路徑 String queueDir = this.storePath + File.separator + topic + File.separator + queueId; // 初始化內存映射隊列 this.mappedPartitionQueue = new MappedPartitionQueue(queueDir, mappedFileSize, null); this.byteBufferIndex = ByteBuffer.allocate(CQ_STORE_UNIT_SIZE); }
ConsumeQueue 的文件格式則相對簡單:
// ConsumeQueue 文件內存放的單條 Message 尺寸 // 1 | MessageStore Offset | int 8 Byte // 2 | Size | short 8 Byte
在 LocalPullConsumer 拉取消息時,設置的批量拉取機制;即一次性從 LocalMessageQueue 拉取多條消息到本地,而後再批次返回給本地進行處理(假設處理也有必定耗時)。在批次拉取的函數中,咱們首先須要獲取當前 Consumer 處理的主題與隊列編號對應的 ConsumeQueue 是否包含數據,而後再申請具體的讀取句柄而且佔用該隊列:
/** * Description 批量抓取消息,注意,這裏只進行預抓取,僅當消費者真正獲取後纔會修正讀取偏移量 */ private void batchPoll() { // 若是是 LocalMessageQueue // 執行預抓取 LocalMessageQueue localMessageStore = (LocalMessageQueue) this.messageQueue; // 獲取當前待抓取的桶名 String bucket = bucketList.get((lastIndex) % (bucketList.size())); // 首先獲取待抓取的隊列和偏移 long offsetInQueue = localMessageStore.getConsumerScheduler().queryOffsetAndLock("127.0.0.1:" + this.refId, bucket, this.getQueueId()); // 若是當前待抓取的 queueId 已經被佔用,則直接切換到下一個主題 if (offsetInQueue == -2) { // 將當前主題設置爲 true this.isFinishedTable.put(bucket, true); // 重置當前的 LastIndex 或者 RefOffset,即 queueId this.resetLastIndexOrRefOffsetWhenNotFound(); } else { // 獲取到了有效的隊列偏移量以後,開始嘗試獲取消息 consumerOffsetTable.put(bucket, new AtomicLong(offsetInQueue)); // 設置每次最多抓一個文件內包含的消息數,等價於變相的一次性讀完,注意,這裏的數目還受到單個文件尺寸的限制 GetMessageResult getMessageResult = localMessageStore.getMessage(bucket, this.getQueueId(), this.consumerOffsetTable.get(bucket).get() + 1, mapedFileSizeConsumeQueue / ConsumeQueue.CQ_STORE_UNIT_SIZE); // 若是沒有找到數據,則切換到下一個 if (getMessageResult.getStatus() != GetMessageStatus.FOUND) { // 將當前主題設置爲 true this.isFinishedTable.put(bucket, true); this.resetLastIndexOrRefOffsetWhenNotFound(); } else { // 這裏不考慮 Consumer 被惡意幹掉的狀況,所以直接更新遠端的 Offset 值 localMessageStore.getConsumerScheduler().updateOffset("127.0.0.1:" + this.refId, bucket, this.getQueueId(), consumerOffsetTable.get(bucket).addAndGet(getMessageResult.getMessageCount())); // 首先從文件系統中一次性讀出全部的消息 ArrayList<DefaultBytesMessage> messages = readMessagesFromGetMessageResult(getMessageResult); // 將消息添加到隊列中 this.messages.addAll(messages); // 本次抓取成功後纔開始抓取下一個 lastIndex++; } } }
ConsumerScheduler 爲咱們提供了核心的消費者調度功能,其內置的 ConsumerOffsetManager 包含了兩個核心存儲:
// 存放映射到內存中 private ConcurrentHashMap<String/* topic */, ConcurrentHashMap<Integer/*queueId*/, Long>> offsetTable = new ConcurrentHashMap<String, ConcurrentHashMap<Integer, Long>>(512); // 存放某個 Topic 下面的某個 Queue 被某個 Consumer 佔用的信息 private ConcurrentHashMap<String/* topic */, ConcurrentHashMap<Integer/*queueId*/, String/*refId*/>> queueIdOccupiedByConsumerTable = new ConcurrentHashMap<String, ConcurrentHashMap<Integer, String>>(512);
分別對應了某個 ConsumeQueue 被消費的進度和被消費者的佔用信息。同時 ConsumerOffsetManager 還提供了基於 JSON 格式的持久化功能,而且經過 ConsumerScheduler 中的按期服務 scheduledExecutorService 進行自動按期持久化。在消息提交階段,LocalMessageQueue 會自動調用 updateOffset 函數更初始化某個 ConsumeQueue 的偏移狀況(在恢復時也會使用):
public void updateOffset(final String topic, final int queueId, final long offset) { this.consumerOffsetManager.commitOffset("Broker Inner", topic, queueId, offset); }
而某個 Consumer 在初次拉取時,會調用 queryOffsetAndLock 函數來查詢某個 ConsumeQueue 的可拉取狀況:
/** * Description 修正某個 ConsumerOffset 隊列中的值 * * @param topic * @param queueId * @return */ public long queryOffsetAndLock(final String clientHostAndPort, final String topic, final int queueId) { String key = topic; // 首先判斷該 Topic-queueId 是否被佔用 if (this.queueIdOccupiedByConsumerTable.containsKey(topic)) { ... } // 若是沒有被佔用,則此時宣告佔用 ConcurrentHashMap<Integer, String> consumerQueueIdMap = this.queueIdOccupiedByConsumerTable.get(key); ... // 真實進行查找操做 ConcurrentHashMap<Integer, Long> map = this.offsetTable.get(key); if (null != map) { Long offset = map.get(queueId); if (offset != null) return offset; } // 默認返回值爲 -1 return -1; }
而且在拉取完畢後調用 updateOffset 函數來更新拉取進度。
在某個 Consumer 經過 ConsumerManager 獲取可用的拉取偏移量以後,即從 LocalMessageQueue 中進行真實地消息讀取操做:
/** * Description Consumer 從存儲中讀取數據的接口 * * @param topic * @param queueId * @param offset 下一個開始抓取的起始下標 * @param maxMsgNums * @return */ public GetMessageResult getMessage(final String topic, final int queueId, final long offset, final int maxMsgNums) { ... // 根據 Topic 與 queueId 構建消費者隊列 ConsumeQueue consumeQueue = findConsumeQueue(topic, queueId); // 保證當前 ConsumeQueue 存在 if (consumeQueue != null) { // 獲取當前 ConsumeQueue 中包含的最小的消息在 MessageStore 中的位移 minOffset = consumeQueue.getMinOffsetInQueue(); // 注意,最大的位移地址便是不可達地址,是當前全部消息的下一個消息的下標 maxOffset = consumeQueue.getMaxOffsetInQueue(); // 若是 maxOffset 爲零,則表示沒有可用消息 if (maxOffset == 0) { status = GetMessageStatus.NO_MESSAGE_IN_QUEUE; nextBeginOffset = 0; } else if (offset < minOffset) { status = GetMessageStatus.OFFSET_TOO_SMALL; nextBeginOffset = minOffset; } else if (offset == maxOffset) { status = GetMessageStatus.OFFSET_OVERFLOW_ONE; nextBeginOffset = offset; } else if (offset > maxOffset) { status = GetMessageStatus.OFFSET_OVERFLOW_BADLY; if (0 == minOffset) { nextBeginOffset = minOffset; } else { nextBeginOffset = maxOffset; } } else { // 根據偏移量獲取當前 ConsumeQueue 的緩存 SelectMappedBufferResult bufferConsumeQueue = consumeQueue.getIndexBuffer(offset); if (bufferConsumeQueue != null) { try { status = GetMessageStatus.NO_MATCHED_MESSAGE; long nextPhyFileStartOffset = Long.MIN_VALUE; long maxPhyOffsetPulling = 0; int i = 0; // 設置每次獲取的最大消息數 final int maxFilterMessageCount = Math.max(16000, maxMsgNums * ConsumeQueue.CQ_STORE_UNIT_SIZE); // 遍歷全部的 Consume Queue 中的消息指針 for (; i < bufferConsumeQueue.getSize() && i < maxFilterMessageCount; i += ConsumeQueue.CQ_STORE_UNIT_SIZE) { long offsetPy = bufferConsumeQueue.getByteBuffer().getLong(); int sizePy = bufferConsumeQueue.getByteBuffer().getInt(); maxPhyOffsetPulling = offsetPy; if (nextPhyFileStartOffset != Long.MIN_VALUE) { if (offsetPy < nextPhyFileStartOffset) continue; } boolean isInDisk = checkInDiskByCommitOffset(offsetPy, maxOffsetPy); if (isTheBatchFull(sizePy, maxMsgNums, getResult.getBufferTotalSize(), getResult.getMessageCount(), isInDisk)) { break; } // 從 MessageStore 中獲取消息 SelectMappedBufferResult selectResult = this.messageStore.getMessage(offsetPy, sizePy); // 若是沒有獲取到數據,則切換到下一個文件繼續 if (null == selectResult) { if (getResult.getBufferTotalSize() == 0) { status = GetMessageStatus.MESSAGE_WAS_REMOVING; } nextPhyFileStartOffset = this.messageStore.rollNextFile(offsetPy); continue; } // 若是獲取到了,則返回結果 getResult.addMessage(selectResult); status = GetMessageStatus.FOUND; nextPhyFileStartOffset = Long.MIN_VALUE; } nextBeginOffset = offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE); long diff = maxOffsetPy - maxPhyOffsetPulling; // 獲取當前內存狀況 long memory = (long) (getTotalPhysicalMemorySize() * (LocalMessageQueueConfig.accessMessageInMemoryMaxRatio / 100.0)); getResult.setSuggestPullingFromSlave(diff > memory); } finally { bufferConsumeQueue.release(); } } else { status = GetMessageStatus.OFFSET_FOUND_NULL; nextBeginOffset = consumeQueue.rollNextFile(offset); log.warning("consumer request topic: " + topic + "offset: " + offset + " minOffset: " + minOffset + " maxOffset: " + maxOffset + ", but access logic queue failed."); } } } else { ... } ... }
注意,這裏返回的其實只是消息在 MessageStore 中的存放地址,真實地消息讀取還須要經過 readMessagesFromGetMessageResult 函數:
/** * Description 從 GetMessageResult 中抓取所有的消息 * * @param getMessageResult * @return */ public static ArrayList<DefaultBytesMessage> readMessagesFromGetMessageResult(final GetMessageResult getMessageResult) { ArrayList<DefaultBytesMessage> messages = new ArrayList<>(); try { List<ByteBuffer> messageBufferList = getMessageResult.getMessageBufferList(); for (ByteBuffer bb : messageBufferList) { messages.add(readMessageFromByteBuffer(bb)); } } finally { getMessageResult.release(); } // 獲取字節數組 return messages; } /** * Description 從輸入的 ByteBuffer 中反序列化消息對象 * * @return 0 Come the end of the file // >0 Normal messages // -1 Message checksum failure */ public static DefaultBytesMessage readMessageFromByteBuffer(java.nio.ByteBuffer byteBuffer) { // 1 TOTAL SIZE int totalSize = byteBuffer.getInt(); // 2 MAGIC CODE int magicCode = byteBuffer.getInt(); switch (magicCode) { case MESSAGE_MAGIC_CODE: break; case BLANK_MAGIC_CODE: return null; default: log.warning("found a illegal magic code 0x" + Integer.toHexString(magicCode)); return null; } byte[] bytesContent = new byte[totalSize]; ... }
端午先後即已中止代碼編寫,原覺得周把時間能夠完成文檔編寫;惋惜畢業旅行和畢業聚會一直拖到了七月,最後也是匆匆寫完,也是我我的拖延癌晚期,不禁感慨啊。