Kafka是一個分佈式消息中間件系統,裏面存儲着實際場景中的數據。Kafka原生是不支持點查詢的,若是咱們想對存儲在Topic中的數據進行查詢,可能須要對Topic中的數據進行消費落地,而後構建索引(或者數據落地到自帶因此的存儲系統中,例如HBase、Hive等)。今天,筆者就爲你們來介紹如何實現Kafka分佈式查詢引擎。html
對於點查詢,咱們能夠總結爲兩個要點。其一,有數據供咱們查詢;其二,對待查詢的數據構建索引。在Kafka中,Topic存儲數據,知足了第一點,雖然Kafka有索引的概念,可是它的索引是基於Offset的稀疏索引,並非對每條Message都會構建一個索引。而且,這個Offset索引對於實際狀況查詢場景來講,也幫助不大。好比,你查詢Topic01下的Partition_0,可是,也僅僅只是查下到某個Topic中分區下的Offset對應的一條記錄,可是這條記錄是啥,你並不知道。真實查詢的狀況,多是你須要查詢某個ID,或者模糊查詢某個Name是否存在。服務器
其實有一種方式,是可行的。就是對Kafka源代碼進行改造,在Broker落地每條數據的時候,構建一條索引(其實,這種方式與在原始的Kafka外面加一層Proxy相似,由Proxy充當與Client交互的角色,接收Client的數據存儲並構建索引)。這樣的實現方式以下圖:多線程
若是對Kafka源代碼熟悉,有能力改造其源代碼,能夠在Kafka中添加對每條數據構建索引的邏輯。若是,以爲怕對Kafka的性能有影響,或者改造有難度。上述流程圖的方式,也能夠實現這種點查詢。分佈式
改造Kafka源代碼添加索引,或者是Proxy的方式存儲數據並構建索引,這種兩種方式來講,數據上都會要冗餘一倍左右的的存儲容量。oop
基於上述的問題,咱們對這種方案進行升級改造一下。由於不少狀況下,生產環境的數據已是運行了很長時間了,加Proxy或者改造Kafka源代碼的方式適合構建一個Kafka的新集羣的時候使用。對於已有的Kafka集羣,若是咱們要查詢Topic中的數據,如何實現呢。性能
在Kafka-Eagle中,我對Topic數據查詢實現了基於SQL查詢的實現方案。邏輯是這樣的,編寫SQL查詢語句,對SQL進行解析,映射出一個Topic的Schema以及過濾條件,而後根據過濾條件消費Topic對應的數據,最後拿到數據集,經過SQL呈現出最後的結果。流程圖以下:學習
可是,這樣是由侷限性的。因爲,單節點的計算能力有限,因此對每一個Partition默認查詢5000條數據,這個記錄是能夠增長或者減小的。若是在配置文件中對這個屬性增大,好比設置爲了50000條,那麼對應的ke.sh腳本中的內存也須要增長,由於每次查詢須要的內存增長了。否則,頻繁若干用戶同時查詢,容易形成OOM的狀況。大數據
可是,一般一個Topic中存儲的數據通常達到上億條數據以上,這種方式要從上億條或者更多的數據中查詢咱們想要的數據,可能就知足不了了。優化
基於這種狀況,咱們能夠對這中單節點查詢的方式進行升級改造,將它變爲分佈式查詢。其實,仔細來看,單節點查詢的方式,就是一個分佈式查詢的縮版。那咱們須要實現這樣一個分佈式查詢的Kafka SQL引擎呢?spa
首先,咱們能夠藉助Hadoop的MapReduce思想,「化繁爲簡,分而治之」。咱們將一個Topic當作一個比較大的數據集,每次咱們須要對這個數據集進行查詢,能夠將待查詢的數據進行拆分若干份Segment,而後,充分利用服務器的CPU,進行多線程消費(這樣就能夠打破Kafka中一個線程只能消費一個分區的侷限性)。實現流程圖以下:
上圖可知,由客戶端發起請求,提交請求到Master節點,而後Master節點解析客戶端的請求,並生成待執行策略。好比上述有三個工做節點,按照客戶端的狀況,Master會將生成的執行策略下發給三個工做節點,讓其進行計算。
這裏以其中一個工做節點爲例子,好比WorkNode1接收到了Master下發的計算任務,接收到執行指令後,結合工做節點自身的資源狀況(好比CPU和內存,這裏CPU較爲重要),將任務進行拆解爲若干個子任務(子任務的個數取決於每一個批次的BatchSize,能夠在屬性中進行配置),而後讓生成好的若干個子任務並行計算,獲得若干個子結果,而後將若干個子結果彙總爲一個最終結果做爲當前工做節點的最終計算結果,最後將不一樣的工做節點的結果進行最後的Merge做爲本次查詢的結果返回給Master節點(這裏須要注意的是,多個工做節點彙總在同一個JobID下)。而後,Master節點收到工做節點返回的結果後,返回給客戶端。
查詢10條Topic中的數據,工做節點執行以下:
select * from ke1115 where `partition` in (0) limit 10
上圖顯示了,同一WorkNode節點下,同一JobID中,不一樣線程子任務的計算進度日誌。
KSqlStrategy顯示了Master節點下發的待執行策略,msg表示各個工做節點返回的最終結果。
目前Kafka分佈式查詢引擎基礎功能已實現能夠用,任務託管、子任務查詢內存優化等還有優化的空間,計劃正在考慮集成到KafkaEagle系統中。
這篇博客就和你們分享到這裏,若是你們在研究學習的過程中有什麼問題,能夠加羣進行討論或發送郵件給我,我會盡我所能爲您解答,與君共勉!
另外,博主出書了《Kafka並不難學》和《Hadoop大數據挖掘從入門到進階實戰》,喜歡的朋友或同窗, 能夠在公告欄那裏點擊購買連接購買博主的書進行學習,在此感謝你們的支持。關注下面公衆號,根據提示,可免費獲取書籍的教學視頻。