Kafka快速入門(四)——Kafka高級功能

Kafka快速入門(四)——Kafka高級功能

1、Kafka無消息丟失配置

一、Kafka消息丟失簡介

Kafka只針對已提交消息(committed message)作有限度的持久化保證。
當Kafka的若干個Broker成功地接收到一條消息並寫入到日誌文件後,會通知生產者程序相應消息已成功提交。多少個Broker成功保存消息算是已提交,能夠由Producer參數或Broker端參數指定。
有限度的持久化保證是指Kafka不可能保證在任何狀況下都作到不丟失消息,Kafka不丟消息的前提條件是保存消息的N個Kafka Broker 中至少有1個在線。數據庫

二、生產者丟失消息

目前Kafka Producer是異步發送消息的,producer.send(msg)接口一般發送消息後會當即返回,但此時不能認爲消息已經發送成功,由於網絡瞬時抖動可能致使消息沒有發送到Broker端,或者消息自己不合格致使Broker拒絕接收(如消息太大,超過Broker承受能力)。所以,Producer必須使用帶有回調通知的producer.send(msg, callback)接口。callback(回調)能夠準確地通知Producer消息是否真的提交成功,若是消息提交失敗,能夠針對性地進行處理。若是由於網絡瞬時抖動致使發送失敗,僅僅讓Producer重試就能夠;若是消息不合格致使發送失敗,能夠調整消息格式後再次發送。apache

三、消費者丟失消息

Kafka 中Consumer Offset表示Consumer當前消費到的Topic分區的位置。
Kafka快速入門(四)——Kafka高級功能
對於Consumer A,offset是9;Consumer B的offset是11。
Kafka中Consumer端的消息丟失是offset沒有正確更新形成的。解決Consumer端的消息丟失的方法是維持先消費消息,再更新offset的順序,能夠最大限度地保證消息不丟失,但可能帶來消息的重複處理問題。
Kafka中,若是Consumer從Kafka獲取到消息後開啓多個線程異步處理消息,而Consumer自動地向前更新位移。假如其中某個線程運行失敗,所負責的消息沒有被成功處理,但位移已經被更新,所以消息對於Consumer已經丟失。
若是多線程異步處理消費消息,Consumer程序不能開啓自動提交位移,須要要應用程序手動提交位移。單個Consumer程序使用多線程消費消息代碼極難實現,很難正確地處理位移更新,即很容易避免消費消息丟失,但極易出現消息被重複消費。編程

四、Kafka無丟失消息解決方案

實現Kafka無丟失消息的解決方案以下:
(1)必須使用producer.send(msg, callback)接口發送消息。
(2)Producer端設置acks參數值爲all。acks參數值爲all表示ISR中全部Broker副本都接收到消息,消息纔算已提交。
(3)設置Producer端retries參數值爲一個較大值,表示Producer自動重試次數。當出現網絡瞬時抖動時,消息發送可能會失敗,此時Producer可以自動重試消息發送,避免消息丟失。
(4)設置Broker端unclean.leader.election.enable = false,unclean.leader.election.enable參數用於控制有資格競選分區Leader的Broker。若是一個Broker落後原Leader太多,那麼成爲新Leader必然會形成消息丟失。所以,要將unclean.leader.election.enable參數設置成false。
(5)設置Broker端參數replication.factor >= 3,將消息保存多份副本。
(6)設置Broker參數min.insync.replicas > 1,保證ISR中Broker副本的最少個數,在acks=-1時才生效。設置成大於1能夠提高消息持久性,生產環境中不能使用默認值 1。
(7)必須確保replication.factor > min.insync.replicas,若是二者相等,那麼只要有一個副本掛機,整個分區沒法正常工做。推薦設置成replication.factor = min.insync.replicas + 1。
(8)確保消息消費完成再提交。設置Consumer端參數enable.auto.commit爲false,並採用手動提交位移的方式。bootstrap

五、消息發送的可靠性

生產者向Kafka發送消息時,能夠選擇須要的可靠性級別。經過request.required.acks參數值能夠設置可靠性級別。
0值:異步發送。生產者向Kafka發送消息而不須要Kafka反饋成功 ACK,效率最高,可靠性最低。可能會存在消息丟失的狀況:在傳輸過程當中會出現消息丟失;在Broker內部會出現消息丟失;會出現寫入到Kafka中的消息的順序與生產順序不一致的狀況。
1值:同步發送。生產者發送消息給Kafka,Broker的Partition Leader在收到消息後立刻發送成功ACK(無需等待ISR中的Follower同步),生產者收到後知道消息發送成功,而後會再發送消息。若是一直未收到Kafka的ACK,則生產者會認爲消息發送失敗,會重發消息。
若是沒有收到ACK,必定能夠確認消息發送失敗,而後能夠重發;但即便收到ACK,也不能保證消息必定就發送成功。
-1值:同步發送。生產者發送消息給Kafka,Kafka Broker收到消息後要等到ISR列表中的全部Follower都同步完消息後,才向生產者發送成功ACK。若是一直未收到Kafka的ACK,則認爲消息發送失敗,會自動重發消息,會出現消息重複接收的狀況。安全

2、Kafka攔截器

一、攔截器簡介

攔截器基本思想是容許應用程序在不修改邏輯的狀況下,動態地實現一組可插拔的事件處理邏輯鏈,可以在主要業務操做的先後多個時間點上插入對應的攔截邏輯。
Kafka 0.10.0.0版本開始引入攔截器,Kafka攔截器分爲生產者攔截器和消費者攔截器。生產者攔截器容許在發送消息前以及消息提交成功後插入攔截器邏輯;而消費者攔截器支持在消費消息前以及提交位移後編寫特定邏輯。Kafka攔截器支持鏈的方式,便可以將一組攔截器串連成一個大的攔截器,Kafka會按照添加順序依次執行攔截器邏輯。
假設要在生產消息前執行兩個前置動做:一個是爲消息增長一個頭信息,封裝發送消息的時間,一個是更新發送消息數字段。將兩個攔截器串聯在一塊兒統一指定給Producer後,Producer會按順序執行兩個前置動做,而後再發送消息。
Kafka攔截器設置是經過參數配置的,生產者和消費者兩端有一個相同的參數,名字叫interceptor.classes,指定的是一組類的列表,每一個類就是特定邏輯的攔截器實現類,指定攔截器類時須要指定全限定名,即 full qualified name。網絡

二、Producer攔截器

Producer端攔截器實現類都要繼承org.apache.kafka.clients.producer.ProducerInterceptor接口。ProducerInterceptor接口有兩個核心的方法:
onSend:在消息發送前被調用。
onAcknowledgement:在消息成功提交或發送失敗後被調用。onAcknowledgement 調用要早於發送回調通知callback的調用。onAcknowledgement與onSend 方法不是在同一個線程中被調用,所以若是兩個方法中使用了某個共享可變對象,要保證線程安全。
假設第一個攔截器的完整類路徑是com.yourcompany.kafkaproject.interceptors.AddTimeStampInterceptor,第二個攔截器是com.yourcompany.kafkaproject.interceptors.UpdateCounterInterceptor,Producer指定攔截器的Java代碼示例以下:多線程

Properties props = new Properties();
List<String> interceptors = new ArrayList<>();
interceptors.add("com.yourcompany.kafkaproject.interceptors.AddTimestampInterceptor"); // 攔截器1
interceptors.add("com.yourcompany.kafkaproject.interceptors.UpdateCounterInterceptor"); // 攔截器2
props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, interceptors);

三、Consumer攔截器

Consumer攔截器的實現類要實現org.apache.kafka.clients.consumer.ConsumerInterceptor接口,ConsumerInterceptor有兩個核心方法。
onConsume:在消息返回給Consumer程序前調用。在開始正式處理消息前,攔截器會先作一些處理,再返回給Consumer。
onCommit:Consumer在提交位移後調用,能夠進行一些打日誌操做等。架構

四、Kafka攔截器應用

Kafka攔截器能夠應用於包括客戶端監控、端到端系統性能檢測、消息審計等多種功能在內的場景。
Kafka默認提供的監控指標是針對單個客戶端或Broker的,很難從具體的消息維度去追蹤集羣間消息的流轉路徑。同時,如何監控一條消息從生產到最後消費的端到端延時是不少Kafka用戶迫切須要解決的問題。但在應用代碼中編寫統一的監控邏輯會增長複雜度,而且將監控邏輯與主業務邏輯耦合也是軟件工程中不提倡的作法。經過實現攔截器的邏輯以及可插拔的機制,可以快速地檢測、驗證以及監控集羣間的客戶端性能指標,特別是可以從具體的消息層面上收集性能指標數據。
對於消息審計(message audit),若是把Kafka做爲一個私有云消息引擎平臺向全公司提供服務,會涉及多租戶以及消息審計的功能。做爲私有云的PaaS提供方,須要要可以隨時查看每條消息是哪一個業務方在什麼時間發佈的,被哪些業務方在什麼時刻消費。能夠經過在攔截器內實現相應的消息審計邏輯,強行規定全部接入Kafka服務的客戶端程序必須設置消息審計攔截器。異步

3、Kafka多線程消費方案

一、Kafka Consumer線程設計

Kafka 0.10.1.0版本開始,Kafka Consumer採用雙線程設計,即用戶主線程和心跳線程。用戶主線程是啓動Consumer應用程序main方法的線程,而新引入的心跳線程(Heartbeat Thread)只負責按期給對應的Broker機器發送心跳請求,以標識消費者應用的存活性(liveness)。心跳線程的引入將心跳頻率與主線程調用KafkaConsumer.poll方法的頻率分開,從而解耦真實的消息處理邏輯與消費者組成員存活性管理。但因爲消息獲取邏輯依然是在用戶主線程中完成的,所以,依然能夠安全地認爲Kafka Consumer是單線程設計。ide

二、Kafka Consumer多線程設計

KafkaConsumer類不是線程安全的 (thread-safe)。全部的網絡IO處理都在用戶主線程中,在使用過程當中必需要確保線程安全。不能在多個線程中共享同一個KafkaConsumer實例,不然程序會拋出ConcurrentModificationException異常。KafkaConsumer的wakeup()方法是線程安全的,能夠在其餘線程中安全地調用KafkaConsumer.wakeup()來喚醒 Consumer。

三、多線程方案一

消費者程序啓動多個線程,每一個線程維護專屬的Kafka Consumer實例,負責完整的消息獲取、消息處理流程。
Kafka快速入門(四)——Kafka高級功能
優勢:
(1)實現簡單,使用多個線程並在每一個線程中建立專屬的KafkaConsumer實例便可。
(2)多個線程間沒有任何交互,不用考慮線程安全開銷。
(3)每一個線程使用專屬的Kafka Consumer實例來執行消息獲取和消息處理邏輯,所以,Kafka主題中的每一個分區都能保證只被一個線程處理,很容易實現分區內的消息消費順序。
缺點:
每一個線程都須要維護本身的KafkaConsumer實例,必然會佔用更多的系統資源,好比內存、TCP鏈接等。
可使用的線程數受限於Consumer訂閱主題的總分區數,在一個消費者組中,每一個訂閱分區都只能被組內的一個消費者實例所消費。
每一個線程須要完整地執行消息獲取和消息處理邏輯,若是消息處理邏輯很重,形成消息處理速度慢,容易產生沒必要要的Rebalance,從而引起整個消費者組的消費停滯。

public class KafkaConsumerRunner implements Runnable {
     private final AtomicBoolean closed = new AtomicBoolean(false);
     private final KafkaConsumer consumer;

     public void run() {
         try {
             consumer.subscribe(Arrays.asList("topic"));
             while (!closed.get()) {
      ConsumerRecords records = 
        consumer.poll(Duration.ofMillis(10000));
                 //  執行消息處理邏輯
             }
         } catch (WakeupException e) {
             // Ignore exception if closing
             if (!closed.get()) throw e;
         } finally {
             consumer.close();
         }
     }

     // Shutdown hook which can be called from a separate thread
     public void shutdown() {
         closed.set(true);
         consumer.wakeup();
     }

}

四、多線程方案二

消費者程序使用單或多線程獲取消息,同時建立多個消費線程執行消息處理邏輯。獲取消息的線程能夠是單線程,也能夠是多線程,每一個線程維護專屬的Kafka Consumer實例,處理消息則交由特定的線程池處理,從而實現消息獲取與消息處理的真正解耦。
Kafka快速入門(四)——Kafka高級功能
優勢:
提升系統伸縮性。將任務切分紅消息獲取和消息處理兩個部分,分別由不一樣的線程處理,能夠獨立地調節消息獲取的線程數以及消息處理的線程數,而沒必要考慮二者之間是否相互影響。若是消費獲取速度慢,那麼增長消費獲取的線程數便可;若是消息的處理速度慢,那麼增長Worker線程池線程數便可。
缺點:
實現難度要大,須要分別管理兩組線程。
沒法保證分區內的消費順序。因爲使用兩組線程,消息獲取能夠保證分區內的消費順序,但消息處理時Worker線程池將沒法保證分區內的消費順序。
使用兩組線程,使得整個消息消費鏈路被拉長,最終致使正確位移提交會變得異常困難,可能會出現消息的重複消費。

private final KafkaConsumer<String, String> consumer;
private ExecutorService executors;
...

private int workerNum = ...;
executors = new ThreadPoolExecutor(
  workerNum, workerNum, 0L, TimeUnit.MILLISECONDS,
  new ArrayBlockingQueue<>(1000), 
  new ThreadPoolExecutor.CallerRunsPolicy());

...
while (true)  {
  ConsumerRecords<String, String> records = 
    consumer.poll(Duration.ofSeconds(1));
  for (final ConsumerRecord record : records) {
    executors.submit(new Worker(record));
  }
}
..

4、Consumer Group消費進度監控

一、消費者消費進度

Kafka Consumer消費的滯後程度(Consumer Lag)是指消費者當前落後於生產者的程度。
Lag的單位是消息數,Kafka監控Lag的層級是在分區上的。Lag直接反映消費者的運行狀況,正常工做的消費者,Lag值應該很小,甚至是接近於0的,表示消費者可以及時地消費生產者生產出來的消息,滯後程度很小。若是消費者Lag值很大,代表沒法跟上生產者的速度,最終Lag會愈來愈大,從而拖慢下游消息的處理速度。
Kafka監控消費者進有3種方法:
(1)使用Kafka自帶的命令行工具kafka-consumer-groups腳本。
(2)使用Kafka Java Consumer API編程。
(3)使用Kafka自帶的JMX監控指標。

二、kafka-consumer-groups監控消費進度

kafka-consumer-groups腳本是Kafka提供的最直接的監控消費者消費進度的工具,也可以監控獨立消費者(Standalone Consumer)的Lag。
kafka-consumer-groups.sh --bootstrap-server &lt;Kafka broker&gt; --describe --group &lt;group id&gt;
kafka-consumer-groups腳本的輸出信息會按照消費者組訂閱主題的分區進行展現,每一個分區一行數據;其次,除了主題、分區等信息外,會彙報每一個分區當前最新生產的消息的位移值(即LOG-END-OFFSET 列值)、該消費者組當前最新消費消息的位移值(即 CURRENT-OFFSET 值)、LAG 值(前二者的差值)、消費者實例 ID、消費者鏈接 Broker 的主機名以及消費者的 CLIENT-ID 信息。

三、Kafka Java Consumer API監控消費進度

Kafka 2.0.0版本開始,Kafka Consumer API分別提供了查詢當前分區最新消息位移和消費者組最新消費消息位移兩組方法,能夠用於計算消費者的Lag。

public static Map<TopicPartition, Long> lagOf(String groupID, String bootstrapServers) throws TimeoutException {
        Properties props = new Properties();
        props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        try (AdminClient client = AdminClient.create(props)) {
            ListConsumerGroupOffsetsResult result = client.listConsumerGroupOffsets(groupID);
            try {
                Map<TopicPartition, OffsetAndMetadata> consumedOffsets = result.partitionsToOffsetAndMetadata().get(10, TimeUnit.SECONDS);
                props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); // 禁止自動提交位移
                props.put(ConsumerConfig.GROUP_ID_CONFIG, groupID);
                props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
                props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
                try (final KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props)) {
                    Map<TopicPartition, Long> endOffsets = consumer.endOffsets(consumedOffsets.keySet());
                    return endOffsets.entrySet().stream().collect(Collectors.toMap(entry -> entry.getKey(),
                            entry -> entry.getValue() - consumedOffsets.get(entry.getKey()).offset()));
                }
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                // 處理中斷異常
                // ...
                return Collections.emptyMap();
            } catch (ExecutionException e) {
                // 處理ExecutionException
                // ...
                return Collections.emptyMap();
            } catch (TimeoutException e) {
                throw new TimeoutException("Timed out when getting lag for consumer group " + groupID);
            }
        }
    }

四、Kafka JMX監控指標

Kafka默認提供的JMX監控指標能夠監控消費者的Lag值。Kafka Consumer提供了一個名爲kafka.consumer:type=consumer-fetch-manager-metrics,client-id=「{client-id}」的JMX指標,其中屬性records-lag-max和records-lead-min分別表示消費者在測試窗口時間內曾經達到的最大的Lag值和最小的Lead值。Lead值是指消費者最新消費消息的位移與分區當前第一條消息位移的差值。

5、Kafka生產者和消費者TCP鏈接

一、Kafka生產者簡介

Apache Kafka採用TCP協議做爲全部請求通訊的底層協議。
Kafka的Java生產者API主要的對象就是KafkaProducer,開發Kafka生產者的流程以下:
(1)構造生產者對象所需的參數對象。
(2)利用構造的參數對象,建立KafkaProducer對象實例。
(3)使用KafkaProducer的send方法發送消息。
(4)調用KafkaProducer的close方法關閉生產者並釋放各類系統資源。

二、Producer TCP鏈接創建

在建立KafkaProducer實例時,生產者應用會在後臺建立並啓動一個名爲Sender的線程,Sender線程開始運行時首先會建立與Broker的鏈接。Java Producer的Sender線程會鏈接bootstrap.servers參數指定的全部Broker,所以生產環境中中,建議在bootstrap.servers參數中指定3~4臺Broker便可。
當Producer更新了集羣的元數據信息後,若是發現與某些Broker當前沒有鏈接,Producer就會建立一個TCP鏈接。當要發送消息時,Producer發現不存在與目標Broker的鏈接,會建立一個TCP鏈接。
當Producer嘗試給一個不存在的Topic發送消息時,Broker會告訴Producer相關Topic不存在,此時Producer會發送METADATA請求給Kafka集羣,嘗試獲取最新的元數據信息;Producer經過指定metadata.max.age.ms參數按期地更新元數據信息,參數默認值是300000,即5分鐘,即Producer每5分鐘都會強制刷新一次元數據以保證是最新的數據。Producer默認會向集羣的全部Broker都建立TCP鏈接,無論是否真的須要傳輸請求。

三、Producer TCP鏈接關閉

Producer關閉TCP鏈接的方式有兩種:一種是用戶主動關閉,一種是Kafka自動關閉。Kafka自動關閉與Producer端參數connections.max.idle.ms值有關,默認參數值是9分鐘,即若是在9分鐘內某個TCP 鏈接沒有任何請求,Kafka會主動把TCP鏈接關閉。用戶能夠在Producer端設置connections.max.idle.ms=-1,TCP鏈接將成爲永久長鏈接。Kafka建立的Socket鏈接都開啓keepalive,所以keepalive探活機制會遵照。對於被Broker端被關閉的TCP鏈接,因爲TCP鏈接的發起方是客戶端,屬於被動關閉,即 passive close。被動關閉的後果就是會產生大量的CLOSE_WAIT鏈接,所以Producer端或Client端怒會顯式地檢測到TCP鏈接已被中斷

四、Consumer TCP鏈接創建

消費者端主要的程序入口是KafkaConsumer類,構建KafkaConsumer實例時不會建立任何TCP鏈接。TCP鏈接是在調用KafkaConsumer.poll方法時被建立,三種時機建立。
(1)發起FindCoordinator請求時。
消費者端Coordinator駐留在Broker端的內存中,負責消費者組的組成員管理和各個消費者的位移提交管理。當消費者程序首次啓動調用poll方法時,會向Kafka集羣中當前負載最小的Broker發送一個名爲FindCoordinator的請求,獲取其Coordinator所在的Broker。負載的評估是看消費者鏈接的全部Broker中,誰的待發送請求最少,是消費者端的單向評估。
(2)鏈接Coordinator時。Broker處理完FindCoordinator請求後,會返回對應的響應結果(Response),顯式地告訴消費者哪一個Broker是其Coordinator。消費者會建立連向Coordinator所在Broker的Socket鏈接。
(3)消費數據時。消費者會爲每一個要消費的分區建立與分區Leader副本所在Broker鏈接的TCP鏈接。

五、Consumer TCP鏈接關閉

消費者關閉Socket分爲主動關閉和Kafka自動關閉。主動關閉是指顯式地調用消費者KafkaConsumer.close()方法關閉消費者;而Kafka自動關閉是由消費者端參數connection.max.idle.ms控制,參數默認值是9分鐘,即若是某個Socket鏈接上連續9分鐘都沒有任何請求,那麼消費者會強行關閉Socket鏈接。若是在編寫消費者程序時,使用循環的方式來調用poll方法消費消息,那麼上面提到的全部請求都會被按期發送到Broker,所以Socket鏈接上老是能保證有請求在發送,從而實現了長鏈接效果。
當實際消費數據的TCP鏈接成功建立後,消費者程序就會廢棄獲取集羣元數據的TCP鏈接,再按期請求元數據時,會改成使用實際消費數據的TCP鏈接。所以,獲取集羣元數據的TCP鏈接會在後臺被自動關閉掉。

6、Kafka消息重複消費

一、消費者消費過程解析

生產者將消息發送到Topic中,消費者便可對其進行消費,其消費過程以下:
(1)Consumer向Broker提交鏈接請求,其所鏈接上的Broker都會向其發送Broker Controller的通訊URL,即配置文件中的listeners地址;
(2)當Consumer指定了要消費的Topic後,會向Broker Controller發送消費請求;
(3)Broker Controller會爲Consumer分配一個或幾個Partition Leader,並將Partition的當前offset發送給Consumer;
(4)Consumer會按照Broker Controller分配的Partition對其中的消息進行消費;
(5)當Consumer消費完消息後,Consumer會向Broker發送一個消息已經被消費反饋,即消息的offset;
(6)在Broker接收到Consumer的offset後,會更新相應的__consumer_offset中;
Consumer能夠重置offset,從而能夠靈活消費存儲在Broker上的消息。

二、重複消費問題的解決方案

(1)同一個Consumer重複消費
當Consumer因爲消費能力低而引起了消費超時,則可能會造成重複消費。
在某數據恰好消費完畢,但正準備提交offset時,消費時間超時,則Broker認爲消息未消費成功,產生重複消費問題。
其解決方案:延長offset提交時間。
(2)不一樣的Consumer重複消費
當Consumer消費了消息,但尚未提交offset時宕機,則已經被消費過的消息會被重複消費。
其解決方案:將自動提交改成手動提交。

三、從架構設計上解決Kafka重複消費的問題

(1)保存並查詢給每一個消息都設置一個惟一的UUID,在消費消息時,首先去持久化系統中查詢,查看消息是否被消費過,若是沒有消費過,再進行消費;若是已經消費過,直接丟棄。(2)利用冪等性冪等性操做的特色是任意屢次執行所產生的影響均與一次執行的影響相同。若是將系統消費消息的業務邏輯設計爲冪等性操做,就不用擔憂Kafka消息的重複消費問題,所以能夠將消費的業務邏輯設計成具有冪等性的操做。利用數據庫的惟一約束能夠實現冪等性,如在數據庫中建一張表,將表的兩個或多個字段聯合起來建立一個惟一約束,所以只能存在一條記錄。(3)設置前提條件實現冪等性的另外一種方式是給數據變動設置一個前置條件。若是知足條件就更新數據,不然拒絕更新數據,在更新數據的時候,同時變動前置條件中須要判斷的數據。

相關文章
相關標籤/搜索