Kafka快速入門(五)——Kafka管理

Kafka快速入門(五)——Kafka管理

1、Kafka工具腳本簡介

一、Kafka工具腳本簡介

Kafka默認提供了不少個命令行腳本,用於實現各類各樣的功能和運維管理。默認狀況下,不加任何參數或攜帶--help運行Kafka shell腳本根據,會獲得腳本的使用方法說明。
connect-standalone.sh用於啓動單節點的Standalone模式的Kafka Connect組件。
connect-distributed.sh用於啓動多節點的Distributed模式的Kafka Connect組件。
kafka-acls.sh腳本用於設置Kafka權限,好比設置哪些用戶能夠訪問Kafka的哪些TOPIC的權限。
kafka-delegation-tokens.sh用於管理Delegation Token。基於Delegation Token的認證是一種輕量級的認證機制,是對SASL認證機制的補充。
kafka-topics.sh用於管理全部TOPIC。
kafka-console-producer.sh用於生產消息。
kafka-console-consumer.sh用於消費消息。
kafka-producer-perf-test.sh用於生產者性能測試。
kafka-consumer-perf-test.sh用於消費者性能測試。
kafka-delete-records.sh用於刪除Kafka的分區消息,因爲Kafka有本身的自動消息刪除策略,使用率不高。
kafka-dump-log.sh用於查看Kafka消息文件的內容,包括消息的各類元數據信息、消息體數據。
kafka-log-dirs.sh用於查詢各個Broker上的各個日誌路徑的磁盤佔用狀況。
kafka-mirror-maker.sh用於在Kafka集羣間實現數據鏡像。
kafka-preferred-replica-election.sh用於執行Preferred Leader選舉,能夠爲指定的主題執行更換Leader的操做。
kafka-reassign-partitions.sh用於執行分區副本遷移以及副本文件路徑遷移。
kafka-run-class.sh用於執行任何帶main方法的Kafka類。
kafka-server-start.sh用於啓動Broker進程。
kafka-server-stop.sh用於中止Broker進程。
kafka-streams-application-reset.sh用於給Kafka Streams應用程序重設位移,以便從新消費數據。
kafka-verifiable-producer.sh用於測試驗證生產者的功能。
kafka-verifiable-consumer.sh用於測試驗證消費者功能。
trogdor.sh是Kafka的測試框架,用於執行各類基準測試和負載測試。
kafka-broker-api-versions.sh腳本主要用於驗證不一樣Kafka版本之間服務器和客戶端的適配性。前端

二、Kafka API兼容性測試

本文使用Kafka 2.4版本,kafka-broker-api-versions.sh須要指定Kafka Broker,測試相應版本的API兼容性。
kafka-broker-api-versions.sh --bootstrap-server kafka-test:9092
Kafka快速入門(五)——Kafka管理
Produce表示Produce請求,Produce請求是Kafka全部請求類型中的第一號請求,序號是0。0 to 8表示Produce請求在Kafka 2.4中總共有9個版本,序號分別是0到8。usable:8表示當前連入本Kafka Broker的客戶端API可以使用的版本號是8,即最新的版本。
當使用低版本的kafka-broker-api-versions.sh腳本鏈接高版本的Kafka Broker時,Produce請求信息的usable表示低版本的客戶端API只能發送版本號是usable值的Produce請求類型。
在Kafka 0.10.2.0版本前,Kafka是單向兼容的,高版本的Broker可以處理低版本Client發送的請求,低版本的Broker不能處理高版本的Client請求。Kafka 0.10.2.0版本開始,Kafka正式支持雙向兼容,低版本的Broker也能處理高版本Client的請求。java

三、Kafka性能測試

Java生產者性能測試腳本:
kafka-producer-perf-test.sh --topic test-topic --num-records 10000000 --throughput -1 --record-size 1024 --producer-props bootstrap.servers=kafka-host:port acks=-1 linger.ms=2000 compression.type=lz4
Java消費者性能測試腳本:
kafka-consumer-perf-test.sh --broker-list kafka-host:port --messages 10000000 --topic testnode

2、TOPIC管理

一、建立TOPIC

kafka-topics.sh --bootstrap-server broker_host:port --create --topic my_topic_name --partitions 1 --replication-factor 1
--create表示建立TOPIC,--partitions表示TOPIC的分區數量,--replication-factor表示每一個分區下的副本數。從Kafka 2.2版本開始,Kafka社區推薦用--bootstrap-server參數替換--zookeeper參數用於指定Kafka Broker。
Kafka社區推薦使用--bootstrap-server的緣由主要有:
(1)使用--zookeeper會繞過Kafka的安全體系。即便爲Kafka集羣設置安全認證,限制TOPIC的建立,若是使用--zookeeper的命令,依然能成功建立任意TOPIC,不受認證體系的約束。
(2)使用--bootstrap-server與Kafka集羣進行交互,愈來愈成爲使用Kafka的標準操做。將來,愈來愈少的命令和API須要與ZooKeeper進行鏈接。正則表達式

二、查詢TOPIC

查詢Kafka集羣全部TOPIC的命令:
kafka-topics.sh --bootstrap-server broker_host:port --list
查詢單個TOPIC詳細數據的命令以下:
kafka-topics.sh --bootstrap-server broker_host:port --describe --topic topic_name
若是describe命令不指定具體的TOPIC名稱,那麼Kafka默認會返回當前用戶的全部可見TOPIC的詳細數據。shell

三、生產消息

kafka-console-producer.sh --bootstrap-server kafka:9092 --topic test
啓動生產者,並將Terminal輸入寫入TOPIC。apache

四、消費消息

kafka-console-consumer.sh --bootstrap-server kafka:9092 --topic test --from-beginning
啓動消費者,從TOPIC消費消息數據。json

五、修改TOPIC分區數量

Kafka目前不容許減小某個TOPIC的分區數,所以修改TOPIC的分區數量就是增長分區數量,可使用kafka-topics腳本,結合--alter參數來增長某個TOPIC的分區數。
kafka-topics.sh --bootstrap-server broker_host:port --alter --topic topic_name --partitions 新分區數
指定的分區數必定要比原有分區數大,不然Kafka會拋出InvalidPartitionsException異常。bootstrap

六、修改TOPIC級別參數

設置TOPIC級別參數max.message.bytes。
kafka-configs.sh --zookeeper zookeeper_host:port --entity-type topics --entity-name topic_name --alter --add-config max.message.bytes=10485760
設置常規的TOPIC級別參數使用--zookeeper,設置動態參數使用--bootstrap-server。後端

七、修改TOPIC限速

當某個TOPIC的副本在執行副本同步機制時,爲了避免消耗過多的帶寬,能夠設置Leader副本和Follower副本使用的帶寬。設置TOPIC各個分區的Leader副本和Follower副本在處理副本同步時,不得佔用超過100MBps(104857600)的帶寬,必須先設置Broker端參數leader.replication.throttled.rate 和 follower.replication.throttled.rate,命令以下:
kafka-configs.sh --zookeeper zookeeper_host:port --alter --add-config 'leader.replication.throttled.rate=104857600,follower.replication.throttled.rate=104857600' --entity-type brokers --entity-name 0
--entity-name參數用於指定Broker ID。若是TOPIC的副本分別在多個Broker上,須要依次爲相應Broker執行。爲TOPIC的全部副本都設置限速,能夠統一使用通配符*來表示,命令以下:
kafka-configs.sh --zookeeper zookeeper_host:port --alter --add-config 'leader.replication.throttled.replicas=*,follower.replication.throttled.replicas=*' --entity-type topics --entity-name testapi

八、刪除TOPIC

刪除TOPIC命令以下:
kafka-topics.sh --bootstrap-server broker_host:port --delete --topic topic_name
刪除TOPIC操做是異步的,執行完刪除命令不表明TOPIC當即就被刪除,僅僅是被標記成「已刪除」狀態而已。Kafka會在後臺默默地開啓TOPIC刪除操做,一般須要耐心地等待一段時間。

九、Kafka內置TOPIC

Kafka有兩個內TOPIC:__consumer_offsets__transaction_state,內置TOPIC默認都有50個分區。
Kafka 0.11前,當Kafka自動建立__consumer_offsets時,會綜合考慮當前運行的Broker臺數和Broker端參數offsets.topic.replication.factor值,而後取二者的較小值做爲__consumer_offsets的副本數,__consumer_offsets一般在只有一臺Broker啓動時被建立,所以副本數爲1。Kafka 0.11版本後,Kafka會嚴格遵照offsets.topic.replication.factor值。若是當前運行的Broker數量小於offsets.topic.replication.factor值,Kafka會建立主題失敗,並顯式拋出異常。
直接查看消費者組提交的位移數據:
kafka-console-consumer.sh --bootstrap-server kafka_host:port --topic __consumer_offsets --formatter "kafka.coordinator.group.GroupMetadataManager\$OffsetsMessageFormatter" --from-beginning
直接讀取TOPIC消息,查看消費者組的狀態信息:
kafka-console-consumer.sh --bootstrap-server kafka_host:port --topic __consumer_offsets --formatter "kafka.coordinator.group.GroupMetadataManager\$GroupMetadataMessageFormatter" --from-beginning
__transaction_state內置TOPIC的信息查看須要指定kafka.coordinator.transaction.TransactionLog\$TransactionLogMessageFormatter。

十、修改TOPIC副本數

建立內置__consumer_offsets的副本配置json 文件,顯式提供50個分區對應的副本數。replicas中的3臺Broker排列順序不一樣,目的是將Leader副本均勻地分散在Broker上。文件具體格式以下:

{"version":1, "partitions":[
 {"topic":"__consumer_offsets","partition":0,"replicas":[0,1,2]}, 
  {"topic":"__consumer_offsets","partition":1,"replicas":[0,2,1]},
  {"topic":"__consumer_offsets","partition":2,"replicas":[1,0,2]},
  {"topic":"__consumer_offsets","partition":3,"replicas":[1,2,0]},
  ...
  {"topic":"__consumer_offsets","partition":49,"replicas":[0,1,2]}
]}

執行kafka-reassign-partitions腳本:
kafka-reassign-partitions.sh --zookeeper zookeeper_host:port --reassignment-json-file reassign.json --execute

十一、錯誤處理

(1)刪除TOPIC失敗
形成TOPIC刪除失敗最多見的緣由有兩個:副本所在的Broker宕機;待刪除TOPIC的部分分區依然在執行遷移過程。
強制刪除TOPIC的方法:
A、手動刪除ZooKeeper節點/admin/delete_topics下以待刪除TOPIC爲名的znode。
B、手動刪除TOPIC在磁盤上的分區目錄。
C、在ZooKeeper中執行rmr /controller,觸發Controller重選舉,刷新Controller緩存。可能會形成大面積的分區Leader重選舉。能夠不執行,只是Controller緩存中沒有清空待刪除TOPIC,不影響使用。
(2)內置__consumer_offsets佔用太多磁盤空間
若是__consumer_offsets消耗過多的磁盤空間,須要顯式地用jstack 命令查看一下kafka-log-cleaner-thread前綴的線程狀態。一般都是由於線程掛掉沒法及時清理__consumer_offsets,此時須要重啓相應的 Broker。

3、Kafka動態配置

一、Kafka動態配置簡介

Kafka安裝目錄的config/server.properties文件能夠用於配置靜態參數(Static Configs)。一般會指定server.properties文件的路徑來啓動Broker。若是要設置Broker端的任何參數,必須在server.properties文件中顯式地增長一行對應的配置,啓動Broker進程,令參數生效。一般會一次性設置好全部參數後,再啓動Broker,但若是須要變動任何參數時,必須重啓Broker。
但生產環境中若是每次修改Broker端參數都須要重啓Broker,Kafka集羣的可用性必然受到限制,所以Kafka 1.1.0版本中正式引入動態Broker參數(Dynamic Broker Configs),即修改參數值後,無需重啓Broker就能當即生效。Kafka 2.3版本中Broker端參數有200多個,但Kafka社區並無將每一個參數都升級成動態參數,僅把一部分參數升級爲可動態調整。
在Kafka 1.1後官方文檔的Broker Configs表中增長了Dynamic Update Mode列,有3類值,分別是read-only、per-broker和cluster-wide。
(1)read-only。read-only類型參數是靜態參數,只有重啓Broker,才能令修改生效。
(2)per-broker。per-broker類型參數屬於動態參數,修改後,只會在對應的Broker上生效。
(3)cluster-wide。cluster-wide類型參數也屬於動態參數,修改後,會在整個集羣範圍內生效,對全部Broker都生效。
經過運行無參數的kafka-configs.sh腳本,其說明文檔會列出當前動態Broker參數。

二、Kafka動態參數應用場景

Broker動態參數使用場景很是普遍,包括但不限於如下幾種:
(1)動態調整Broker端各類線程池大小,實時應對突發流量。
(2)動態調整Broker端鏈接信息或安全配置信息。
(3)動態更新SSL Keystore有效期。
(4)動態調整Broker端Compact操做性能。
(5)實時變動JMX指標收集器(JMX Metrics Reporter)。
一般,當Kafka Broker入站流量(inbound data)激增時,會形成Broker端請求積壓(Backlog)。此時能夠動態增長網絡線程數和I/O線程數,快速處理積壓請求。當流量洪峯事後,能夠將線程數調整回原值,減小對資源的浪費。整個過程不須要重啓Broker,而且能夠將調整線程數的操做,封裝進定時任務中,以實現自動擴縮容。

三、Kafka動態參數保存

Kafka將Broker動態參數保存在ZooKeeper中。
Kafka快速入門(五)——Kafka管理
changes是用來實時監測動態參數變動的,不會保存參數值。
topics是用來保存Kafka TOPIC級別參數的,不屬於Broker動態參數,但可以動態變動。
users和 clients則是用於動態調整客戶端配額(Quota)的znode節點。所謂配額,是指Kafka運維人員限制連入集羣的客戶端的吞吐量或者是限定使用的CPU資源。
/config/brokers znode纔是保存動態Broker參數的地方。znode有兩大類子節點,第一類子節點就只有一個,即< default >,保存cluster-wide範圍的動態參數;第二類以broker.id爲名,保存特定Broker的per-broker範圍參數。

四、Kafka動態參數配置

配置動態Broker參數的工具行命令是Kafka自帶的kafka-configs腳本。
設置cluster-wide範圍動態參數,須要顯式指定--entity-default。
kafka-configs.sh --bootstrap-server kafka-host:port --entity-type brokers --entity-default --alter --add-config unclean.leader.election.enable=true
可使用--describe命令查看cluster-wide範圍動態Broker參數設置:
kafka-configs.sh --bootstrap-server kafka-host:port --entity-type brokers --entity-default --describe
設置per-broker範圍參數命令以下:
kafka-configs.sh --bootstrap-server kafka-host:port --entity-type brokers --entity-name 1 --alter --add-config unclean.leader.election.enable=false
可使用--describe命令查看per-broker範圍動態Broker參數設置:
kafka-configs.sh --bootstrap-server kafka-host:port --entity-type brokers --entity-name 1 --describe
刪除cluster-wide範圍參數命令以下:
kafka-configs.sh --bootstrap-server kafka-host:port --entity-type brokers --entity-default --alter --delete-config unclean.leader.election.enable
刪除per-broker範圍參數命令以下:
kafka-configs.sh --bootstrap-server kafka-host:port --entity-type brokers --entity-name 1 --alter --delete-config unclean.leader.election.enable

五、經常使用Broker動態參數

(1)log.retention.ms
修改日誌留存時間,能夠在Kafka集羣層面動態變動日誌留存時間。(2)num.io.threads和num.network.threads。
在實際生產環境中,Broker端請求處理能力常常要按需擴容。
(3)SSL相關的參數。
SSL相關的主要4個參數:ssl.keystore.type、ssl.keystore.location、ssl.keystore.password和ssl.key.password。動態實時調整SSL參數,能夠建立過時時間很短的SSL證書。每當動態調整SSL參數時,Kafka底層會從新配置Socket鏈接通道並更新Keystore。
(4)num.replica.fetchers
一般,Follower副本拉取速度比較慢,常見的作法是增長num.replica.fetchers參數值,確保有充足的線程能夠執行Follower副本向Leader 副本的拉取。經過調整動態參數,不須要再重啓Broker,就能當即在 Follower端生效。

4、Kafka消費者組位移

一、Kafka消費者組位移

傳統消息中間件(RabbitMQ 或 ActiveMQ)處理和響應消息的方式是破壞性的(destructive),即一旦消息被成功處理,就會被從Broker上刪除。而Kafka是基於日誌結構(log-based)的消息引擎,消費者在消費消息時,僅僅從磁盤文件上讀取數據,不會刪除消息數據,而且位移數據是由消費者控制的,所以可以很容易地修改位移,實現重複消費歷史數據的功能。

二、位移策略

位移策略根據位移維度和時間維度分爲七種。位移維度是指根據位移值來重設,即直接把消費者的位移值重設成指定的位移值;時間維度是能夠給定一個基準時間,讓消費者把位移調整成大於基準時間的最小位移。
Kafka快速入門(五)——Kafka管理
Earliest策略表示將位移調整到TOPIC當前最先位移處。最先位移不必定是0,由於在生產環境中,太太久遠的消息一般會被Kafka自動刪除,因此當前最先位移極可能是一個大於0的值。若是想要從新消費主題的全部消息,那麼可使用Earliest策略。
Latest策略表示把位移重設成最新末端位移。若是想跳過全部歷史消息,從最新的消息處開始消費,可使用Latest策略。
Current策略表示將位移調整成消費者當前提交的最新位移。
Specified-Offset策略則是比較通用的策略,表示消費者把位移值調整到指定的位移處。如消費者程序在處理某條錯誤消息時,能夠手動地跳過此消息的處理。
Shift-By-N策略表示跳過的位移的相對數值,能夠是負數,負數表示把位移重設成當前位移的前N條位移處。
DateTime策略表示將位移重置到指定時間後的最先位移處。如想從新消費昨天的數據,可使用DateTime策略重設位移到昨天0點。
Duration策略表示指定相對的時間間隔,而後將位移調整到距離當前給定時間間隔的位移處,具體格式是 PnDTnHnMnS,是一個符合ISO-8601規範的Duration格式,以字母P開頭,後面由4部分組成,即 D、H、M和S,分別表示天、小時、分鐘和秒。若是想將位移調回到15分鐘前,能夠指定PT0H15M0S。

三、重設消費者組位移

重設消費者組位移的方式有兩種,一種是經過消費者API來實現,一種是經過kafka-consumer-groups.sh命令行腳原本實現。
(1)消費者API方式
經過Java API重設位移,須要調用KafkaConsumer的seek系列方法。

void seek(TopicPartition partition, long offset);
void seek(TopicPartition partition, OffsetAndMetadata offsetAndMetadata);
void seekToBeginning(Collection<TopicPartition> partitions);
void seekToEnd(Collection<TopicPartition> partitions);

每次調用seek方法只能重設一個分區的位移。OffsetAndMetadata是一個封裝Lon 型的位移和自定義元數據的複合類。seekToBeginning和seekToEnd擁有一次重設多個分區的能力。
設置Earliest位移策略的Java代碼以下:

Properties consumerProperties = new Properties();
consumerProperties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
consumerProperties.put(ConsumerConfig.GROUP_ID_CONFIG, groupID);
consumerProperties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
consumerProperties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
consumerProperties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
consumerProperties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList);

String topic = "test";  // 要重設位移的Kafka主題 
try (final KafkaConsumer<String, String> consumer = 
  new KafkaConsumer<>(consumerProperties)) {
         consumer.subscribe(Collections.singleton(topic));
         consumer.poll(0);
         consumer.seekToBeginning(
  consumer.partitionsFor(topic).stream().map(partitionInfo ->          
  new TopicPartition(topic, partitionInfo.partition()))
  .collect(Collectors.toList()));
}

消費者程序必需要禁止自動提交位移。
組ID要設置成要重設的消費者組的組ID。
調用seekToBeginning方法時,須要一次性構造主題的全部分區對象。
必需要調用帶長整型的poll方法,而不要調用consumer.poll(Duration.ofSecond(0))。
設置Latest位移策略的Java代碼以下:

consumer.seekToEnd(consumer.partitionsFor(topic).stream().map(partitionInfo->new TopicPartition(topic, partitionInfo.partition())).collect(Collectors.toList()));
設置Current 位移策略的Java代碼以下:
consumer.partitionsFor(topic).stream().map(info->new TopicPartition(topic,info.partition())).forEach(tp->{long committedOffset = consumer.committed(tp).offset();consumer.seek(tp, committedOffset);
});

partitionsFor方法獲取給定主題的全部分區,而後依次獲取對應分區上的已提交位移,最後經過seek方法重設位移到已提交位移處。
(2)命令行腳本方式
Kafka 0.11版本中,引入了經過kafka-consumer-groups.sh腳本重設消費者位移的功能。
Earliest策略直接指定–to-earliest:
kafka-consumer-groups.sh --bootstrap-server kafka-host:port --group test-group --reset-offsets --all-topics --to-earliest –execute
Latest策略直接指定–to-latest:
kafka-consumer-groups.sh --bootstrap-server kafka-host:port --group test-group --reset-offsets --all-topics --to-latest --execute
Current策略直接指定–to-current:
kafka-consumer-groups.sh --bootstrap-server kafka-host:port --group test-group --reset-offsets --all-topics --to-current --execute
Specified-Offset策略直接指定–to-offset:
kafka-consumer-groups.sh --bootstrap-server kafka-host:port --group test-group --reset-offsets --all-topics --to-offset &lt;offset&gt; --execute
Shift-By-N策略直接指定–shift-by N:
kafka-consumer-groups.sh --bootstrap-server kafka-host:port --group test-group --reset-offsets --shift-by &lt;offset_N&gt; --execute
DateTime策略直接指定–to-datetime:
kafka-consumer-groups.sh --bootstrap-server kafka-host:port --group test-group --reset-offsets --to-datetime 2020-03-08T12:00:00.000 --execute
Duration策略直接指定–by-duration:
kafka-consumer-groups.sh --bootstrap-server kafka-host:port --group test-group --reset-offsets --by-duration PT0H60M0S --execute

5、Kafka AdminClient工具

一、AdminClient引入

Kafka管理工具腳本的缺陷以下:
(1)只能運行在控制檯,集成度差,不能集成到應用程序、運維框架、監控平臺。
(2)經過鏈接ZooKeeper來提供服務,會繞過Kafka的用戶認證機制,不安全。
(3)須要使用Kafka服務器端的代碼。
基於上述缺陷,Kafka 0.11版本正式推出Java客戶端版的AdminClient。

二、AdminClient功能特性

(1)主題管理:包括主題的建立、刪除和查詢。
(2)權限管理:包括具體權限的配置與刪除。
(3)配置參數管理:包括Kafka各類資源的參數設置、詳情查詢。Kafka資源包括Broker、主題、用戶、Client-id等。
(4)副本日誌管理:包括副本底層日誌路徑的變動和詳情查詢。
(5)分區管理:即建立額外的主題分區。
(6)消息刪除:即刪除指定位移前的分區消息。
(7)Delegation Token管理:包括Delegation Token建立、更新、過時和詳情查詢。
(8)消費者組管理:包括消費者組的查詢、位移查詢和刪除。
(9)Preferred領導者選舉:推選指定主題分區的Preferred Broker爲領導者。

三、AdminClient工做原理

AdminClient採用雙線程設計:前端主線程和後端I/O線程,前端線程負責將用戶要執行的操做轉換成對應的請求,而後再將請求發送到後端I/O線程的隊列中;然後端I/O線程從隊列中讀取相應的請求,而後發送到對應的Broker節點上,而後把執行結果保存起來,以便等待前端線程的獲取。AdminClient內部大量使用生產者-消費者模式將請求生成與處理解耦。
Kafka快速入門(五)——Kafka管理
前端主線程會建立名爲Call的請求對象實例,Call主要任務由兩個:
(1)構建對應的請求對象。若是要建立主題,建立CreateTopicsRequest;若是查詢消費者組位移,建立OffsetFetchRequest。
(2)指定響應的回調邏輯。如從Broker端接收到CreateTopicsResponse後要執行的動做。一旦建立好Call實例,前端主線程會將其放入到新請求隊列(New Call Queue)中,此時前端主線程的任務完成,只須要等待結果返回便可。
後端I/O線程使用3個隊列來承載不一樣時期的請求對象,分別是新請求隊列、待發送請求隊列和處理中請求隊列。新請求隊列的線程安全由Java的monitor鎖來保證。爲了確保前端主線程不會由於monitor鎖被阻塞,後端I/O線程會按期地將新請求隊列中的全部Call實例所有搬移到待發送請求隊列中進行處理。待發送請求隊列和處理中請求隊列只由後端I/O線程處理,所以無需任何鎖機制來保證線程安全。
當I/O線程在處理某個請求時,會顯式地將請求保存在處理中請求隊列。一旦處理完成,I/O線程會自動地調用Call對象中的回調邏輯完成最後的處理。最後,I/O線程會通知前端主線程結果已經準備完畢,前端主線程可以及時獲取到執行操做的結果。AdminClient使用Java Object對象的wait和notify實現的通知機制。
後端I/O線程名字的前綴是kafka-admin-client-thread。若是AdminClient程序貌似在正常工做,但執行的操做沒有返回結果或者hang住,多是由於I/O線程出現問題致使的。

四、構造和銷燬AdminClient實例

若是正確地引入kafka-clients依賴,那麼應該能夠在編寫Java程序時看到AdminClient對象,完整類路徑是org.apache.kafka.clients.admin.AdminClient。建立AdminClient實例須要手動構造一個Properties對象或Map對象,而後傳給對應的方法。最多見並且必需要指定的參數是bootstrap.servers參數。

Properties props = new Properties();
props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka-host:port");
props.put("request.timeout.ms", 600000);
AdminClient client = AdminClient.create(props);

AdminClient實例銷燬須要顯式調用close方法。

五、AdminClient實例

(1)建立Topic

String newTopicName = "test-topic";
try (AdminClient client = AdminClient.create(props)) {
         NewTopic newTopic = new NewTopic(newTopicName, 10, (short) 3);
         CreateTopicsResult result = client.createTopics(Arrays.asList(newTopic));
         result.all().get(10, TimeUnit.SECONDS);
}

(2)消費者組位移查詢

String groupID = "test-group";
try (AdminClient client = AdminClient.create(props)) {
         ListConsumerGroupOffsetsResult result = client.listConsumerGroupOffsets(groupID);
         Map<TopicPartition, OffsetAndMetadata> offsets = 
                  result.partitionsToOffsetAndMetadata().get(10, TimeUnit.SECONDS);
         System.out.println(offsets);
}

(3)Broker磁盤佔用查詢

try (AdminClient client = AdminClient.create(props)) {
     DescribeLogDirsResult ret = client.describeLogDirs(Collections.singletonList(targetBrokerId)); // 指定Broker id
     long size = 0L;
     for (Map<String, DescribeLogDirsResponse.LogDirInfo> logDirInfoMap : ret.all().get().values()) {
              size += logDirInfoMap.values().stream().map(logDirInfo -> logDirInfo.replicaInfos).flatMap(
                       topicPartitionReplicaInfoMap ->
                       topicPartitionReplicaInfoMap.values().stream().map(replicaInfo -> replicaInfo.size))
                       .mapToLong(Long::longValue).sum();
     }
     System.out.println(size);
}

AdminClient的describeLogDirs方法獲取指定Broker上全部分區主題的日誌路徑信息。

6、Kafka認證機制

一、Kafka認證機制簡介

Kafka 0.9.0.0版本開始,Kafka正式引入認證機制,實現基礎的安全用戶認證,用於將Kafka上雲或進行多租戶管理。Kafka 2.3版本支持基於SSL和基於SASL的安全認證機制。
SSL認證主要是指Broker和客戶端的雙路認證(2-way authentication)。一般,SSL加密(Encryption)已經啓用了單向認證,即客戶端認證Broker的證書(Certificate)。若是要作SSL認證,須要啓用雙路認證,即Broker也要認證客戶端的證書。SSL一般用於通訊加密,使用SASL來作Kafka認證明現。
Kafka支持經過SASL作客戶端認證,SASL是提供認證和數據安全服務的框架。Kafka支持的SASL機制有5種,在不一樣版本中被引入,所以須要根據Kafka版本選擇其所支持的認證機制。
(1)GSSAPI:基於Kerberos認證機制使用的安全接口,在0.9版本中被引入,用於已經實現Kerberos認證機制的場景。
(2)PLAIN:基於簡單的用戶名/密碼認證的機制,在0.10版本中被引入,與SSL加密搭配使用。PLAIN是一種認證機制,而PLAINTEXT是未使用SSL時的明文傳輸。PLAIN認證機制的配置和運維成本相對較小,適合於小型公司Kafka集羣。但PLAIN認證機制不能動態地增減認證用戶,必須重啓Kafka集羣才能令變動生效。因爲全部認證用戶信息所有保存在靜態文件中,因此只能重啓Broker,才能從新加載變動後的靜態文件。
(3)SCRAM:主要用於解決PLAIN機制安全問題的新機制,在0.10.2版本中被引入。經過將認證用戶信息保存在ZooKeeper的方式,避免了動態修改須要重啓Broker的弊端。可使用Kafka提供的命令動態地建立和刪除用戶,無需重啓整個集羣。
(4)OAUTHBEARER:基於OAuth 2認證框架的新機制,在2.0版本中被引進。OAuth是一個開發標準,容許用戶受權第三方應用訪問用戶在某網站上的資源,而無需將用戶名和密碼提供給第三方應用。Kafka不提倡單純使用OAUTHBEARER,由於其生成的JSON Web Token不安全,必須配以SSL加密才能用在生產環境中。
(5)Delegation Token:用於補充SASL認證機制的輕量級認證機制,在1.1.0版本被引入。若是要使用Delegation Token,須要先配置好SASL 認證,而後再利用Kafka提供的API獲取對應的Delegation Token。Broker和客戶端在作認證時能夠直接使用token,不用每次都去KDC 獲取對應的ticket(Kerberos 認證)或傳輸Keystore文件(SSL認證)。

二、Kafka認證機制比較

Kafka快速入門(五)——Kafka管理

三、SASL/SCRAM-SHA-256 配置實例

(1)建立用戶
配置SASL/SCRAM首先須要建立可否鏈接Kafka集羣的用戶。建立3個用戶,分別是admin用戶、producer用戶和consumer用戶。admin用戶用於實現Broker間通訊,producer用於生產消息,consumer用於消費消息。
建立admin:
kafka-configs.sh --zookeeper zookeeper-test:2181 --alter --add-config 'SCRAM-SHA-256=[password=123456],SCRAM-SHA-512=[password=123456]' --entity-type users --entity-name admin
建立producer:
kafka-configs.sh --zookeeper zookeeper-test:2181 --alter --add-config 'SCRAM-SHA-256=[password=123456],SCRAM-SHA-512=[password=123456]' --entity-type users --entity-name producer
建立consumer:
kafka-configs.sh --zookeeper zookeeper-test:2181 --alter --add-config 'SCRAM-SHA-256=[password=123456],SCRAM-SHA-512=[password=123456]' --entity-type users --entity-name consumer
用戶查看:
kafka-configs.sh --zookeeper zookeeper-test:2181 --describe --entity-type users --entity-name consumer
(2)建立JAAS文件
配置用戶後,須要爲每一個Broker建立一個對應的JAAS文件。

KafkaServer {
org.apache.kafka.common.security.scram.ScramLoginModule required
username="admin"
password="123456";
};

配置Broker的server.properties文件:

sasl.enabled.mechanisms=SCRAM-SHA-256
sasl.mechanism.inter.broker.protocol=SCRAM-SHA-256
security.inter.broker.protocol=SASL_PLAINTEXT
listeners=SASL_PLAINTEXT://localhost:9092

(3)啓動Broker
KAFKA_OPTS=-Djava.security.auth.login.config=/opt/kafka/config/kafka-broker.jaas kafka-server-start.sh config/server.properties
(4)生產消息
建立Topic
kafka-topics.sh --bootstrap-server kafka-test:9092 --create --topic test --partitions 1 --replication-factor 1
建立Producer客戶端配置:

security.protocol=SASL_PLAINTEXT
sasl.mechanism=SCRAM-SHA-256
sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="producer" password="123456";

生產消息:
kafka-console-producer.sh --broker-list kafka-test:9092 --topic test --producer.config producer.conf
(5)消費消息
建立Consumer客戶端配置consumer.conf:

security.protocol=SASL_PLAINTEXT
sasl.mechanism=SCRAM-SHA-256
sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="consumer" password="123456";

(6)動態增減用戶
刪除用戶:
kafka-configs.sh --zookeeper zookeeper-test:2181 --alter --delete-config 'SCRAM-SHA-512' --entity-type users --entity-name producer
建立用戶:
kafka-configs.sh --zookeeper zookeeper:2181 --alter --add-config 'SCRAM-SHA-256=[iterations=8192,password=123456]' --entity-type users --entity-name test
修改Producer客戶端配置:

security.protocol=SASL_PLAINTEXT
sasl.mechanism=SCRAM-SHA-256
sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="test" password="123456";

建立每個客戶端,Producer或Consumer都須要指定相應的配置文件。

7、Kafka受權機制

一、Kafka受權機制簡介

Kafka 受權機制(Authorization)採用ACL(Access-Control List,訪問控制列表)權限模型。Kafka提供了一個可插拔的受權實現機制,會將配置的全部ACL項保存在ZooKeeper下的/kafka-acl節點中。能夠經過Kafka自帶的kafka-acls.sh腳本動態地對ACL項進行增刪改查,並讓其當即生效。

二、ACL開啓

Kafka開啓ACL的方法只須要在Broker端的配置文件中增長一行設置便可,即在server.properties文件中配置authorizer.class.name參數。
authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer
authorizer.class.name參數指定了ACL受權機制的實現類。Kafka提供了Authorizer接口,容許開發者實現本身的受權機制,但一般是直接使用Kafka自帶的SimpleAclAuthorizer實現類。設置authorizer.class.name參數,而且啓動Broker後,Broker默認會開啓ACL受權驗證。

三、超級用戶設置

開啓ACL受權後,必須顯式地爲不一樣用戶設置訪問某項資源的權限,不然,在默認狀況下,沒有配置任何ACL的資源是不能被訪問的。但超級用戶可以訪問全部的資源,即便沒有爲超級用戶設置任何ACL項。超級用戶的設置只須要在Broker端的配置文件server.properties中設置super.users參數。
super.users=User:superuser1;User:superuser2
若是須要一次性指定多個超級用戶,使用分號做爲分隔符。
Kafka支持將全部用戶都配置成超級用戶的用法。若是在server.properties文件中設置allow.everyone.if.no.acl.found=true,那麼全部用戶均可以訪問沒有設置任何ACL的資源。但在生產環境中,特別是對安全有較高要求的環境中,採用白名單機制要比黑名單機制更安全。

四、ACL受權配置

在Kafka中,配置受權的方法是經過kafka-acls.sh腳本。
kafka-acls.sh --authorizer-properties zookeeper.connect=zookeeper-test:2181 --add --allow-principal User:test --operation All --topic '*' --cluster
All表示全部操做,topic中的星號則表示全部主題,指定--cluster則說明要爲用戶test設置的是集羣權限。
kafka-acls.sh --authorizer-properties zookeeper.connect=zookeeper-test:2181 --add --allow-principal User:'*' --allow-host '*' --deny-principal User:BadUser --deny-host 192.168.1.120 --operation Read --topic test
User的星號表示全部用戶,allow-host的星號則表示全部IP地址。容許全部的用戶使用任意的IP地址讀取名爲test的TOPIC數據,禁止BadUser用戶和192.168.1.120的IP地址訪問test下的消息。

五、Kafka ACL權限列表

Kafka提供了細粒度的受權機制,kafka-acls.sh直接指定--producer能同時得到Producer所需權限,指定--consumer能夠得到 Consumer所需權限。Kafka受權機制能夠不配置認證機制而單獨使用,如只爲IP地址設置權限。
kafka-acls.sh --authorizer-properties zookeeper.connect=zookeeper-test:2181 --add --deny-principal User:* --deny-host 192.168.1.120 --operation Write --topic test
禁止Producer在192.168.1.120 IP上向test發送數據。

六、雲環境多租戶權限設置

若是Kafka集羣部署在雲上,對於多租戶須要設置合理的認證機制,併爲每一個鏈接Kafka集羣的客戶端授予合適權限。
(1)開啓ACL
authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer
(2)開啓白名單機制
在Kafka的server.properties文件中,不要設置allow.everyone.if.no.acl.found=true。
(3)爲SSL用戶受權
使用kafka-acls.sh爲SSL用戶授予集羣的權限。咱們之前面的例子來進行一下說明。在配置 SSL 時,指定用戶的 Distinguished Name 爲「CN=Xi Hu, OU=YourDept, O=YourCompany, L=Beijing, ST=Beijing, C=CN」。在設置 Broker 端參數時,指定 security.inter.broker.protocol=SSL,即強制指定 Broker 間的通信也採用 SSL 加密。
kafka-acls.sh --authorizer-properties zookeeper.connect=zookeeper-test:2181 --add --allow-principal User:"CN=Xi Hu,OU=YourDept,O=YourCompany,L=Beijing,ST=Beijing,C=CN" --operation All --cluster
(4)爲客戶端受權
爲生產者授予producer權限:
kafka-acls.sh --authorizer-properties zookeeper.connect==zookeeper-test:2181 --add --allow-principal User:"CN=Xi Hu,OU=YourDept,O=YourCompany,L=Beijing,ST=Beijing,C=CN" --producer --topic 'test'
爲消費者授予consumer權限:
kafka-acls.sh --authorizer-properties zookeeper.connect=zookeeper-test:2181 --add --allow-principal User:"CN=Xi Hu,OU=YourDept,O=YourCompany,L=Beijing,ST=Beijing,C=CN" --consumer --topic 'test' --group '*'

8、MirrorMaker

一、MirrorMaker簡介

一般,大多數業務需求使用一個Kafka集羣便可知足,但有些場景確實會須要多個Kafka集羣同時工做,好比爲了便於實現災難恢復,能夠在兩個機房分別部署單獨的Kafka集羣。若是其中一個機房出現故障,能夠很容易地把流量切換到另外一個正常運轉的機房。若是要爲地理相近的客戶提供低延時的消息服務,而主機房離客戶很遠時,能夠在靠近客戶的機房部署一套Kafka集羣,服務於客戶,從而提供低延時的服務。
爲了在多個Kafka集羣間實現數據同步,Kafka提供了跨集羣數據鏡像工具MirrorMaker。一般,數據在單個集羣下不一樣節點之間的拷貝稱爲備份,而數據在集羣間的拷貝稱爲鏡像(Mirroring)。
MirrorMaker本質是一個消費者+生產者的程序。消費者負責從源集羣(Source Cluster)消費數據,生產者負責向目標集羣(Target Cluster)發送消息。整個鏡像流程以下:
Kafka快速入門(五)——Kafka管理
MirrorMaker鏈接的源集羣和目標集羣,會實時同步消息。實際場景中,用戶會部署多套Kafka集羣,用於實現不一樣的目的。
Kafka快速入門(五)——Kafka管理
源集羣負責主要的業務處理,目標集羣1能夠用於執行數據分析,目標集羣2則充當源集羣的熱備份。

二、MirrorMaker腳本工具

Kafka默認提供MirrorMaker命令行工具kafka-mirror-maker腳本,其常見用法是指定生產者配置文件、消費者配置文件、線程數以及要執行數據鏡像的TOPIC正則表達式。
kafka-mirror-maker.sh --consumer.config config/consumer.properties --producer.config config/producer.properties --num.streams 8 --whitelist ".*"
--consumer.config:指定MirrorMaker中消費者的配置文件地址,最主要的配置項是bootstrap.servers,即MirrorMaker從哪一個Kafka集羣讀取消息。因爲MirrorMaker有可能在內部建立多個消費者實例並使用消費者組機制,所以還須要設置group.id參數。另外,須要配置auto.offset.reset=earliest,不然MirrorMaker只會拷貝在其啓動後到達源集羣的消息。
--producer.config:指定MirrorMaker內部生產者組件的配置文件地址。必須顯式地指定參數bootstrap.servers,配置拷貝的消息要發送到的目標集羣。
--num.streams:MirrorMaker要建立多少個KafkaConsumer實例,會在後臺建立並啓動多個線程,每一個線程維護專屬的消費者實例。
--whitelist:接收一個正則表達式。全部匹配該正則表達式的TOPIC都會被自動地執行鏡像。「.*」表示要同步源集羣上的全部TOPIC。

三、跨集羣數據鏡像實例

(1)配置生產者和消費者
生產者配置文件producer.config:

producer.properties:
bootstrap.servers=kafka-test1:9092

消費者配置文件consumer.config:

consumer.properties:
bootstrap.servers=kafka-test2:9092
group.id=mirrormaker
auto.offset.reset=earliest

(2)MirrorMaker工具啓動
kafka-mirror-maker.sh --producer.config producer.config --consumer.config consumer.config --num.streams 4 --whitelist ".*"
若是MirrorMaker內部消費者會使用輪詢策略(Round-robin)來爲消費者實例分配分區,須要consumer.properties文件中增長配置:
partition.assignment.strategy=org.apache.kafka.clients.consumer.RoundRobinAssignor
(3)結果驗證
kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list kafka-test:9092 --topic test --time -2
-1和-2分別表示獲取某分區最新的位移和最先的位移,兩個位移值的差值就是分區當前的消息數。
MirrorMaker在執行消息鏡像的過程當中,若是發現要同步的Topic在目標集羣上不存在,會根據目標集羣的Kafka Broker端參數num.partitions和default.replication.factor默認值,自動將Topic建立出來。在生產環境中,推薦提早把要同步的全部主題按照源集羣上的規格在目標集羣上地建立出來,避免在源集羣某個分區的消息同步到目標集羣后位於其它分區中。MirrorMaker默認會同步內置Topic,在鏡像位移主題時,若是發現目標集羣還沒有建立位移主題,會根據Broker端參數offsets.topic.num.partitions和offsets.topic.replication.factor來建立位移Topic,默認配置是50個分區,每一個分區3個副本。在Kafka 0.11.0.0版本前,Kafka不會嚴格依照offsets.topic.replication.factor參數值,即若是設置offsets.topic.replication.factor參數值爲3,而當前存活的Broker數量少於3,位移主題依然能被成功建立,副本數會取offsets.topic.replication.factor參數值和存活Broker數之間的較小值。從Kafka 0.11.0.0版本開始,Kafka會嚴格遵照設定offsets.topic.replication.factor參數值,若是發現存活Broker數量小於參數值,就會直接拋出異常,通知主題建立失敗。

四、其它跨集羣數據鏡像工具

(1)uReplicatorUber公司在使用MirrorMaker過程當中發現了一些缺陷,好比MirrorMaker中的消費者使用的是消費者組機制,會不可避免地會碰到不少Rebalance問題。所以,Uber研發了uReplicator,使用Apache Helix做爲集中式的TOPIC分區管理組件來管理分區的分配,而且重寫消費者程序,替代MirrorMaker下的消費者,從而避免Rebalance各類問題。(2)Brooklin Mirror Maker針對MirrorMaker工具不易實現管道化的缺陷,LinkedIn進行鍼對性的改進並對性能進行優化研發了Brooklin Mirror Maker。(3)Confluent Replicator Replicator是Confluent提供的企業級的跨集羣鏡像方案,能夠便捷地提供Kafka TOPIC在不一樣集羣間的遷移,同時還能自動在目標集羣上建立與源集羣上配置如出一轍的TOPIC,極大地方便運維管理。

相關文章
相關標籤/搜索