kafka學習指南(總結版)

版本介紹

目前最新版本爲2.3(20190808更新)。html

從使用上來看,以0.9爲分界線,0.9開始再也不區分高級(至關於mysql binlog的GTID,只須要跟topic打交道,服務器自動管理偏移量和負載均衡)/低級消費者API(至關於mysql binlog的文件+position,直接和分區以及偏移量打交道)。java

從兼容性上來看,以0.8.x爲分界線,0.8.x不兼容之前的版本。node

整體拓撲架構

從上可知:mysql

一、生產者不須要訪問zookeeper(0.8.x版本的kafka consumer直連zk獲得偏移量信息,以後的版本直接從cluster獲取,因此這兩個版本的API並不兼容,上圖是0.8x的結構,0.9.x以及以後略有失真)。c++

二、消費者fetch消息、生產者發佈消息老是向leader節點發請求,不會發送給follower(broker之間,而不是broker和客戶端之間協調複製)。git

三、和rocketmq同樣,爲了線性提升性能,每一個topic被分爲partition(跟數據庫的分庫分表同樣的道理,對業務而言透明,屬於技術策略,不是業務策略),每一個partition只能被相同消費組的任何一個成員消費(因此若是topic中的message不要求有序消費的話,partition是在大流量下提高性能的關鍵機制),topic的分區數量(默認是1)可經過./kafka-topics.sh –zookeeper localhost:2181 -alter –partitions 5 –topic userService修改,其合理值的設置能夠參考https://blog.csdn.net/kwengelie/article/details/51150114。程序員

四、kafka 0.8.x使用zk存儲每一個consumer-group在每一個topic每一個partition的點位,0.9版本開始存儲在專門的topic中,該topic名爲"__consumer_offset",採用日誌壓縮存儲,也就是僅存儲每一個key的最新值,而非全部。 github

五、每一個topic本地有一個local log,broker會持續順序寫入。web

六、每條消息能夠有key,也能夠沒有。有的話,用於肯定消息發往哪一個parition,不然就是輪詢機制,java中是對key應用hash(實際爲了重複消費的問題,通常會設置key),每一個分區內的記錄是保證有序的,因此選擇合適的key可以將串行轉爲並行,這個須要很是理解業務邏輯要求,不少時候,嚴格遞增並不是必須(OLTP更是如此,能夠根據產品、客戶、商家、甚至某一次活動),只是實現簡單而已。須要記住的是:是生產者而非broker決定去哪一個分區。面試

七、在replicas模式下,一致性遵循的是全一致性模式,而非過半模式,以下:

ISRs見下文所述。一個topic中的不一樣parition能夠爲不一樣broker中的leader,這種模式能夠提升性能,由於讀寫都是leader負責。committed記錄所在的截止位置也成爲高水位"High Watermark"。雖然使用角度不直接care,可是partition是HA和擴展性的真正落地之處。

kafka自身總體架構

這裏要說起的是controller,其中一個broker會被做爲controller,controller主要負責處理kafka集羣範圍內的事件,包括leader選舉、topic變化、paritions副本數跟蹤、broker的變化等,主要和zk通訊。

kafka controller架構以下:

 其職責能夠參考https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Controller+Internals。對controller工做原理的解析,能夠參考https://www.cnblogs.com/huxi2b/p/6980045.html,這篇文章總結的仍是比較到位的。

綜合起來,kafka的核心要素包括:brokers, topics, logs, partitions, controller, message, cluster。

broker內部機制

消息發送過程

那客戶端是怎麼知道哪一個broker是leader呢?由於每一個broker都緩存了元數據,因此在鏈接初始創建的時候,客戶端能夠從任何一個broker獲取每一個topic的元數據信息,以下:

 

消息消費過程

 

核心高級API(不容許用戶控制消費者和broker的交互過程)

ConsumerConnector

KafkaStream

ConsumerConfig

低級API則容許控制各個交互過程,好比從哪裏開始讀以及在客戶端維護點位,rocketmq實現其實採用的就是高層和底層結合的API,也就是kafka 0.9以後合併的api版本。

底層API的主要接口是SimpleConsumer。

消費者group和partition的關係 

每一個消費者group會記錄本身在每一個分區中的消費進度(該信息記錄在專門的topic log中,見上文)。一個分區只能由被每一個消費者group中的任意一個消費者成員消費,由於通常狀況下微服務都是集羣部署,因此這會致使N-1個微服務節點中的topic listener空跑,這是須要注意的,可是若是當前消費者所在的服務掛了,kafka會自動選擇其中一個剩下的consumer,可是若是已經消費可是ack未被kafka收到,其它consumer接管時就會重複消費,要注意冪等。想要一個topic被消費者group中的成員並行消費的話,就須要配置不低於集羣成員數的partition。簡單的說,就是管理粒度是消費者組(在其餘MQ中稱訂閱者)和topic,底層消息接收粒度分區和消費者。

不只集羣微服務能夠從多partition受益,單JVM也能夠收益,只要啓動多個獨立的線程,每一個線程都做爲topic的consumer就能夠併發處理,這主要用於SMP服務器的時候,因此當消息處理須要必定時間或消息TPS大的時候,都應該使用多parition。

ISRs

An ISR is an in-sync replica. If a leader fails, an ISR is picked to be a new leader. 

Topic Log Compaction

kafka的topic log會持續增加,因此爲了保持穩定,應該按期回收。這涉及到兩方面:消息的key是否會相同,它們的策略是不一樣的。Log Compaction主要用於key會相同的狀況,也就是非UUID做爲消息的鍵,不然就沒有意義了。其機制是根據消息保留的時間或文件大小來刪除key相同的歷史value,以下所示:

可知,歷史版本被清了。啓用compact後,topic log分爲了head和tail部分,只有tail的纔會被壓縮,可是刪除還要根據其它配置決定,以下。

 

kafka參數min.compaction.lag.ms控制消息至少過多久纔會被壓縮,delete.retention.ms控制多久會被刪除,log.cleanup.policy=compact控制啓用壓縮,因此消費者只要在此以內進行消費,就能夠保證至少看到最新記錄(由於producer可能又寫入了,因此至少會看到最新,也許更多)。

消息保留時長

每一個topic能夠基於時間或topic log的大小聲明消息的保留時間,由下列參數決定:

 

屬性名 含義 默認值
log.cleanup.polict 日誌清理保存的策略只有delete和compact兩種 delete
log.retention.hours 日誌保存的時間,能夠選擇hours,minutes和ms 168(7day)
log.retention.bytes 刪除前日誌文件容許保存的最大值(任意一個達到都會執行刪除) -1
log.segment.delete.delay.ms 日誌文件被真正刪除前的保留時間 60000
log.cleanup.interval.mins 每隔一段時間多久調用一次清理的步驟 10
log.retention.check.interval.ms 週期性檢查是否有日誌符合刪除的條件(新版本使用) 300000

ACK一致性級別

生產者(如今面試,咱們都問如何保證發出的消息不丟失)能夠經過ack設置數據一致性要求(和mysql機制相似)。ack=0(不須要ACK,至多一次), ack=all(leader和全部follows都寫入成功,默認), ack=1(leader成功便可)。

能夠經過在producer properties中設置,以下:

props.put("acks", "0");
props.put("acks", "1");
props.put("acks", "all");

早期版本的生產者不支持「精確一次」的概念,從Kafka 0.11.0支持精確一次投遞概念,它是經過引入生產者消息冪等+原子事務概念實現的,能夠參考https://dzone.com/articles/exactly-once-semantics-with-apache-kafka-1。

在消費者層面,kafka支持至多一次和至少一次兩種模式。

To implement 「at-most-once」 consumer reads a message, then saves its offset in the partition by sending it to the broker, and finally process the message. The issue with 「at-most-once」 is a consumer could die after saving its position but before processing the message. Then the consumer that takes over or gets restarted would leave off at the last position and message in question is never processed.

To implement 「at-least-once」 the consumer reads a message, process messages, and finally saves offset to the broker. The issue with 「at-least-once」 is a consumer could crash after processing a message but before saving last offset position. Then if the consumer is restarted or another consumer takes over, the consumer could receive the message that was already processed. The 「at-least-once」 is the most common set up for messaging, and it is your responsibility to make the messages idempotent, which means getting the same message twice will not cause a problem (two debits).

To implement 「exactly once」 on the consumer side, the consumer would need a two-phase commit between storage for the consumer position, and storage of the consumer’s message process output. Or, the consumer could store the message process output in the same location as the last offset.

kafka僅支持前兩種消費者ACK,第三種須要用戶本身實現,通常你們都是用第二種+冪等來實現,也就是消費者自身的一致性,經過冪等+ACK保證,就不重複闡述了。

經過以下能夠保證手工管理ack提交:

props.put("enable.auto.commit", "false");
try {
  while (running) {
    ConsumerRecords<String, String> records = consumer.poll(1000);
    for (ConsumerRecord<String, String> record : records)
      System.out.println(record.offset() + ": " + record.value());

    try {
      consumer.commitSync();
    } catch (CommitFailedException e) {
      // application specific failure handling
    }
  }
} finally {
  consumer.close();
}

在自動提交模式下,提交間隔由auto.commit.interval.ms肯定。各類提交模式的使用能夠參考https://www.confluent.io/blog/tutorial-getting-started-with-the-new-apache-kafka-0-9-consumer-client/。

Kafka的生態

MirrorMaker是kafka集羣之間同步的組件,本質上是一個生產者+消費者,以下:

如上所示。

Kafka REST Proxy and Confluent Schema Registry

 

kafka在zk中的存儲

爲了顯示方便,LZ設置了chroot爲localKakfa,以下:

各個zk節點的含義以下示意圖所示,其中kafka01就是chroot,在kafka的server.properties中設置,加載zookeeper.connect後便可。如zookeeper.connect=localhost:2181/localKafka。

 

kafka監控管理

主流的三種kafka監控程序分別爲:

  • 一、Kafka Web Conslole,從https://github.com/claudemamo/kafka-web-console下載源碼編譯,https://pan.baidu.com/s/1cJ2OefPqlxfWzTHElG6AjA 下載編譯好的,提取碼:f8xz 
  • 二、Kafka Manager
  • 三、KafkaOffsetMonitor

以咱們使用的KafkaOffsetMonitor爲例,KafkaOffsetMonitor是Kafka的一款客戶端消費監控工具,用來實時監控Kafka服務的Consumer以及它們所在的Partition中的Offset,咱們能夠瀏覽當前的消費者組,而且每一個Topic的全部Partition的消費狀況均可以一目瞭然。KafkaOffsetMonitor託管在Github上,能夠經過Github下載。下載地址:https://github.com/quantifind/KafkaOffsetMonitor/releases,也能夠從baidu網盤下載(內網的話,要使用這個,不然會缺乏從cdn加載的js)。

能夠經過java -cp KafkaOffsetMonitor-assembly-0.2.0.jar     com.quantifind.kafka.offsetapp.OffsetGetterWeb     --zk 10.20.30.10:2181     --port 8088     --refresh 10.seconds     --retain 2.days啓動,各配置含義能夠參考github。

常見問題

如何經過java api獲取全部topic?

消費和如何一次性訂閱多個topic? 

如何查看全部的topic?

[root@hs-test-10-20-30-11 kafka]# bin/kafka-topics.sh --zookeeper 10.20.30.10:2181 --list
global
test

查看特定topic的配置?

[root@hs-test-10-20-30-11 kafka]# bin/kafka-topics.sh --zookeeper 10.20.30.10:2181 --topic global --describe
Topic:global PartitionCount:1 ReplicationFactor:1 Configs:
Topic: global Partition: 0 Leader: 0 Replicas: 0 Isr: 0

生產者鏈接的時候報了下列錯誤

WARN [Producer clientId=console-producer] Connection to node -1 could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)

有兩個緣由:一、kafka沒有啓動;二、鏈接串使用了非conf/server.properties裏面的LISTENERS參數的值。

如何查看全部的消費者?

新的方式,也就是否是使用基於zk的客戶端(kafka.consumer.Consumer.createJavaConsumerConnector、內部是bootstrap)。

[root@hs-test-10-20-30-11 kafka]# bin/kafka-consumer-groups.sh --new-consumer --bootstrap-server 10.20.30.11:9092 --list
Note: This will only show information about consumers that use the Java consumer API (non-ZooKeeper-based consumers).

老的方式:基於zk的客戶端(kafka.javaapi.consumer.ZookeeperConsumerConnector,已經deprecated)。

[root@hs-test-10-20-30-11 kafka]# bin/kafka-consumer-groups.sh --zookeeper 10.20.30.10:2181 --list
Note: This will only show information about consumers that use ZooKeeper (not those using the Java consumer API).

AAA
TA50-Aggr-Logger-ConsumerGroup
console-consumer-23104
console-consumer-37858

log4j-kafka配置

增長jar包依賴: 

        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka_2.12</artifactId>
            <version>0.11.0.3</version>
        </dependency>

配置log4j2.xml,以下:

logger增長kafka appender。

        <Root level="INFO" additivity="false">
            <AppenderRef ref="Console"/>
            <AppenderRef ref="KAFKA"/>
            <AppenderRef ref="app_error" />
        </Root>

增長kafka appender。

    <Appenders>
        <!-- 輸出錯誤日誌到Kafka -->
        <Kafka name="KAFKA" topic="bomp">
            <ThresholdFilter level="error" onMatch="DENY" onMismatch="NEUTRAL"/>
            <ThresholdFilter level="trace" onMatch="ACCEPT" onMismatch="DENY"/>
            <PatternLayout pattern="%d{yyyy-MM-dd HH:mm:ss,SSS}:%4p %t (%F:%L) - %m%n" />
            <Property name="bootstrap.servers">10.20.30.11:9092</Property>
        </Kafka>
    </Appenders>

這樣log4j配置kafka就完成了。對於c++,可使用librdkafka庫,https://docs.confluent.io/2.0.0/clients/librdkafka/index.html,後續會專門出文講解。

相關問題

消費者報:

2018-09-17 14:10:07.768 WARN 130400 --- [r-finder-thread] kafka.client.ClientUtils$ : Fetching topic metadata with correlation id 0 for topics [Set(test)] from broker [BrokerEndPoint(0,10.20.30.11,9092)] failed

java.nio.channels.ClosedChannelException: null
at kafka.network.BlockingChannel.send(BlockingChannel.scala:112) ~[kafka_2.12-0.11.0.3.jar:na]
at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:80) ~[kafka_2.12-0.11.0.3.jar:na]
at kafka.producer.SyncProducer.doSend(SyncProducer.scala:79) ~[kafka_2.12-0.11.0.3.jar:na]
at kafka.producer.SyncProducer.send(SyncProducer.scala:124) ~[kafka_2.12-0.11.0.3.jar:na]
at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:61) [kafka_2.12-0.11.0.3.jar:na]
at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:96) [kafka_2.12-0.11.0.3.jar:na]
at kafka.consumer.ConsumerFetcherManager$LeaderFinderThread.doWork(ConsumerFetcherManager.scala:72) [kafka_2.12-0.11.0.3.jar:na]
at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:64) [kafka_2.12-0.11.0.3.jar:na]

解決方法:在server.properties裏面設置下advertised.host.name,重啓試試看。參考https://stackoverflow.com/questions/30606447/kafka-consumer-fetching-metadata-for-topics-failed

zk日誌中報:

2018-10-08 14:13:28,297 [myid:] - INFO [ProcessThread(sid:0 cport:2181)::PrepRequestProcessor@653] - Got user-level KeeperException when processing sessionid:0x100147743c10000 type:setData cxid:0xc8 zxid:0x53 txntype:-1 reqpath:n/a Error Path:/config/topics/uft_trade Error:KeeperErrorCode = NoNode for /config/topics/uft_trade
2018-10-08 14:13:28,302 [myid:] - INFO [ProcessThread(sid:0 cport:2181)::PrepRequestProcessor@653] - Got user-level KeeperException when processing sessionid:0x100147743c10000 type:create cxid:0xc9 zxid:0x54 txntype:-1 reqpath:n/a Error Path:/config/topics Error:KeeperErrorCode = NodeExists for /config/topics

解決方法:待排查。

spring boot kafka客戶端在某虛擬機服務器(物理機一直運行未發生)上運行一段時間後,瞬間cpu system 80-90%,大量下列日誌:

2018-10-09 13:54:57,713  WARN ClientCnxn:1108 - Client session timed out, have not heard from server in 2682ms for sessionid 0x100175687960002
2018-10-09 13:54:57,904  WARN ClientCnxn:1108 - Client session timed out, have not heard from server in 2672ms for sessionid 0x100175687960004
2018-10-09 13:54:58,621  WARN ClientCnxn:1108 - Client session timed out, have not heard from server in 2675ms for sessionid 0x100175687960003
2018-10-09 13:54:57,232  WARN ClientCnxn:1108 - Client session timed out, have not heard from server in 2700ms for sessionid 0x100175687960007
2018-10-09 13:55:09,812  INFO ClientCnxn:1156 - Client session timed out, have not heard from server in 2672ms for sessionid 0x100175687960004, closing socket connection and attempting reconn
ect
2018-10-09 13:55:02,942  WARN ClientCnxn:1108 - Client session timed out, have not heard from server in 2702ms for sessionid 0x100175687960008
2018-10-09 13:55:09,755  INFO ClientCnxn:1156 - Client session timed out, have not heard from server in 2675ms for sessionid 0x100175687960003, closing socket connection and attempting reconn
ect
2018-10-09 13:55:09,789  INFO ClientCnxn:1156 - Client session timed out, have not heard from server in 2682ms for sessionid 0x100175687960002, closing socket connection and attempting reconn
ect
2018-10-09 13:55:18,677  WARN ClientCnxn:1108 - Client session timed out, have not heard from server in 2675ms for sessionid 0x100175687960005
2018-10-09 13:55:11,752  WARN ClientCnxn:1108 - Client session timed out, have not heard from server in 20016ms for sessionid 0x100175687960001
2018-10-09 13:55:17,709  WARN ClientCnxn:1108 - Client session timed out, have not heard from server in 2678ms for sessionid 0x100175687960006
2018-10-09 13:55:12,779  INFO ClientCnxn:1156 - Client session timed out, have not heard from server in 2700ms for sessionid 0x100175687960007, closing socket connection and attempting reconn
ect
2018-10-09 13:55:20,634  INFO ClientCnxn:1156 - Client session timed out, have not heard from server in 2702ms for sessionid 0x100175687960008, closing socket connection and attempting reconn
ect
2018-10-09 13:55:22,178  INFO ClientCnxn:1156 - Client session timed out, have not heard from server in 20016ms for sessionid 0x100175687960001, closing socket connection and attempting recon
nect
2018-10-09 13:58:10,244  INFO ZkClient:713 - zookeeper state changed (Disconnected)
2018-10-09 13:58:10,240  INFO ZkClient:713 - zookeeper state changed (Disconnected)
2018-10-09 13:58:10,241  INFO ZkClient:713 - zookeeper state changed (Disconnected)
2018-10-09 13:58:10,240  INFO ClientCnxn:1156 - Client session timed out, have not heard from server in 2675ms for sessionid 0x100175687960005, closing socket connection and attempting reconn
ect
2018-10-09 13:58:10,243  INFO ClientCnxn:1156 - Client session timed out, have not heard from server in 2678ms for sessionid 0x100175687960006, closing socket connection and attempting reconn
ect
2018-10-09 13:58:11,107  INFO ZkClient:713 - zookeeper state changed (Disconnected)
2018-10-09 13:58:40,384  INFO ClientCnxn:1032 - Opening socket connection to server localhost/127.0.0.1:2181. Will not attempt to authenticate using SASL (unknown error)
2018-10-09 13:58:40,383  INFO ClientCnxn:1032 - Opening socket connection to server localhost/127.0.0.1:2181. Will not attempt to authenticate using SASL (unknown error)
2018-10-09 13:58:40,379  INFO ClientCnxn:1032 - Opening socket connection to server localhost/127.0.0.1:2181. Will not attempt to authenticate using SASL (unknown error)
2018-10-09 13:58:40,378  INFO ZkClient:713 - zookeeper state changed (Disconnected)
2018-10-09 13:58:40,378  INFO ZkClient:713 - zookeeper state changed (Disconnected)
2018-10-09 13:58:40,377  INFO ClientCnxn:1032 - Opening socket connection to server localhost/127.0.0.1:2181. Will not attempt to authenticate using SASL (unknown error)
2018-10-09 13:59:22,082  INFO ClientCnxn:876 - Socket connection established to localhost/127.0.0.1:2181, initiating session
2018-10-09 13:59:22,084  INFO ClientCnxn:876 - Socket connection established to localhost/127.0.0.1:2181, initiating session
2018-10-09 13:59:22,099  INFO ClientCnxn:876 - Socket connection established to localhost/127.0.0.1:2181, initiating session
2018-10-09 13:59:22,108  INFO ClientCnxn:876 - Socket connection established to localhost/127.0.0.1:2181, initiating session
2018-10-09 13:59:22,130  INFO ZkClient:713 - zookeeper state changed (Disconnected)
2018-10-09 13:59:23,382  INFO ClientCnxn:1032 - Opening socket connection to server localhost/127.0.0.1:2181. Will not attempt to authenticate using SASL (unknown error)
2018-10-09 13:59:23,412  INFO ZkClient:713 - zookeeper state changed (Expired)
2018-10-09 13:59:23,412  INFO ZkClient:713 - zookeeper state changed (Expired)
2018-10-09 13:59:23,443  INFO ZooKeeper:438 - Initiating client connection, connectString= sessionTimeout=500 watcher=org.I0Itec.zkclient.ZkClient@8646db9
2018-10-09 13:59:23,411  WARN ClientCnxn:1285 - Unable to reconnect to ZooKeeper service, session 0x100175687960001 has expired
2018-10-09 13:59:32,474  INFO ZkClient:713 - zookeeper state changed (Disconnected)
2018-10-09 13:59:23,404  WARN ClientCnxn:1285 - Unable to reconnect to ZooKeeper service, session 0x100175687960007 has expired
2018-10-09 13:59:23,390  INFO ZkClient:713 - zookeeper state changed (Expired)
2018-10-09 13:59:32,477  INFO ZooKeeper:438 - Initiating client connection, connectString= sessionTimeout=500 watcher=org.I0Itec.zkclient.ZkClient@4671e53b
2018-10-09 13:59:23,390  WARN ClientCnxn:1285 - Unable to reconnect to ZooKeeper service, session 0x100175687960008 has expired
2018-10-09 13:59:23,390  INFO ZkClient:713 - zookeeper state changed (Expired)
2018-10-09 13:59:32,477  INFO ZooKeeper:438 - Initiating client connection, connectString= sessionTimeout=500 watcher=org.I0Itec.zkclient.ZkClient@6a1aab78
2018-10-09 13:59:23,389  WARN ClientCnxn:1285 - Unable to reconnect to ZooKeeper service, session 0x100175687960004 has expired
2018-10-09 13:59:32,417  INFO ClientCnxn:1032 - Opening socket connection to server localhost/127.0.0.1:2181. Will not attempt to authenticate using SASL (unknown error)
2018-10-09 13:59:23,380  INFO ClientCnxn:1032 - Opening socket connection to server localhost/127.0.0.1:2181. Will not attempt to authenticate using SASL (unknown error)
2018-10-09 13:59:23,446  INFO ZooKeeper:438 - Initiating client connection, connectString= sessionTimeout=30000 watcher=org.I0Itec.zkclient.ZkClient@dc24521
2018-10-09 13:59:41,829  INFO ClientCnxn:1154 - Unable to reconnect to ZooKeeper service, session 0x100175687960004 has expired, closing socket connection
2018-10-09 13:59:41,832  INFO ZkClient:936 - Waiting for keeper state SyncConnected
2018-10-09 13:59:41,829  INFO ClientCnxn:1154 - Unable to reconnect to ZooKeeper service, session 0x100175687960008 has expired, closing socket connection
2018-10-09 13:59:41,831  INFO ClientCnxn:876 - Socket connection established to localhost/127.0.0.1:2181, initiating session
2018-10-09 13:59:41,830  INFO ClientCnxn:1154 - Unable to reconnect to ZooKeeper service, session 0x100175687960007 has expired, closing socket connection
2018-10-09 13:59:41,830  INFO ClientCnxn:1154 - Unable to reconnect to ZooKeeper service, session 0x100175687960001 has expired, closing socket connection
2018-10-09 13:59:41,860  INFO ClientCnxn:876 - Socket connection established to localhost/127.0.0.1:2181, initiating session
2018-10-09 13:59:42,585  INFO ClientCnxn:876 - Socket connection established to localhost/127.0.0.1:2181, initiating session
2018-10-09 13:59:42,810  INFO ClientCnxn:1032 - Opening socket connection to server localhost/127.0.0.1:2181. Will not attempt to authenticate using SASL (unknown error)
2018-10-09 13:59:42,835  INFO ClientCnxn:876 - Socket connection established to localhost/127.0.0.1:2181, initiating session
2018-10-09 14:00:31,813  WARN ClientCnxn:1108 - Client session timed out, have not heard from server in 48978ms for sessionid 0x100175687960002
2018-10-09 14:00:31,825  WARN ClientCnxn:1108 - Client session timed out, have not heard from server in 49644ms for sessionid 0x100175687960005
2018-10-09 14:00:31,825  INFO ClientCnxn:1156 - Client session timed out, have not heard from server in 49644ms for sessionid 0x100175687960005, closing socket connection and attempting recon
nect
2018-10-09 14:00:31,827  WARN ClientCnxn:1108 - Client session timed out, have not heard from server in 49968ms for sessionid 0x100175687960006
2018-10-09 14:00:31,827  INFO ClientCnxn:1156 - Client session timed out, have not heard from server in 49968ms for sessionid 0x100175687960006, closing socket connection and attempting recon
nect
2018-10-09 14:00:31,842  WARN ClientCnxn:1108 - Client session timed out, have not heard from server in 50011ms for sessionid 0x100175687960003
2018-10-09 14:00:31,868  INFO ClientCnxn:1156 - Client session timed out, have not heard from server in 50011ms for sessionid 0x100175687960003, closing socket connection and attempting recon
nect
2018-10-09 14:00:31,853  INFO ClientCnxn:1156 - Client session timed out, have not heard from server in 48978ms for sessionid 0x100175687960002, closing socket connection and attempting recon
nect
2018-10-09 14:00:31,885  INFO ClientCnxn:1032 - Opening socket connection to server localhost/127.0.0.1:2181. Will not attempt to authenticate using SASL (unknown error)
2018-10-09 14:00:31,886  INFO ClientCnxn:876 - Socket connection established to localhost/127.0.0.1:2181, initiating session
2018-10-09 14:00:31,887  INFO ClientCnxn:1032 - Opening socket connection to server localhost/127.0.0.1:2181. Will not attempt to authenticate using SASL (unknown error)
2018-10-09 14:00:31,887  INFO ClientCnxn:876 - Socket connection established to localhost/127.0.0.1:2181, initiating session
2018-10-09 14:00:31,907  INFO ClientCnxn:519 - EventThread shut down for session: 0x100175687960001
2018-10-09 14:00:31,907  INFO ClientCnxn:519 - EventThread shut down for session: 0x100175687960008
2018-10-09 14:00:31,908  INFO ClientCnxn:519 - EventThread shut down for session: 0x100175687960004
2018-10-09 14:00:31,944  INFO ClientCnxn:519 - EventThread shut down for session: 0x100175687960007
2018-10-09 14:00:33,391  INFO ClientCnxn:1032 - Opening socket connection to server localhost/127.0.0.1:2181. Will not attempt to authenticate using SASL (unknown error)
2018-10-09 14:00:33,396  INFO ClientCnxn:1032 - Opening socket connection to server localhost/127.0.0.1:2181. Will not attempt to authenticate using SASL (unknown error)
2018-10-09 14:00:33,424  WARN ClientCnxn:1108 - Client session timed out, have not heard from server in 1336ms for sessionid 0x0
2018-10-09 14:00:33,430  INFO ClientCnxn:1299 - Session establishment complete on server localhost/127.0.0.1:2181, sessionid = 0x10017568796000b, negotiated timeout = 30000
2018-10-09 14:00:33,517  INFO ClientCnxn:876 - Socket connection established to localhost/127.0.0.1:2181, initiating session
2018-10-09 14:00:33,516  INFO ZkClient:713 - zookeeper state changed (SyncConnected)
2018-10-09 14:00:34,399  INFO ClientCnxn:1032 - Opening socket connection to server localhost/127.0.0.1:2181. Will not attempt to authenticate using SASL (unknown error)
2018-10-09 14:00:34,354  INFO ClientCnxn:1156 - Client session timed out, have not heard from server in 1336ms for sessionid 0x0, closing socket connection and attempting reconnect
2018-10-09 14:00:34,433  INFO ClientCnxn:876 - Socket connection established to localhost/127.0.0.1:2181, initiating session
2018-10-09 14:00:34,475  INFO ClientCnxn:1032 - Opening socket connection to server localhost/127.0.0.1:2181. Will not attempt to authenticate using SASL (unknown error)
2018-10-09 14:00:34,476  INFO ClientCnxn:876 - Socket connection established to localhost/127.0.0.1:2181, initiating session
2018-10-09 14:00:34,485  WARN ClientCnxn:1108 - Client session timed out, have not heard from server in 968ms for sessionid 0x0
2018-10-09 14:00:34,488  INFO ClientCnxn:1156 - Client session timed out, have not heard from server in 968ms for sessionid 0x0, closing socket connection and attempting reconnect
2018-10-09 14:00:37,472  INFO ClientCnxn:1032 - Opening socket connection to server localhost/127.0.0.1:2181. Will not attempt to authenticate using SASL (unknown error)
2018-10-09 14:00:37,484  INFO ClientCnxn:876 - Socket connection established to localhost/127.0.0.1:2181, initiating session
2018-10-09 14:00:37,487  INFO ClientCnxn:876 - Socket connection established to localhost/127.0.0.1:2181, initiating session
2018-10-09 14:00:37,488  INFO ClientCnxn:1032 - Opening socket connection to server localhost/127.0.0.1:2181. Will not attempt to authenticate using SASL (unknown error)
2018-10-09 14:00:37,489  INFO ClientCnxn:876 - Socket connection established to localhost/127.0.0.1:2181, initiating session
2018-10-09 14:00:37,479  WARN ClientCnxn:1285 - Unable to reconnect to ZooKeeper service, session 0x100175687960006 has expired
2018-10-09 14:00:37,495  INFO ClientCnxn:1154 - Unable to reconnect to ZooKeeper service, session 0x100175687960006 has expired, closing socket connection
2018-10-09 14:00:37,447  INFO ZkClient:713 - zookeeper state changed (Expired)
2018-10-09 14:00:37,479  INFO ZkClient:713 - zookeeper state changed (Expired)
2018-10-09 14:00:37,519  INFO ZooKeeper:438 - Initiating client connection, connectString= sessionTimeout=500 watcher=org.I0Itec.zkclient.ZkClient@69b0fd6f
2018-10-09 14:00:37,519  INFO ZooKeeper:438 - Initiating client connection, connectString= sessionTimeout=500 watcher=org.I0Itec.zkclient.ZkClient@4a87761d
2018-10-09 14:00:37,446  WARN ClientCnxn:1285 - Unable to reconnect to ZooKeeper service, session 0x100175687960005 has expired
2018-10-09 14:00:37,519  INFO ClientCnxn:1154 - Unable to reconnect to ZooKeeper service, session 0x100175687960005 has expired, closing socket connection
2018-10-09 14:00:37,765  INFO ZkClient:713 - zookeeper state changed (Expired)
2018-10-09 14:00:37,780  INFO ZkClient:713 - zookeeper state changed (Expired)
2018-10-09 14:00:37,780  WARN ClientCnxn:1285 - Unable to reconnect to ZooKeeper service, session 0x100175687960003 has expired
2018-10-09 14:00:37,791  INFO ClientCnxn:1154 - Unable to reconnect to ZooKeeper service, session 0x100175687960003 has expired, closing socket connection
2018-10-09 14:00:38,194  INFO ZooKeeper:438 - Initiating client connection, connectString= sessionTimeout=500 watcher=org.I0Itec.zkclient.ZkClient@3aeaafa6
2018-10-09 14:00:37,995  WARN ClientCnxn:1108 - Client session timed out, have not heard from server in 507ms for sessionid 0x0
2018-10-09 14:00:52,148  INFO ClientCnxn:1156 - Client session timed out, have not heard from server in 507ms for sessionid 0x0, closing socket connection and attempting reconnect
2018-10-09 14:00:38,198  INFO ZooKeeper:438 - Initiating client connection, connectString= sessionTimeout=500 watcher=org.I0Itec.zkclient.ZkClient@491cc5c9
2018-10-09 14:00:52,141  INFO ClientCnxn:519 - EventThread shut down for session: 0x100175687960006
2018-10-09 14:00:52,128  INFO ClientCnxn:1032 - Opening socket connection to server localhost/127.0.0.1:2181. Will not attempt to authenticate using SASL (unknown error)
2018-10-09 14:00:52,154  INFO ClientCnxn:876 - Socket connection established to localhost/127.0.0.1:2181, initiating session
2018-10-09 14:00:52,126  INFO ClientCnxn:1032 - Opening socket connection to server localhost/127.0.0.1:2181. Will not attempt to authenticate using SASL (unknown error)
2018-10-09 14:00:52,179  INFO ClientCnxn:876 - Socket connection established to localhost/127.0.0.1:2181, initiating session
2018-10-09 14:00:38,010  WARN ClientCnxn:1285 - Unable to reconnect to ZooKeeper service, session 0x100175687960002 has expired
2018-10-09 14:00:52,231  INFO ClientCnxn:1154 - Unable to reconnect to ZooKeeper service, session 0x100175687960002 has expired, closing socket connection
2018-10-09 14:00:52,683  WARN ClientCnxn:1108 - Client session timed out, have not heard from server in 504ms for sessionid 0x0
2018-10-09 14:05:12,238  INFO ClientCnxn:1032 - Opening socket connection to server localhost/127.0.0.1:2181. Will not attempt to authenticate using SASL (unknown error)
2018-10-09 14:05:12,176  INFO ClientCnxn:1032 - Opening socket connection to server localhost/127.0.0.1:2181. Will not attempt to authenticate using SASL (unknown error)
2018-10-09 14:08:21,078  INFO ClientCnxn:519 - EventThread shut down for session: 0x100175687960002
2018-10-09 14:05:12,113  WARN ClientCnxn:1108 - Client session timed out, have not heard from server in 259911ms for sessionid 0x10017568796000b
2018-10-09 14:08:21,107  INFO ClientCnxn:1156 - Client session timed out, have not heard from server in 259911ms for sessionid 0x10017568796000b, closing socket connection and attempting reco
nnect
2018-10-09 14:05:12,098  INFO ClientCnxn:519 - EventThread shut down for session: 0x100175687960003
2018-10-09 14:00:52,677  WARN ClientCnxn:1108 - Client session timed out, have not heard from server in 501ms for sessionid 0x0
2018-10-09 14:08:21,107  INFO ClientCnxn:1156 - Client session timed out, have not heard from server in 501ms for sessionid 0x0, closing socket connection and attempting reconnect

 經大概看了下帖子https://blog.csdn.net/xjping0794/article/details/77784171的內容,查看該段時間系統io,確實很高,高達50%,以下:

14時00分28秒       sda   3062.38 922268.58    670.77    301.38      5.17      1.71      0.16     49.44
14時00分28秒   ol-root   3111.77 922266.41    495.79    296.54      5.29      1.70      0.16     49.43
14時00分28秒   ol-swap     22.04      2.09    174.24      8.00      0.13      5.80      0.15      0.33
14時11分16秒       sda   5432.75 1537105.34    768.61    283.07     19.06      3.53      0.17     91.53
14時11分16秒   ol-root   5513.26 1537106.56    731.82    278.93     19.55      3.54      0.17     91.52
14時11分16秒   ol-swap      5.07      4.68     35.87      8.00      0.01      2.27      0.19      0.10

14時11分16秒       DEV       tps  rd_sec/s  wr_sec/s  avgrq-sz  avgqu-sz     await     svctm     %util
14時20分01秒       sda   2784.00 795332.59    462.60    285.85     10.89      3.93      0.18     50.09
14時20分01秒   ol-root   2827.44 795311.85    414.30    281.43     11.18      3.95      0.18     50.07
14時20分01秒   ol-swap      6.96     12.98     42.72      8.00      0.05      7.80      0.18      0.12
14時30分01秒       sda      3.13     12.42     59.59     23.04      0.00      0.57      0.44      0.14

可是這段時間沒有東西特別在運行,這就比較奇怪了,那會兒一會兒也忘了用iotop看下是哪一個進程所致。上述帖子提到的幾點是:

關於ZK日誌存放,官網給出以下建議:

Having a dedicated log devicehas a large impact on throughput and stable latencies. It is highly recommenedto dedicate a log device and set dataLogDir to point to a directory on thatdevice, and then make sure to point dataDir to a directory not residing on thatdevice.

在ZOO.CFG中增長:

      forceSync=no

默認是開啓的,爲避免同步延遲問題,ZK接收到數據後會馬上去講當前狀態信息同步到磁盤日誌文件中,同步完成後纔會應答。將此項關閉後,客戶端鏈接能夠獲得快速響應(這一點在有BMU的服務器上問題不大)。

 再看下zk服務器的日誌,差很少時間開始出現大量CancelledKeyException:

2018-10-09 13:56:36,712 [myid:] - INFO  [SyncThread:0:NIOServerCnxn@1040] - Closed socket connection for client /127.0.0.1:14926 which had sessionid 0x100175687960008
2018-10-09 13:56:43,857 [myid:] - INFO  [SyncThread:0:NIOServerCnxn@1040] - Closed socket connection for client /127.0.0.1:14924 which had sessionid 0x100175687960006
2018-10-09 13:56:49,783 [myid:] - INFO  [SyncThread:0:NIOServerCnxn@1040] - Closed socket connection for client /127.0.0.1:14919 which had sessionid 0x100175687960001
2018-10-09 13:56:49,816 [myid:] - WARN  [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:NIOServerCnxnFactory@236] - Ignoring unexpected runtime exception
java.nio.channels.CancelledKeyException
	at sun.nio.ch.SelectionKeyImpl.ensureValid(SelectionKeyImpl.java:73)
	at sun.nio.ch.SelectionKeyImpl.readyOps(SelectionKeyImpl.java:87)
	at org.apache.zookeeper.server.NIOServerCnxnFactory.run(NIOServerCnxnFactory.java:205)
	at java.lang.Thread.run(Thread.java:748)
2018-10-09 13:58:54,331 [myid:] - INFO  [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:ZooKeeperServer@941] - Client attempting to renew session 0x100175687960000 at /192.168.223.137:23459
2018-10-09 13:58:54,377 [myid:] - INFO  [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:ZooKeeperServer@686] - Invalid session 0x100175687960000 for client /192.168.223.137:23459, probably expired
2018-10-09 13:58:54,401 [myid:] - INFO  [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:NIOServerCnxnFactory@215] - Accepted socket connection from /192.168.223.137:23485
2018-10-09 13:58:54,441 [myid:] - INFO  [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:NIOServerCnxnFactory@215] - Accepted socket connection from /192.168.223.137:23494
2018-10-09 13:58:56,314 [myid:] - INFO  [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:NIOServerCnxn@1040] - Closed socket connection for client /192.168.223.137:23459 which had sessionid 0x10017
5687960000
2018-10-09 13:58:56,336 [myid:] - INFO  [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:ZooKeeperServer@941] - Client attempting to renew session 0x100175687960000 at /192.168.223.137:23485
2018-10-09 13:58:56,392 [myid:] - INFO  [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:ZooKeeperServer@686] - Invalid session 0x100175687960000 for client /192.168.223.137:23485, probably expired
2018-10-09 13:58:57,890 [myid:] - INFO  [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:NIOServerCnxnFactory@215] - Accepted socket connection from /192.168.223.137:23497
2018-10-09 13:58:59,480 [myid:] - INFO  [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:NIOServerCnxn@1040] - Closed socket connection for client /192.168.223.137:23485 which had sessionid 0x10017
5687960000
2018-10-09 13:59:00,383 [myid:] - INFO  [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:ZooKeeperServer@941] - Client attempting to renew session 0x100175687960000 at /192.168.223.137:23494
2018-10-09 13:59:00,910 [myid:] - INFO  [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:ZooKeeperServer@686] - Invalid session 0x100175687960000 for client /192.168.223.137:23494, probably expired
2018-10-09 13:59:02,140 [myid:] - INFO  [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:NIOServerCnxnFactory@215] - Accepted socket connection from /192.168.223.137:23507
2018-10-09 13:59:03,286 [myid:] - INFO  [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:ZooKeeperServer@941] - Client attempting to renew session 0x100175687960000 at /192.168.223.137:23497
2018-10-09 13:59:03,671 [myid:] - INFO  [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:NIOServerCnxn@1040] - Closed socket connection for client /192.168.223.137:23494 which had sessionid 0x10017
5687960000
2018-10-09 13:59:03,905 [myid:] - INFO  [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:ZooKeeperServer@686] - Invalid session 0x100175687960000 for client /192.168.223.137:23497, probably expired
2018-10-09 13:59:05,341 [myid:] - WARN  [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:NIOServerCnxnFactory@236] - Ignoring unexpected runtime exception
java.nio.channels.CancelledKeyException
	at sun.nio.ch.SelectionKeyImpl.ensureValid(SelectionKeyImpl.java:73)
	at sun.nio.ch.SelectionKeyImpl.readyOps(SelectionKeyImpl.java:87)
	at org.apache.zookeeper.server.NIOServerCnxnFactory.run(NIOServerCnxnFactory.java:205)
	at java.lang.Thread.run(Thread.java:748)
2018-10-09 13:59:06,862 [myid:] - INFO  [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:NIOServerCnxnFactory@215] - Accepted socket connection from /192.168.223.137:23511
2018-10-09 13:59:10,044 [myid:] - INFO  [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:ZooKeeperServer@941] - Client attempting to renew session 0x100175687960000 at /192.168.223.137:23507
2018-10-09 13:59:10,267 [myid:] - INFO  [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:NIOServerCnxn@1040] - Closed socket connection for client /192.168.223.137:23497 which had sessionid 0x10017
5687960000
2018-10-09 13:59:10,285 [myid:] - INFO  [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:ZooKeeperServer@686] - Invalid session 0x100175687960000 for client /192.168.223.137:23507, probably expired
2018-10-09 13:59:10,286 [myid:] - WARN  [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:NIOServerCnxnFactory@236] - Ignoring unexpected runtime exception
java.nio.channels.CancelledKeyException
	at sun.nio.ch.SelectionKeyImpl.ensureValid(SelectionKeyImpl.java:73)
	at sun.nio.ch.SelectionKeyImpl.readyOps(SelectionKeyImpl.java:87)
	at org.apache.zookeeper.server.NIOServerCnxnFactory.run(NIOServerCnxnFactory.java:205)
	at java.lang.Thread.run(Thread.java:748)
2018-10-09 13:59:10,287 [myid:] - INFO  [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:NIOServerCnxn@1040] - Closed socket connection for client /192.168.223.137:23507 which had sessionid 0x10017
5687960000
2018-10-09 13:59:10,287 [myid:] - INFO  [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:ZooKeeperServer@941] - Client attempting to renew session 0x100175687960000 at /192.168.223.137:23511
2018-10-09 13:59:10,287 [myid:] - INFO  [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:ZooKeeperServer@686] - Invalid session 0x100175687960000 for client /192.168.223.137:23511, probably expired
2018-10-09 13:59:10,313 [myid:] - INFO  [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:NIOServerCnxnFactory@215] - Accepted socket connection from /192.168.223.137:23519
2018-10-09 13:59:10,313 [myid:] - INFO  [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:NIOServerCnxn@1040] - Closed socket connection for client /192.168.223.137:23511 which had sessionid 0x10017
5687960000
2018-10-09 13:59:10,314 [myid:] - INFO  [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:NIOServerCnxnFactory@215] - Accepted socket connection from /192.168.223.137:23524
2018-10-09 13:59:10,314 [myid:] - INFO  [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:ZooKeeperServer@941] - Client attempting to renew session 0x100175687960000 at /192.168.223.137:23519
2018-10-09 13:59:10,314 [myid:] - INFO  [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:ZooKeeperServer@686] - Invalid session 0x100175687960000 for client /192.168.223.137:23519, probably expired
2018-10-09 13:59:10,315 [myid:] - INFO  [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:ZooKeeperServer@941] - Client attempting to renew session 0x100175687960000 at /192.168.223.137:23524
2018-10-09 13:59:10,315 [myid:] - INFO  [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:NIOServerCnxn@1040] - Closed socket connection for client /192.168.223.137:23519 which had sessionid 0x10017
5687960000
2018-10-09 13:59:10,316 [myid:] - INFO  [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:ZooKeeperServer@686] - Invalid session 0x100175687960000 for client /192.168.223.137:23524, probably expired
2018-10-09 13:59:10,321 [myid:] - WARN  [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:NIOServerCnxnFactory@236] - Ignoring unexpected runtime exception
java.nio.channels.CancelledKeyException
	at sun.nio.ch.SelectionKeyImpl.ensureValid(SelectionKeyImpl.java:73)
	at sun.nio.ch.SelectionKeyImpl.readyOps(SelectionKeyImpl.java:87)
	at org.apache.zookeeper.server.NIOServerCnxnFactory.run(NIOServerCnxnFactory.java:205)
	at java.lang.Thread.run(Thread.java:748)

上述帖子中提到在3.4.8中修復,咱們用的3.4.12。進一步查找,有些說起寫日誌延遲很大,例如「fsync-ing the write ahead log in SyncThread:0 took 8001ms which will adversely effect operation latency. See the ZooKeeper troubleshooting guide 」可是日誌中並無看到該告警。決定加上forceSync=no試試看,參考https://www.jianshu.com/p/73eec030db86。

至於日誌中的超時時間有長、有短,這是tickTime有關,能夠解釋,不作詳細說明。

zk日誌中大量下列錯誤信息:

id:0x9d zxid:0x42 txntype:-1 reqpath:n/a Error Path:/config/topics Error:KeeperErrorCode = NodeExists for /config/topics
2018-10-09 12:01:07,918 [myid:] - INFO  [ProcessThread(sid:0 cport:2181)::PrepRequestProcessor@653] - Got user-level KeeperException when processing sessionid:0x100175687960000 type:create cx
id:0xa5 zxid:0x45 txntype:-1 reqpath:n/a Error Path:/brokers/topics/uft_individual/partitions/0 Error:KeeperErrorCode = NoNode for /brokers/topics/uft_individual/partitions/0
2018-10-09 12:01:07,921 [myid:] - INFO  [ProcessThread(sid:0 cport:2181)::PrepRequestProcessor@653] - Got user-level KeeperException when processing sessionid:0x100175687960000 type:create cx
id:0xa6 zxid:0x46 txntype:-1 reqpath:n/a Error Path:/brokers/topics/uft_individual/partitions Error:KeeperErrorCode = NoNode for /brokers/topics/uft_individual/partitions
2018-10-09 12:01:17,740 [myid:] - INFO  [ProcessThread(sid:0 cport:2181)::PrepRequestProcessor@653] - Got user-level KeeperException when processing sessionid:0x100175687960000 type:setData c
xid:0xaf zxid:0x4a txntype:-1 reqpath:n/a Error Path:/config/topics/uft_splitter Error:KeeperErrorCode = NoNode for /config/topics/uft_splitter
2018-10-09 12:01:17,741 [myid:] - INFO  [ProcessThread(sid:0 cport:2181)::PrepRequestProcessor@653] - Got user-level KeeperException when processing sessionid:0x100175687960000 type:create cx
id:0xb0 zxid:0x4b txntype:-1 reqpath:n/a Error Path:/config/topics Error:KeeperErrorCode = NodeExists for /config/topics
2018-10-09 12:01:17,753 [myid:] - INFO  [ProcessThread(sid:0 cport:2181)::PrepRequestProcessor@653] - Got user-level KeeperException when processing sessionid:0x100175687960000 type:create cx
id:0xb8 zxid:0x4e txntype:-1 reqpath:n/a Error Path:/brokers/topics/uft_splitter/partitions/0 Error:KeeperErrorCode = NoNode for /brokers/topics/uft_splitter/partitions/0
2018-10-09 12:01:17,754 [myid:] - INFO  [ProcessThread(sid:0 cport:2181)::PrepRequestProcessor@653] - Got user-level KeeperException when processing sessionid:0x100175687960000 type:create cx
id:0xb9 zxid:0x4f txntype:-1 reqpath:n/a Error Path:/brokers/topics/uft_splitter/partitions Error:KeeperErrorCode = NoNode for /brokers/topics/uft_splitter/partitions
2018-10-09 12:01:35,671 [myid:] - INFO  [ProcessThread(sid:0 cport:2181)::PrepRequestProcessor@653] - Got user-level KeeperException when processing sessionid:0x100175687960000 type:setData c
xid:0xc2 zxid:0x53 txntype:-1 reqpath:n/a Error Path:/config/topics/cres_global Error:KeeperErrorCode = NoNode for /config/topics/cres_global

參考https://github.com/mesos/kafka/issues/136,但是kafka服務一直正常啓動着啊(對比啓動日誌也能夠看出確實已經啓動了)。https://stackoverflow.com/questions/34393837/zookeeper-kafka-error-keepererrorcode-nodeexists還有一個緣由,是由於zk的data未刪除的緣由,可咱們是全新安裝過一會也有這個問題。最後查看https://stackoverflow.com/questions/43559328/got-user-level-keeperexception-when-processing,以下:

The message you see is not an error yet. It is a potential exception raised by Zookeeper that original object making a request has to handle.

When you start a fresh Kafka, it gets a bunch of NoNode messages. It's normal because some paths don't exist yet. At the same time, you get also NodeExists messages as the path exists already.

Example: Error:KeeperErrorCode = NoNode for /config/topics/test It's because Kafka sends a request to Zookeeper for this path. But it doesn't exist. That's OK, because you are trying to create it. So, you see "INFO" from Zookeeper but no error from Kafka. Once Kafka gets this message, it tries to create your topic. To do so, it needs to access a path in Zookeeper for topics. So, it sends a request and gets an error NodeExists for /config/topics. Again, it's normal and Kafka ignores the message.

Long story short, these are all non-issue messages and you should skip them. If it bothers you, change logging configuration of Zookeeper (it's not recommended though).

其實就是提示性信息,不用管它就行了,kafka會直接忽略該信息。

編譯不報錯,啓動時報下列錯誤:

java.lang.NoSuchMethodError:
 org.apache.kafka.clients.consumer.KafkaConsumer.subscribe

緣由:編譯依賴的kafka client版本和運行時不一致如0.9.1和0.11.0,典型的例如間接依賴,對比下編譯依賴的版本和運行時打出來的版本。

kafka後臺啓動

默認狀況下,執行

./kafka-server-start.sh ../config/server.properties的時候,進程是前臺模式的,意味着關掉控制檯,kafka就停了。因此須要加-daemon選項之後臺模式啓動。以下:
./kafka-server-start.sh -daemon ../config/server.properties

提交失敗

//自動提交失敗,也包括手工提交失敗。
Auto-commit of offsets {wmBiz_topic_india_people_articletype_total_dev2-2=OffsetAndMetadata{offset
=191084, metadata=''}, wmBiz_topic_india_people_articletype_total_dev2-3=OffsetAndMetadata{offset=196003, metadata=''}} failed for group group_db_2_es
_people_articletype_total_matrix_dev: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member.
This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the pol
l loop is spending too much time message processing. You can address this either by increasing the session timeout or by reducing the maximum size of
batches returned in poll() with max.poll.records.
對kafka應用而言,在consumer消費性能慢的狀況下,在指定時間內沒法自動提交,若是觸發上述條件出現rebalanced,數據從新發送到新的consumer上消費,就會出現數據重複消費問題,若是一直都在間隔時間內沒法完成消費,就會出現重複消費但offset不變的死循環。
所以,咱們須要作到
一、保證consumer的消費性能足夠快
二、儘可能讓consumer邏輯冪等
三、經過配置優化解決重複消費
四、不僅僅作好消息積壓的監控,還要作消息消費速度的監控(先後兩次offset比較)
因此能夠設置加大max.poll.interval.ms(其默認值是3秒,而不是網傳的300秒,看server.properties,因此過小了)或減小max.poll.records。

咱們使用的Kafka的api,
調用的是KafkaConsumer的poll方法:
給方法調用了pollOnce方法:
該方法又調用了ConsumerCoordinator 的poll方法:
該方法的最後調用了自動offset同步的方法:
關鍵就在這個方法,這個方法只有在poll的時候纔會調用,若是,數據處理時間操過poll的最大時間,就會致使本文開始的錯誤,而不能提交offset.

將max.poll.interval.ms加到300後,再也不報錯。
注意:kafka在0.9版本無max.poll.records參數,默認拉取記錄是500,直到0.10版本才引入該參數,因此在0.9版本配置是無效的。
在不肯定數據狀況下推薦一個線程消費kafka數據,消費後將數據傳給其餘線程處理。
這裏涉及到fetch時間、最大fetch大小、處理時間、超時時間直接的各類關係,還須要梳理下。
相關工做機制參考:https://juejin.im/post/5b4454a4e51d451908693763

kafka:爲何說Kafka使用磁盤比內存快

Kafka最核心的思想是使用磁盤,而不是使用內存,可能全部人都會認爲,內存的速度必定比磁盤快,我也不例外。

在看了Kafka的設計思想,查閱了相應資料再加上本身的測試後,發現磁盤的順序讀寫速度(Cassandra, LevelDB, RocksDB也都是這種策略)和內存持平。      

 並且Linux對於磁盤的讀寫優化也比較多,包括read-ahead和write-behind,磁盤緩存等。

若是在內存作這些操做的時候,一個是JAVA對象的內存開銷很大,另外一個是隨着堆內存數據的增多,JAVA的GC時間會變得很長,而利用OS的page cache,gc的開銷就節省了很多(JNI是否也能夠達到相似效果???,起碼netty中的ByteBuffer及Unsafe一大部分是的)。

使用磁盤操做有如下幾個好處:

  • 磁盤緩存由Linux系統維護,減小了java程序員的很多工做。
  • 磁盤順序讀寫速度超過內存隨機讀寫。
  • JVM的GC效率低,內存佔用大。
  • 系統冷啓動後,磁盤緩存依然可用。

優化

消息刪除方面,使用實時標記代替刪除。

發送方面,使用批量發送代替實時發送。

在jvm方面,默認kafka用的是cms gc,能夠考慮g1垃圾回收期,調整爲:-server -XX:+UseG1GC -XX:MaxGCPauseMillis=200 -XX:InitiatingHeapOccupancyPercent=35 -XX:+DisableExplicitGC -Djava.awt.headless=true

其餘關鍵實踐

Kafka uses tombstones instead of deleting records right away
❖ Kafka producers support record batching. by the size of records and auto-flushed based on time
❖ Batching is good for network IO throughput.
❖ Batching speeds up throughput drastically

With Kafka consumers pull data from brokers
replica.lag.time.max.ms > lag時,leader就把follow從ISRs踢掉

If all replicas are down for a partition, Kafka chooses first replica (not
necessarily in ISR set) that comes alive as the leader
❖ Config unclean.leader.election.enable=true is default

❖ If unclean.leader.election.enable=false, if all replicas are down for a
partition, Kafka waits for the ISR member that comes alive as new
leader.

Outside of using a single ensemble(協調器,zookeeper) for multiple Kafka clusters, it is not recommended
to share the ensemble with other applications, if it can be avoided. Kafka is sensitive
to Zookeeper latency and timeouts, and an interruption in communications with the
ensemble will cause the brokers to behave unpredictably. This can easily cause multiple
brokers to go offline at the same time, should they lose Zookeeper connections,
which will result in offline partitions. It also puts stress on the cluster controller,
which can show up as subtle errors long after the interruption has passed, such as
when trying to perform a controlled shutdown of a broker. Other applications that
can put stress on the Zookeeper ensemble, either through heavy usage or improper
operations, should be segregated to their own ensemble.

Kafka與MQTT

不一樣於rabbitmq、active mq,kafka默認不支持MQTT協議,若是但願現有和rabbitmq經過MQTT對接的應用無縫切換,要麼本身寫gateway,要麼借用三方插件,比較正統的主要有https://www.confluent.io/connector/kafka-connect-mqtt/,https://www.infoq.cn/article/fdbcrh6I*9ajCWLvippC

參考

https://kafka.apache.org/documentation(single page模式)

http://cloudurable.com/blog/kafka-architecture/index.html

https://cwiki.apache.org/confluence/display/KAFKA/

http://cloudurable.com/ppt/4-kafka-detailed-architecture.pdf 

Learning Apache Kafka Second Edition (針對0.8.x版本)

Kafka: The Definitive Guide(針對0.9.x版本)

https://cwiki.apache.org/confluence/display/KAFKA/Kafka+papers+and+presentations

相關文章
相關標籤/搜索