各種消息中間件對順序消息實現的作法是將具備順序性的一類消息發往相同的主題分區中,只須要將這類消息設置相同的 Key 便可,而 Kafka 會在任意時刻保證一個消費組同時只能有一個消費者監聽消費,所以可在消費時按分區進行順序消費,保證每一個分區的消息具有局部順序性。因爲須要確保分區消息的順序性,並不能併發地消費消費,對消費的吞吐量會形成必定的影響。那麼,如何在保證消息順序性的前提下,最大限度的提升消費者的消費能力?java
本文將會對 Kafka 消費者拉取消息流程進行深度分析以後,對 Kafka 消費者順序消費線程模型進行一次實踐與優化。apache
Kafka 消費者拉取消息流程分析
在講實現 Kafka 順序消費線程模型以前,咱們須要先深刻分析 Kafka 消費者的消息拉取機制,只有當你對 Kafka 消費者拉取消息的整個流程有深刻的瞭解以後,你纔可以很好地理解本次線程模型改造的方案。api
我先給你們模擬一下消息拉取的實際現象,這裏 max.poll.records = 500。緩存
一、消息沒有堆積時:安全
能夠發現,在消息沒有堆積時,消費者拉取時,若是某個分區沒有的消息不足 500 條,會從其餘分區湊夠 500 條後再返回。多線程
二、多個分區都有堆積時:併發
在消息有堆積時,能夠發現每次返回的都是同一個分區的消息,但通過不斷 debug,消費者在拉取過程當中並非等某個分區消費完沒有堆積了,再拉取下一個分區的消息,而是不斷循環的拉取各個分區的消息,可是這個循環並非說分區 p0 拉取完 500 條,後面必定會拉取分區 p1 的消息,頗有可能後面還會拉取 p0 分區的消息,爲了弄明白這種現象,我仔細閱讀了相關源碼。異步
org.apache.kafka.clients.consumer.KafkaConsumer#pollide
private ConsumerRecords<K, V> poll(final Timer timer, final boolean includeMetadataInTimeout) { try { // poll for new data until the timeout expires do { // 客戶端拉取消息核心邏輯 final Map<TopicPartition, List<ConsumerRecord<K, V>>> records = pollForFetches(timer); if (!records.isEmpty()) { // 在返回數據以前, 發送下次的 fetch 請求, 避免用戶在下次獲取數據時線程阻塞 if (fetcher.sendFetches() > 0 || client.hasPendingRequests()) { // 調用 ConsumerNetworkClient#poll 方法將 FetchRequest 發送出去。 client.pollNoWakeup(); } return this.interceptors.onConsume(new ConsumerRecords<>(records)); } } while (timer.notExpired()); return ConsumerRecords.empty(); } finally { release(); } }
咱們使用 Kafka consumer 進行消費的時候一般會給一個時間,好比:性能
consumer.poll(Duration.ofMillis(3000));
從以上代碼邏輯能夠看出來,用戶給定的這個時間,目的是爲了等待消息湊夠 max.poll.records 條消息後再返回,即便消息條數不夠 max.poll.records 消息,時間到了用戶給定的等待時間後,也會返回。
pollForFetches 方法是客戶端拉取消息核心邏輯,但並非真正去 broker 中拉取,而是從緩存中去獲取消息。在 pollForFetches 拉取消息後,若是消息不爲零,還會調用 fetcher.sendFetches() 與 client.pollNoWakeup(),調用這兩個方法究竟有什麼用呢?
fetcher.sendFetches() 通過源碼閱讀後,得知該方法目的是爲了構建拉取請求 FetchRequest 並進行發送,可是這裏的發送並非真正的發送,而是將 FetchRequest 請求對象存放在 unsend 緩存當中,而後會在 ConsumerNetworkClient#poll 方法調用時纔會被真正地執行發送。
fetcher.sendFetches() 在構建 FetchRequest 前,會對當前可拉取分區進行篩選,而這個也是決定多分區拉取消息規律的核心,後面我會講到。
從 KafkaConsumer#poll 方法源碼能夠看出來,其實 Kafka 消費者在拉取消息過程當中,有兩條線程在工做,其中用戶主線程調用 pollForFetches 方法從緩存中獲取消息消費,在獲取消息後,會再調用 ConsumerNetworkClient#poll 方法從 Broker 發送拉取請求,而後將拉取到的消息緩存到本地,這裏爲何在拉取完消息後,會主動調用 ConsumerNetworkClient#poll 方法呢?我想這裏的目的是爲了下次 poll 的時候能夠當即從緩存中拉取消息。
pollForFetches 方法會調用 Fetcher#fetchedRecords 方法從緩存中獲取並解析消息:
public Map<TopicPartition, List<ConsumerRecord<K, V>>> fetchedRecords() { Map<TopicPartition, List<ConsumerRecord<K, V>>> fetched = new HashMap<>(); int recordsRemaining = maxPollRecords; try { while (recordsRemaining > 0) { // 若是當前獲取消息的 PartitionRecords 爲空,或者已經拉取完畢 // 則須要從 completedFetches 從新獲取 completedFetch 並解析成 PartitionRecords if (nextInLineRecords == null || nextInLineRecords.isFetched) { // 若是上一個分區緩存中的數據已經拉取完了,直接中斷本次循環拉取,並返回空的消息列表 // 直至有緩存數據爲止 CompletedFetch completedFetch = completedFetches.peek(); if (completedFetch == null) break; try { // CompletedFetch 即拉取消息的本地緩存數據 // 緩存數據中 CompletedFetch 解析成 PartitionRecords nextInLineRecords = parseCompletedFetch(completedFetch); } catch (Exception e) { // ... } completedFetches.poll(); } else { // 從分區緩存中獲取指定條數的消息 List<ConsumerRecord<K, V>> records = fetchRecords(nextInLineRecords, recordsRemaining); // ... fetched.put(partition, records); recordsRemaining -= records.size(); } } } } catch (KafkaException e) { // ... } return fetched; }
completedFetches 是拉取到的消息緩存,以上代碼邏輯就是圍繞着如何從 completedFetches 緩存中獲取消息的,從以上代碼邏輯能夠看出:
maxPollRecords 爲本次拉取的最大消息數量,該值可經過 max.poll.records 參數配置,默認爲 500 條,該方法每次從 completedFetches 中取出一個 CompletedFetch 並解析成能夠拉取的 PartitionRecords 對象,即方法中的 nextInLineRecords,請注意,PartitionRecords 中的消息數量可能大與 500 條,所以可能本次可能一次性從 PartitionRecords 獲取 500 條消息後即返回,若是 PartitionRecords 中消息數量不足 500 條,會從 completedFetches 緩存中取出下一個要拉取的分區消息,recordsRemaining 會記錄本次剩餘還有多少消息沒拉取,經過循環不斷地從 completedFetches 緩存中取消息,直至 recordsRemaining 爲 0。
以上代碼便可解釋爲何消息有堆積的狀況下,每次拉取的消息很大機率是同一個分區的消息,由於緩存 CompletedFetch 緩存中的消息很大機率會多餘每次拉取消息數量,Kafka 客戶端每次從 Broker 拉取的消息數據並非經過 max.poll.records 決定的,該參數僅決定用戶每次從本地緩存中獲取多少條數據,真正決定從 Broker 拉取的消息數據量是經過 fetch.min.bytes、max.partition.fetch.bytes、fetch.max.bytes 等參數決定的。
咱們再想一下,假設某個分區的消息一直都處於堆積狀態,Kafka 會每次都拉取這個分區直至將該分區消費完畢嗎?(根據假設,Kafka 消費者每次都會從這個分區拉取消息,並將消息存到分區關聯的 CompletedFetch 緩存中,根據以上代碼邏輯,nextInLineRecords 一直處於還沒拉取完的狀態,致使每次拉取都會從該分區中拉取消息。)
答案顯然不會,不信你打開 Kafka-manager 觀察每一個分區的消費進度狀況,每一個分區都會有消費者在消費中。
那 Kafka 消費者是如何循環地拉取它監聽的分區呢?咱們接着往下分析。
發送拉取請求邏輯:
org.apache.kafka.clients.consumer.internals.Fetcher#sendFetches
public synchronized int sendFetches() { // 解析本次可拉取的分區 Map<Node, FetchSessionHandler.FetchRequestData> fetchRequestMap = prepareFetchRequests(); for (Map.Entry<Node, FetchSessionHandler.FetchRequestData> entry : fetchRequestMap.entrySet()) { final Node fetchTarget = entry.getKey(); final FetchSessionHandler.FetchRequestData data = entry.getValue(); // 構建請求對象 final FetchRequest.Builder request = FetchRequest.Builder .forConsumer(this.maxWaitMs, this.minBytes, data.toSend()) .isolationLevel(isolationLevel) .setMaxBytes(this.maxBytes) .metadata(data.metadata()) .toForget(data.toForget()); // 發送請求,但不是真的發送,而是將請求保存在 unsent 中 client.send(fetchTarget, request) .addListener(new RequestFutureListener<ClientResponse>() { @Override public void onSuccess(ClientResponse resp) { synchronized (Fetcher.this) { // ... ... // 建立 CompletedFetch, 並緩存到 completedFetches 隊列中 completedFetches.add(new CompletedFetch(partition, fetchOffset, fetchData, metricAggregator, resp.requestHeader().apiVersion())); } } } // ... ... }); } return fetchRequestMap.size(); }
以上代碼邏輯很好理解,在發送拉取請求前,先檢查哪些分區可拉取,接着爲每一個分區構建一個 FetchRequest 對象,FetchRequest 中的 minBytes 和 maxBytes,分別可經過 fetch.min.bytes 和 fetch.max.bytes 參數設置。這也是每次從 Broker 中拉取的消息不必定等於 max.poll.records 的緣由。
prepareFetchRequests 方法會調用 Fetcher#fetchablePartitions 篩選可拉取的分區,咱們來看下 Kafka 消費者是如何進行篩選的:
org.apache.kafka.clients.consumer.internals.Fetcher#fetchablePartitions
private List<TopicPartition> fetchablePartitions() { Set<TopicPartition> exclude = new HashSet<>(); List<TopicPartition> fetchable = subscriptions.fetchablePartitions(); if (nextInLineRecords != null && !nextInLineRecords.isFetched) { exclude.add(nextInLineRecords.partition); } for (CompletedFetch completedFetch : completedFetches) { exclude.add(completedFetch.partition); } fetchable.removeAll(exclude); return fetchable; }
nextInLineRecords 即咱們上面提到的根據某個分區緩存 CompletedFetch 解析獲得的,若是 nextInLineRecords 中的緩存還沒拉取完,則不從 broker 中拉取消息了,以及若是此時 completedFetches 緩存中存在該分區的緩存,也不進行拉取消息。
咱們能夠很清楚的得出結論:
當緩存中還存在中還存在某個分區的消息數據時,消費者不會繼續對該分區進行拉取請求,直到該分區的本地緩存被消費完,纔會繼續發送拉取請求。
爲了更加清晰的表達這段邏輯,我舉個例子並將整個流程用圖表達出來:
假設某消費者監聽三個分區,每一個分區每次從 Broker 中拉取 4 條消息,用戶每次從本地緩存中獲取 2 條消息:
從以上流程可看出,Kafka 消費者自身已經實現了拉取限流的機制。
Kafka 順序消費線程模型的實現
kafka 的消費類 KafkaConsumer 是非線程安全的,所以用戶沒法在多線程中共享一個 KafkaConsumer 實例,且 KafkaConsumer 自己並無實現多線程消費邏輯,如需多線程消費,還須要用戶自行實現,在這裏我會講到 Kafka 兩種多線程消費模型:
一、每一個線程維護一個 KafkaConsumer
這種消費模型建立多個 KafkaConsumer 對象,每一個線程維護一個 KafkaConsumer,從而實現線程隔離消費,因爲每一個分區同一時刻只能有一個消費者消費,因此這種消費模型自然支持順序消費。
可是缺點是沒法提高單個分區的消費能力,若是一個主題分區數量不少,只能經過增長 KafkaConsumer 實例提升消費能力,這樣一來線程數量過多,致使項目 Socket 鏈接開銷巨大,項目中通常不用該線程模型去消費。
二、單 KafkaConsumer 實例 + 多 worker 線程
這種消費模型獎 KafkaConsumer 實例與消息消費邏輯解耦,咱們不須要建立多個 KafkaConsumer 實例就可進行多線程消費,還可根據消費的負載狀況動態調整 worker 線程,具備很強的獨立擴展性,在公司內部使用的多線程消費模型就是用的單 KafkaConsumer 實例 + 多 worker 線程模型。可是一般狀況下,這種消費模型沒法保證消費的順序性。
那麼,若是在使用第二種消費模型的前提下,實現消息順序消費呢?
接下來咱們來看下 ZMS 是怎麼實現順序消費線程模型的,目前 ZMS 的順序消費線程模型爲每一個分區單線程消費模式:
com.zto.consumer.KafkaConsumerProxy#addUserDefinedProperties
首先在初始化的時候,會對消費線程池進行初始化,具體是根據 threadsNumMax 的數量建立若干個單個線程的線程池,單個線程的線程池就是爲了保證每一個分區取模後拿到線程池是串行消費的,但這裏建立 threadsNumMax 個線程池是不合理的,後面我會說到。
com.zto.consumer.KafkaConsumerProxy#submitRecords
ZMS 會對消息分區進行取模,根據取模後的序號從線程池列表緩存中獲取一個線程池,從而使得相同分區的消息會被分配到相同線程池中執行,對於順序消費來講相當重要,前面我也說了,當用戶配置了順序消費時,每一個線程池只會分配一個線程,若是相同分區的消息分配到同一個線程池中執行,也就意味着相同分區的消息會串行執行,實現消息消費的順序性。
爲了保證手動提交位移的正確性,咱們必須保證本次拉取的消息消費完以後纔會進行位移提交,所以 ZMS 在消費前會建立一個 count 爲本次消息數量的 CountDownLatch:
final CountDownLatch countDownLatch = new CountDownLatch(records.count());
消費邏輯中,在 finally 進行 countDown 操做,最後會在本次消費主線程當中阻塞等待本次消息消費完成:
com.zto.consumer.KafkaConsumerProxy#submitRecords
以上就是目前 ZMS 順序消費的線程模型,用圖表示以上代碼邏輯:
以上,因爲某些分區的消息堆積量少於 500 條(Kafka 默認每次從 Broker 拉取 500 條消息),所以會繼續從其它分區湊夠 500 條消息,此時拉取的 500 條消息會包含 3 個分區的消息,ZMS 根據利用分區取模將同一個分區的消息放到指定的線程池中(線程池只有一條線程)進行消費,以上圖來看,總共有 3 條線程在消費本次拉取的 500 條消息。
那若是每一個分區的積壓都超過了 500 條消息呢?這種實際的狀況會更加多,由於消息中間件其中一個重要功能就是用於流量削峯,流量洪峯那段時間積壓幾百上千萬條消息仍是常常可以遇到的,那麼此時每次拉取的消息中,很大機率就只剩下一個分區了,我用以下圖表示:
在消息流量大的時候,順序消息消費時卻退化成單線程消費了。
如何提升 Kafka 順序消費的併發度?
通過對 ZMS 的消費線程模型以及對 Kafka 消費者拉取消息流程的深刻了解以後,我想到了以下幾個方面對 ZMS 的消費線程模型進行優化:
一、細化消息順序粒度
以前的作法是將每一個分區單獨一條線程消費,沒法再繼續在分區之上增長消費能力,咱們知道業務方發送順序消息時,會將同一類型具備順序性的消息給一個相同的 Key,以保證這類消息發送到同一個分區進行消費,從而達到消息順序消費的目的,而同一個分區會接收多種類型(即不一樣 Key)的消息,每次拉取的消息具備很大多是不一樣類型的,那麼咱們就能夠將同一個分區的消息,分配一個獨立的線程池,再利用消息 Key 進行取模放入對應的線程中消費,達到併發消費的目的,且不打亂消息的順序性。
二、細化位移提交粒度
因爲 ZMS 目前是手動提交位移,目前每次拉取消息必須先消費完才能進行位移提交,既然已經對分區消息進行指定的線程池消費了,因爲分區之間的位移前後提交不影響,那麼咱們能夠將位移提交交給每一個分區進行管理,這樣拉取主線程沒必要等到是否消費完才進行下一輪的消息拉取。
三、異步拉取與限流
異步拉取有個問題,就是若是節點消費跟不上,而拉取消息過多地保存在本地,極可能會形成內存溢出,所以咱們須要對消息拉取進行限流,當本地消息緩存量達到必定量時,阻止消息拉取。
上面在分析 Kafka 消費者拉取消息流程時,咱們知道消費者在發送拉取請求時,首先會判斷本地緩存中是否存在該分區的緩存,若是存在,則不發送拉取請求,但因爲 ZMS 須要改形成異步拉取的形式,因爲 Comsumer#poll 再也不等待消息消費完再進行下一輪拉取,所以 Kafka 的本地緩存中幾乎不會存在數據了,致使 Kafka 每次都會發送拉取請求,至關於將 Kafka 的本地緩存放到 ZMS 中,所以咱們須要 ZMS 層面上對消息拉取進行限流,Kafka 消費者有兩個方法能夠設置訂閱的分區是否能夠發送拉取請求:
// 暫停分區消費(即暫停該分區發送拉取消息請求) org.apache.kafka.clients.consumer.KafkaConsumer#pause // 恢復分區消費(即恢復該分區發送拉取消息請求) org.apache.kafka.clients.consumer.KafkaConsumer#resume
以上兩個方法,其實就是改變了消費者的訂閱分區的狀態值 paused,當 paused = true 時,暫停分區消費,當 paused = false 時,恢復分區消費,這個參數是在哪裏使用到呢?上面在分析 Kafka 消費者拉取消息流程時咱們有提到發送拉取請求以前,會對可拉取的分區進行篩選,其中一個條件即分區 paused = false:
org.apache.kafka.clients.consumer.internals.SubscriptionState.TopicPartitionState#isFetchable
private boolean isFetchable() { return !paused && hasValidPosition(); }
因爲 KafkaConsumer 是非線程安全的,若是咱們在異步線程 KafkaConsumer 相關的類,會報以下錯誤:
KafkaConsumer is not safe for multi-threaded access
只須要確保 KafkaConsumer 相關方法在 KafkaConsumer#poll 方法線程中調用便可,具體作法能夠設置一個線程安全上下文容器,異步線程操做 KafkaConsumer 相關方法是,只須要將具體的分區放到上下文容器便可,後續統一由 poll 線程執行。
所以咱們只須要利用好這個特性,就能夠實現拉取限流,消費者主線程的 Comsumer#poll 方法依然是異步不斷地從緩存中獲取消息,同時不會形成兩次 poll 之間的時間過大致使消費者被踢出消費組。
以上優化改造的核心是在不打亂消息順序的前提下利用消息 Key 儘量地併發消費,但若是遇到分區中的消息都是相同 Key,而且在有必定的積壓下每次拉取都是同一個分區的消息時,以上模型可能沒有理想狀況下的那麼好。這時是否能夠將 fetch.max.bytes 與 max.partition.fetch.bytes 參數設置小一點,讓每一個分區的本地緩存都不足 500 條,這樣每次 poll 的消息列表均可以包含多個分區的消息了,但這樣又會致使 RPC 請求增多,這就須要針對業務消息大小,對這些參數進行調優。
以上線程模型,須要增長一個參數 orderlyConsumePartitionParallelism,用於設置分區消費並行度,假設某個消費組被分配 5 個分區進行消費,則每一個分區默認啓動一條線程消費,一共 5 * 1 = 5 條消費線程,當 orderlyConsumePartitionParallelism = 3,則每一個分區啓動 3 條線程消費,一共 5 * 3 = 15 條消費線程。orderlyConsumePartitionParallelism = 1 時,則說明該分區全部消息都處在順序(串行)消費;當 orderlyConsumePartitionParallelism > 1 時,則根據分區消息的 Key 進行取模分配線程消費,保證不了整個分區順序消費,但保證相同 Key 的消息順序消費。
注意,當 orderlyConsumePartitionParallelism > 1 時,分區消費線程的有效使用率取決於該分區消息的 Key:
一、若是該分區全部消息的 Key 都相同,則消費的 Key 取模都分配都同一條線程當中,並行度退化成 orderlyConsumePartitionParallelism = 1;
二、若是該分區相同 Key 的消息過於集中,會致使每次拉取都是相同 key 的一批消息,一樣並行度退化成 orderlyConsumePartitionParallelism = 1。
綜合對比:
優化前,ZMS 可保證整個分區消息的順序性,優化後可根據消息 Key 在分區的基礎上不打亂相同 Key 消息的順序性前提下進行併發消費,有效地提高了單分區的消費吞吐量;優化前,有很大的機率會退化成同一時刻單線程消費,優化後儘量至少保證每一個分區一條線程消費,狀況好的時候每一個分區可多條線程消費。
經過以上場景分析,該優化方案不是提升順序消費吞吐量的銀彈,它有很大的侷限性,用戶在業務的實現上不能重度依賴順序消費去實現,以避免影響業務性能上的需求。
總結
經過本文深度分析,咱們已經認識到順序消息會給消費吞吐量帶來怎麼樣的影響,所以用戶在業務的實現上不能重度依賴順序消費去實現,能避免則避免,若是必定要使用到順序消費,須要知道 Kafka 並不能保證嚴格的順序消費,在消費組重平衡過程當中極可能就會將消息的順序性打亂,並且順序消費會影響消費吞吐量,用戶須要權衡這種需求的利弊。
寫在最後
咱們知道 RocketMQ 自己已經實現了具體的消費線程模型,用戶不須要關心具體實現,只須要實現消息消費邏輯便可,而 Kafka 消息者僅提供 KafkaConsumer#poll 一個方法,消費線程模型的實現則徹底交由用戶去實現。