kafka知識點詳解

第一部分:kafka概述

1、定義(消息引擎系統)

一句話歸納kafka的核心功能就是:高性能的消息發送與高性能的消息消費。java

kafka剛推出的時候是以消息引擎的身份出現的,它具備強大的消息傳輸效率和完備的分佈式解決方案,隨着版本更新,在kafka0.10.0.0版推出了流式處理組件——Kafka Streams,使kafka交由下游數據處理平臺作的事也能夠本身作,自此kafka在消息引擎的基礎上正式成爲了一個流式處理框架。但不管是消息引擎仍是流式處理平臺,kafka的處理架構從未質變,歸納以下正則表達式

image

                                       圖 kafka簡要架構圖算法

      總結就是三句話:數據庫

  • 生產者發送消息給kafka服務器;
  • 消費者從kafka服務器讀取消息;
  • kafka服務器依託zookeeper集羣進行服務的協調管理。

說到消息引擎,和它相似的術語就是消息隊列和消息中間件,我的感受稱kafka爲消息引擎更合理。由於「消息隊列」名字給出了一個很不許確的暗示,彷彿它就是以隊列的形式實現的;而消息中間件有點過分強調了「中間件」之嫌,使其真實用途不夠明顯。apache

消息引擎系統既然是在不一樣應用之間傳輸消息的系統,那麼在設計時須要重點考慮的關鍵因素就是:消息設計、傳輸協議設計和消息引擎範型。編程

消息設計bootstrap

消息引擎系統在設計消息時必定要考慮語義的清晰和格式上的通用性,消息一般都採用結構化的方式進行設計,好比XML格式、JSON格式的消息等,而kafka的消息是用二進制方式來保存的,但依然是結構化的消息。數組

傳輸協議設計緩存

廣義上的傳輸協議包括任何可以在不一樣系統間傳輸消息或是執行語義操做的協議或框架,好比RPC及序列化框架、Google的ProtoBuffers、阿里系的Dubbo等,而kafka本身設計了一套二進制的消息傳輸協議。(後面再講)傳輸協議做爲一個基礎構建塊,它服務於消息引擎系統實現的消息引擎範型。安全

消息引擎範型

最多見的兩種消息引擎範型是消息隊列模型和發佈/訂閱模型

消息隊列模型是基於隊列提供消息傳輸服務的,其定義了消息隊列、發送者、接收者,提供的是一種點對點的消息傳遞方式。一旦消息被消費就會從隊列中移除該消息,每條消息由一個發送者生產出來,且只被一個消費者處理——發送者和消費者是一對一的關係,相似生活中以前的電話接線生的工做

image

發佈/訂閱模型有主題的概念,一個主題能夠理解爲邏輯語義相近的消息的容器,該模型也定義了相似生產者、消費者的角色,即發佈者(publisher)和訂閱者(subscriber)。發佈者將消息生產出來發送到指定的topic中,全部訂閱了該topic的訂閱者均可以接受到該topic下的全部消息,相似生活中報紙的訂閱

image

kafka經過引入消息組(consumer group)來同時支持這兩種模型。(後面再講)

2、概要設計

kafka的設計初衷就是爲了解決互聯網公司超大量級數據的實時傳輸,概要設計關鍵點:吞吐量/延時、消息持久化、負載均衡和故障轉移、伸縮性。

一、吞吐量/延時

kafka的吞吐量就是指每秒可以處理的消息數或者每秒能處理的字節數。kafka的延時能夠表示客戶端發起請求與服務器處理請求併發送響應給客戶端之間的這段時間。在實際使用場景中,這兩個指標一般是一個矛盾體,但也不是等比例的此消彼長的關係。

kafka寫入端實現高吞吐量低延時的方法原理:利用操做系統的頁緩存和採用追加寫入消息的方式。

kafka會持久化全部數據到磁盤,可是本質上每次寫入操做都只是把數據寫入到操做系統的頁緩存中,而後由操做系統自行決定什麼時候把頁緩存中的數據寫回磁盤上。正是得益於這種對磁盤的使用方式,使得kafka的寫入操做是很快的。

這樣設計有3個主要優點:

  • 操做系統頁緩存是在內存中分配的,因此消息寫入的速度很是快;
  • kafka沒必要直接與底層的文件系統打交道,全部繁瑣的IO操做都交給操做系統來處理;
  • kafka寫入操做採用追加寫入的方式,避免了磁盤的隨機寫操做(對於普通的物理磁盤(非固態硬盤)隨機讀/寫的吞吐量的確很慢,可是磁盤的順序讀/寫操做實際上是很快的,速度甚至能夠匹敵內存的隨機I/O速度)。

kafka在設計時採用了追加寫入消息的方式,即只能在日誌文件末尾追加寫入新的消息,且不能修改已寫入的消息,所以它屬於典型的磁盤順序訪問型操做。

kafka消費端實現高吞吐量低延時的方法原理:kafka把消息寫入操做系統的頁緩存中,一樣地,kafka在讀取消息時會首先嚐試從操做系統的頁緩存中讀取,且大部分消息極可能依然存在於頁緩存中,若是命中就把消息經頁緩存直接發送到網絡的socket上,不用「穿透」到底層的物理磁盤上獲取消息,同時這個過程用到了大名鼎鼎的零拷貝(zero copy)技術。

補充說明:傳統的Linux操做系統中的I/O接口是依託於數據拷貝來是實現的,在零拷貝技術出現以前,一個I/O操做會將同一份數據進行屢次拷貝,數據傳輸過程當中還涉及到內核態與用戶態的上下文切換,CPU的開銷很是大,極大限制了操做系統高效進行數據傳輸的能力,而零拷貝技術很好的改善了這個問題。

【總結】kafka依靠下面4點達到了高吞吐量、低延時的設計目標:

  • 大量使用操做系統頁緩存,內存操做速度快且命中率高;
  • kafka不直接參與物理I/O操做,而是交由最擅長此事的操做系統來完成;
  • 採用追加寫入方式,摒棄了緩慢的磁盤隨機讀/寫操做;
  • 使用以sendfile爲表明的零拷貝技術增強網絡間的數據傳輸效率;

二、消息持久化

kafka是要持久化消息到磁盤上的,這樣作的好處是:

  • 解耦消息發送和消息消費:經過將消息持久化使得生產者方再也不須要直接和消費者方耦合,它只是簡單的把消息生產出來並交由kafka服務器保存便可;
  • 實現靈活的消息處理:能夠很方便的實現消息重演,即對於已經處理過的消息可能在將來某個時間點須要從新處理一次。

普通系統在實現持久化時可能會先儘可能使用內存,當內存資源耗盡時再一次性的把數據「刷盤」,而kafka則反其道而行之,全部數據都會當即被寫入文件系統的持久化日誌中,以後kafka服務器纔會返回結果給客戶端通知它們消息已被成功寫入。這樣能減小kafka程序對內存的消耗從而將節省出來的內存留給頁緩存使用,更進一步提高性能。

三、負載均衡和故障轉移

負載均衡就是指讓系統的負載根據必定的規則均衡地分配在全部參與工做的服務器上,從而最大限度的提高系統總體的運行效率。

對於kafka來講就是,每臺服務器broker都有均等的機會爲kafka的客戶提供服務,能夠把負載分散到全部集羣中的機器上。

kafka經過智能化的分區領導者選舉來實現負載均衡,kafka默認提供智能的leader選舉算法,可在集羣的全部機器上以均等機會分散各個partition的leader,從而總體上實現負載均衡。

kafka的故障轉移是經過使用會話機制實現的,每臺kafka服務器啓動後會以會話的形式把本身註冊到zookeeper服務器上。一旦該服務器運轉出現問題,與zookeeper的會話便不能維持從而超時失效,此時kafka集羣會選舉出另外一臺服務器來徹底替代這臺服務器繼續提供服務。

四、伸縮性

伸縮性是指向分佈式系統中增長額外的計算資源好比CPU、內存、存儲或帶寬等時吞吐量提高的能力。

若是一個CPU的運算能力是U,那麼兩個CPU的運算能力咱們天然但願是2U,便可以線性的擴容計算能力,可是因爲不少隱藏的「單點」瓶頸致使實際中幾乎不可能達到。阻礙線性擴容的一個很常見的因素就是狀態的保存,由於不管哪類分佈式系統,集羣中的每臺服務器必定會維護不少內部狀態,若是有服務器本身來保存這些狀態信息,則必需要處理一致性的問題。相反,若服務器是無狀態的,狀態的保存和管理交由專門的協調服務來作好比zookeeper,那麼整個集羣的服務器之間就無需繁重的狀態共享,就極大地下降了維護複雜度。假若要擴容集羣節點,只需簡單的啓動新的節點機器進行自動負載均衡就能夠了。kafka正是採用上述思想,將每臺kafka服務器上的狀態統一交由zookeeper保管,擴展kafka集羣時只需啓動新的kafka服務器便可。說明:kafka服務器上並非全部狀態都不保存,之保存了很輕量級的內部狀態,所以整個集羣間維護狀態一致性的代價很低。

3、kafka基本概念和術語

一、消息

消息由消息頭部、key和value組成。kafka中的消息格式由不少字段組成,其中不少字段都是用於管理消息的元數據字段能,對用戶是透明的。V1版本的消息格式以下圖(不一樣版本可能會有稍微差別):

image

圖 消息的完整格式

kafka使用緊湊的二進制字節數組來保存字段,也就是沒有多餘的比特位浪費。一般的Java堆上內存分配,即便有重排各個字段在內存的佈局以減小內存使用量的優化措施,但仍有部分字節用於補齊之用。同時,運行Java的操做系統一般都默認開啓了頁緩存機制,也就是說堆上保存的對象極可能在頁緩存中還保留一份,這就形成了極大的資源浪費。kafka在消息設計時直接使用緊湊的二進制字節數組ByteBuffer而不是獨立的對象,避開了繁重的java堆上內存分配。所以,咱們至少可以訪問多一倍的可用內存。還有一點,大量使用頁緩存而非堆內存還有一個好處——數據不丟失,即當出現kafka broker進程崩潰時,堆內存上的數據也一併消失,但頁緩存的數據依然存在。

二、主題和分區即topic和partition:

topic只是一個邏輯概念,表明一類消息,也能夠認爲是消息被髮送到的地方,一般咱們可使用topic來區分實際業務。

kafka中的topic一般都會被多個消費者訂閱,出於性能的考量,kafka並非topic-message的兩級結構,而是採用topic-partition-message的三級結構來分散負載。topic與partition關係以下圖.

image

      圖 topic和partition

kafka的partition實際上並無太多的業務含義,它的引入就是單純的爲了提高系統的吞吐量。

topic是有多個partition組成的,而partition是不可修改的有序消息序列,也能夠說是有序的消息日誌。每一個partition有本身專屬的partition號,一般是從0開始。用戶對partition惟一 能作的就是在消息序列的末尾追加寫入消息。partition上的每條消息都會被分配一個惟一的序列號——位移。位移值也是從0開始順序遞增的整數,經過位移信息能夠惟必定位到某partition下的一條信息。

三、位移offset

topic partition下的每條消息都被分配一個位移值,而在kafka消費者端也有位移的概念,注意區分。每條消息在某個partition的位移是固定的,但消費該partition的消費者的位移會隨着消費進度不斷前移,但不會超過前者。所以,從此討論位移的時候必定給出清晰的上下文環境。

綜上,能夠斷言kafka中的一條消息其實就是一個<topic, partition,offset>三元組

四、replica副本、leader、follower

kafka中的分區partition是有序消息日誌,那爲了實現高可靠性,經過冗餘機制——備份多份日誌,而這些備份日誌在kafka中被稱爲副本(replica),它們存在的惟一目的就是防止數據丟失。

kafka中的replica分爲兩個角色:領導者(leader)和追隨者(follower)(相似過去的主備的提法(Master-slave)),也即副本分爲兩類:領導者副本(leader replica)和追隨者副本(follower replica)。follower replica是不能提供服務給客戶端的,也即不負責響應客戶端發來的消息寫入和消息消費請求,它只是被動地向領導者副本獲取數據,保持與leader的同步,follower存在的惟一價值就是充當leader的候補,一旦leader replica所在的broker宕機,kafka會從剩餘的replica中選舉出新的leader繼續提供服務。

image

圖 kafka的leader-follower系統

五、ISR(與leader replica保持同步的replica集合)

好比一個partition能夠配置N個replica,那麼是否就覺得着該partition能夠容忍N-1個replica失效而不丟失數據呢?答案是「否」。

kafka爲partition動態維護一個replica集合,該集合中的全部replica保存的消息日誌都與leader replica保持同步狀態,只有這個集合中的replica才能被選舉爲leader,也只有該集合中全部replica都接收到了同一條消息,kafka纔會將該消息置於「已提交」狀態,即認爲這條消息發送成功。kafka能保證只要ISR集合中至少存在一個replica,那些「已提交」狀態的消息就不會丟失——兩個關鍵點:第一,ISR中至少存在一個「活着的」replica;第二,「已提交」消息。

正常狀況下,partition的全部replica都應該與leader replica保持同步,即全部的replica都在ISR中,但因各類緣由,小部分replica可能開始落後於leader replica的進度,當其滯後到必定程度時,kafka會將這些replica「踢出」ISR。相反,當這些replica從新「追上」了leader replica的進度時,kafka又會將它們加回到ISR中。這些都是自動維護的,不需人工干預。

4、kafka使用場景

一、消息傳輸:替代傳統的消息總線等。

二、網站行爲日誌追蹤:鑑於點擊流數據量很大,kafka超強的吞吐量特性就有了用武之地。網站上的用戶操做以消息的形式發送到kafka的某個對應topic中,而後使用機器學習或其餘實時處理框架來幫助收集並分析。

三、審計數據收集:從各個運維應用程序處實時彙總操做步驟信息進行集中式管理,同時支持持久化特性,方便後續離線審計。

四、日誌收集:各個機器上的分散日誌,經過kafka進行全量收集,並集中送往下游的分佈式存儲如hdfs中。相對於其餘主流的日誌抽取框架好比flume,kafka有更好的性能,並且提供了完備的可靠性解決方案,同時還有低延時的特色。

五、流式處理:新版本kafka才推出的流式處理組件kafka streams,相對於典型的流式處理框架如Apache Storm、Apache Samza、Spark、Apache Flink等競爭力如何,讓時間給出答案吧。

 5、版本注意事項

image

自1.0.0版本開始,kafka版本號正式從原來的四位升級到了如今的3位,格式是<major>.<minor>.<patch>。

在kafka世界中,一般把producer和consumer統稱爲客戶端即clients,這是與服務器端即broker相對應的。

選擇kafka版本時要注意的幾個分界點爲:0.8版本才加入了集羣間的備份機制;0.9.0.0版本開始才支持kafka security功能;0.10.0.0(含)以後的版本纔有了流式處理組件kafka streams;但建議選擇相對較新版本,功能更完善bug更少咯。

2014年kafka的創始人創辦了公司——Confluent.io,從事商業化Kafka工具開發以及提供實時流式處理方面的產品。另外,confluent還分爲開源版本和企業版本,企業版本中提供了對底層kafka集羣完整的可視化監控解決方案以及一些輔助系統幫助管理集羣,而開源版本與Apache社區的kafka並沒有太大區別。

 

第二部分:kafka線上環境部署

1、環境部署說明

2、集羣環境規劃

一、操做系統選型

除了現狀的確是Linux服務器數量最多,單論它與kafka自己的相適性,Linux也要比Windows等操做系統更加適合部署kafka,能想到的緣由有兩個:I/O模型的使用和數據網絡傳輸效率。

二、磁盤選型

使用機械硬盤徹底能夠知足kafka集羣的使用,固然SSD更好。

關於JBOD(一堆普通磁盤的意思)和RAID(磁盤陣列)的選擇,即便用一堆普通商用磁盤進行安裝仍是搭建專屬的RAID呢?答案是具體問題具體分析。追求性價比的公司能夠考慮使用JBOD.

三、磁盤容量規劃

主要考慮如下因素:

  • 新增消息數
  • 消息留存時間
  • 平均消息時間
  • 副本數
  • 是否啓用壓縮。

四、內存規劃

kafka對於Java堆內存的使用不是不少,kafka將消息寫入頁緩存,通常狀況下,broker所需的堆內存都不會超過6GB。

對於內存的規劃建議以下:

  • 儘可能分配更多的內存給操做系統的page cache;
  • 不要爲broker設置過大的堆內存;
  • page cache大小至少要大於一個日誌段的大小(?)。

五、CPU規劃

要追求多核而非高時鐘頻率。

六、帶寬選擇

規劃建議爲:

  • 儘可能使用高速網絡;
  • 根據自身網絡條件和帶寬來評估Kafka集羣機器數量;
  • 儘可能避免使用跨機房網絡。

七、kafka集羣涉及的主要幾類參數:

  • broker端參數
  • topic級別參數
  • GC配置參數
  • JVM參數
  • OS參數。

 

第三部分:producer開發

1、序言

kafka內置有Java版本producer,而當前Apache kafka支持的第三方clients庫有不少,這些第三方庫基本上都是由非Apache kafka社區的人維護的,用戶下載的是Apache kafka的話默認是不包含這些庫的,須要單獨下載對應的庫。

Apache kafka封裝了一套二進制通訊協議,對於producer而言,用戶幾乎可使用任意語言按照該協議進行編程,從而實現向kafka發送消息。

實際上內置的Java版本producer和上面列出的全部第三方庫在底層都是相同的實現原理,這組協議本質上爲不一樣的協議類型分別定義了專屬的緊湊二進制字節數組格式,而後經過socket發送給合適的broker,以後等待broker處理完成後返回響應給producer。這樣設計的好處就是具備良好的統一性——即全部的協議類型都是統一格式的,而且因爲是自定義的二進制格式,這套協議不依賴任何外部序列號框架,從而顯得輕量級也具備好的擴展性。

2、producer工做原理

說到producer,它的主要功能就是向某個topic的某個分區發送一條消息,因此它首先須要肯定到底要向topic的哪一個分區寫入消息——這就是分區器作的事。

kafka producer提供了一個默認的分區器,對於每條待發送的消息,若是該消息指定了key,那麼partitioner會根據key的哈希值來選擇目標分區;若這條消息沒有指定key,則partitioner使用輪訓的方式確認目標分區,從而最大限度的保證消息在全部分區上的均勻性。

固然,producer提供了用戶自行指定目標分區的API,即用戶在消息發送時跳過partitioner直接指定要發送到的分區。另外,producer也容許用戶實現自定義的分區策略而不使用默認的分區器。

第二,確認了目標分區以後,producer要作的第二個事就是尋找這個分區對應的leader,也就是該分區leader副本所在的kafka broker。所以,在發送消息時,producer也就有了多種選擇來實現消息發送(好比不等待任何副本的響應便返回成功、只是等待leader副本響應寫入操做後再返回成功等)。

producer簡言之就是將用戶待發送的消息封裝成一個ProducerRecord對象,而後使用KafkaProducer.send方法進行發送。具體過程爲:Producer首先使用一個線程(用戶主線程,也即用戶啓動Producer的線程)將待發送的消息封裝進一個ProducerRecord類實例,而後將其序列化以後發送給partitioner,再結合本地緩存的元數據信息由partitioner來肯定目標分區後一同發送到位於producer程序中的一塊內存緩衝區中。而KafkaProducer中的另外一個專門的sender I/O線程則負責實時地從該緩衝區中提取出準備就緒的消息封裝進一個批次(batch),統一發送給對應的broker。工做流程圖以下圖。

image

                       圖 Java版本producer的工做流程

3、構造producer(詳見Demo代碼)

一、構造producer實例大體步驟

1>構造一個java.util.Properties對象,而後至少指定bootstrap.servers 、key.serializer、value.serializer這三個屬性。

對於bootstrap.servers參數,若kafka集羣中機器數不少,可只需指定部分broker便可,producer會經過該參數找到並發現集羣中全部的broker。被髮送到broker端的任何消息的格式必須是字節數組,所以消息的各個組件必須首先作序列化,而後才能發送到broker。必定注意的是,key.serializer和value.serializer兩個參數必須是全限定名。

2>使用上一步中建立的Properties實例構造KafkaProducer對象。

        /*
         * 建立producer的時候同時指定key和value的序列化類,則不需在Properties中指定了。
         */
        Serializer<String> keySerializer = new StringSerializer();
        Serializer<String> valueSerializer = new StringSerializer();
        Producer<String, String> producer = new KafkaProducer<String, String>(props, keySerializer, valueSerializer);

3>構造待發送的消息對象ProducerRecord,指定消息要被髮送到的topic、分區及對應的key和value。注意,分區和key信息能夠不用指定,有kafka自行肯定分區。

4>調用KafkaProducer的send方法發送消息。

經過Java提供的Future同時實現了同步發送和異步發送+回調(Callback)兩種發送方式。而上文代碼清單中的調用方式實現了第三種發送方式——fire and forget即發送以後無論發送結果,在實際中不被推薦使用。真是使用場景中,同步和異步發送方式纔是最多見的兩種方式。

異步發送:實際上全部的寫入操做默認都是異步的。send方法提供了回調類參數來實現異步發送以及發送結果的響應,具體代碼以下:

        /*
         * 發送消息後的回調類Callback其實是一個Java接口,用戶能夠建立自定義的Callback實現類來處理消息發送後的邏輯,
         * 只要該類實現org.apache.kafka.clients.producer.Callback接口便可。
         */
        producer.send(record, new Callback() {
            @Override
            public void onCompletion(RecordMetadata metadata, Exception exception) {//兩個參數不會同時非空
                if(exception == null) {
                    //消息發送成功
                }else {

//執行錯誤處理邏輯

if(exception instanceof RetriableException) {
                    //處理可重試瞬時異常
               }else {
                    //處理不可重試異常
               }

                }
                
            }
        })

同步發送:調用Future.get()無限等待結果返回,即實現同步發送的效果,具體代碼以下:

producer.send(record).get();//使用Future.get會一直等待直至Kafka broker將發送結果返回給producer程序.

【說明】不管同步發送和異步發送都有可能失敗,當前kafka的錯誤類型包含兩類:可重試異常和不可重試異常。全部可重試異常都繼承自org.apache.kafka.common.errors.RetriableException抽象類。

5>關閉KafkaProducer。

producer程序結束時必定要關閉producer。提供有無參數的close方法和有超時參數close方法。在實際場景中,必定要慎用待超時參數的close方法。

二、producer的主要參數

acks:指定在給producer發送響應前,leader broker必需要確保已成功寫入該消息的副本數。有3個取值:0、1和all。

acks producer吞吐量 消息持久性 使用場景
0 最高 最差 一、徹底不關心消息是否發送成功;
二、容許消息丟失(好比統計服務器日誌等)
1 適中 適中 通常場景便可
all或-1 最差 最高 不能容忍消息丟失

buffer.memory:指定producer端用於緩存消息的緩衝區大小,單位是字節,咱們幾乎能夠認爲該參數指定的內存大小就是producer程序使用的內存大小。

compression.type:指定是否壓縮消息,默認是none。若要壓縮直接指定壓縮類型,目前kafka支持3中壓縮算法:GZIP、Snappy和LZ4,根據實際使用經驗producer結合LZ4的性能最好。

 4、消息自定義分區機制(詳見Demo代碼)

producer提供了默認的分區策略及對應的分區器供用戶使用,但有時候用戶可能想實現本身的分區策略,就須要用戶自定義實現。若要使用自定義分區機制,用戶須要作兩件事:

一、在producer程序中建立一個類,實現org.apache.kafka.clients.producer.Partitioner接口。主要分區邏輯在Partitioner.partition中實現;

二、在用於構造KafkaProducer的Properties對象中設置partitioner.class參數。

5、自定義序列化(詳見Demo代碼)

kafka支持用戶自定義消息序列化,須要完成的3件事:

一、定義數據對象格式;

二、建立自定義序列化類,實現org.apache.kafka.common.serialization.Serializer接口,在serializer方法中實現序列化邏輯;

三、在用於構造KafkaProducer的Properties對象中設置key.serializer或value.serializer.

6、producer攔截器(詳見Demo代碼)

實現定製化邏輯,實例實現了一個簡單的雙interceptor組成的攔截鏈。

7、無消息丟失配置

KafkaProducer.send方法僅僅把消息放入緩衝區中,由一個專屬I/O線程負責從緩衝區中提取消息並封裝進消息batch中,而後發送出去,而這個過程當中存在着數據丟失的窗口:若I/O線程發送以前producer崩潰,則存在緩衝區中的消息所有丟失了。採用同步發送不會丟數據,可是性能會不好,實際場景中不推薦使用,所以最好能有一份配置,既使用異步方式還能有效避免數據丟失。

一、producer端配置

block.on.buffer.full =  true

acks = all or –1

retries = Integer.MAX_VALUE

max.in.flight.requests.per.connection = 1

使用帶回調機制的send發送消息,即KafkaProducer.sent(record, callback)

Callback邏輯中顯式地當即關閉producer,使用close

二、broker端參數配置

unclean.leader.election.enable = false

replication.factor = 3

min.insync.replicas = 2

replication.factor > min.insync.replicas

enable.auto.commit = false

8、 producer多線程處理

存在兩種基本的使用方法:多線程單KafkaProducer實例 + 多線程多KafkaProducer實例。

                                                                       兩種KafkaProducer使用方式比較

  說明 優點 劣勢

單KafkaProducer實例

全部線程共享一個KafkaProducer實例

實現簡單,性能好

一、全部線程共享一個內存緩衝區,可能須要較多內存;二、一旦producer某個線程崩潰致使KafkaProducer實例被「破壞」,則全部用戶線程都沒法工做。

多KafkaProducer實例

每一個線程維護本身專屬的KafkaProducer實例

一、每一個用戶線程擁有專屬的KafkaProducer實例、緩衝區空間及一組對應的配置參數,能夠進行細粒度的調優;二、單個KafkaProducer崩潰不會影響其餘producer線程工做

須要較大的內存分配開銷

【建議】若是是對分區數很少的Kafka集羣而言,推薦使用第一種方法,即在多個producer用戶線程中共享一個KafkaProducer實例;若對那些擁有超多分區的集羣而言,採用第二種方法具備較高的可控性,方便producer的後續管理。

 

第四部分:consumer開發

1、序言

一、版本對比

                                                           新舊版本consumer對比

 

 

編程語言

位移管理

API包名

主要使用類

新版本

使用消費者組(consumer group)

Java

新版本把位移提交到kafka的一個內部topic(__consumer_offsets)上。注意這個topic名字的前面有兩個下劃線

org.apache.kafka.clients.consumer.*

KafkaConsumer

舊版本 使用low-level consumer,分high-level和low-level兩種API. Scala 舊版本把位移提交到zookeeper。 kafka.consumer.* ZookeeperConsumerConnector
SimpleConsumer

二、consumer分類
consumer分爲兩類:消費者組(consumer group)和獨立消費者(standalone consumer),其中前者是由多個消費者實例(consumer instance)構成一個總體進行消費,然後者則單獨執行消費操做。咱們在討論或開發consumer程序的時候,必須明確消費者上下文信息,即所使用的consumer的版本以及consumer的分類。

【消費者組】

  • 一個consumer group可能有若干個consumer實例,固然一個group只有一個實例也是容許的;
  • 對於同一個group而言,topic的每條消息只能被髮送到group下的一個consumer實例上;
  • topic消息能夠被髮送到多個group中。

    kafka就是經過consumer group實現了對基於隊列和基於發佈/訂閱兩種消息引擎模型的支持的:

  • 全部consumer實例都屬於相同group——實現基於隊列的模型,每條消息只會被一個consumer實例處理;
  • 全部consumer實例都屬於不一樣group——實現基於發佈/訂閱的模型。

group.id惟一標示一個consumer group,一個consumer實例能夠是一個線程或是運行在其餘機器上的進程。

三、位移相關說明

這裏的位移指的是consumer端的offset,與分區日誌中的offset是不一樣的含義。

不少消息引擎是把消費端的offset保存在服務器端(broker),這樣作的好處是實現簡單,但會存在下面的問題:

  • broker今後變成了有狀態的,增長了同步成本,影響伸縮性;
  • 須要引入應答機制來確認消費成功;
  • 因爲要保存許多consumer的offset,故必然引入複雜的數據結構,從而形成沒必要要的資源浪費。

而kafka選擇讓consumer group保存offset,只須要保存一個長整型數據便可。當前kafka consumer在內部使用一個map來保存期訂閱topic所屬分區的offset。

新版本consumer把位移提交到kafka的一個內部topic(__consumer_offsets)上,用戶應儘可能避免執行該topic的任何操做。

2、構造consumer(詳見Demo代碼)

一、構造consumer實例大體步驟

一、構造一個java.util.Properties對象,至少指定bootstrap.servers、key.deserializer、value.deserializer和group.id的值;

二、使用上一步建立的Properties實例構造KafkaConsumer對象;

三、調用KafkaConsumer.subscribe方法訂閱consumer group感興趣的topic列表;

注意subscribe方法不是增量式的,後續的subscribe調用會徹底覆蓋以前的訂閱語句。

四、循環調用KafkaConsumer.poll方法獲取封裝在ConsumerRecord的topic消息;

poll函數的參數是一個超時設定,一般若是consumer拿到了足夠多的可用數據,那麼它可當即從該方法返回;但若當前沒有足夠多的數據可供返回,consumer會處於阻塞狀態,這個超時參數即控制阻塞的最大時間。這個超時設定給予了用戶可以在consumer消費的同時按期去執行其餘任務(但不知道具體實現)。不然設定一個比較大的值甚至是Integer.MAX_VALUE是不錯的建議。

五、處理獲取到的ConsumerRecord對象;

拿到這些kafka消息後consumer一般都包含處理邏輯,也即consumer的目的不只是要從kafka處讀取消息,還要對獲取到的消息進行有意義的業務級處理。從kafka consumer的角度來講,poll方法返回即認爲consumer成功消費了消息,但咱們用戶的觀點一般認爲是執行完真正的業務級處理以後纔算消費完畢。所以,對於「consumer處理太慢」的問題要從兩個方面定位明確瓶頸:第一,若是是poll返回消息的速度過慢,那麼能夠調節相應的參數來提高poll方法的效率;第二,若消息的業務級處理邏輯過慢,則應該考慮簡化處理邏輯或把處理邏輯放入單獨的線程執行。

六、關閉KafkaConsumer。

 

consumer腳本命令:目前來講,kafka全部命令行腳本表示相同含義的參數都不是統一的名字,好比consumer腳本中的名字是bootstrap-server,到了producer腳本中變成了broker-list,而在建立主題腳本中又變成了zookeeper。

3、訂閱topic

一、訂閱列表

在consumer group訂閱topic列表使用下面語句便可:

consumer.subscribe(Arrays.asList("topic1","topic2","topic3"));

在獨立consumer(standalone consumer),訂閱列表則使用下面語句實現手動訂閱:

TopicPartition tp1 = new TopicPartition("topic-name", 0);
TopicPartition tp2 = new TopicPartition("topic-name", 1);
consumer.assign(Arrays.asList(tp1, tp2));

二、基於正則表達式訂閱topic

使用基於正則表達式的訂閱必須指定ConsumerRebalanceListener,該類是一個回調接口,用戶須要經過實現這個接口來是吸納consumer分區分配方案發生變動時的邏輯。

若是用戶使用的是自動提交(即設置enable.auto.commit=true),則一般不用理會這個類,用下面實現類便可。

consumer.subscribe(Pattern.compile("kafka-.*"), new NoOpConsumerRebalanceListener());

可是當用戶手動提交位移的,則至少要在ConsumerRebalanceListener實現類的onPartitionsRevoked方法中處理分區分配方案變動時的位移提交。

4、consumer.poll方法剖析(詳見Demo代碼)

一、poll的內部原理

kafka的consumer是用來讀取消息的,且要可以同時讀取多個topic的多個分區的消息。若要實現並行的消息讀取,一種方法是使用多線程的方式,爲每一個要讀取的分區都建立一個專有的線程去消費(這其實就是舊版本consumer採用的方式);另外一種方法是採用相似Linux I/O模型的poll或select等,使用一個線程來同時管理多個socket鏈接,即同時與多個broker通訊實現消息的並行讀取(這就是新版consumer最重要的設計改變)。

新版本Java consumer是一個多線程或說是一個雙線程的Java進程:建立KafkaConsumer的線程被稱爲用戶主線程,同時consumer在後臺會建立一個心跳線程。KafkaConsumer的poll方法在用戶主線程中運行,而一旦consumer訂閱了topic,全部的消費邏輯包括coordinator的協調、消費者組的rebalance以及數據的獲取都會在主邏輯poll方法的一次調用中被執行。

二、poll使用方法

KafkaConsumer.poll方法引入參數的做用:

  • 第一,超時設定;
  • 第二,是想讓consumer程序有機會按期「醒來」去作一些其餘的事情,這是超時設定的最大意義。

poll的使用方法總結以下:

  • consumer須要按期執行其餘子任務:推薦poll(較小超時時間)+運行標識布爾變量的方式;
  • consumer不須要按期執行子任務:推薦poll(MAX_VALUE)+捕獲WakeupException的方式。

須要按期執行的代碼:

        try {
            while (isRunning){//將isRunning標示爲volatile型,而後在其餘線程中設置isRunning=false來控制consumer的結束。
                // 四、循環調用KafkaConsumer.poll方法獲取封裝在ConsumerRecord的topic消息;
                ConsumerRecords<String, String> records = consumer.poll(1000);
                // 五、處理獲取到的ConsumerRecord對象;
                for (ConsumerRecord<String, String> record : records) {
                    LOG.info("topic = %s, partition = %d, offset = %d", record.topic(), record.partition(), record.offset());
                }
            }
        } finally {
            // 千萬不要忘記!!關閉KafkaConsumer。它不只會清除consumer建立的各類socket資源,還會通知消費者組coordinator主動離組從而更快的開啓新一輪rebalance。
            consumer.close();
        }

不須要按期執行的代碼:

        try {
            while (true){//設置爲true
                ConsumerRecords<String, String> records = consumer.poll(Long.MAX_VALUE);//在consumer程序未獲取到足夠多數據時無限等待,而後經過捕獲WakeupException異常來判斷consumer是否結束。須要在另外一個線程中調用consumer.wakeup()方法來觸發consumer的關閉。
                // 五、處理獲取到的ConsumerRecord對象;
                for (ConsumerRecord<String, String> record : records) {
                    LOG.info("topic = %s, partition = %d, offset = %d", record.topic(), record.partition(), record.offset());
                }
            }

} catch(WakeupException e) {

// 此處忽略此異常的處理

}finally {

            // 千萬不要忘記!!關閉KafkaConsumer。它不只會清除consumer建立的各類socket資源,還會通知消費者組coordinator主動離組從而更快的開啓新一輪rebalance。
            consumer.close();
        }

說明:KafkaConsumer不是線程安全的,但有一個例外就是wakeup方法,用戶能夠安全地在另外一個線程中調用consumer.wakeup(). 其餘KafkaConsumer方法都不能同時在多線程中使用。

5、交付語義

offset對於consumer很是重要,由於它是實現消息交付語義保證的基石,常見的3種消息交付語義保證以下:

  • 最多一次(at most once)處理語義:消息可能丟失,但不會被重複處理;
  • 最少一次(at least once)處理語義:消息不會丟失,但可能被處理屢次;
  • 精確一次(exactly once)處理語義:消息必定會被處理且只會被處理一次。

6、自動提交和手動提交(詳見Demo代碼)

consumer默認是自動提交的,優點是下降用戶的開發成本,劣勢是用戶不能細粒度地處理位移的提交。

所謂的手動位移提交就是用戶自行肯定消息什麼時候被真正處理完並能夠提交位移。一個典型的應用場景是:用戶須要對poll方法返回的消息集合中的消息執行業務級處理,用戶想要確保只有消息被真正處理完成後再提交位移,若是使用自動位移提交則沒法保證這種時序性,這種狀況就必須使用手動位移提交。

設置使用手動位移提交的步驟:

  • 在構建KafkaConsumer時設置enable.auto.commit=false;
  • 而後調用commitSync或commitAsync方法便可。

                                                      自動提交和手動提交的比較

  使用方法 優點 劣勢 交付語義保證 使用場景
自動提交 默認不用配置或顯示設置enable.auto.commit=true 開發成本低,簡單易用 沒法實現精確控制,位移提交失敗後不易處理 可能形成消息丟失,最多實現「最少一次」處理語義 對消息交付語義無需求,容忍必定的消息丟失
手動提交 設置enable.auto.commit=false;手動調用commitSync或commitAsync提交位移 可精確控制位移提交行爲 額外的開發成本,須自行處理位移提交 易實現「最少一次」處理語義,依賴外部狀態鵝考實現「精確一次」處理語義 消息處理邏輯重,不容許消息消失,至少要求「最少一次」處理語義

手動提交位移API進一步細分爲同步手動提交和異步手動提交,即commitSync和commitAsync方法。當用戶調用上面兩個方法時,consumer會爲全部它訂閱的分區提交位移。它們還有帶參數的重載方法。用戶調用帶參數的方法時須要指定一個Map顯示地告訴kafka爲哪些分區提交位移。consumer只對它所擁有的分區作提交時更合理的行爲,所以跟推薦帶參數的重載方法。下面是一段典型的手動提交部分分區位移的代碼:

//下面代碼按照分區級別進行位移提交。它首先對poll方法返回的消息集合按照分區進行分組,而後每一個分區下的消息待處理完成後構造一個Map對象統一提交位移,從而實現了細粒度控制位移提交。

try {

            while (running) {
                ConsumerRecord<String, String> records = consumer.poll(1000);
                for (TopicPartition partition : records.partitions()) {
                    List<ConsumerRecord<String, String>> partitionRecords = records.records(partition);
                    for (ConsumerRecord<String, String> record : partitionRecords) {
                        System.out.println(record.offset() + ": " + record.value());
                    }
                    long lastOffset = partitionRecords.get(partitionRecords.size() - 1).offset();
                    consumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(lastOffset + 1)));
                }
            }
        } finally {
            consumer.close();
        }

7、rebalance監聽器(詳見Demo代碼)

新版本consumer默認是把位移提交到__consumer_offsets中,其實kafka也支持用戶把位移提交到外部存儲中,好比數據庫中。若要實現這個功能,用戶就必須使用rebalance監聽器。

【注意】使用rebalance監聽器的前提是用戶使用consumer group,若使用的是consumer或是直接手動分配分區,那麼rebalance監聽器是無效的。

8、解序列化(詳見Demo代碼)

kafka consumer從broker端獲取消息的格式是字節數組。自定義解序列化的步驟(同自定義序列化相似):

  • 定義或複用serializer的數據對象格式;
  • 建立自定義deserializer類,令其實現org.apache.kafka.common.serialization.Deserializer接口,在deserializer方法中實現deserialize邏輯;
  • 在構造kafkaConsumer的Properties對象中設置key.deserializer或value.deserializer爲上一步的實現類全限定名。

9、多線程消費實例(詳見Demo代碼)

下面介紹兩種多線程消費的方法及實例代碼:

一、每一個線程維護一個KafkaConsumer

在這個方法中用戶建立多個線程來消費topic數據,每一個線程都會建立專屬於該線程的KafkaConsumer實例,如圖.

image

由圖可知,consumer group由多個線程的KafkaConsumer組成,每一個線程負責消費固定數目的分區。

 

二、單KafkaConsumer實例+多worker線程

本方法將消息的獲取與消息的處理解耦,把後者放入單獨的工做者線程中,即所謂的woker線程中,同時在全局維護一個或若干個consumer實例執行消息獲取任務,以下圖。

image

本例使用全局的kafkaConsumer實例執行消息獲取,而後把獲取到的消息集合交給線程池中的worker線程執行工做,以後worker線程完成處理後上報位移狀態,由全局consumer提交位移。

 

 

第五部分:管理kafka集羣

待補充。。。。

 

第六部分:監控kafka集羣

待補充。。。。

相關文章
相關標籤/搜索