(持續更新中~~~)kafka--消息引擎與分佈式流處理平臺

kafka概述

kafka是一個分佈式的基於發佈/訂閱模式的消息隊列(message queue),通常更願意稱kafka是一款開源的消息引擎系統,只不過消息隊列會耳熟一些。kafka主要應用於大數據實時領域。java

爲何會有消息隊列,主要是爲了異步處理,提升效率。咱們來看一張圖node

使用消息隊列,能夠把耗時任務扔到隊列裏面,異步調用,從而提高效率。也就是咱們所說的解耦。python

然而除了解耦,還有沒有其餘做用呢?答案顯然是有的,用一個專業點的名詞解釋的話,就是削峯填谷。mysql

削峯填谷,真的是很是形象的四個字。所謂的削峯填谷,就是指緩衝上下游瞬時突發流量,使其更平滑。特別是那種發送能力很強的上游系統,若是沒有消息引擎的保護,脆弱的下游系統可能會直接被壓垮致使全鏈路服務雪崩。可是,一旦有了消息引擎,它可以有效的對抗上游的流量衝擊,真正作到將上游的"峯"填到"谷"中,避免了流量的震盪。消息引擎系統的另外一大好處就是咱們剛纔說的,在於發送方和接收方的鬆耦合,這也在必定程度上簡化了應用的開發,減小了系統間沒必要要的交互linux

直接解釋的話,可能沒有直觀的感覺,咱們來舉一個實際的例子。好比在京東購買商品。當點擊購買的時候,會調用訂單系統生成對應的訂單,然而要處理該訂單則會依次調用下游系統的多個子服務,好比調用銀行等支付接口、查詢你的登陸信息、驗證商品信息等等。顯然上游的訂單操做比較簡單,它的TPS要遠高於處理訂單的下游服務。所以若是上游和下游直接對接,勢必會出現下游服務沒法及時處理上游訂單從而形成訂單堆積的狀況。特別是當出現雙11、雙12、相似秒殺這種業務的時候,上游訂單流量會瞬間增長,可能出現的結果就是直接壓垮下游子系統服務。解決此問題的一個常見的作法就是對上游系統進行限速,可是這種作法顯然是不合理的,畢竟問題不是出如今它那裏。何況你要是限速了,別人家網站雙十一成交一千萬筆單子,自家網站才成交一百萬筆單子,這樣錢送到嘴邊都賺不到。因此更常見的辦法就是引入像kafka這樣的消息引擎系統來對抗這種上下游系統的TPS不一致以及瞬時峯值流量。程序員

仍是這個例子,當引入了kafka以後,上游系統再也不直接與下游系統進行交互。當新訂單生成以後它僅僅是向kafka broker發送一條消息便可。相似的,下游的各個子服務訂閱kafka中的對應主題,並實時從該主題的各自分區(partition)中獲取到訂單消息進行處理,從而實現上游訂單服務和下游訂單處理服務的解耦。這樣當出現秒殺業務的時候,kafka可以將瞬時增長的訂單流量所有以消息的形式保存在對應的主題中。既不影響上游服務的TPS,同時也給下游服務流出了足夠的時間去消費它們。這就是kafka這類消息引擎存在的最大意義所在。算法

目前裏面出現了不少的專業詞彙,broker、主題、partition等等,這些咱們後面都會介紹。sql

kafka消費模式

咱們知道消息隊列傳輸的是消息,那麼這個消息如何傳遞也是很重要的一環。通常消息隊列支持兩種傳遞模式。數據庫

  • 點對點模式:編程

    生產者將生產的消息發送到queue中,而後消費者再從queue中取出消息進行消費。消息一旦被消費,那麼queue中再也不有存儲,因此消費者不可能消費到已經被消費的信息。queue支持多個消費者同時消費,可是一個消息只能被一個消費者消費,不存在說多個消費者同時消費一個消息。平常生活中就好電話客服服務,同一個客戶呼入電話,只能被一位客服人員處理,第二個客服人員不能爲該客戶服務

  • 發佈訂閱模式

    和點對點模型不一樣,它有一個主題(Topic)的概念。該模型也有發送方和接收方,只不過叫法不同。發送方也被成爲發佈者(publisher),接收方成爲訂閱者(subscriber)。和點對點模型不同,這個模型能夠存在多個發佈者和多個訂閱者,它們都能接收到相同主題的消息。比如微信公衆號,一個公衆號能夠有多個訂閱者,一個訂閱者也能夠訂閱多個公衆號。

搞定kafka的專業術語

在kafka的世界中有不少概念和術語是須要咱們提早理解而且熟練掌握的,下面來盤點一下。

以前咱們提到過,kafka屬於分佈式的消息引擎系統,主要功能是提供一套完善的消息發佈與訂閱方案。在kafka中,發佈訂閱的對象是主題(topic),能夠爲每一個業務、每一個應用、甚至是每一類數據都建立專屬的主題

向主題發佈消息的客戶端應用程序成爲生產者(producer),生產者一般持續不斷地向一個或多個主題發送消息,而訂閱這些主題獲取消息的客戶端應用程序就被稱之爲消費者(consumer)。和生產者相似,消費者也能同時訂閱多個主題。咱們把生產者和消費者統稱爲客戶端(clients)。你能夠同時運行多個生產者和消費者實例,這些實例不斷地向kafka集羣中的多個主題生產和消費消息。有客戶端天然也就有服務端。kafka的服務器端由被稱爲broker的服務進程構成,即一個kafka集羣由多個broker組成,broker負責接收和處理客戶端發來的請求,以及對消息進行持久化。雖然多個broker進程可以運行在同一臺機器上,但更常見的作法是將不一樣的broker分散運行在不一樣的機器上。這樣即使集羣中的某一臺機器宕機,運行在其之上的broker進程掛掉了其餘機器上的broker也依舊能對外提供服務。這其實就是kafka提供高可用的手段之一

在實現高可用的另外一個手段就是備份機制(replication)。備份的思想很簡單,就是把相同的數據拷貝到多臺機器上,而這些相同的數據拷貝就叫作副本(replica)。副本的數量是能夠配置的,這些副本保存着相同的數據,但卻有不一樣的角色和做用。kafka定義了兩種副本,領導者副本(leader replica)和追隨者副本(follower replica)。前者對外提供服務,這裏的對外指的是與客戶端進行交互;然後者只是被動地追隨領導者副本而已,不與外界進行交互。固然了,不少其餘系統中追隨者副本是能夠對外提供服務的,好比mysql,從庫是能夠處理讀操做的,也就是所謂的"主寫從讀",可是在kafka中追隨者副本不會對外提供服務,至於爲何咱們做爲思考題解答。對了,關於領導者--追隨者,以前實際上是叫作主(master)--從(slave),可是不建議使用了,由於slave有奴隸的意思,政治上有點不合適,因此目前大部分的系統都改爲leader-follower了。

副本的工做機制很簡單:生產者向主題寫的消息老是往領導者那裏,消費者向主題獲取消息也都是來自於領導者。也就是不管是讀仍是寫,針對的都是領導者副本,至於追隨者副本,它只作一件事情,那就是向領導者副本發送請求,請求領導者副本把最新生產的消息發送給它,這樣便可以保持和領導者的同步。

雖然有了副本機制能夠保證數據的持久化或者數據不丟失,但沒有解決伸縮性的問題。伸縮性即所謂的scalability,是分佈式系統中很是重要且必須謹慎對待的問題。什麼事伸縮性呢?咱們拿副原本說,雖然如今有了領導者副本和追隨者副本,但假若領導者副本積累了太多的數據以致於單臺broker都沒法容納了,此時應該怎麼辦?有個很天然的想法就是,可否把數據分割成多分保存在不一樣的broker上?沒錯,kafka就是這麼設計的。

這種機制就是所謂的分區(partition)。若是瞭解其餘的分佈式系統,那麼可能據說過度片、分區域等提法,好比MongoDB和ElasticSearch中的sharding、Hbase中的region,其實它們都是相同的原理,只是partition是最標準的名稱。

kafka中的分區機制指定的是將每一個主題劃分爲多個分區,每一個分區都是一組有序的消息日誌。生產者生產的每一條消息只會被髮到一個分區中,也就說若是向有兩個分區的主題發送一條消息,那麼這條消息要麼在第一個分區中,要麼在第二條分區中。而kafka的分區編號是從0開始的,若是某個topic有100個分區,那麼它們的分區編號就是從0到99

到這裏可能會有疑問,那就是剛纔提到的副本如何與這裏的分區聯繫在一塊兒呢?實際上,副本是在分區這個層級定義的。每一個分區下能夠配置若干個副本,其中只能有1個領導者副本和N-1個追隨者副本。生產者向分區寫入消息,每條消息在分區中的位置由一個叫位移(offset)的數據來表徵。分區位移老是從0開始,假設一個生產者向一個空分區寫入了10條消息,那麼這10條消息的位移依次是0、一、二、...、9

至此咱們能完整地串聯起kafka的三層消息架構

  • 第一層是主題層,每一個主題能夠配置M個分區,每一個分區又能夠配置N個副本
  • 第二層是分區層,每一個分區的N個副本中只能有一個副原本充當領導者角色,對外提供服務;其餘的N-1個副本只是追隨者副本,用來提供數據冗餘之用。
  • 第三層是消息層,分區中包含若干條消息,每條消息的位移從0開始,依次遞增。
  • 最後客戶端程序只能與分區的領導者副本進行交互

那麼kafka是如何持久化數據的呢?總的來講,kafka使用消息日誌(log)來保存數據,一個日誌就是磁盤上一個只能追加寫(append-only)消息的物理文件。由於只能追加寫入,故避免了緩慢的隨機I/O操做,改成性能較好的順序I/O操做,這也是實現kafka高吞吐量特性的一個重要手段。不過若是不停地向一個日誌寫入消息,最終也會耗盡全部的磁盤空間,所以kafka必然要按期地刪除消息以回收磁盤。怎麼刪除?簡單來講就是經過日誌段(log segment)機制。在kafka底層,一個日誌又進一步細分紅多個日誌段,消息被追加寫到當前最新的日誌段中,當寫滿了一個日誌段後,kafka會自動切分出一個新的日誌段,並將老的日誌段封存起來。kafka在後臺還有定時任務會按期地檢查老的日誌段是否可以被刪除,從而實現回收磁盤的目的。

這裏再重點說一下消費者,以前說過有兩種消息模型,即點對點模型(peer to peer, p2p)和分佈訂閱模型。這裏面的點對點指的是同一條消息只能被下游的一個消費者消費,其餘消費者不能染指。在kafka中實現這種p2p模型的方法就是引入了消費者組(consumer group)。所謂的消費者組,指的是多個消費者實例共同組成一個組來消費一個主題。這個主題中的每一個分區都只會被消費者組裏面的一個消費者實例消費,其餘消費者實例不能消費它。爲何要引入消費者組呢?主要是爲了提高消費者端的吞吐量,多個消費者實例同時消費,加速了整個消費端的吞吐量(TPS)。關於消費者組的機制,後面會詳細介紹,如今只須要知道消費者組就是多個消費者組成一個組來消費主題裏面的消息、而且消息只會被組裏面的一個消費者消費便可。此外,這裏的消費者實例能夠是運行消費者應用的進程,也能夠是一個線程,它們都稱爲一個消費者實例(consumer instance)

消費者組裏面的消費者不只瓜分訂閱主題的數據,並且更酷的是它們還能彼此協助。假設組內某個實例掛掉了,kafka可以自動檢測,而後把這個Failed實例以前負責的分區轉移給其餘活着的消費者。這個過程就是大名鼎鼎的"重平衡(rebalance)"。嗯,其實便是大名鼎鼎,也是臭名昭著,由於由重平衡引起的消費者問題比比皆是。事實上,目前不少重平衡的bug,整個社區都無力解決。

每一個消費者在消費消息的過程當中,必然須要有個字段記錄它當前消費到了分區的哪一個位置上,這個字段就是消費者位移(consumer offset)。注意,咱們以前說一個主題能夠有多個分區、每一個分區也是用位移來表示消息的位置。可是這兩個位移徹底不是一個概念,分區位移表示的是分區內的消息位置,它是不變的,一旦消息被成功寫入一個分區上,那麼它的位置就是固定了的。而消費者位移則不一樣,它多是隨時變化的,畢竟它是消費者消費進度的指示器嘛。另外每一個消費者都有着本身的消費者位移,所以必定要區分這兩類位移的區別。一個是分區位移,另外一個是消費者位移

小結:

  • 生產者,producer向主題發佈新消息的應用程序
  • 消費者,consumer從主題訂閱新消息的應用程序
  • 消息,recordkafka是消息引擎,這裏的消息就是指kafka處理的主要對象
  • 主題,topic主題是承載消息的邏輯容器,在實際使用中多用來區分具體的業務,即不一樣的業務對應不一樣的主題。
  • 分區,partition一個有序不變的消息序列,每一個主題下能夠有多個分區。分區編號從0開始,分佈在不一樣的broker上面,實現發佈於訂閱的負載均衡。生產者將消息發送到主題下的某個分區中,以分區偏移(offset)來標識一條消息在一個分區當中的位置(惟一性)
  • 分區位移,offset表示分區中每條消息的位置信息,是一個單調遞增且不變的值
  • 副本,replicakafka中同一條數據可以被拷貝到多個地方以提供數據冗餘,這即是所謂的副本。副本還分爲領導者副本和追隨者副本,各自有各自的功能職責。讀寫都是針對領導者副原本的,追隨者副本只是用來和領導者副本進行數據同步、保證數據冗餘、實現高可用。
  • 消費者位移,consumer offset表示消費者消費進度,每一個消費者都有本身的消費者位移
  • 消費者組,consumer group多個消費者實例共同組成的一個組,同時消費多個分區以實現高吞吐。
  • 重平衡,rebalance消費者組內某個消費者實例掛掉以後,其它消費者實例自動從新分配訂閱主題分區的過程。重平衡是kafka消費者端實現高可用的重要手段

思考:爲何kafka不像mysql那樣支持主寫從讀呢?

由於kafka的主題已經被分爲多個分區,分佈在不一樣的broker上,而不一樣的broker又分佈在不一樣的機器上,所以從某種角度來講,kafka已經實現了負載均衡的效果。不像mysql,壓力都在主上面,因此纔要從讀;另外,kafka保存的數據和數據庫的數據有着實質性的差異,kafka保存的數據是流數據,具備消費的概念,並且須要消費者位移。因此若是支持從讀,那麼消費端控制offset會更復雜,並且領導者副本同步到追隨者副本須要時間的,會形成數據不一致的問題;另外對於生產者來講,kafka是能夠經過配置來控制是否等待follower對消息確認的,若是支持從讀的話,那麼也須要全部的follower都確認了才能夠回覆生產者,形成性能降低,並且follower出現了問題也很差處理。

kafka只是消息隊列(消息引擎系統)嗎

kafka真的只是消息引擎系統嗎?要搞清楚這個問題,就要從kafka的發展歷史提及,縱觀kafka的發展歷史,它確實是消息引擎起家的,但它不只是一個消息引擎系統,同時也是一個分佈式流處理平臺(distributed stream processing platform)。若是這一節你只能記住一句戶的話,那我但願你能記住,kafka雖然是消息引擎起家,但它不只是一個消息引擎,仍是一個分佈式流處理平臺。

總所周知,kafka是LinkedIn公司內部孵化的項目,LinkedIn最開始有強烈的數據強實時處理方面的需求,其內部的諸多子系統要執行多種類型的數據處理與分析,主要包括業務系統和應用程序性能監控,以及用戶行爲數據處理等。當時他們碰到的主要問題包括:

  • 數據正確性不足。由於數據的收集主要採用輪詢(polling)的方式,如何肯定輪詢的時間間隔就變成了一個高度經驗化的事情。雖然能夠採用一些相似於啓發式算法來幫助評估間隔時間,但一旦指定不當,必然會形成較大的數據誤差。
  • 系統高度定製化,維護成本高。各個業務子系統都須要對接數據收集模塊,引入了大量的定製開銷和人工成本

爲了解決這些問題,LinkedIn工程師嘗試過使用ActiveMQ來解決這些問題,但效果並不理想。顯然須要有一個"大一統"的系統來取代現有的工做方式,而這個系統就是kafka。所以kafka自誕生伊始是以消息引擎系統的面目出如今大衆視野的,若是翻看比較老的kafka對應的官網的話,你會發現kafka社區將其清晰地定位爲一個分佈式、分區化且帶備份功能的提交日誌(commit log)服務。

所以,kafka在設計之初就旨在提供三個方面的特性:

  • 提供一套API實現生產者和消費者
  • 下降網絡傳輸和磁盤存儲開銷
  • 實現高伸縮架構

在現現在的大數據領域,kafka在承接上下游、串聯數據流管道方面發揮了重要的做用:全部的數據幾乎都要從一個系統流入kafka,而後再流入下游的另外一個系統中 。這樣使用方式家常便飯以致於引起了kafka社區的思考:與其我把數據從一個系統傳遞到下一個系統進行處理,我爲什麼不本身實現一套流處理框架呢?基於這個考量,kafka社區在0.10.0.0版本推出了流處理組件kafka streams,也正是從這個版本開始,kafka正式變身爲分佈式的流處理平臺,而再也不僅僅只是消息引擎系統了。今天kafka是和storm、spark、flink同等級的實時流處理平臺了。

那麼做爲流處理平臺,kafka與其餘大數據流式計算框架相比,優點在哪裏呢?

  • 第一點是更容易實現端到端的正確性(correctness)。流處理要最終替代它的兄弟批處理須要具有兩點核心優點:要實現正確性和提供可以推導時間的工具。實現正確性是流處理可以匹敵批處理的基石。正確性一直是批處理的強項,而實現正確性的基石則是要求框架能提供'精確一次語義處理',即處理一條消息有且只有一次機會可以影響系統狀態。目前主流的大數據流處理框架都宣稱實現了'精確一次語義處理',可是這是有限定條件的,即它們只能實現框架內的精確一次語義處理,沒法實現端到端的。這是爲何呢?由於當這些框架與外部消息引擎系統結合使用時,它們沒法影響到外部系統的處理語義,因此若是你搭建了一套環境使得spark或flink從kafka讀取消息以後進行有狀態的數據計算,最後再寫回kafka,那麼你只能保證在spark或者flink內部,這條消息對於狀態的影響只有一次。可是計算結果有可能屢次寫入的kafka,由於它們不能控制kafka的語義處理。相反地,kafka則不是這樣,由於全部的數據流轉和計算都在kafka內部完成,故kafka能夠實現端到端的精確一次處理。
  • 第二點是kafka本身對於流式計算的定位。官網上明確表示kafka streams是一個用於搭建實時流處理的客戶端庫而非是一個完整地功能系統。這就是說,你不能指望着kafka提供相似於集羣調度、彈性部署等開箱即用的運維特性,你須要本身選擇合適的工具或者系統來幫助kafka流處理應用實現這些功能。讀到這裏可能以爲這怎麼是有點呢?坦率的說,這是一個雙刃劍的設計,也是kafka劍走偏鋒不正面pk其餘流計算框架的特地考量。大型公司的流處理平臺必定是大規模部署的,所以具有集羣調度功能以及靈活的部署方案是不可獲取的要素。但畢竟世界上還存在着不少中小企業,它們的流處理數據量並不巨大,邏輯也不復雜,部署幾臺或者十幾臺機器足以應付。在這樣的需求下,搭建重量級的完整性平臺實在是'殺雞用宰牛刀',而這正式kafka流處理組件的用武之地。所以從這個角度來講,將來在流處理框架當中,kafka應該是有着一席之地的。

這裏再來解釋一下什麼是精確一次語義處理。舉個例子,若是咱們使用kafka計算某網頁的pv,咱們將每次網頁訪問都做爲一個消息發送給kafka,pv的計算就是咱們統計kafka總共接收了多少條這樣的消息便可。精確一次語義處理表示每次網頁訪問都會產生、且只產生一條消息

處理消息引擎和流處理平臺,kafka還有別的用途嗎?固然有,kafka甚至可以被用做分佈式存儲系統,可是實際生產中,沒有人會把kafka當中分佈式存儲系統來用的。kafka從一個優秀的消息引擎系統起家,逐漸演變成如今的分佈式的流處理平臺。咱們不只要熟練掌握它做爲消息引擎系統的非凡特性以及使用技巧,最好還要多瞭解下其流處理組件的設計與案例應用。

應該選擇哪一種kafka

咱們上一節談了kafka當前的定位問題,kafka再也不是一個單純的消息引擎系統,而是可以實現精確一次(exactly-once)語義處理的實時流平臺。咱們到目前爲止所說的kafka都是Apache kafka,kafka是Apache社區的一個頂級項目,若是咱們把視角從流處理平臺擴展到流處理生態圈,kafka其實還有很長的路要走,畢竟是半路出家轉型成流處理平臺的。前面咱們提到過kafka streams組件,正是它提供了kafka實時處理流數據的能力。可是其實還有一個重要的組件沒有說起,那就是kafka connect。

咱們在評估流處理平臺的時候,框架自己的性能、所提供操做算子(operator)的豐富程度當然是重要的評判指標,可是框架與上下游交互的能力也是很是重要的。可以與之進行數據傳輸的外部系統越多,圍繞它打造的生態圈就越牢固,於是也就有更多的人願意去使用它,從而造成正向反饋,不斷地促進該生態圈的發展。就kafka而言,kafka connect經過一個個具體的鏈接器(connector),串聯起上下游的外部系統。

整個kafka生態圈以下圖所示,值得注意的是,這張圖的外部系統只是kafka connect組件支持的一部分而已。目前還有一個可喜的趨勢是使用kafka connect組件的用戶愈來愈多,相信在將來會有愈來愈多的人開發本身的鏈接器。

說了這麼多,可能會有人好奇這跟這一節的主題有什麼關係呢?其實清晰地瞭解kafka的發展脈絡和生態圈現狀,對於咱們選擇合適的kafka版本大有裨益。下面咱們就進入今天的主題---如何選擇kafka版本

首先你知道幾種kafka

咦?kafka不是一個開源框架嗎?什麼叫有幾種kafka,實際上,kafka的確有好幾種kafka啊?實際上,kafka的確有好幾種,這裏我不是指它的版本,而是指存在多個組織或者公司發佈的不一樣kafka。就像linux的發行版,有Ubuntu、centos等等,雖然說kafka沒有發行版的概念,但姑且能夠這樣的近似的認爲市面上的確存在着多個kafka"發行版"。固然用發行版這個詞只是爲了這裏方便解釋,可是發行版這個詞在kafka生態圈很是陌生,之後聊天時不要用發行版這個詞。下面咱們就看看kafka都有哪些"發行版"

  • Apache kafka

    Apache kafka是最"正宗"的kafka,也應該是最熟悉的發行版了。字kafka開源伊始,它便在Apache基金會孵化並最終畢業成爲頂級項目,也被稱之爲社區版kafka。目前我也是以這個版本的kafka進行介紹的。更重要的是,它是後面其餘全部發行版的基礎。也就是說,其餘的發行版要麼是原封不動的繼承了Apache kafka,要麼是在其基礎之上進行了擴展、添加了新功能,總之Apache kafka是咱們學習和使用kafka的基礎。

  • Confluent kafka

    我先說說Confluent公司吧。2014年,kafka的3個創始人Jay Kreps、Naha Narkhede 和饒軍離開 LinkedIn創辦了Confluent公司,專一於提供基於kafka的企業級流處理解決方案。2019年1月,Confluent公司成功融資1.25億美圓,估值也到了25億美圓,足見資本市場的青睞。這裏說點題外話,饒軍是咱們中國人,清華大學畢業的大神級人物。咱們已經看到愈來愈多的Apache頂級項目創始人中出現了中國人的身影。另外一個例子就是,Apache pulsar,它是一個以戰勝kafka爲目標的新一代消息引擎系統,在開源社區中活躍的中國人數不勝數,這種現象實在使人振奮。還說回Confluent公司,它主要從事商業化的kafka工具開發,並在此基礎上發佈了Confluent kafka。Confluent kafka提供了Apache kafka沒有的高級特性,好比跨數據中心備份、schema註冊中心以及集羣監控工具等等。

  • Cloudera/Hortonworks Kafka

    Cloudera提供的CDH和Hortonworks提供的HDP是很是著名的大數據平臺,裏面集成了目前主流的大數據框架,可以幫助用戶實現從分佈式存儲、集羣調度、流處理到機器學習、實時數據庫等全方位的數據處理。不少創業公司在搭建數據平臺時首選就是這兩個產品。無論是CDH仍是HDP,裏面都集成了Apache kafka,所以就把這款產品中的kafka稱之爲CDH kafka和HDP kafka

    固然在2018年10月兩家公司宣佈合併,共同打造世界領先的數據平臺,也許之後CDH和HDP也會合併成一款產品,但能確定的是Apache kafka依然會包含其中,並做爲新數據平臺的一部分對外提供服務。

特色比較

okay,說完了目前市面上的這些kafka,咱們來對比一下它們的優點和劣勢

  • 1.Apache kafka

    對Apache kafka而言,它如今依舊是開發人數最多,版本迭代速度最快的kafka。在2018年度Apache基金會郵件列表中開發者數量最多的top5排行榜中,kafka社區郵件組排名第二位。若是你使用Apache kafka碰到任何問題並將問題提交到社區,社區都會比較及時的響應你。這對於咱們kafka普通使用者來講無疑是很是友好的。

    可是Apache kafka的劣勢在於它僅僅提供最最基礎的主組件,特別是對於前面提到的kafka connect而言,社區版kafka只提供一種鏈接器,即讀寫磁盤文件的鏈接器,而沒有與其餘外部系統交互的鏈接器,在實際使用過程當中須要自行編寫代碼實現,這是它的一個劣勢。另外Apache kafka沒有提供任何監控框架或工具,而在線上環境不加監控確定是不行的,你必然須要藉助第三方的監控框架來對kafka進行監控。好消息是目前有一些開源的監控框架能夠幫助用於監控kafka(好比kafka manager)

    總而言之,若是僅僅須要一個消息引擎系統亦或是簡單的流處理應用場景,同時須要對系統有較大把控度,那麼我推薦你使用Apache kafka

  • 2.Confluent kafka

    下面來看看Confluent kafka。Confluent kafka目前分爲免費版和企業版兩種。前者和Apache kafka很是相像,除了常規的組件以外,免費版還包含schema註冊中心和rest proxy兩大功能。前者是幫助你集中管理kafka消息格式以實現數據向前/向後兼容;後者用開放的HTTP接口的方式容許你經過網絡訪問kafka的各類功能,這兩個都是Apache kafka所沒有的。除此以外,免費版還包含了更多的鏈接器,它們都是Confluent公司開發並認證過的,你能夠無償使用它們。至於企業版,它提供的功能就更多了,最有用的當屬跨數據中心備份和集羣監控兩大功能了。多個數據中心之間數據的同步以及對集羣的監控從來是kafka的痛點,Confluent kafka企業版提供了強大的解決方案來幫助你幹掉它們。不過Confluent kafka沒有發展國內業務的計劃,相關資料以及技術支持都很欠缺,不少國內的使用者都沒法找到對應的中文文檔,所以目前Confluent kafka在國內的普及率是比較低的。

    一言以蔽之,若是你須要使用kafka的一些高級特性,那麼推薦你使用Confluent kafka。

  • 3.CDH/HDP kafka

    最後說說大數據雲公司發佈的kafka,這些大數據平臺自然繼承了Apache kafka,經過便捷化的界面操做將kafka的安裝、運維、管理、監控所有統一在控制檯中。若是你是這些平臺的用戶必定以爲很是方便,由於全部的操做均可以在前段UI界面上完成,而沒必要執行復雜的kafka命令。另外這些平臺的監控界面也很是友好,你一般不須要進行任何配置就能有效的監控kafka。

    可是凡事有利就有弊,這樣作的結果就是直接下降了你對kafka集羣的掌握程度。畢竟你對下層的kafka集羣一無所知,你怎麼可以作到心中有數呢?這種kafka的另外一個弊端在於它的滯後性,因爲它有本身的發佈週期,所以是否能及時地包含最新版本的kafka就成爲了一個問題。好比CDH6.1.0版本發佈時Apache kafka已經演進到了2.1.0版本,但CDH中的kafka仍然是2.0.0版本,顯然那些在kafka2.1.0中修復的bug只能等到CDH下次版本更新時纔有可能被真正修復。

    簡單來講,若是你須要快速的搭建消息引擎系統,或者你須要搭建的是多框架構成的數據平臺且kafka只是其中的一個組件,那麼建議使用這些大數據雲公司提供的kafka。

小結

總結一下,咱們今天討論了不一樣的kafka"發行版"以及它們的優缺點,根據這些優缺點,咱們能夠有針對性地根據實際需求選擇合適的kafka。最後咱們回顧一下今天的內容:

  • Apache kafka,也成社區辦kafka。優點在於迭代速度快,社區響應度高,使用它可讓你有更高的把控度;缺陷在於僅提供最基礎的核心組件,缺失一些高級的特性。
  • Confluent kafka,Confluent公司提供的kafka。優點在於集成了不少高級特性且由kafka原版人馬打造,質量上有保證;缺陷在於相關資料不全,普及率較低,沒有太多可供參考的範例。
  • CDH/HPD kafka,大數據雲公司提供的kafka,內嵌Apache kafka。優點在於操做簡單,節省運維成本;缺陷在於把控度低,演進速度較慢。

聊聊kafka的版本號

今天聊聊kafka版本號的問題,這個問題實在是過重要了,我以爲甚至是往後可否用好kafka的關鍵。上一節咱們介紹了kafka的幾種發行版,其實不管是哪一種kafka,本質上都內嵌了最核心的Apache kafka,也就是社區版kafka,那今天咱們就說說Apache kafka版本號的問題。在開始以前,先強調一下,後面出現的全部"版本"這個詞都表示kafka具體的版本號,而非上一節中介紹kafka種類,這一點要切記。

那麼如今可能會有這樣的疑問,我爲何要關心版本號的問題呢?直接使用最新版本不就行了嗎?固然了,這的確是一種有效的版本選擇的策略,但我想強調的是這種策略並不是在任何場景下都適用。若是你不瞭解各個版本之間的差別和功能變化,你怎麼能準確地評判某kafka版本是否是知足你的業務需求呢?所以在深刻學習kafka以前,花些時間搞明白版本演進,其實是很是划算的一件事。

kafka版本命名

當前Apache kafka已經迭代到2.2版本,社區正在爲2.3.0發版日期進行投票,相信2.3.0也會立刻發佈。可是稍微有些使人吃驚的是,不少人對於kafka的版本命名理解存在歧義。好比咱們在官網下載kafka時,會看到這樣的版本。

因而有些人或許就會納悶,難道kafka的版本號不是2.11或者2.12嗎?其實否則,前面的版本號是編譯kafka源代碼的Scala編譯器版本。kafka服務器端的代碼徹底由Scala語言編寫,Scala同時支持面向對象編程和函數式編程,用Scala寫的源代碼編譯以後也是普通".class"文件,所以咱們說Scala是JVM系的語言,它的不少設計思想都是爲人稱道的。

事實上目前java新推出的不少功能都是在不斷地向Scala靠近,好比lambda表達式、函數式接口、val變量等等。一個有意思的事情是,kafka新版客戶端代碼徹底由java語言編寫,因而有人展開了java vs Scala的討論,並從語言特性的角度嘗試分析kafka社區爲何放棄Scala轉而使用java重寫客戶端代碼。其實事情遠沒有那麼複雜,僅僅是由於社區來了一批java程序員而已,而之前老的Scala程序員隱退罷了。可能有點跑題了,可是無論怎麼樣,我依然建議你有空學一學python語言。

回到剛纔的版本號討論,如今你應該知道了對於kafka-2.11-2.1.1的提法,真正的kafka版本號是2.1.1,那麼這個2.1.1又表示什麼呢?前面的2表示大版本號,即major version;中間的1表示小版本號或者次版本號,即minor version;最後的1表示修訂版本號,也就是patch號。kafka社區在發佈1.0.0版本後特地寫過一篇文章,宣佈kafka版本命名規則正式從4位演進到3位,好比0.11.0.0版本就是4位版本號。

kafka版本演進

於kafka目前總共演進了7個大版本,分別是0.七、0.八、0.九、0.十、0.十一、1.0和2.0,其中的小版本和patch版本不少。哪些版本引入了哪些重大的功能改進?建議你最好作到如數家珍,由於這樣不只令你在和別人交談時顯得很酷,並且若是你要向架構師轉型或者已然是架構師,那麼這些都是可以幫助你進行技術選型、架構評估的重要依據。

咱們先從0.7版本提及,實際上也沒有什麼可說的,這是最先開源時的上古版本了。這個版本只提供了最基礎的消息隊列功能,甚至連副本機制都沒有,我實在想不出來有什麼理由你要使用這個版本,所以若是有人要向你推薦這個版本,果斷走開好了。

kafka從0.7時代演進到0.8以後正式引入了副本機制,至此kafka成爲了一個真正意義上完備的分佈式、高可靠消息隊列解決方案。有了副本備份機制,kafka就可以比較好地作到消息無丟失。那時候生產和消費消息使用的仍是老版本客戶端的api,所謂老版本是指當你使用它們的api開發生產者和消費者應用時,你須要指定zookeeper的地址而非broker的地址。

若是你如今尚不能理解這二者的區別也沒有關係,我會在後續繼續介紹它們。老版本的客戶端有不少的問題,特別是生產者api,它默認使用同步方式發送消息,能夠想到其吞吐量必定不會過高。雖然它也支持異步的方式,但實際場景中消息有可能丟失,所以0.8.2.0版本社區引入了新版本producer api,即須要指定broker地址的producer。

據我所知,國內依然有少部分用戶在使用0.8.1.一、0.8.2版本。個人建議是儘可能使用比較新的版本,若是你不能升級大版本,我也建議你至少要升級到0.8.2.2這個版本,由於該版本中老版本消費者的api是比較穩定的。另外即便升級到了0.8.2.2,也不要使用新版本producer api,此時它的bug還很是的多。

時間來到了2015年11月,社區正式發佈了0.9.0.0版本,在我看來這是一個重量級的大版本更迭,0.9大版本增長了基礎的安全認證/權限功能,同時使用java重寫了新版本消費者的api,另外還引入了kafka connect組件用於實現高性能的數據抽取。若是這麼眼花繚亂的功能你一時無暇顧及,那麼我但願你記住這個版本另外一個好處,那就是新版本的producer api在這個版本中算比較穩定了。若是你使用0.9做爲線上環境不妨切換到新版本producer,這是此版本一個不太爲人所知的優點。但和0.8.2引入新api問題相似,不要使用新版本的consumer api,由於bug超級多,絕對用到你崩潰。即便你反饋問題到社區,社區也無論的,它會無腦的推薦你升級到新版本再試試,所以千萬別用0.9新版本的consumer api。對於國內一些使用比較老的CDH的創業公司,鑑於其內嵌的就是0.9版本,因此要格外注意這些問題。

0.10.0.0是里程碑式的大版本,由於該版本引入了kafka streams。從這個版本起,kafka正式升級成爲分佈式流處理平臺,雖然此時的kafka streams還不能上線部署使用。0.10大版本包含兩個包含兩個小版本:0.10.1和0.10.2,它們的主要功能變動都是在kafka streams組件上。若是把kafka做爲消息引擎,實際上該版本並無太多的功能提高。不過在個人印象中,自從0.10.2.2版本起,新版本consumer api算是比較穩定了。若是你依然在使用0.10大版本,那麼我強烈建議你至少升級到0.10.2.2而後再使用新版本的consumer api。還有個事情不得不提,0.10.2.2修復了一個可能致使producer性能下降的bug。基於性能的緣故你也應該升級到0.10.2.2。

在2017年6月,社區發佈了0.11.0.0版本,引入了兩個重量級的功能變動:一個是提供冪等性producer api;另外一個是對kafka消息格式作了重構。

  • 前一個好像更加吸引眼球一些,畢竟producer實現冪等性以及支持事務都是kafka實現流處理結果正確性的基石。沒有它們,kafka streams在作流處理時沒法像批處理那樣保證結果的正確性。固然一樣是因爲剛推出,此時的事務api有一些bug,不算十分穩定。另外事務api主要是爲kafka streams應用服務的,實際使用場景中用戶利用事務api自行編寫程序的成功案例並很少見
  • 第二個改進是消息格式的變化。雖然它對用戶是透明的,可是它帶來的深遠影響將一直持續。由於格式變動引發消息格式轉換而致使的性能問題在生產環境中家常便飯,因此必定要謹慎對待0.11這個版本的變化。不得不說的是,在這個版本中,各個大功能組件都變得至關穩定了,國內該版本的用戶也不少,應該算是目前最主流的版本之一了。也正是由於這個緣故,社區爲0.11大版本特地退出了3個patch版本,足見它的受歡迎程度。個人建議是,若是你對1.0版本是否適用於線上環境依然感到困惑,那麼至少將你的環境升級到0.11.0.3,由於這個版本的消息引擎功能已經很是完善了。

最後合併說一下1.0和2.0版本吧,由於在我看來這兩個大版本主要仍是kafka streams的各類改進,在消息引擎方面並未引入太多的重大功能特性。kafka streams的確在這兩個版本有着很是大的變化,也必須認可kafka streams目前依然還在積極地發展着。若是你是kafka streams的用戶,只要選擇2.0.0版本吧。

去年8月國外出了一本書叫作kafka streams in action,中文譯名:kafka streams實戰,它是基於kafka streams1.0版本撰寫的,可是用2.0版本去運行書中的不少例子,竟然不少都已經沒法編譯了,足見兩個版本的差異之大。不過若是你在乎的依然是消息引擎,那麼這兩個大版本都是能夠用於生產環境的。

最後還有個建議,不論你使用的是哪一個版本,都請儘可能保持服務器端版本和客戶端版本一致,不然你將損失不少kafka爲你提供的性能優化收益。

kafka線上集羣部署方案怎麼作

前面幾節,咱們分別從kafka的定位,版本的變遷以及功能的演進等方面按部就班地梳理了Apache kafka的發展脈絡。那麼如今咱們就來看看生產環境中的kafka集羣方案該怎麼作。既然是集羣,那必然就要有多個kafka節點機器,由於只有單臺機器構成的kafka僞集羣只能用於平常測試之用,根本沒法知足實際的線上生產需求。而真正的線上環境須要仔細地考量各類因素,結合自身的業務需求而制定。下面咱們就從操做系統、磁盤、磁盤容量和帶寬等方面來討論一下。

操做系統:

這個很少BB,果斷選擇linux。至於爲何?主要是在如下這三個方面

  • I/O模型的使用

    什麼是I/O模型,能夠近似的認爲是操做系統執行I/O指令的方法。主流的I/O模型有五種:阻塞式I/O,非阻塞式I/O,I/O多路複用,信號驅動I/O,異步I/O。每種I/O模型都有各自的使用場景,但咱們想要支持高併發的話,都會選擇I/O多路複用,至於異步I/O,因爲操做系統支持的不完美,因此不選擇。對於I/O多路複用有三種,select、poll、epoll,epoll是在linux內核2.4中提出的,對於I/O輪詢能夠作到效率最大化,至於這三者的具體關係就不詳細介紹了,只須要知道epoll"最好"就好了。說了這麼多,那麼I/O模型和kafka又有什麼關係呢?實際上kafka客戶端底層使用java的selector,selector會自動從select、poll、epoll中選擇一個,而Windows只支持select。所以在這一點上linux是有優點的,由於可以得到更高效的I/O性能。

  • 數據網絡傳輸效率

    首先kafka生產和消費的消息都是經過網絡傳輸的,而消息保存在哪裏呢?確定是磁盤,故kafka須要在磁盤和網絡之間進行大量數據傳輸。若是你熟悉linux,那麼你確定據說過零拷貝(zero copy)技術,就是當數據在磁盤和網絡進行傳輸時避免昂貴的內核態數據拷貝從而實現快速的數據傳輸。linux平臺實現了這樣的零拷貝機制,但有些遺憾的是在Windows平臺上必須等到java8的60更新版本才能享受這個福利。一句話總結一下,在linux部署kafka可以享受到零拷貝技術所帶來的快速數據傳輸特性。

  • 社區支持度

    最後是社區支持度,這一點雖然不是什麼明顯的差異,但若是不瞭解的話,所形成的影響可能會比前兩個因素更大。簡單的來講,就是社區目前對Windows平臺上發現的bug不作任何承諾。所以Windows平臺上部署kafka只適合於我的測試或用於功能驗證,千萬不要用於生產環境。

磁盤:

若是要問哪一種資源對kafka性能最重要,磁盤無疑是要排名靠前的。在對kafka集羣進行磁盤規劃時常常要面對的問題是,我應該選擇普通的機械磁盤仍是固態硬盤?前者成本低且容量大,但易損壞;後者性能優點大,不過單價高。我的建議:使用普通的機械硬盤便可。

kafka大量使用磁盤不假,可它使用的方式是多順序讀寫操做,必定程度上規避了機械磁盤最大的劣勢,即隨機讀寫操做慢。從這一點上,使用ssd彷佛沒有太大的性能優點,畢竟從性價比是哪一個來講,機械磁盤物美價廉,而它因易損壞而形成的可靠性差等缺陷,又有kafka在軟件層面提供機制來保證,故使用普通機械磁盤是很划算的

關於磁盤選擇另外一個經常討論的話題,究竟是否應該使用磁盤陣列(raid)。使用磁盤陣列的兩個優點在於:

  • 提供冗餘的磁盤存儲空間
  • 提供負載均衡

以上兩個優點對於任何一個分佈式系統都頗有吸引力。不過就kakfa而言,一方面kafka本身實現了冗餘機制來提供高可靠性;另外一方面經過分區的概念,kafka也能在軟件層面自行實現負載均衡。如此一來磁盤陣列的優點就沒有那麼明顯了,固然並非說磁盤陣列很差,實際上依然有不少大廠是把確實是把kafka底層的存儲交給磁盤陣列的,只是目前kafka在存儲這方面提供了愈來愈便捷的高可靠性方案,所以在線上環境使用磁盤陣列彷佛變得不那麼重要了。綜合以上的考量,我的給出的建議是:

  • 追求性價比的公司能夠不搭建磁盤陣列,使用普通磁盤組成存儲空間便可。
  • 使用機械磁盤徹底可以勝任kafka線上環境。

磁盤容量:

kafka集羣到底須要多大的存儲空間,這是一個很是經典的規劃問題。kafka須要將消息保存在底層的磁盤上,這些消息默認會被保存一段時間而後自動被刪除。雖然這段時間是能夠配置的,但你應該如何結合自身業務場景和存儲需求來規劃kafka集羣的存儲容量呢?

我舉一個簡單的例子來講明如何思考這個問題,假設你所在公司有個業務天天須要向kafka集羣發送1億條消息,每條消息保存兩份以防止數據丟失,另外消息默認保存兩週時間。如今假設消息的平均大小是1KB,那麼你能說出你的kafka集羣須要爲這個業務預留多少磁盤空間嗎?

咱們來計算一下,天天1億條1KB大小的消息,保存兩份且存兩週的時間,那麼總的空間大小就等於1億 * 1KB * 2/ 1024 / 1024 。通常狀況下,kafka集羣除了消息數據還有其餘類型的數據,好比索引數據等,所以咱們須要再爲這些數據預留出10%的磁盤空間,所以咱們在原來的基礎上乘上1.1,既然要保存兩週,那麼再乘上14,那麼總體容量大概爲21.5TB左右。因爲kafka支持數據的壓縮 ,假設數據的壓縮比是0.75,那麼最後你須要規劃的存儲空間是21.5 * 0.75=16.14TB左右。

總之在規劃磁盤容量時你須要考慮下面這幾個元素:

  • 新增消息數
  • 消息留存時間
  • 平均消息大小
  • 備份數
  • 是否啓用壓縮

帶寬:

對於kafka這種經過網絡進行大量數據傳輸的框架而言,帶寬特別容易成爲瓶頸。事實上,在真實案例當中,帶寬資源不足致使kafka出現性能問題的比例至少佔60%以上。若是你的環境中還要涉及跨機房傳輸,那麼狀況狀況可能更糟糕了。

若是你不是超級土豪的話,我會認爲你使用的是普通的以太網,帶寬也主要有兩種:1Gbps的千兆網絡,和10Gbps的萬兆網絡,特別是千兆網絡應該是通常公司網絡的標準配置了。下面我就以千兆網舉一個實際的例子,來講明一下如何進行帶寬資源的規劃。

與其說是帶寬資源的規劃,其實真正要規劃的是所需的kafka服務器的數量。假設你公司的機房環境是千兆網絡,即1Gps,如今你有個業務,其業務目標或SLA是在1小時內處理1TB的業務數據。那麼問題來了,你到底須要多少臺kafka服務器來完成這個業務呢?

讓咱們來計算一下,因爲帶寬是1Gps,即每秒處理1Gb的數據,假設每臺kafka服務器都是安裝在專屬的機器上,也就是說每臺kafka機器上沒有混布其餘服務,可是真實環境中不建議這麼作。一般狀況下你只能假設kafka會用到70%的資源,由於總要爲其餘應用或者進程留一些資源。根據實際使用經驗,超過70%的閾值就有網絡丟包的可能性了,故70%的設定是一個比較合理的值,也就是說單臺kafka服務器最多也就能使用700Mb的帶寬資源。

稍等,這只是它能使用的最大帶寬資源,你不能讓kafka服務器常規性地使用這麼多資源,故一般要再額外留出2/3的資源,即單臺服務器使用帶寬爲700/3≈233MBps。須要提示的是,這裏的2/3是至關保守的,你能夠結合本身機器的使用狀況酌情減小此值。

好了,有了240MBps,咱們就能夠計算1小時內處理1TB數據所須要的服務器數量了。根據這個目標,咱們每秒須要處理1TB / 3600s * 8 ≈ 2336MB的數據,除以240,約等於10臺服務器。若是消息還須要額外複製兩份,那麼總的服務器臺數還要乘以3,即30臺。

kafka的安裝&啓動&關閉

下面咱們就來安裝kafka,這裏咱們選擇的版本爲kafka_2.12-2.2.1,安裝在/opt/kafka目錄下,而後配置環境變量,source一下

kafka的目錄結構以下

先來看看kafka的配置文件吧

sink、source顯然是和flume有關的。consumer、producer則是經過命令行啓動消費者、生產者,這個是作測試用的,可是咱們通常都在代碼中寫配置。下面還有一個zookeeper,不過這個zookeeper咱們不用管,由於這是kafka自帶的zookeeper,咱們都用本身的zookeeper。比較重要的是,那麼server.properities,咱們來看一下。

# The id of the broker. This must be set to a unique integer for each broker.
# broker的惟一id,對於每個broker都必須設置爲惟一的"整數",另外一臺broker的話,broker.id=1
broker.id=0

# The address the socket server listens on. It will get the value returned from 
# java.net.InetAddress.getCanonicalHostName() if not configured.
#   FORMAT:
#     listeners = listener_name://host_name:port
#   EXAMPLE:
#     listeners = PLAINTEXT://your.host.name:9092
#listeners=PLAINTEXT://:9092
# 監聽端口9092

# The number of threads that the server uses for receiving requests from the network and sending responses to the network
# 服務用於接收來自網絡的請求以及向網絡發送響應的線程數
num.network.threads=3

# The number of threads that the server uses for processing requests, which may include disk I/O
# 服務用於處理請求的線程數、可能包含磁盤IO
num.io.threads=8

# The send buffer (SO_SNDBUF) used by the socket server
# socket服務端用於發送數據的緩存,意思是當數據到達指定的緩存以後才發送
socket.send.buffer.bytes=102400

# The receive buffer (SO_RCVBUF) used by the socket server
# socket服務端用於接收數據的緩存,意思是當數據達到指定的緩存以後纔讀取
socket.receive.buffer.bytes=102400

# The maximum size of a request that the socket server will accept (protection against OOM)
# socket服務容許接收的請求的最大字節數
socket.request.max.bytes=104857600

# 以上都是默認配置,咱們就不改了

# A comma separated list of directories under which to store log files
# 用逗號分隔的一系列文件路徑,用於存儲日誌文件
# 注意:其實不止日誌文件,還有暫存數據,也存在這裏面。都叫作log,這一點容易混淆,務必記住
log.dirs=/tmp/kafka-logs

# The default number of log partitions per topic. More partitions allow greater
# parallelism for consumption, but this will also result in more files across
# the brokers.
# 每個注意的分區數,下面都不用管
num.partitions=1

# The number of threads per data directory to be used for log recovery at startup and flushing at shutdown.
# This value is recommended to be increased for installations with data dirs located in RAID array.
# 用來恢復和清理data下數據的線程數量
num.recovery.threads.per.data.dir=1

############################# Internal Topic Settings  #############################
# The replication factor for the group metadata internal topics "__consumer_offsets" and "__transaction_state"
# For anything other than development testing, a value greater than 1 is recommended for to ensure availability such as 3.
# 副本系數等等
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1


# The maximum size of a log segment file. When this size is reached a new log segment will be created.
# 每個日誌段的最大字節,換算以後是一個G
log.segment.bytes=1073741824

# The interval at which log segments are checked to see if they can be deleted according
# to the retention policies
# segment保留的最長時間,超時將被刪除
log.retention.check.interval.ms=300000

# Zookeeper connection string (see zookeeper docs for details).
# This is a comma separated host:port pairs, each corresponding to a zk
# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002".
# You can also append an optional chroot string to the urls to specify the
# root directory for all kafka znodes.
# 配置鏈接zookeeper的地址,若是多個zookeeper的話,那麼就用逗號分割
zookeeper.connect=localhost:2181

# Timeout in ms for connecting to zookeeper
# 鏈接zookeeper的最大超時時間
zookeeper.connection.timeout.ms=6000

咱們再來看一下,bin目錄

有五個sh腳本是比較經常使用的,kafka-console-consumer.sh、kafka-console-producer.sh,這兩個是在控制檯啓動的,用於測試。kafka-server-start.sh、kafka-server-stop.sh,這兩個是啓動kafka集羣的。kafka-topics.sh,這個是與主題相關的,能夠對主題進行相關操做。
啓動kafka:bin]# ./kafka-server-start.sh ../config/server.properties,我是在bin目錄下啓動的,注意啓動的時候須要指定配置文件,就是咱們剛纔配的server.properties。可是注意的是,這樣啓動的話,進程是一個阻塞的,若是想進行別的操做,只能單獨開一個終端了,所以咱們能夠以守護進程的方式啓動:bin]# ./kafka-server-start.sh -daemon ../config/server.properties。不過還有一點須要注意:那就是咱們指定了zookeeper,是否是要先啓動zookeeper呢?沒錯,zkServer.sh start,啓動以後才能啓動kafka,不然就會鏈接zookeeper超時,從而致使啓動失敗

關閉kafka:bin]# ./kafka-server-stop.sh ../config/server.properties,關閉的時候就不須要指定-daemon這個參數了。

命令行操做topic增刪查

查看全部topic

kafka-topics.sh --list --zookeeper localhost:2181

能夠看到,因爲咱們尚未建立,因此此時尚未主題。

建立topic:

kafka-topics.sh --create --zookeeper localhost:2181 --topic 主題名 --partitions 分區數 --replication-factor 副本數

注意:副本數不能超過你broker的數量,由於咱們只有一臺機器,因此副本數是1,可是分區在一臺broker上是能夠有多個的

刪除topic:

kafka-topics.sh --delete --zookeeper localhost:2181 --topic 主題名

這裏提示咱們,若是沒有將delete.topic.enable設置爲true,那麼這個分區不會被刪除,可是satori這個主題已經被標記爲刪除了。咱們看看,就知道了,或者說再建立一個satori,若是存在會報錯的。

能夠看到,這個分區是真的被刪除了。

查看topic信息:

kafka-topics.sh --describe --zookeeper localhost:2181 --topic 主題名

命令行控制檯生產者消費者測試

啓動生產者:

kafka-console-producer.sh --topic 主題 --broker-list localhost:9092,注意這裏是--broker-list,也就是broker的地址

啓動消費者:

kafka-console-consumer.sh --topic 主題 --bootstrap-server localhost:9092

此時消費者卡在了這個地方,等待生產者生產數據。

數據默認保留7天,超過7天就會刪除。可是還有一個問題,要是消費者啓動以前,生產生產消息了,怎麼辦?顯然此時的消費者是接收不到的,所以咱們能夠加上一個--from-beginning參數,這樣的話就能夠把消息所有消費掉。

關閉消費者以後,生產者又生產了兩條消息,而後啓動消費者。

相關文章
相關標籤/搜索