Zookeeper connection loss leads to Flink job restart

Flink可使用zookeeper來進行ha,而通常咱們都會使用zookeeper的高級api架構curator來對zk進行通信。在curator中引入了狀態的概念,包括connected,reconnected,suspeneded,lost與read_only,其中suspended是個有意思的狀態,當由於網絡抖動、機器繁忙、zk集羣短暫無響應,都會致使curator將狀態置爲suspended.java

而Flink對suspended採起了很是謹慎的處理,就是發現是suspended,則取消全部做業,進行restart,顯得未免有些太敏感了,其實這個時候每每zk也是ok的,相應的jm也是leader都沒有問題。api

好,咱們再順一下:網絡

在發生zk connection loss的狀況下,curator會設置suspended狀態,在此狀態下,curator會釋放leader,flink在發現notleader以後則會revokeLeadership,進而致使dispatcher會cancel掉全部的job,cancel的過程當中flink會主動拋出異常。session

 

雖然這樣作沒什麼大的影響,由於其實若是connection很快恢復,做業也會很快被拉起,沒有大礙,但看起來老是很差,zk鏈接隨便的一個擾動,均可能致使job重啓,因此就想把它改動。架構

 

方案一:測試

在flink的ZooKeeperUtils.java經過CuratorFrameworkFactory來構造CuratorFramework時,經過connectionStateErrorPolicy將ConnectionStateErrorPolicy從StandardConnectionStateErrorPolicy更新爲SessionConnectionStateErrorPolicy,前者將suspended和lost都做爲error,後者只是將lost做爲error,而只有發生error的時候纔會取消leadership,因此如此設置以後,在進入suspended狀態時,不在發生leadership的取消和從新選舉。spa

優勢:從總體的狀態轉換上進行了控制,優雅。rest

缺點:目前flink所引用的curator的版本爲2.12.0,不支持設置policy,須要更新curator版本號,是否會帶來其餘問題,不可知。ip

測試:成功。io

更改curator的版本爲4.2.0,提交做業,restart zk,job沒有重啓,checkpoint正常進行。

 

 

方案二:

在flink內部,在代碼ZooKeeperLeaderElectionService.java中的notLeader方法中,在收到notleader的通知的時候,根據當前的狀態是不是suspended進行相應的處理。

優勢:不對flink的總體形成影響,更改在局部範圍內可控。

缺點:因爲curator對suspended的處理依舊,因此從curator的層面仍是會發生取消leadership而後從新進行選舉的狀況,雖然這一切都沒必要要。

測試:失敗

1.原先預計的是在notleader方法中,若是發現當前狀態是suspended,就不去執行revokeLeadership方法,但notleader方法和suspended狀態的獲取分別是在兩個回調方法中觸發的,通過測試,沒法保證兩個回調的執行順序,即有可能notleader方法已經觸發,可是suspended狀態尚未觸發。

2.若是隻是修改notleader方法,即便修改爲功,仍是會觸發isleader方法,在isleader方法中,若是不修改,仍是會觸發原有做業的取消和從新提交,因此這裏也要改,改爲從新連接以後這裏即便被通知isleader也不會去給dispatcher進行grantLeadership,但又不能直接這麼操做,還須要判斷是否本身已是leader,但惋惜的是,在發生suspended的時候,curator裏面已經將leadership取消掉了,因此若是在這裏加上判斷是connected狀態而且不是leader而後不去grantleadership,會看起來很奇怪。

總而言之,若是不動curator的邏輯,只是在flink裏改,這裏的邏輯就會被改的難以理解,而且還沒法成功。

 

目前的方案應對的場景是zk connection的短期抖動,若是發生zk connection的長時間不可用,則tm和jm都會失敗,這個也是應有之義。

 

另,

在flink中對curator的suspended狀態起做用的還有一個地方,在ZooKeeperCheckpointIDCounter.java中有對suspended的判斷,若是以前是suspended或者Lost,則flink就不會去zk上存取checkpoint的信息了。這裏感受是個坑,也須要改對suspended的策略。

 

外一篇,

zookeeper能夠設置session timeout時間,可是不是你隨便設置就會起做用,會有一個判斷的過程。

SessionTimeOut的協商以下:

  • 狀況1: 配置文件配置了maxSessionTimeOut和minSessionTimeOut

最終SessionTimeOut,必須在minSessionTimeOut和maxSessionTimeOut區間裏,若是跨越上下界,則以跨越的上屆或下界爲準。

  • 狀況2:配置文件沒有配置maxSessionTimeOut和minSessionTimeOut

maxSessionTimeout沒配置則 maxSessionTimeOut設置爲 20 * tickTime

minSessionTimeOut沒配置則 minSessionTimeOut設置爲 2 * tickTime

也就是默認狀況下, SessionTimeOut的合法範圍爲 4秒~40秒,默認配置中tickTime爲2秒。

相關文章
相關標籤/搜索