Apache Kafka 是一個快速、可擴展的、高吞吐的、可容錯的分佈式「發佈-訂閱」消息系統, 使用 Scala 與 Java 語言編寫,可以將消息從一個端點傳遞到另外一個端點,較之傳統的消息中 間件(例如 ActiveMQ、RabbitMQ),Kafka 具備高吞吐量、內置分區、支持消息副本和高容 錯的特性,很是適合大規模消息處理應用程序。java
Kafka 官網: http://kafka.apache.org/算法
Kafka主要設計目標以下:spring
Kafka一般用於兩大類應用程序:數據庫
要了解Kafka如何執行這些操做,讓咱們從頭開始深刻研究Kafka的功能。apache
首先幾個概念:bootstrap
kafka的應用場景很是多, 下面咱們就來舉幾個咱們最多見的場景api
用戶在網站的不一樣活動消息發佈到不一樣的主題中心,而後能夠對這些消息進行實時監測、實時處理。固然,也能夠加載到Hadoop或離線處理數據倉庫,對用戶進行畫像。像淘寶、天貓、京東這些大型電商平臺,用戶的全部活動都要進行追蹤的。安全
Kafka與其餘MQ相比,最大的特色就是高吞吐率。爲了增長存儲能力,Kafka將全部的消息都寫入到了低速大容量的硬盤。按理說,這將致使性能損失,但實際上,Kafka仍然能夠保持超高的吞吐率,而且其性能並未受到影響。其主要採用以下方式實現了高吞吐率。springboot
在項目啓動之初來預測未來項目會碰到什麼需求,是極其困難的。消息系統在處理過程當中間插入了一個隱含的、基於數據的接口層,兩邊的處理過程都要實現這一接口。這容許你獨立的擴展或修改兩邊的處理過程,只要確保它們遵照一樣的接口約束。服務器
有些狀況下,處理數據的過程會失敗。除非數據被持久化,不然將形成丟失。消息隊列把數據進行持久化直到它們已經被徹底處理,經過這一方式規避了數據丟失風險。許多消息隊列所採用的"插入-獲取-刪除"範式中,在把一個消息從隊列中刪除以前,須要你的處理系統明確的指出該消息已經被處理完畢,從而確保你的數據被安全的保存直到你使用完畢。
由於消息隊列解耦了你的處理過程,因此增大消息入隊和處理的頻率是很容易的,只要另外增長處理過程便可。不須要改變代碼、不須要調節參數。擴展就像調大電力按鈕同樣簡單。
在訪問量劇增的狀況下,應用仍然須要繼續發揮做用,可是這樣的突發流量並不常見;若是爲以能處理這類峯值訪問爲標準來投入資源隨時待命無疑是巨大的浪費。使用消息隊列可以使關鍵組件頂住突發的訪問壓力,而不會由於突發的超負荷的請求而徹底崩潰。
系統的一部分組件失效時,不會影響到整個系統。消息隊列下降了進程間的耦合度,因此即便一個處理消息的進程掛掉,加入隊列中的消息仍然能夠在系統恢復後被處理。
在大多使用場景下,數據處理的順序都很重要。大部分消息隊列原本就是排序的,而且能保證數據會按照特定的順序來處理。Kafka保證一個Partition內的消息的有序性。
在任何重要的系統中,都會有須要不一樣的處理時間的元素。例如,加載一張圖片比應用過濾器花費更少的時間。消息隊列經過一個緩衝層來幫助任務最高效率的執行———寫入隊列的處理會盡量的快速。該緩衝有助於控制和優化數據流通過系統的速度。
不少時候,用戶不想也不須要當即處理消息。消息隊列提供了異步處理機制,容許用戶把一個消息放入隊列,但並不當即處理它。想向隊列中放入多少消息就放多少,而後在須要的時候再去處理它們。
RabbitMQ是使用Erlang編寫的一個開源的消息隊列,自己支持不少的協議:AMQP,XMPP, SMTP, STOMP,也正因如此,它很是重量級,更適合於企業級的開發。同時實現了Broker構架,這意味着消息在發送給客戶端時先在中心隊列排隊。對路由,負載均衡或者數據持久化都有很好的支持。
Redis是一個基於Key-Value對的NoSQL數據庫,開發維護很活躍。雖然它是一個Key-Value數據庫存儲系統,但它自己支持MQ功能,因此徹底能夠當作一個輕量級的隊列服務來使用。對於RabbitMQ和Redis的入隊和出隊操做,各執行100萬次,每10萬次記錄一次執行時間。測試數據分爲128Bytes、512Bytes、1K和10K四個不一樣大小的數據。實驗代表:入隊時,當數據比較小時Redis的性能要高於RabbitMQ,而若是數據大小超過了10K,Redis則慢的沒法忍受;出隊時,不管數據大小,Redis都表現出很是好的性能,而RabbitMQ的出隊性能則遠低於Redis。
ZeroMQ號稱最快的消息隊列系統,尤爲針對大吞吐量的需求場景。ZeroMQ可以實現RabbitMQ不擅長的高級/複雜的隊列,可是開發人員須要本身組合多種技術框架,技術上的複雜度是對這MQ可以應用成功的挑戰。ZeroMQ具備一個獨特的非中間件的模式,你不須要安裝和運行一個消息服務器或中間件,由於你的應用程序將扮演這個服務器角色。你只須要簡單的引用ZeroMQ程序庫,可使用NuGet安裝,而後你就能夠愉快的在應用程序之間發送消息了。可是ZeroMQ僅提供非持久性的隊列,也就是說若是宕機,數據將會丟失。其中,Twitter的Storm 0.9.0之前的版本中默認使用ZeroMQ做爲數據流的傳輸(Storm從0.9版本開始同時支持ZeroMQ和Netty做爲傳輸模塊)。
ActiveMQ是Apache下的一個子項目。 相似於ZeroMQ,它可以以代理人和點對點的技術實現隊列。同時相似於RabbitMQ,它少許代碼就能夠高效地實現高級應用場景。
Kafka是Apache下的一個子項目,是一個高性能跨語言分佈式發佈/訂閱消息隊列系統,而Jafka是在Kafka之上孵化而來的,即Kafka的一個升級版。具備如下特性:快速持久化,能夠在O(1)的系統開銷下進行消息持久化;高吞吐,在一臺普通的服務器上既能夠達到10W/s的吞吐速率;徹底的分佈式系統,Broker、Producer、Consumer都原生自動支持分佈式,自動實現負載均衡;支持Hadoop數據並行加載,對於像Hadoop的同樣的日誌數據和離線分析系統,但又要求實時處理的限制,這是一個可行的解決方案。Kafka經過Hadoop的並行加載機制統一了在線和離線的消息處理。Apache Kafka相對於ActiveMQ是一個很是輕量級的消息系統,除了性能很是好以外,仍是一個工做良好的分佈式系統。
任何容許發佈與使用無關的消息發佈的消息隊列都有效地充當了運行中消息的存儲系統。Kafka的不一樣之處在於它是一個很是好的存儲系統。
寫入Kafka的數據將寫入磁盤並進行復制以實現容錯功能。Kafka容許生產者等待確認,以便直到徹底複製並確保即便寫入服務器失敗的狀況下寫入也不會完成。
Kafka的磁盤結構能夠很好地擴展使用-不管服務器上有50 KB仍是50 TB的持久數據,Kafka都將執行相同的操做。
因爲認真對待存儲並容許客戶端控制其讀取位置,所以您能夠將Kafka視爲一種專用於高性能,低延遲提交日誌存儲,複製和傳播的專用分佈式文件系統。
Kafka的流概念與傳統的企業消息傳遞系統相好比何?
傳統上,消息傳遞具備兩種模型:排隊和發佈-訂閱。在隊列中,一組使用者能夠從服務器中讀取內容,而且每條記錄都將轉到其中一個。在發佈-訂閱記錄中廣播給全部消費者。這兩個模型中的每個都有優勢和缺點。排隊的優點在於,它容許您將數據處理劃分到多個使用者實例上,從而擴展處理量。不幸的是,隊列不是多用戶的—一次進程讀取了丟失的數據。發佈-訂閱容許您將數據廣播到多個進程,可是因爲每條消息都傳遞給每一個訂閱者,所以沒法擴展處理。
Kfka的消費者羣體概念歸納了這兩個概念。與隊列同樣,使用者組容許您將處理劃分爲一組進程(使用者組的成員)。與發佈訂閱同樣,Kafka容許您將消息廣播到多個消費者組。
Kafka模型的優勢在於,每一個主題都具備這些屬性-能夠擴展處理範圍,而且是多訂閱者-無需選擇其中一個。
與傳統的消息傳遞系統相比,Kafka還具備更強的訂購保證。
傳統隊列將記錄按順序保留在服務器上,若是多個使用者從隊列中消費,則服務器將按記錄的存儲順序分發記錄。可是,儘管服務器按順序分發記錄,可是這些記錄是異步傳遞給使用者的,所以它們可能在不一樣的使用者上亂序到達。這實際上意味着在並行使用的狀況下會丟失記錄的順序。消息傳遞系統一般經過「專有使用者」的概念來解決此問題,該概念僅容許一個進程從隊列中使用,可是,這固然意味着在處理中沒有並行性。
Kafka作得更好。經過在主題內具備並行性(即分區)的概念,Kafka可以在用戶進程池中提供排序保證和負載均衡。這是經過將主題中的分區分配給消費者組中的消費者來實現的,以便每一個分區都由組中的一個消費者徹底消費。經過這樣作,咱們確保使用者是該分區的惟一讀取器,並按順序使用數據。因爲存在許多分區,所以仍然能夠平衡許多使用者實例上的負載。可是請注意,使用者組中的使用者實例不能超過度區。
僅讀取,寫入和存儲數據流是不夠的,目的是實現對流的實時處理。
在Kafka中,流處理器是指從輸入主題中獲取連續數據流,對該輸入進行一些處理並生成連續數據流以輸出主題的任何東西。
例如,零售應用程序能夠接受銷售和裝運的輸入流,並輸出根據此數據計算出的從新訂購和價格調整流。
能夠直接使用生產者和消費者API進行簡單處理。可是,對於更復雜的轉換,Kafka提供了徹底集成的Streams API。這容許構建執行非重要處理的應用程序,這些應用程序計算流的聚合或將流鏈接在一塊兒。
該功能有助於解決此類應用程序所面臨的難題:處理無序數據,在代碼更改時從新處理輸入,執行狀態計算等。
流API創建在Kafka提供的核心原語之上:它使用生產者和使用者API進行輸入,使用Kafka進行狀態存儲,並使用相同的組機制來實現流處理器實例之間的容錯。
主題。在 Kafka 中,使用一個類別屬性來劃分消息的所屬類,劃分消息的這個類稱爲 topic。 topic 至關於消息的分類標籤,是一個邏輯概念
物理上不一樣Topic的消息分開存儲,邏輯上一個Topic的消息雖然保存於一個或多個broker上但用戶只需指定消息的Topic便可生產或消費數據而沒必要關心數據存於何處
##2.2 Partition
分區。topic 中的消息被分割爲一個或多個 partition,其是一個物理概念,對應到系統上 就是一個或若干個目錄。partition 內部的消息是有序的,但 partition 間的消息是無序的。
段。將 partition 進一步細分爲了若干的 segment,每一個 segment 文件的大小相等。
Kafka 集羣包含一個或多個服務器,每一個服務器節點稱爲一個 broker。
broker存儲topic的數據。若是某topic有N個partition,集羣有N個broker,那麼每一個broker存儲該topic的一個partition。
若是某topic有N個partition,集羣有(N+M)個broker,那麼其中有N個broker存儲該topic的一個partition,剩下的M個broker不存儲該topic的partition數據。
若是某topic有N個partition,集羣中broker數目少於N個,那麼一個broker存儲該topic的一個或多個partition。在實際生產環境中,儘可能避免這種狀況的發生,這種狀況容易致使Kafka集羣數據不均衡。
生產者, 即消息的發佈者. 生產者將數據發佈到他們選擇的主題。生產者負責選擇將哪一個記錄分配給主題中的哪一個分區。即: 生產者生產的一條消息,會被寫入到某一個 partition。
##2.6 Consumer
消費者。能夠從 broker 中讀取消息。
一個消費者能夠消費多個 topic 的消息
一個消費者能夠消費同一個 topic 中的多個 partition 中的消息
一個 partiton 容許多個 consumer 同時消費
consumer group 是 kafka 提供的可擴展且具備容錯性的消費者機制。組內能夠有多個消 費者,它們共享一個公共的 ID,即 group ID。組內的全部消費者協調在一塊兒來消費訂閱主題 的全部分區。
Kafka 保證同一個 consumer group 中只有一個 consumer 會消費某條消息,實際上,Kafka 保證的是穩定狀態下每個 consumer 實例只會消費某一個或多個特定的 partition,而某個 partition 的數據只會被某一個特定的 consumer 實例所消費。
下面咱們用官網的一張圖, 來標識consumer數量和partition數量的對應關係
由兩臺服務器組成的Kafka羣集,其中包含四個帶有兩個使用者組的分區(P0-P3)。消費者組A有兩個消費者實例,組B有四個。
其實對於這個消費組, 之前一直搞不明白, 我本身的總結是:
topic中的partitoin到group是發佈訂閱的通訊方式,即一條topic的partition的消息會被全部的group消費,屬於一對多模式;group到consumer是點對點通訊方式,屬於一對一模式。
舉個例子: 不使用group的話,啓動10個consumer消費一個topic,這10個consumer都能獲得topic的全部數據,至關於這個topic中的任一條消息被消費10次。
使用group的話,鏈接時帶上groupid,topic的消息會分發到10個consumer上,每條消息只被消費1次
分區副本。副本是一個分區的備份,是爲了防止消息丟失而建立的分區的備份。
每一個 partition 有多個副本,其中有且僅有一個做爲 Leader,Leader 是當前負責消息讀寫 的 partition。即全部讀寫操做只能發生於 Leader 分區上。
全部Follower都須要從Leader同步消息,Follower與Leader始終保持消息同步。Leader 與 Follower 的關係是主備關係,而非主從關係。
ISR,In-Sync Replicas,是指副本同步列表。 ISR列表是由Leader負責維護。
偏移量。每條消息都有一個當前Partition下惟一的64字節的offset,它是至關於當前分區第一條消息的偏移量。
Kafka集羣的多個broker中,有一個會被選舉controller,負責管理整個集羣中partition和replicas的狀態。
只有 Broker Controller 會向 zookeeper 中註冊 Watcher,其餘 broker 及分區無需註冊。即 zookeeper 僅需監聽 Broker Controller 的狀態變化便可。
HW,HighWatermark,高水位,表示 Consumer 能夠消費到的最高 partition 偏移量。HW 保證了 Kafka 集羣中消息的一致性。確切地說,是保證了 partition 的 Follower 與 Leader 間數 據的一致性。
LEO,Log End Offset,日誌最後消息的偏移量。消息是被寫入到 Kafka 的日誌文件中的, 這是當前最後一個寫入的消息在 Partition 中的偏移量。
我相信你看完上面的概念仍是懵逼的, 好吧, 下面咱們就用圖來形象話的表示二者的關係吧
Zookeeper 負責維護和協調 broker,負責 Broker Controller 的選舉。
在 kafka0.9 以前版本,offset 是由 zk 負責管理的。
總結:zk 負責 Controller 的選舉,Controller 負責 leader 的選舉。
Coordinator通常指的是運行在每一個broker上的group Coordinator進程,用於管理Consumer Group中的各個成員,主要用於offset位移管理和Rebalance。一個Coordinator能夠同時管理多個消費者組。
當消費者組中的數量發生變化,或者topic中的partition數量發生了變化時,partition的全部權會在消費者間轉移,即partition會從新分配,這個過程稱爲再均衡Rebalance。
再均衡可以給消費者組及broker帶來高性能、高可用性和伸縮,但在再均衡期間消費者是沒法讀取消息的,即整個broker集羣有小一段時間是不可用的。所以要避免沒必要要的再均衡。
##2.18 offset commit
Consumer從broker中取一批消息寫入buffer進行消費,在規定的時間內消費完消息後,會自動將其消費消息的offset提交給broker,以記錄下哪些消息是消費過的。固然,若在時限內沒有消費完畢,其是不會提交offset的。
消息發送者將消息發送給broker, 並造成最終的可供消費者消費的log, 是已給比較複雜的過程
在經過 API 方式發佈消息時,生產者是以 Record 爲消息進行發佈的。Record 中包含 key 與 value,value 纔是咱們真正的消息自己,而 key 用於路由消息所要存放的 Partition。消息 要寫入到哪一個 Partition 並非隨機的,而是有路由策略的。
若指定了 partition,則直接寫入到指定的 partition;
若未指定 partition 但指定了 key,則經過對 key 的 hash 值與 partition 數量取模,該取模
結果就是要選出的 partition 索引;
若是 partition leader 接收到了新的消息, ISR 中其它 Follower 正在同步過程當中,還未同 步完畢時 leader 宕機。此時就須要選舉出新的 leader。若沒有 HW 截斷機制,將會致使 partition 中 leader 與 follower 數據的不一致。
當原 Leader 宕機後又恢復時,將其 LEO 回退到其宕機時的 HW,而後再與新的 Leader進行數據同步,這樣就能夠保證老 Leader 與新 Leader 中數據一致了,這種機制稱爲 HW 截斷機制。
生產者向 kafka 發送消息時,能夠選擇須要的可靠性級別。經過 request.required.acks參數的值進行設置。
異步發送。生產者向 kafka 發送消息而不須要 kafka 反饋成功 ack。該方式效率最高,但可靠性最低。其可能會存在消息丟失的狀況。
同步發送。生產者發送消息給 kafka,broker 的 partition leader 在收到消息後立刻發送 成功 ack(無需等等 ISR 中的 Follower 同步),生產者收到後知道消息發送成功,而後會再發送消息。若是一直未收到 kafka 的 ack,則生產者會認爲消息發送失敗,會重發消息。
該方式對於 Producer 來講,若沒有收到 ACK,必定能夠確認消息發送失敗了,而後能夠 重發;可是,即便收到了 ACK,也不能保證消息必定就發送成功了。故,這種狀況,也可能 會發生消息丟失的狀況。
同步發送。生產者發送消息給 kafka,kafka 收到消息後要等到 ISR 列表中的全部副本都 同步消息完成後,才向生產者發送成功 ack。若是一直未收到 kafka 的 ack,則認爲消息發送 失敗,會自動重發消息。該方式會出現消息重複接收的狀況。
生產者將消息發送到topitc中, 消費者便可對其進行消費, 其消費過程以下:
當leader宕機後,broker controller會從ISR中挑選一個follower成爲新的leader。若是ISR中沒有其餘副本怎麼辦?能夠經過unclean.leader.election.enable的值來設置leader選舉範圍。
必須等到ISR列表中全部的副本都活過來才進行新的選舉。該策略可靠性有保證,但可用性低。
true
在ISR列表中沒有副本的狀況下,能夠選擇任意一個沒有宕機的主機做爲新的leader,該策略可用性高,但可靠性沒有保證。
當Consumer因爲消費能力低而引起了消費超時,則可能會造成重複消費。
在某數據恰好消費完畢,可是正準備提交offset時候,消費時間超時,則broker認爲這條消息未消費成功。這時就會產生重複消費問題。
其解決方案:延長offset提交時間。
當Consumer消費了消息,但尚未提交offset時宕機,則這些已經被消費過的消息會被重複消費。
其解決方案:將自動提交改成手動提交。
其實在開發的時候, 咱們在設計程序的時候, 好比考慮到網絡故障等一些異常的狀況, 咱們都會設置消息的重試次數,
可能還有其餘可能出現消息重複, 那咱們應該如何解決呢?
下面提供三個方案:
給每一個消息都設置一個獨一無二的uuid, 全部的消息, 咱們都要存一個uuid, 咱們在消費消息的時候, 首先去持久化系統中查詢一下, 看這個看是否之前消費過, 如沒有消費過, 在進行消費, 若是已經消費過, 丟棄就行了, 下圖, 代表了這種方案
冪等(Idempotence)在數學上是這樣定義的,若是一個函數 f(x) 知足:f(f(x)) = f(x),則函數 f(x) 知足冪等性。
這個概念被拓展到計算機領域,被用來描述一個操做、方法或者服務。一個冪等操做的特色是,其任意屢次執行所產生的影響均與一次執行的影響相同。一個冪等的方法,使用一樣的參數,對它進行屢次調用和一次調用,對系統產生的影響是同樣的。因此,對於冪等的方法,不用擔憂重複執行會對系統形成任何改變。
咱們舉個例子來講明一下。在不考慮併發的狀況下,「將 X 老師的帳戶餘額設置爲 100 萬元」,執行一次後對系統的影響是,X 老師的帳戶餘額變成了 100 萬元。只要提供的參數 100萬元不變,那即便再執行多少次,X 老師的帳戶餘額始終都是 100萬元,不會變化,這個操做就是一個冪等的操做。
再舉一個例子,「將 X 老師的餘額加 100 萬元」,這個操做它就不是冪等的,每執行一次,帳戶餘額就會增長 100 萬元,執行屢次和執行一次對系統的影響(也就是帳戶的餘額)是不同的。
因此,經過這兩個例子,咱們能夠想到若是系統消費消息的業務邏輯具有冪等性,那就不用擔憂消息重複的問題了,由於同一條消息,消費一次和消費屢次對系統的影響是徹底同樣的。也就能夠認爲,消費屢次等於消費一次。
那麼,如何實現冪等操做呢?最好的方式就是,從業務邏輯設計上入手,將消費的業務邏輯設計成具有冪等性的操做。可是,不是全部的業務都能設計整天然冪等的,這裏就須要一些方法和技巧來實現冪等。
下面咱們介紹一種經常使用的方法:利用數據庫的惟一約束實現冪等。
例如,咱們剛剛提到的那個不具有冪等特性的轉帳的例子:將 X 老師的帳戶餘額加 100 萬元。在這個例子中,咱們能夠經過改造業務邏輯,讓它具有冪等性。
首先,咱們能夠限定,對於每一個轉帳單每一個帳戶只能夠執行一次變動操做,在分佈式系統中,這個限制實現的方法很是多,最簡單的是咱們在數據庫中建一張轉帳流水錶,這個表有三個字段:轉帳單 ID、帳戶 ID 和變動金額,而後給轉帳單 ID 和帳戶 ID 這兩個字段聯合起來建立一個惟一約束,這樣對於相同的轉帳單 ID 和帳戶 ID,表裏至多隻能存在一條記錄。
這樣,咱們消費消息的邏輯能夠變爲:「在轉帳流水錶中增長一條轉帳記錄,而後再根據轉帳記錄,異步操做更新用戶餘額便可。」在轉帳流水錶增長一條轉帳記錄這個操做中,因爲咱們在這個表中預先定義了「帳戶 ID 轉帳單 ID」的惟一約束,對於同一個轉帳單同一個帳戶只能插入一條記錄,後續重複的插入操做都會失敗,這樣就實現了一個冪等的操做。
爲更新的數據設置前置條件另一種實現冪等的思路是,給數據變動設置一個前置條件,若是知足條件就更新數據,不然拒絕更新數據,在更新數據的時候,同時變動前置條件中須要判斷的數據。
這樣,重複執行這個操做時,因爲第一次更新數據的時候已經變動了前置條件中須要判斷的數據,不知足前置條件,則不會重複執行更新數據操做。
好比,剛剛咱們說過,「將 X 老師的帳戶的餘額增長 100 萬元」這個操做並不知足冪等性,咱們能夠把這個操做加上一個前置條件,變爲:「若是X老師的帳戶當前的餘額爲 500萬元,將餘額加 100萬元」,這個操做就具有了冪等性。
對應到消息隊列中的使用時,能夠在發消息時在消息體中帶上當前的餘額,在消費的時候進行判斷數據庫中,當前餘額是否與消息中的餘額相等,只有相等才執行變動操做。
可是,若是咱們要更新的數據不是數值,或者咱們要作一個比較複雜的更新操做怎麼辦?用什麼做爲前置判斷條件呢?更加通用的方法是,給你的數據增長一個版本號屬性,每次更數據前,比較當前數據的版本號是否和消息中的版本號一致,若是不一致就拒絕更新數據,更新數據的同時將版本號 +1,同樣能夠實現冪等。
咱們在工做中, 爲了保證環境的高可用, 防止單點, kafka都是以集羣的方式出現的, 下面就帶領你們一塊兒搭建一套kafka集羣環境
咱們在官網下載kafka, 下載地址爲: http://kafka.apache.org/downloads, 下載咱們須要的版本, 推薦使用穩定的版本
cd /usr/local/src
wget http://mirrors.tuna.tsinghua.edu.cn/apache/kafka/2.4.0/kafka_2.11-2.4.0.tgz
mkdir /data/servers
tar xzvf kafka_2.11-2.4.0.tgz -C /data/servers/
cd /data/servers/kafka_2.11-2.4.0
kafka的配置文件$KAFKA_HOME/config/server.properties, 主要修改一下下面幾項
確保每一個機器上的id不同 broker.id=0 配置服務端的監控地址 listeners=PLAINTEXT://192.168.51.128:9092 kafka 日誌目錄 log.dirs=/data/servers/kafka_2.11-2.4.0/logs #kafka設置的partitons的個數 num.partitions=1 zookeeper的鏈接地址, 若是有本身的zookeeper集羣, 請直接使用本身搭建的zookeeper集羣 zookeeper.connect=192.168.51.128:2181
由於我本身是本機作實驗, 全部使用的是一個主機的不一樣端口, 在線上, 就是不一樣的機器,你們參考便可
咱們這裏使用kafka的zookeeper, 只啓動一個節點, 可是正真的生產過程當中, 是須要zookeeper集羣, 本身搭建就好, 後期咱們也會出zookeeper的教程, 你們請關注就行了.
#建立對應的日誌目錄 mkdir -p /data/servers/kafka_2.11-2.4.0/logs/9092 mkdir -p /data/servers/kafka_2.11-2.4.0/logs/9093 mkdir -p /data/servers/kafka_2.11-2.4.0/logs/9094 #拷貝三份配置文件 cp server.properties server_9092.properties cp server.properties server_9093.properties cp server.properties server_9094.properties
#9092的id爲0, 9093的id爲1, 9094的id爲2 broker.id=0 # 配置服務端的監控地址, 分別在不通的配置文件中寫入不一樣的端口 listeners=PLAINTEXT://192.168.51.128:9092 # kafka 日誌目錄, 目錄也是對應不一樣的端口 log.dirs=/data/servers/kafka_2.11-2.4.0/logs/9092 # kafka設置的partitons的個數 num.partitions=1 # zookeeper的鏈接地址, 若是有本身的zookeeper集羣, 請直接使用本身搭建的zookeeper集羣 zookeeper.connect=192.168.51.128:2181
dataDir=/data/servers/zookeeper server.1=192.168.51.128:2888:3888
echo "1"> /data/servers/zookeeper/myid
使用kafka內置的zookeeper
cd /data/servers/kafka_2.11-2.4.0/bin zookeeper-server-start.sh -daemon ../config/zookeeper.properties netstat -anp |grep 2181
./kafka-server-start.sh -daemon ../config/server_9092.properties ./kafka-server-start.sh -daemon ../config/server_9093.properties ./kafka-server-start.sh -daemon ../config/server_9094.properties
咱們先來看一下建立topic經常使用的參數吧
--create 建立topic
--delete 刪除topic
--alter 修改topic的名字或者partition個數
--list 查看topic
--describe 查看topic的詳細信息
--topic <String: topic> 指定topic的名字
--zookeeper <String: hosts> 指定zookeeper的鏈接地址,
參數提示並不同意這樣使用
DEPRECATED, The connection string for
the zookeeper connection in the form host:port. Multiple hosts can be
given to allow fail-over.
--bootstrap-server <String: server to connect to>: 指定kafka的鏈接地址, 推薦使用這個,
參數的提示信息顯示
REQUIRED: The Kafka server to connect
to. In case of providing this, a direct Zookeeper connection won't be required.
--replication-factor <Integer: replication factor> : 對於每一個partiton的備份個數
The replication factor for each
partition in the topic being
created. If not supplied, defaults
to the cluster default.
--partitions <Integer: # of partitions>: 指定該topic的分區的個數
示例:
cd /data/servers/kafka_2.11-2.4.0/bin # 建立topic test1 kafka-topics.sh --create --bootstrap-server=192.168.51.128:9092,10.231.128.96:9093,192.168.51.128:9094 --replication-factor 1 --partitions 1 --topic test1 # 建立topic test2 kafka-topics.sh --create --bootstrap-server=192.168.51.128:9092,10.231.128.96:9093,192.168.51.128:9094 --replication-factor 1 --partitions 1 --topic test2 # 查看topic kafka-topics.sh --list --bootstrap-server=192.168.51.128:9092,10.231.128.96:9093,192.168.51.128:9094
咱們在工做中, 若是咱們不想去管理topic, 能夠經過kafka的配置文件來管理, 咱們可讓kafka自動建立topic, 須要在咱們的kafka配置文件中加入以下配置文件
auto.create.topics.enable=true
若是刪除topic想達到物理刪除的目的, 也是須要配置的
delete.topic.enable=true
他們能夠經過客戶端的命令生產消息
先來看看kafka-console-producer.sh經常使用的幾個參數吧
--topic <String: topic> 指定topic
--timeout <Integer: timeout_ms> 超時時間
--sync 異步發送消息
--broker-list <String: broker-list> 官網提示: REQUIRED: The broker list string in the form HOST1:PORT1,HOST2:PORT2. 這個參數是必須的
kafka-console-producer.sh --broker-list 192.168.51.128:9092,192.168.51.128:9093,192.168.51.128:9094 --topic test1
咱們也仍是先來看看kafka-console-consumer.sh的參數吧
--topic <String: topic> 指定topic
--group <String: consumer group id> 指定消費者組
--from-beginning : 指定從開始進行消費, 若是不指定, 就從當前進行消費
--bootstrap-server : kafka的鏈接地址
kafka-console-consumer.sh --bootstrap-server 192.168.51.128:9092,192.168.51.128:9093,192.168.51.128:9094 --topic test1 ---beginning
4.3 kafka的日誌
kafka的日誌分兩種:
第一種日誌: 是咱們的kafka的啓動日誌, 就是咱們排查問題, 查看報錯信息的日誌,
第二種日誌:就是咱們的數據日誌, kafka是咱們的數據是以日誌的形式存在存盤中的, 咱們第二種所說的日誌就是咱們的partiton與segment
那咱們就來講說備份和分區吧
咱們建立一個分區, 一個備份, 那麼test就應該在三臺機器上或者三個數據目錄只有一個test-0, (分區的下標是從0開始的)
若是咱們建立N個分區, 咱們就會在三個服務器上發現, test_0-n
若是咱們建立M個備份, 咱們就會在發現, test_0 到test_n 每個都是M個
定義本身的生產者
import org.apache.kafka.clients.producer.Callback; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; import java.util.Properties; /** * @ClassName MyKafkaProducer * @Description TODO * @Author lingxiangxiang * @Date 3:37 PM * @Version 1.0 **/ public class MyKafkaProducer { private org.apache.kafka.clients.producer.KafkaProducer<Integer, String> producer; public MyKafkaProducer() { Properties properties = new Properties(); properties.put("bootstrap.servers", "192.168.51.128:9092,192.168.51.128:9093,192.168.51.128:9094"); properties.put("key.serializer", "org.apache.kafka.common.serialization.IntegerSerializer"); properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); // 設置批量發送 properties.put("batch.size", 16384); // 批量發送的等待時間50ms, 超過50ms, 不足批量大小也發送 properties.put("linger.ms", 50); this.producer = new org.apache.kafka.clients.producer.KafkaProducer<Integer, String>(properties); } public boolean sendMsg() { boolean result = true; try { // 正常發送, test2是topic, 0表明的是分區, 1表明的是key, hello world是發送的消息內容 final ProducerRecord<Integer, String> record = new ProducerRecord<Integer, String>("test2", 0, 1, "hello world"); producer.send(record); // 有回調函數的調用 producer.send(record, new Callback() { @Override public void onCompletion(RecordMetadata recordMetadata, Exception e) { System.out.println(recordMetadata.topic()); System.out.println(recordMetadata.partition()); System.out.println(recordMetadata.offset()); } }); // 本身定義一個類 producer.send(record, new MyCallback(record)); } catch (Exception e) { result = false; } return result; } }
import org.apache.kafka.clients.producer.Callback; import org.apache.kafka.clients.producer.RecordMetadata; /** * @ClassName MyCallback * @Description TODO * @Author lingxiangxiang * @Date 3:51 PM * @Version 1.0 **/ public class MyCallback implements Callback { private Object msg; public MyCallback(Object msg) { this.msg = msg; } @Override public void onCompletion(RecordMetadata metadata, Exception e) { System.out.println("topic = " + metadata.topic()); System.out.println("partiton = " + metadata.partition()); System.out.println("offset = " + metadata.offset()); System.out.println(msg); } }
在生產者測試類中,本身遇到一個坑, 就是最後本身沒有加sleep, 就是怎麼檢查本身的代碼都沒有問題, 可是最後就是無法發送成功消息, 最後加了一個sleep就能夠了, 由於主函數main已經執行完退出, 可是消息並無發送完成, 須要進行等待一下.固然, 你在生產環境中可能不會遇到這樣問題, 呵呵, 代碼以下
import static java.lang.Thread.sleep; /** * @ClassName MyKafkaProducerTest * @Description TODO * @Author lingxiangxiang * @Date 3:46 PM * @Version 1.0 **/ public class MyKafkaProducerTest { public static void main(String[] args) throws InterruptedException { MyKafkaProducer producer = new MyKafkaProducer(); boolean result = producer.sendMsg(); System.out.println("send msg " + result); sleep(1000); } }
import kafka.utils.ShutdownableThread; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import java.util.Arrays; import java.util.Collections; import java.util.Properties; /** * @ClassName MyKafkaConsumer * @Description TODO * @Author lingxiangxiang * @Date 4:12 PM * @Version 1.0 **/ public class MyKafkaConsumer extends ShutdownableThread { private KafkaConsumer<Integer, String> consumer; public MyKafkaConsumer() { super("KafkaConsumerTest", false); Properties properties = new Properties(); properties.put("bootstrap.servers", "192.168.51.128:9092,192.168.51.128:9093,192.168.51.128:9094"); properties.put("group.id", "mygroup"); properties.put("enable.auto.commit", "true"); properties.put("auto.commit.interval.ms", "1000"); properties.put("session.timeout.ms", "30000"); properties.put("heartbeat.interval.ms", "10000"); properties.put("auto.offset.reset", "earliest"); properties.put("key.deserializer", "org.apache.kafka.common.serialization.IntegerDeserializer"); properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); this.consumer = new KafkaConsumer<Integer, String>(properties); } @Override public void doWork() { consumer.subscribe(Arrays.asList("test2")); ConsumerRecords<Integer, String>records = consumer.poll(1000); for (ConsumerRecord record : records) { System.out.println("topic = " + record.topic()); System.out.println("partition = " + record.partition()); System.out.println("key = " + record.key()); System.out.println("value = " + record.value()); } } }
/** * @ClassName MyConsumerTest * @Description TODO * @Author lingxiangxiang * @Date 4:23 PM * @Version 1.0 **/ public class MyConsumerTest { public static void main(String[] args) { MyKafkaConsumer consumer = new MyKafkaConsumer(); consumer.start(); System.out.println("=================="); } }
前面的消費者都是以自動提交 offset 的方式對 broker 中的消息進行消費的,但自動提交 可能會出現消息重複消費的狀況。因此在生產環境下,不少時候須要對 offset 進行手動提交, 以解決重複消費的問題。
手動提交又能夠劃分爲同步提交、異步提交,同異步聯合提交。這些提交方式僅僅是 doWork()方法不相同,其構造器是相同的。因此下面首先在前面消費者類的基礎上進行構造 器的修改,而後再分別實現三種不一樣的提交方式。
import kafka.utils.ShutdownableThread; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import java.util.Arrays; import java.util.Collections; import java.util.Properties; /** * @ClassName MyKafkaConsumer * @Description TODO * @Author lingxiangxiang * @Date 4:12 PM * @Version 1.0 **/ public class MyKafkaConsumer extends ShutdownableThread { private KafkaConsumer<Integer, String> consumer; public MyKafkaConsumer() { super("KafkaConsumerTest", false); Properties properties = new Properties(); properties.put("bootstrap.servers", "192.168.51.128:9092,192.168.51.128:9093,192.168.51.128:9094"); properties.put("group.id", "mygroup"); // 這裏要修改爲手動提交 properties.put("enable.auto.commit", "false"); // properties.put("auto.commit.interval.ms", "1000"); properties.put("session.timeout.ms", "30000"); properties.put("heartbeat.interval.ms", "10000"); properties.put("auto.offset.reset", "earliest"); properties.put("key.deserializer", "org.apache.kafka.common.serialization.IntegerDeserializer"); properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); this.consumer = new KafkaConsumer<Integer, String>(properties); } @Override public void doWork() { consumer.subscribe(Arrays.asList("test2")); ConsumerRecords<Integer, String>records = consumer.poll(1000); for (ConsumerRecord record : records) { System.out.println("topic = " + record.topic()); System.out.println("partition = " + record.partition()); System.out.println("key = " + record.key()); System.out.println("value = " + record.value()); //手動同步提交 consumer.commitSync(); } } }
手動同步提交方式須要等待 broker 的成功響應,效率過低,影響消費者的吞吐量。異步提交方式是,消費者向 broker 提交 offset 後不用等待成功響應,因此其增長了消費者的吞吐量。
import kafka.utils.ShutdownableThread; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import java.util.Arrays; import java.util.Collections; import java.util.Properties; /** * @ClassName MyKafkaConsumer * @Description TODO * @Author lingxiangxiang * @Date 4:12 PM * @Version 1.0 **/ public class MyKafkaConsumer extends ShutdownableThread { private KafkaConsumer<Integer, String> consumer; public MyKafkaConsumer() { super("KafkaConsumerTest", false); Properties properties = new Properties(); properties.put("bootstrap.servers", "192.168.51.128:9092,192.168.51.128:9093,192.168.51.128:9094"); properties.put("group.id", "mygroup"); // 這裏要修改爲手動提交 properties.put("enable.auto.commit", "false"); // properties.put("auto.commit.interval.ms", "1000"); properties.put("session.timeout.ms", "30000"); properties.put("heartbeat.interval.ms", "10000"); properties.put("auto.offset.reset", "earliest"); properties.put("key.deserializer", "org.apache.kafka.common.serialization.IntegerDeserializer"); properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); this.consumer = new KafkaConsumer<Integer, String>(properties); } @Override public void doWork() { consumer.subscribe(Arrays.asList("test2")); ConsumerRecords<Integer, String>records = consumer.poll(1000); for (ConsumerRecord record : records) { System.out.println("topic = " + record.topic()); System.out.println("partition = " + record.partition()); System.out.println("key = " + record.key()); System.out.println("value = " + record.value()); //手動同步提交 // consumer.commitSync(); //手動異步提交 // consumer.commitAsync(); // 帶回調公共的手動異步提交 consumer.commitAsync((offsets, e) -> { if(e != null) { System.out.println("提交次數, offsets = " + offsets); System.out.println("exception = " + e); } }); } } }
如今你們的開發過程當中, 不少都用的是springboot的項目, 直接啓動了, 若是仍是用原生的API, 就是有點low了啊, 那kafka是如何和springboot進行聯合的呢?
<!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients --> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>2.1.1</version> </dependency>
在application.properties中加入以下配置信息:
spring.kafka.bootstrap-servers = 192.168.51.128:9092,10.231.128.96:9093,192.168.51.128:9094
spring.kafka.producer.acks = 0 spring.kafka.producer.key-serializer = org.apache.kafka.common.serialization.StringSerializer spring.kafka.producer.value-serializer = org.apache.kafka.common.serialization.StringSerializer spring.kafka.producer.retries = 3 spring.kafka.producer.batch-size = 4096 spring.kafka.producer.buffer-memory = 33554432 spring.kafka.producer.compression-type = gzip
spring.kafka.consumer.group-id = mygroup spring.kafka.consumer.auto-commit-interval = 5000 spring.kafka.consumer.heartbeat-interval = 3000 spring.kafka.consumer.key-deserializer = org.apache.kafka.common.serialization.StringDeserializer spring.kafka.consumer.value-deserializer = org.apache.kafka.common.serialization.StringDeserializer spring.kafka.consumer.auto-offset-reset = earliest spring.kafka.consumer.enable-auto-commit = true # listenner, 標識消費者監聽的個數 spring.kafka.listener.concurrency = 8 # topic的名字 kafka.topic1 = topic1
import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Value; import org.springframework.kafka.core.KafkaTemplate; @Service @Slf4j public class MyKafkaProducerServiceImpl implements MyKafkaProducerService { @Resource private KafkaTemplate<String, String> kafkaTemplate; // 讀取配置文件 @Value("${kafka.topic1}") private String topic; @Override public void sendKafka() { kafkaTemplate.send(topic, "hell world"); } }
@Component @Slf4j public class MyKafkaConsumer { @KafkaListener(topics = "${kafka.topic1}") public void listen(ConsumerRecord<?, ?> record) { Optional<?> kafkaMessage = Optional.ofNullable(record.value()); if (kafkaMessage.isPresent()) { log.info("----------------- record =" + record); log.info("------------------ message =" + kafkaMessage.get()); }