在使用Kafka過程當中,有時常常須要查看一些消費者的狀況、Kafka健康情況、臨時查看、同步一些數據,又因爲Kafka只是用來作流式存儲,又沒有像Mysql或者Redis提供方便的查詢方法查看數據。只能經過在命令行執行Kafka 腳本方式操做kafka(固然也有一些第三方的kafka監控工具),這裏就主要收集一些經常使用的Kafka命令。html
在看到 kafka ISR 副本時,實在忍不住就多扯了一點背後的原理,將Kafka、Redis、ElasticSearch三者對比起來看各自的存儲模型,好比說Redis主要用來作緩存,那採用異步複製可以減小Client的時延,Redis的P2P結構註定了它採用Gossip協議傳播集羣狀態。另外,將Redis裏面的基於Raft的選舉算法與ES裏面的master選舉對比,也有助於理解分佈式系統的選舉理論。固然了,各個原理介紹都淺嘗輒止,僅是本身的一些淺見,裏面的每一點都值得仔細深究。node
./bin/kafka-consumer-groups.sh --new-consumer --bootstrap-server localhost:9092 --listgit
找到consumer group後,接下來可查看這個 consumer group的消費狀況,好比:好比是否有消費延時(LAG)、一共有多少個consumer(OWNER)、還能看到這個consumer 所消費的TOPIC 各個 分區 的消費狀況:github
./bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group GROUP_IDredis
在Kafka中數據採用多個副本存儲。主副本接收生產者、消費者的請求,從副本(replica)只從主副本那裏同步數據。算法
./bin/kafka-topics.sh --zookeeper ZK_IP:2181 --topic user_update_info_topic --describesql
Topic:user_update_info_topic PartitionCount:6 ReplicationFactor:2 Configs:retention.ms=259200000 Topic: user_update_info_topic Partition: 0 Leader: 3 Replicas: 3,2 Isr: 3,2 Topic: user_update_info_topic Partition: 1 Leader: 4 Replicas: 4,3 Isr: 4,3 Topic: user_update_info_topic Partition: 2 Leader: 0 Replicas: 0,4 Isr: 0,4 Topic: user_update_info_topic Partition: 3 Leader: 1 Replicas: 1,0 Isr: 0,1 Topic: user_update_info_topic Partition: 4 Leader: 2 Replicas: 2,1 Isr: 2,1 Topic: user_update_info_topic Partition: 5 Leader: 3 Replicas: 3,4 Isr: 3,4
在這裏根據Leader、Replicas、Isr 能發現一個Kafka Broker可能存在的一些故障(這些數字是 server.properties配置項 broker.id)。好比某個broker.id沒有出如今Isr中,可能這臺節點上的副本同步出現了問題。bootstrap
這裏介紹一個Kafka的ISR機制:緩存
要保證數據的可靠性,數據會存儲多份,即多個副本。引入多個副本會帶來2個難題:一是各個副本之間的數據如何保證一致?二是對Client寫性能(寫操做)的影響?由於數據有多份時,Client寫入一條消息,何時給Client返回ACK成功確認呢?是將寫入的消息成功"同步"給了全部的副本才返回ACK,仍是隻要主副本"寫入"了就返回ACK?網絡
注意:這裏的"寫入"、"同步"都加了引號,是由於:它們只是一個抽象的描述。就拿"寫入"來講:是寫入內存便可、仍是寫入到磁盤?這裏牽涉到一系列的過程,可參考Redis persistent這篇文章。
對於Kafka而言,生產者發送消息時有一個ack參數(ack=0、ack=一、ack=all),默認狀況下ack=1意味着:發送的消息寫入主副本後,返回ACK給生產者。ack=all 意味着:發送的消息寫入全部的ISR集合中的副本後返回ACK給生產者。
某個Topic 的 ISR集合中的副本和它全部的副本是有區別的:ISR集合中的副本是"最新的",這也說明:ISR集合中的副本是動態變化的,好比當發生網絡分區時,某個節點上的從副本與主副本斷開了鏈接,這些從副本就會從ISR集合中移除。
生產者發送一條消息給Kafka主副本而且收到了ACK返回確認 和 這條消息已提交是兩回事。返回ACK確認的過程上面已經分析了,消息已提交跟參數"min.insync.replicas"息息相關。默認狀況下,min.insync.replicas=1,意味着只要主副本寫入了消息,就認爲這條消息已經提交了。
下面舉個例子說明生產者ack參數和服務端min.insync.replicas參數分別所起的做用:假設三個副本、ack=一、min.insync.replicas=2,生產者發送的消息只要寫入了主副本就會返回ACK確認給生產者,可是這條消息要成功同步到2個副本以後纔算是已提交。若此時ISR集合裏面只有主副本(一個),意味着生產者雖然收到了寫入消息的ACK確認,可是這些消息都不能被消費者消費到,由於只有已提交的消息才能被消費者消費到。
min.insync.replicas可提升消息存儲的可靠性:若是ack=一、min.insync.replicas=1,生產者將消息寫入主副本,收到返回ACK確認,但主副本還將來得及將消息同步給其餘副本時,主副本所在節點宕機了,那麼:就會丟失已經返回了ACK確認的消息。(在《Kafka實戰》一書提到:ack=all時,min.insync.replicas纔有意義,這一點須要讀源碼才能驗證了)
所以,ISR機制加上min.insync.replicas參數就能提升數據的可靠性。當ISR集合裏面副本個數大於等於min.insync.replicas時,一切正常。爲何要用ISR機制呢?
ISR機制是基於同步複製和異步複製之間的一種中間狀態,Redis Cluster 採用了異步複製策略,其Redis Sentinel 官方文檔提到,Redis 裏面沒法保證已經返回給Client ACK確認的消息不丟失,並推薦了兩種補救方案:
- Use synchronous replication (and a proper consensus algorithm to run a replicated state machine).
- Use an eventually consistent system where different versions of the same object can be merged.
而ISR機制較好地兼容了同步複製帶來的寫性能消耗和異步複製致使的數據可靠性問題。ISR集合裏面實時維護着一組副本,即同步副本,這組副本的數據與主副本數據是一致的,在Kafka中:同步副本是指:過去6s內向zookeeper發送過心跳或者過去10s內從首領副本那裏同步過消息的副本。
其實,ElasticSearch的數據副本模型也是採用的ISR機制,ES中的primary shard負責接收index操做,而後將index的數據同步給各個replica,ES裏面提供了參數:wait_for_actives_shards參數來保證當index操做寫入多少個replica後才返回ACK給Client。
Kafka和ElasticSearch的數據副本模型是相似的,只是在Kafka中,生產者寫入和消費者讀取都是由主副本完成,而ES中數據寫入primary shard,可是讀取能夠讀各個replica,ES中讀操做存在 read unacknowledged 和 dirty reads。因爲ES是個搜索引擎,文檔索引到ES後,用戶關注的問題是:何時可以被搜索到?ES提供的是近實時搜索,由於文檔須要刷新成Segment才能被搜索,同時ES又提供了real time GET API,能實時獲取剛索引的文檔。
此外,ES與Kafka都有一個master節點(這裏的master是針對節點而言,不是數據副本的主從),master節點負責集羣狀態變動(待完善)。而Redis Cluster採用的是P2P結構,經過Gossip協議來傳播集羣狀態,並無master節點這樣的角色。在ES中經過Hash文檔ID(murmur3)將文檔hash到各個primary shard,而在Redis Cluster中則是 CRC 再 mod 16384 進行數據分片,以固定的"槽數目"爲中間橋樑,將鍵映射到Redis節點上,當動態增刪節點時只需遷移部分槽上的鍵。從數據分佈這一點看,ES和Redis是相似的。哈希方式進行數據分佈的優點是:不須要存儲數據分佈的元信息,hash結果均勻的話,也能保證較均勻的數據分佈。
從集羣(節點的角度)結構上看,ES和Kafka都有一個主節點(無論叫master,仍是叫controller),而Redis是去中心化P2P結構,因此在傳播集羣狀態結構時,ES採用的是兩階段提交協議,Kafka的我沒有去研究,而Redis採用的是Gossip協議。在討論"主從"時,要明確指出是基於節點、仍是基於數據副本?在ES中,一個節點上既可存儲主副本(primary shard),也可存儲從副本(replica),ES的 shard allocation 策略是不會把同一個索引的主副本、從副本放在一個節點上,而是說:索引A的主副本在節點1上,索引B的從副本也在節點1上。而對於Redis集羣而言,一個主節點存儲一部分鍵空間上的數據,這個主節點能夠有若干個從節點,這些從節點從主節點上異步複製數據作副本備份。
ES的master節點(master-eligible)既可用來存儲數據,也可只專心做爲master(在配置文件中將node.data配置爲false)。所以,ES集羣中的節點角色比Redis Cluster要多得多(master-eligible node、data node、coordinating only node)。
因爲Kafka的流式特徵,在寫代碼的時候須要先知道一下消息的格式。用下面的命令就能夠查看一條Kafka Topic上的消息。
./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic TOPIC--from-beginning --max-messages 1
有個時候,生產環境上的Kafka集羣在本地開發環境下不能直接鏈接致使測試Debug不方便,可採用mirror-make將一部分生產環境上的數據同步到測試環境作測試,~^~
./bin/kafka-mirror-maker.sh --consumer.config ./config/mirror-maker-consumer.properties --producer.config ./config/mirror-maker-producer.properties --whitelist "TOPIC_NAME"
基本的mirror-maker-producer.properties配置參數以下:
bootstrap.servers=測試環境KafkaIP:9092 acks=1 linger.ms=100 batch.size=16384 retries=3
基本的mirror-maker-consumer.properties配置參數以下:
bootstrap.servers=生產環境上的kafka集羣BOOT_STRAP_SERVERS group.id=mirror-maker auto.offset.reset=earliest enable.auto.commit=true auto.commit.interval.ms=1000