Kafka原理及應用(一)

一. Kafka簡介

(1) 消息中間件的兩種實現模式

JMS (Java Message Service) 對消息的發送和接收定義了兩種模式:java

  1. 點對點模式:消息的生產和消費者均只有一個,消息由生產者將消息發送到消息隊列(queue)中,而後消息消費者從隊列中取出消息進行消費,消息被取出後,queue中再也不保存該消息。正則表達式

  2. 發佈訂閱模式:消息的生產和消費者可能有多個,使用主題(Topic)來對消息進行分類,生產者將消息發送到主題,多個消費者都可以對這個主題進行消費。相似於對多個消費者作廣播。apache

     

     

    常見的消息中間件Active MQ, Rabbit MQ , Kafka中,只有Active 徹底實現了上述JMS的規範,Kafka則經過消費組和主題分區的方式讓發佈訂閱模型同時也具備了點對點模式的消息收發能力。事實上沒有徹底按上述JMS規範設計Rabbit MQ,和Kafka反而更優秀,其中Kafka在徹底按照分佈式的思想來設計的,在大數據和高可用上有着自然優點。bootstrap

(2) Kafka 基本架構

使用Kafka做爲消息中間件,咱們須要涉及到包括 Kafka集羣, 分佈式協調中心(Zookeeper), 生產者, 消費者 在內的四個部分對象。它們協同工做,讓消息高吞吐高可靠的存儲和流通。以下圖ubuntu

 

左圖簡單來說就是,消息生產者在Kafka集羣上訂閱主題後,能夠併發的向集羣發送消息,Kafka集羣接受到消息會按機制將消息存在不一樣的分區,存哪一個分區能夠由生產者指定,若是生產者未指定則按key來hash或者採用round robin的方式保存保存。服務器

中間的圖是一個左右兩圖整體歸納。網絡

右圖來自kafka官網,旨在說明kafka的消費都是以消費組的方式來消費,即便不指定也會默認建立一個消費組,不一樣的消費組對同一個主題的消費相互獨立,同一消費組內不一樣消費者不能重複消費某一分區,兩種極端的狀況就是:架構

  1. 若消費組內消費者數量和分區數量相同,則每一個消費者各自消費一個分區,一個分區一個消費者併發

  2. 若消費組內只有一個消費者,則該消費者須要消費全部分區,由於主題的完整消息時各分區消息的總和app

    假如主題分區數爲 N,消費組內消費者數量爲 M,且M > N ,能夠確定是組內有 M - N 個消費者沒法消費主題。

 

(3) Kafka 常見使用場景

  1. 消息傳輸:即用做消息中間件

  2. 行爲日誌跟蹤:

    Kafka 最先就是用於重建用戶行爲數據追蹤系統的。不少網站上的用戶操做都會以消息的形式發送到Kafka 的某個對應的topic 上。這些點擊流蘊含了巨大的商業價值, 事實上,目前就有不少創業公司使用機器學習或其餘實時處理框架來幫助收集並分析用戶的點擊流數據。鑑於這種點擊流數據量是很大的, Kafka 超強的吞吐量特性此時就有了用武之地

  3. 審計數據收集:

    不少企業和組織都須要對關鍵的操做和運維進行監控和審計。這就須要從各個運維應用程序處實時彙總操做步驟信息進行集中式管理。在這種使用場景下,你會發現Kafka 是很是適合的解決方案,它能夠便捷地對多路消息進行實時收集,同時因爲其持久化的特性,使得後續離線審計成爲可能。

  4. 日誌收集:

    這多是Kafka 最多見的使用方式了一一日誌收集彙總解決方案。每一個企業都會產生大量的服務日誌,這些日誌分散在不一樣的機器上。咱們可使用Kafka 對它們進行全量收集,井集中送往下游的分佈式存儲中(好比HDF S 等) 。比起其餘主流的日誌抽取框架Kafka 有更好的性能,並且提供了完備的可靠性解決方案,同時還保持了低延時的特色。

  5. 流處理:

    不少用戶接觸到Kafka 都是由於它的消息隊列功能。自0.10.0.0 版本開始, Kafka 社區推出了一個全新的流式處理組件Kafka Streams 。這標誌着Kafka 正式進入流式處理框架俱樂部。相比老牌流式處理框架Apache Storm 、Apache Samza,或是最近風頭正勁的Spark Strearming,抑或是Apache Flink, Kafka Streams 的競爭力如何?讓咱們拭目以待。


 

二. Kafka工做原理

(1) 重要術語解釋

  1. broker: Kafka把服務器的物理機稱爲 broker

  2. topic: 發佈訂閱的消息模式中對消息的分類, 對應某個業務需求的消息。

  3. partition: kakfa在保存主題消息數據時對主題的劃分,每一個partition分別保存主題的一部分數據,全部分區的數據的總和就是主題的完整消息。

  4. leader & follower: 至關於 master 和 slaver的關係,分別表明分佈式系統中的主節點和從節點。當主題的分區有多個副本(replication)時,有且僅有一個replication當選leader,其它的均爲follower, follower的數據的直接來源是leader而不是生產者。

  5. replication:分區的備份,當leader節點掛了後, 從replica中選舉出新的leader。Kafka中消息的讀寫都是分區的leader完成的,replica 只經過向leader fench數據保存備份並在leader宕機後重新當選leader,來保證高可用性。

  6. offset:生產者和消費者在寫和讀數據的時候,對消息寫讀進度的記錄。Kafka服務器將消息數據保存在磁盤log文件上,採用對磁盤的append順序寫讀的方式,offset至關於順序寫讀的偏移量

  7. 消費組:消費者使用一個消費者組名(即group.id )來標記本身, topic 的每條消息都只會被髮送到每一個訂閱它的消費者組的一個消費者實例上。kafka默認全部消費都使用消費組來消費。

  8. ISR: ISR 的全稱是in-sync replica,翻譯過來就是與leader replica 保持同步的replica 集合,只有這個集合中的replica 才能被選舉爲leader,也只有該集合中全部replica 都接收到了同一條消息, Kafka 纔會將該消息置於「己提交」狀態。

(2) 消息生產

  生產者在鏈接kafka服務器的時候通常都會指定以下參數, 經過以下參數的設定來建立KafkaProducer對象,而後使用該producer對象來發送消息。

1 props.put("bootstrap.servers", "10.118.65.203:9092");
2 props.put("acks", "all");
3 props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
4 props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

  其中當 bootstrap.servers 參數用來指定鏈接服務器的 地址與端口號, 一般kafka服務器會有多個broker, 該參數只須要指定其中的一個或者幾個便可,鏈接上kafka的任意broker以後能夠在zookeeper中的 /brokers/ids/ 下找到全部的id 以及  id對應的主機地址及端口號。

  生產者鏈接上broker以後, 可以獲得全部的broker 的id 及地址端口, 但一個生產者默認狀況下只能寫該 topic 下的一個partition,這時若是生產者在發送的 ProduceRecord 中指定了消息的 key, kafka會更具該key 來自行計算該寫入的partition編號。若生產者在創建鏈接後發送消息時未指定消息的key 值,能夠經過自定義實現Partitioner接口的自定義類來制定寫partion編號的規則。而後只須要在鏈接broker-list 時指定一個"partitioner.class"參數,該參數傳自定義類的全路徑名,類中覆蓋接口的partition方法便可.一種分區策略以下:

 1     @Override
 2     public int partition(String topic, Object keyObj, byte[] keyBytes,
 3                         Object value, byte[] valueBytes, Cluster cluster) {
 4         String key  = (String) keyObj;
 5         List<PartitionInfo> partitionInfos = cluster.availablePartitionsForTopic(topic);
 6 
 7         int partitionCount = partitionInfos.size();
 8         int myPartition = (1 == partitionCount) ? partitionCount : partitionCount - 1;
 9         boolean condition =  (key == null || key.isEmpty() || !key.contains("my"));
10         return condition ? random.nextInt(partitionCount - 1): myPartition;
11     }

  若生產者在創建鏈接時並未指定 partitioner.class 發消息時候也沒有指定key, 這時默認狀況下kafka會以round robin的機制選擇該topic下的分區。

 
  生產者向主題的某個分區寫數據, 是向該主題的該分區的leader寫的數據,而不向該分區的follower寫,可是能夠經過設置參數讓生產者寫leader的同時follower也獲得同步,這個參數就是前面提到的"acks"參數, 該參數的取值及對應的寫流程以下:
  ① acks = 0            :生產者向leader發送完消息以後, leader不向producer發送生產者發送確認信息
       ② acks = 1            :生產者向leader發送完消息以後, leader無需等follower備份數據,將數據寫入本地log後 直接向生產者發送確認信息
       ③ acks = -1或者all :生產者向leader發送完消息以後,leader將數據寫入本地log後 ,還須要繼續等follower寫好各自備份並向leader 發送ack以後才向生產者發送ack。
  當acks = -1 時 , 生產者寫數據的流程可參考下圖:
      

(3) 消息消費

   消費者客戶端在鏈接服務器建立consumer對象時,一般須要設置如下四個參數:

1 props.put("bootstrap.servers", "10.118.65.203:9092");
2 props.put("group.id", "test");
3 props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
4 props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

  以上參數是沒有默認值的,須要用戶自行指定。其中key,value 的解序列化類要求與生產者指定的序列化類對應。若是消費者不指定groupId,Kafka會自動的爲該消費者實例生成一個groupId。

  

  消費者客戶端在消費消息時會維護一個offset, 該offset就是當前消費者消費到分組-主題-分區下的什麼位置的記錄。例如當消費者A在消費完第N條消息後,自動或者手動的,消費者A會向kafka服務器提交一次位移,(注意這裏是N,由於offset從0開始計數,屬於第N+1條消息了),該offset會提交到log.dirs指定的路徑下中的某一個__consumer_offsets中(以下圖),這裏的__consumer_offsets 其實也是kafka本身建立的一個主題,__consumer_offsets-n 路徑裏面保存的也是index. log 文件。默認狀況下 kafka爲該__consumer_offsets建立了50個分區。用來保存多個主題,多個分區,以及多個組的場景下的消費者位移。以下圖

  

  kafka服務器在__consumer_offsets 主題下,實際保存的是消費者提交過來的offset的鍵值對,其中key是 group.id + topic + 分區號, value 爲offset的實際取值。每當更新一個key的最新的offest時,該topic就會寫入一條含有最新offset的消息,同時kafka也會按期的對topic作清理,即爲每一個消息key只保存含有最新offset。這樣每次消費者在讀取消息以前會先讀取本身的offset,而後再根據offset的值來讀取訂閱主題的topic消息,即便在消費者服務器啓動時沒有指定offset的值也能自動的從上一次消費的地方開始消費。

   

  kafka消費者在消費消息時若是不指定group.id,默認會爲單個的消費組指定消費組的ID, 這種意義上來說,kakfa消費者必定會經過組來消費。之因此這樣要求,緣由在於消費者組是kafka實現消息隊列消費和消費發佈訂閱兩種模式的重要設計,同時從性能角度上來考慮,消費者組也能夠提升消費消息的併發能力,且能夠實現訪問集羣的高伸縮和容錯能力。
               消費組內消費者的消費策略是:
               ①:對於同一個group ,topic的每條消息只能被髮送到group下的一個consumer 實例上
               ②:topic消息能夠被髮送到多個group 中
               ③:一個consumer group 能夠有若干個consumer 實例
               
               首先基於以上消費策略,對消費組內的消費者而言,topic的每條消息都會且僅會被一個消費者消費,這正是點對點模式。而對於不一樣group , 各個group均可以消費到主題的全部消息。而若是每一個group都只包含一個消費者,topic的消息全部消息就會被每個消費者消費到,這就是發佈訂閱模式。
               
               其次考慮一個消費組內有多個消費者的狀況,對於併發量很大的業務而言,若是隻採用單臺服務器做爲消費者,有可能給服務器形成太大的壓力從而影響其餘業務邏輯的執行,消費組內各消費者經過分別訪問不重複的分區能夠有效的將訪問壓力分攤到不一樣的服務器上。對於不少對順序要求不高的業務場景能夠有效的實現負載均衡,就算業務場景要求按順序消費也能夠經過在主題下爲消息分類使用不一樣的key 或者使用自定義partitioner 來實現帶負載均衡的分場景有序消費。
 
               最後消費組能夠實現訪問集羣的高伸縮和容錯能力, 這依賴於消費組的一種叫作rebalance 機制,rebalance 本質上也是一種協議,它規定了一個消費組是如何達成一致來分配訂閱topic的全部分區。假設某組有4個consumer 的實例,該組訂閱了一個包含20個分區的topic。 正常狀況下,kafka會爲每一個consumer平均分配5個分區。這個分配的過程就稱爲rebalance(重平衡)。對每一個組而言,kafka的某個broker會被選舉爲組協調者(coordinator),由該coordinator 負責對組的狀態進行管理, 它的主要職責就是當新成員到達時促成組內全部成員達成新的分區分配方案。
                消費組內觸發rebalance 的條件有如下幾個:
                ①:組成員發生變動,如新的consumer加入組,或者已有的consumer主動離開組,或者已有的consumer崩潰都會觸發rebalance。
                ②:組訂閱的topic 數發生變動,好比使用正則表達式的訂閱,當能匹配正則表達式的新topic被建立時就會觸發rebalance.
                ③:   組訂閱tipic的分區數發生變動,好比使用命令行腳本增長了訂閱topic的分區數。
 
                真實場景中引起rebalance最多見的就是第一種狀況,特別是consumer崩潰的狀況,這裏的崩潰不必定是consumer進程掛掉,或者consumer服務器宕機,當consumer 沒法指定在時間內完成消息的處理,coordinator就認爲該consumer已經崩潰,這樣也會引起新一輪的rebalance。
               例如:若是某個消費組group 中包含 5 個consumer, 且消費組訂閱了一個有20個分區的topic,按kafka組消費策略,這5個consumer會不帶重複的消費這20個分區,每一個consumer消費4個分區。如今假設consumer-2 在消費消息的時候因爲執行業務邏輯時間超時, 致使組協調者coordinator  認爲消費組服務崩潰,這時coordinator  就會對剩下的consumer 進行重rebalance , rebalance 以後,每一個consumer 消費5個分區,主題中的全部分區均會被消費到。如今通過一段時間的邏輯執行 consumer-2 又恢復了鏈接,這個時候coordinator  也會檢測到並對全部消費組成員進行新一輪的rebalance。 這樣咱們就能夠經過對消費組的重平衡管理消費者對分區的消費,從而實現服務消費的伸縮。
 

(4) 消息存儲

  Kafka採用將每一個分區的消息數據寫入磁盤文件的方式來存儲, 在config/server.properties 文件中log.dir 指定的路徑下,咱們能夠找到 [topic名-分區ID]格式的路徑,選擇任意一個路徑進入能夠看到以下文件列表:

   

  由圖能夠看到四個文件,其中一個log文件, 兩個index文件 和 一個epoch 文件;其中的log文件就是用來記錄消息數據的,兩個index文件用來對log文件的中的數據創建索引,方便消費者快速讀取到須要消費的數據。

  此外咱們看到index 和 log文件的文件名是全數字表示的,這是由於默認狀況下kafka主題的分區的leader會將數據順序的存儲到log文件中。可是kafka並非將全部的數據都存到一個log文件中,而是將數據順序的存入分段的log文件中,每一個段(segment) 默認分配的大小爲500M(能夠設定),當一個segment數據滿了以後會建立下一個segment的log文件, 而後在新的segment中繼續順序的保存數據。
 
  
         自0.11.0.0 版本以後, kafka 保存消息時採用了以下圖的消息格式. 與此同時還引入消息集合(batch)的概念。單條消息及消息batch的格式以下圖兩圖
                            V2 版本消息格式
  

                            Kafka 消息集格式

  

  

  消息中各分區的含義以下:
          1. 消息總長度: 在計算出消息總字節數後, 會將消息的總長度保存在該區域
          2. 屬性:採用固定1 字節保存屬性
          3. 時間戳增量:保存相對於 batch起始時間戳的差值(以前是採用固定長度的8字節來保存)
          4. 位移增量:保存消息位移與外層batch起始位移的差值(以前是採用固定長度的8字節來保存 )
          5. key長度: key的長度(key的長度是一個可變長度)
          6. key值:實際key的內容
          7. value長度: value的長度(value的長度也是一個可變長度)
          8. value值: 實際value的內容
          9. header個數:增長頭部信息用來作集羣間的消息路由的用途,或者用來承載消息的一些特定元數據信息,通常用不到
          10. header內容: 若無則不佔用字節

   此外有消息集的格式能夠看出,消息集的實際長度 = 61 + 消息長度 。所以咱們能夠簡單的驗證一下消息的數據存儲是否符合上述描述。

  

# 建立secondTopic 主題, 設定2個分區
bin/kafka-topics.sh --create --topic secondTopic --zookeeper localhost:2181 --partitions 2 --replication-factor 1

  生產者向secondTopic 發送消息前的狀態:

  

  開啓控制檯生產者,發送字符串 "1234567"  , 以後再發送 "hello" ,獲得兩個分區的log文件大小截圖以下:

bin/kafka-console-producer.sh --broker-list localhost:9092 --topic secondTopic

  

  從發送時間前後來看,顯然第一次發的"1234567" 保存在了分區0,第二次發的保存在分區1,第二次比第一次少了2個字節。根據上面分析的消息集大小計算方式可得"1234567" 保存在剛建立的消息集中的大小爲 = 消息體大小 + 61。

  消息體大小 = 1(屬性) + 1(時間戳增量) + 1(位移增量) + 1(key 長度) + 1(value 長度) + 7(value內容) + 1(header個數) +1(消息總字節數,須要計算才能肯定字節數) = 14 , 所以計算的理論消息集的大小就是 14 + 61 = 75. 能夠看到與實際存入log文件字節數一致。

   事實上採用消息集在消息併發量較大時能夠有效節省消息存儲空間,而且爲消息的查詢帶來便利。


 

三. Kafka的應用 (Demo 及 API介紹)

(1) Kafka 集羣服務搭建

  kafka環境的搭建十分簡單,只須要簡單的配置便可讓服務運行起來;能夠分兩步

  1. zookeeper 環境搭建:

   ① zookeeper下載:https://www.apache.org/dyn/closer.cgi/zookeeper/(鏡像地址)

   ②  zk下載後分別保存到 /opt/bigdata/zookeeper 路徑下,解壓後修改zookeeper 配置文件 zoo_sample.cfg 重命名爲 zoo.cfg 

   ③  編輯zoo.cfg 文件, 加入如下配置

dataDir=/tmp/data/zookeeper
server.1=ubuntu:2888:3888
server.2=ubuntu2:2888:3888
server.3=ubuntu3:2888:3888

  在三臺服務上的上述 dataDir 路徑分別保存一個myid文件,文件中分別保存上述配置中主機名對應前面的server的ID即(1,2,3); 而後分別在三臺服務器上啓動zookeeper。

  

  2. Kafka 環境搭建

  ① 下載地址:http://kafka.apache.org/downloads 選擇 kafka_2.12-1.0.2 版本 (下劃線後面的2.12爲 scala 語言的版本, 1.0.2 爲kafka版本)

  ②  修改kafka 解壓路徑下 config/server.properties 文件

zookeeper.connect=ubuntu:2181,ubuntu2:2181,ubuntu3:2181

  ③ 至此就能夠啓動kafka服務器了

./kafka-server-start.sh -daemon ../config/server.properties

   ④ 指令指定了執行守護進程,所以啓動成功後看不到任何結果, 能夠查看9092端口號是否在監聽,或者適應jps指令查看是否有kafka服務;接下來就是建立kafka主題了(默認副本數不能大於broker數)

bin/kafka-topics.sh --create --topic secondTopic --zookeeper ubuntu:2181 --partitions 2 --replication-factor 1

 ⑤ 啓動生產者向該主題寫消息, (控制檯生產者只能發送消息的value ,沒法發送key)

bin/kafka-console-producer.sh --broker-list ubuntu:9092 --topic secondTopic

 ⑥ 啓動消費者消費消息

bin/kafka-console-consumer.sh --topic secondTopic --bootstrap-server ubuntu:9092 --from-beginning

  其中控制檯消費組啓動的指令中的參數  --bootstrap-server 在老版本中使用的是  --zookeeper host:post, 可是自從消費組位移offset信息再也不保存到zookeeper以後,消費者不用再鏈接zookeeper,而改成直接鏈接kafka集羣。

  下面介紹java 工程鏈接Kafka服務器實現生產與消費的簡單實現。

(2) Kafka 生產與消費

   客戶端鏈接Kafka以及Zookeeper 實現生產者的發送以及消費者的拉取消費,須要引入以下Maven依賴:

 1         <!-- https://mvnrepository.com/artifact/org.apache.curator/curator-client -->
 2         <dependency>
 3             <groupId>org.apache.curator</groupId>
 4             <artifactId>curator-client</artifactId>
 5             <version>4.0.1</version>
 6         </dependency>
 7         <!-- https://mvnrepository.com/artifact/org.apache.curator/curator-framework -->
 8         <dependency>
 9             <groupId>org.apache.curator</groupId>
10             <artifactId>curator-framework</artifactId>
11             <version>4.0.1</version>
12         </dependency>
13         <!-- https://mvnrepository.com/artifact/org.apache.curator/curator-recipes -->
14         <dependency>
15             <groupId>org.apache.curator</groupId>
16             <artifactId>curator-recipes</artifactId>
17             <version>4.0.1</version>
18         </dependency>
19 
20         <!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka -->
21         <dependency>
22             <groupId>org.apache.kafka</groupId>
23             <artifactId>kafka_2.12</artifactId>
24             <version>${kafka.version}</version>
25         </dependency>
26 
27         <!-- https://mvnrepository.com/artifact/org.codehaus.jackson/jackson-mapper-asl -->
28         <dependency>
29             <groupId>org.codehaus.jackson</groupId>
30             <artifactId>jackson-mapper-asl</artifactId>
31             <version>1.9.13</version>
32         </dependency>

  生產者端高級API 實現:

 1       static private final String TOPIC = "firstTopic";
 2       static private final String BROKER_LIST = "192.168.0.102:9092";
 3              ....
 4     
 5         Properties props = new Properties();
 6         props.put("bootstrap.servers",BROKER_LIST);
 7         props.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");
 8         props.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer");
 9 
10         // acks 指定了 partition 中leader broker 在接收到producer 的消息後必須寫入的 副本數; acks 一般可能的取值有 0,1,all(-1)
11         //    acks = 0  則表示producer 徹底不理睬 leader broker 的處理結果, 在發送完一條消息後不等待leader broker 的返回結果就開始下一次發送
12         //          因爲不等待發送結果得  一般這種方式能夠有效提升producer的吞吐率;同時若是發送失敗了 producer是不知道的
13         //    acks = 1 表示設置 leader broker 在接收到producer 的消息並將消息寫入本地日誌,就能夠發送響應結果給producer
14         //          而無需等待其它ISR中的副本,這樣只要leader broker 一直存活,kafka 就可以保證這一條消息不丟失
15         //    acks = -1(all)  表示 leader broker 在接收到producer 的消息以後 不經須要將記錄寫入本地日誌,同時還要將記錄寫入ISR中全部的其它成員
16         //          纔會向 producer發送響應結果; 這樣只要ISR中存在一個存活的副本,消息記錄就不會丟失; 當副本數較多的 producer的吞吐量將變得較低
17         props.put("acks","1");
18         // 因爲網絡抖動或者leader選舉等緣由, producer 發送的消息可能會失敗,能夠在properties 參數中設置producer的重發次數
19         //  retries = 0 表示不作重發; producer 認爲的發送失敗 有可能並非真正的發送失敗,而是在broker提交後發送響應給producer producer因爲某種緣由
20         //  沒有成功接收到, 這將致使producer 向broker 發送重複的消息,所以retries > 0 時須要consumer在消費時對消息採起去重處理
21         props.put("retries","0");
22         //  producer 將發往同一分區的多條消息封裝進一個batch 中,當batch 滿了的時候,producer 會發送batch中的全部消息
23         //  能夠經過 配置batch.size 來設置 batch 容量的大小; batch 過大佔用過多內存,batch 太小
24         props.put("batch.size","323840");
25         // producer 在向broker發送消息時若是是等到 batch已經滿了再發送 有可能由於 producer的吞吐量比較小,batch須要等較長時間才能滿
26         // 這個時候若是等待就會話較長時間, linger.ms 參數就是用來設置這種消息發送延時的行爲的,linger 設置的較大會讓生產者發送消息的延時變大
27         // linger 設置的較小會讓生產者發送消息的吞吐量變小, 吞吐量和延時之間存在矛盾 須要權衡設置
28         props.put("linger.ms",2);
29         // buffer.memory 指定producer 用戶緩衝消息的內存大小,
30         props.put("buffer.memory",33554432);
31         // 設置單條消息最大大小
32         props.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG,1024*1024);
33         // 設置請求超時時間,producer 向 broker發送消息後 等待時長,若是超過這個時長 producer就會認爲響應超時了
34         props.put("max.block.ms",3000);
35 
36         // 指定使用 topic 下的哪個分區
37         props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, "kafka.partitioner.MyPartitioner");
38         Producer<String,String> producer = new KafkaProducer<>(props);
39 
40         // 使用 producer 發送後的回調函數 作後續處理
41 
42         // 測試 對topic設定partition
43         ProducerRecord<String,String> record = new ProducerRecord<>(TOPIC,"my non-test","partition setting");
44         producer.send(record);
45 
46         producer.close();

  消費者高級API實現:

 1      private static final String topicName = "firstTopic";
 2       private static final String groupId = "group1";
 3            ....
 4 
 5      Properties props = new Properties();
 6         // server, group.id, key.deserializer, value.deserializer四個參數無默認值,必須配置
 7         // 注意這裏 服務器地址配置的 主機名:端口號, 須要在研發環境修改hosts 文件
 8         props.put("bootstrap.servers","ubuntu1:9092");
 9         props.put("group.id",groupId);
10         props.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
11         props.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
12         // 是否容許consumer 位移自動提交
13         props.put("enable.auto.commit","true");
14         // consumer 位移自動提交時間間隔
15         props.put("auto.commit.interval.ms","1000");
16         // auto.offset.reset 設置爲 earliest 指定從最先的位移開始消費,可是若是以前有位移提交,則啓動時從位移提交處開始消費
17         // auto.offset.reset 一般還能夠設置爲 latest, 設置爲latest 指的從最新處位移開始消費
18         props.put("auto.offset.reset","earliest");
19 
20         KafkaConsumer<String,String> consumer = new KafkaConsumer<String, String>(props);
21         consumer.subscribe(Arrays.asList(topicName));
22 
23         try {
24             while(true){
25                 ConsumerRecords<String,String> records = consumer.poll(2000);
26                 for(ConsumerRecord<String,String> record : records){
27                     System.out.printf("訂閱消息 offset=%d,key=%s,value=%s%n",record.offset(),record.key(),record.value());
28                 }
29             }
30         } catch (Exception e) {
31             e.printStackTrace();
32         } finally {
33             consumer.close();
34         }

  以上生產者與消費者端的實現雖然簡單,可是在不少業務場景下是不知足需求的,須要咱們使用更多定製化的開發,譬如生產者如何設定分區規則,消費何時提交位移,這些後續文章再作進一步研究。

相關文章
相關標籤/搜索