kafka shutdown中止很慢問題 java
在數據量大的時候,consumer一次抓取數據的數據不少,進入到業務處理的數據可能有不少, apache
假設一次poll有1萬條數據進入業務程序,並且業務程序是和poll綁定在一塊兒線程同步執行的,假設平均每條數據,執行業務程序花費100ms, 服務器
那麼poll一次的數據,至少要執行 1w*0.1s = 1000s = 16.67分鐘。 網絡
因此,在數據量大的時候,中止一個線程(須要先等待業務程序處理完數據),可能要十幾分鍾。 異步
shutdown問題解決方案 this
一、改爲異步處理數據,consumer取出來的數據,放到BlockQueue中,由異步線程去處理,當異步線程處理不過來時,阻塞consumer,調用consumer.pause()方法avoid group management rebalance,代碼以下(來源於Spring-Kafka): spa
// avoid group management rebalance due to a slow consumer this.consumer.pause(this.assignedPartitions.toArray(new TopicPartition[this.assignedPartitions.size()])); public void onPartitionsAssigned(Collection<TopicPartition> partitions) { this.assignedPartitions = partitions; }
二、若是是同步執行數據處理,考慮提升業務程序 處理數據的速度。 線程
三、同步處理數據,可是改爲手動提交offset,當shutdown的時候,poll的數據不須要所有處理,只須要記錄處理的位置便可。代碼示例以下: component
list data = consumer.poll(); for(record: data) { if(shutdown) { // 收到shutdown命令後當即中止,未處理的數據將丟棄 break; } deal(record); saveTopicOffset(record); } submitDealtDataOffset();
另外, kafka
Kafka停不掉shutdown關閉不了問題
緣由是卡在了consumer.close()方法裏面,它會提交offset信息,若是網絡中斷或者kafka服務器有問題致使提交不了offset,則consumer.close方法會一直卡住(不停的循環嘗試提交offset,永不中斷)。
參見:Kafka poll一直等待的bug:
https://issues.apache.org/jira/browse/KAFKA-4189?jql=project%20%3D%20KAFKA%20AND%20resolution%20%3D%20Unresolved%20AND%20component%20%3D%20consumer%20ORDER%20BY%20priority%20DESC
https://issues.apache.org/jira/browse/KAFKA-3172?jql=project%20%3D%20KAFKA%20AND%20resolution%20%3D%20Unresolved%20AND%20component%20%3D%20consumer%20ORDER%20BY%20priority%20DESC
解決方法:目前尚未好的辦法,只能將offset的自動提交改爲手動提交offset。可是,我寫了一個程序能夠在調用consumer.close後將線程強行殺死,做爲臨時解決方案。