歡迎你們前往騰訊雲+社區,獲取更多騰訊海量技術實踐乾貨哦~javascript
本文由 特魯門發表於 雲+社區專欄
導讀:html
遇到Keepper通知更新沒法收到的問題,思考節點變動通知的可靠性,經過閱讀源碼解析瞭解到zk Watch的註冊以及觸發的機制,本地調試運行模擬zk更新的不可靠的場景以及得出相應的解決方案。java
過程很曲折,但問題的根本緣由也水落石出了,本文最後陳述了更新沒法收到的根本緣由,但願對其餘人有所幫助。-----------------------------------------node
一般Zookeeper是做爲配置存儲、分佈式鎖等功能被使用,配置讀取若是每一次都是去Zookeeper server讀取效率是很是低的,幸虧Zookeeper提供節點更新的通知機制,只須要對節點設置Watch監聽,節點的任何更新都會以通知的方式發送到Client端。git
如上圖所示:應用Client一般會鏈接上某個ZkServer,forPath不只僅會讀取Zk 節點zkNode的數據(一般存儲讀取到的數據會存儲在應用內存中,例如圖中Value),並且會設置一個Watch,當zkNode節點有任何更新時,ZkServer會發送notify,Client運行Watch來才走出相應的事件相應。這裏假設操做爲更新Client本地的數據。這樣的模型使得配置異步更新到Client中,而無需Client每次都遠程讀取,大大提升了讀的性能,(圖中的re-regist從新註冊是由於對節點的監聽是一次性的,每一次通知完後,須要從新註冊)。但這個Notify是可靠的嗎?若是通知失敗,那豈不是Client永遠都讀取的本地的未更新的值?github
因爲現網環境定位此類問題比較困難,所以本地下載源碼並模擬運行ZkServer & ZkClient來看通知的發送狀況。apache
一、git 下載源碼 https://github.com/apache/zookeeperapi
二、cd 到路徑下,運行ant eclipse 加載工程的依賴。數組
三、導入Idea中。promise
https://stackoverflow.com/que...
查看相關問題和步驟。
首先運行ZkServer。QuorumPeerMain是Server的啓動類。這個能夠根據bin下ZkServer.sh找到入口。注意啓動參數配置參數文件,指定例如啓動端口等相關參數。
在此以前,須要設置相關的斷點。
ZkClient 是使用Nio的方式與ZkServer進行通訊的,Zookeeper的線程模型中使用兩個線程:
SendThread專門成立的請求的發送,請求會被封裝爲Packet(包含節點名稱、Watch描述等信息)類發送給Sever。
EventThread則專門處理SendThread接收後解析出的Event。
ZkClient 的主要有兩個Processor,一個是SycProcessor負責Cluster之間的數據同步(包括集羣leader選取)。另外一個是叫FinalRuestProcessor,專門處理對接受到的請求(Packet)進行處理。
//ZookeeperServer 的processPacket方法專門對收到的請求進行處理。 public void processPacket(ServerCnxn cnxn, ByteBuffer incomingBuffer) throws IOException { // We have the request, now process and setup for next InputStream bais = new ByteBufferInputStream(incomingBuffer); BinaryInputArchive bia = BinaryInputArchive.getArchive(bais); RequestHeader h = new RequestHeader(); h.deserialize(bia, "header"); // Through the magic of byte buffers, txn will not be // pointing // to the start of the txn incomingBuffer = incomingBuffer.slice(); //鑑權請求處理 if (h.getType() == OpCode.auth) { LOG.info("got auth packet " + cnxn.getRemoteSocketAddress()); AuthPacket authPacket = new AuthPacket(); ByteBufferInputStream.byteBuffer2Record(incomingBuffer, authPacket); String scheme = authPacket.getScheme(); ServerAuthenticationProvider ap = ProviderRegistry.getServerProvider(scheme); Code authReturn = KeeperException.Code.AUTHFAILED; if(ap != null) { try { authReturn = ap.handleAuthentication(new ServerAuthenticationProvider.ServerObjs(this, cnxn), authPacket.getAuth()); } catch(RuntimeException e) { LOG.warn("Caught runtime exception from AuthenticationProvider: " + scheme + " due to " + e); authReturn = KeeperException.Code.AUTHFAILED; } } if (authReturn == KeeperException.Code.OK) { if (LOG.isDebugEnabled()) { LOG.debug("Authentication succeeded for scheme: " + scheme); } LOG.info("auth success " + cnxn.getRemoteSocketAddress()); ReplyHeader rh = new ReplyHeader(h.getXid(), 0, KeeperException.Code.OK.intValue()); cnxn.sendResponse(rh, null, null); } else { if (ap == null) { LOG.warn("No authentication provider for scheme: " + scheme + " has " + ProviderRegistry.listProviders()); } else { LOG.warn("Authentication failed for scheme: " + scheme); } // send a response... ReplyHeader rh = new ReplyHeader(h.getXid(), 0, KeeperException.Code.AUTHFAILED.intValue()); cnxn.sendResponse(rh, null, null); // ... and close connection cnxn.sendBuffer(ServerCnxnFactory.closeConn); cnxn.disableRecv(); } return; } else { if (h.getType() == OpCode.sasl) { Record rsp = processSasl(incomingBuffer,cnxn); ReplyHeader rh = new ReplyHeader(h.getXid(), 0, KeeperException.Code.OK.intValue()); cnxn.sendResponse(rh,rsp, "response"); // not sure about 3rd arg..what is it? return; } else { Request si = new Request(cnxn, cnxn.getSessionId(), h.getXid(), h.getType(), incomingBuffer, cnxn.getAuthInfo()); si.setOwner(ServerCnxn.me); // Always treat packet from the client as a possible // local request. setLocalSessionFlag(si); //交給finalRequestProcessor處理 submitRequest(si); } } cnxn.incrOutstandingRequests(h); }
FinalRequestProcessor 對請求進行解析,Client鏈接成功後,發送的exist命令會落在這部分處理邏輯。
zkDataBase 由zkServer從disk持久化的數據創建而來,上圖能夠看到這裏就是添加監聽Watch的地方。
首先了解兩個概念,FinalRequestProcessor處理的請求分爲兩種,一種是事務型的,一種非事務型,exist 的event-type是一個非事物型的操做,上面代碼中是對其處理邏輯,對於事物的操做,例如SetData的操做。則在下面代碼中處理。
private ProcessTxnResult processTxn(Request request, TxnHeader hdr, Record txn) { ProcessTxnResult rc; int opCode = request != null ? request.type : hdr.getType(); long sessionId = request != null ? request.sessionId : hdr.getClientId(); if (hdr != null) { //hdr 爲事物頭描述,例如SetData的操做就會被ZkDataBase接管操做, //由於是對Zk的數據存儲機型修改 rc = getZKDatabase().processTxn(hdr, txn); } else { rc = new ProcessTxnResult(); } if (opCode == OpCode.createSession) { if (hdr != null && txn instanceof CreateSessionTxn) { CreateSessionTxn cst = (CreateSessionTxn) txn; sessionTracker.addGlobalSession(sessionId, cst.getTimeOut()); } else if (request != null && request.isLocalSession()) { request.request.rewind(); int timeout = request.request.getInt(); request.request.rewind(); sessionTracker.addSession(request.sessionId, timeout); } else { LOG.warn("*****>>>>> Got " + txn.getClass() + " " + txn.toString()); } } else if (opCode == OpCode.closeSession) { sessionTracker.removeSession(sessionId); } return rc; }
這裏設置了斷點,就能夠攔截對節點的更新操做。
這兩個設置了斷點,就能夠了解到Watch的設置過程。
接下來看如何啓動Zookeeper的Client。ZookeeperMain爲Client的入口,一樣在bin/zkCli.sh中能夠找到。注意設置參數,設置Server的鏈接地址。
修改ZookeeperMain方法,設置對節點的Watch監聽。
public ZooKeeperMain(String args[]) throws IOException, InterruptedException, KeeperException { cl.parseOptions(args); System.out.println("Connecting to " + cl.getOption("server")); connectToZK(cl.getOption("server")); while (true) { // 模擬註冊對/zookeeper節點的watch監聽 zk.exists("/zookeeper", true); System.out.println("wait"); } }
啓動Client。
因爲咱們要觀察節點變動的過程,上面這個Client設置了對節點的監聽,那麼咱們須要另一個cleint對節點進行更改,這個咱們只須要在命令上進行就能夠了。
此時命令行的zkClient更新了/zookeeper節點,Server此時會停在setData事件的處理代碼段。
public Stat setData(String path, byte data[], int version, long zxid, long time) throws KeeperException.NoNodeException { Stat s = new Stat(); DataNode n = nodes.get(path); if (n == null) { throw new KeeperException.NoNodeException(); } byte lastdata[] = null; synchronized (n) { lastdata = n.data; n.data = data; n.stat.setMtime(time); n.stat.setMzxid(zxid); n.stat.setVersion(version); n.copyStat(s); } // now update if the path is in a quota subtree. String lastPrefix = getMaxPrefixWithQuota(path); if(lastPrefix != null) { this.updateBytes(lastPrefix, (data == null ? 0 : data.length) - (lastdata == null ? 0 : lastdata.length)); } //觸發watch監聽 dataWatches.triggerWatch(path, EventType.NodeDataChanged); return s; }
此時,咱們重點關注的類出現了。WatchManager
package org.apache.zookeeper.server; import java.io.PrintWriter; import java.util.HashMap; import java.util.HashSet; import java.util.LinkedHashMap; import java.util.Map; import java.util.Map.Entry; import java.util.Set; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; import org.apache.zookeeper.Watcher.Event.EventType; import org.apache.zookeeper.Watcher.Event.KeeperState; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** * This class manages watches. It allows watches to be associated with a string * and removes watchers and their watches in addition to managing triggers. */ class WatchManager { private static final Logger LOG = LoggerFactory.getLogger(WatchManager.class); //存儲path對watch的關係 private final Map<String, Set<Watcher>> watchTable = new HashMap<String, Set<Watcher>>(); //存儲watch監聽了哪些path節點 private final Map<Watcher, Set<String>> watch2Paths = new HashMap<Watcher, Set<String>>(); synchronized int size(){ int result = 0; for(Set<Watcher> watches : watchTable.values()) { result += watches.size(); } return result; } //添加監聽 synchronized void addWatch(String path, Watcher watcher) { Set<Watcher> list = watchTable.get(path); if (list == null) { // don't waste memory if there are few watches on a node // rehash when the 4th entry is added, doubling size thereafter // seems like a good compromise list = new HashSet<Watcher>(4); watchTable.put(path, list); } list.add(watcher); Set<String> paths = watch2Paths.get(watcher); if (paths == null) { // cnxns typically have many watches, so use default cap here paths = new HashSet<String>(); watch2Paths.put(watcher, paths); } paths.add(path); } //移除 synchronized void removeWatcher(Watcher watcher) { Set<String> paths = watch2Paths.remove(watcher); if (paths == null) { return; } for (String p : paths) { Set<Watcher> list = watchTable.get(p); if (list != null) { list.remove(watcher); if (list.size() == 0) { watchTable.remove(p); } } } } Set<Watcher> triggerWatch(String path, EventType type) { return triggerWatch(path, type, null); } //觸發watch Set<Watcher> triggerWatch(String path, EventType type, Set<Watcher> supress) { WatchedEvent e = new WatchedEvent(type, KeeperState.SyncConnected, path); Set<Watcher> watchers; synchronized (this) { watchers = watchTable.remove(path); if (watchers == null || watchers.isEmpty()) { if (LOG.isTraceEnabled()) { ZooTrace.logTraceMessage(LOG, ZooTrace.EVENT_DELIVERY_TRACE_MASK, "No watchers for " + path); } return null; } for (Watcher w : watchers) { Set<String> paths = watch2Paths.get(w); if (paths != null) { paths.remove(path); } } } for (Watcher w : watchers) { if (supress != null && supress.contains(w)) { continue; } //通知發送 w.process(e); } return watchers; } }
重點關注triggerWatch的方法,能夠發現watch被移除後,即往watch中存儲的client信息進行通知發送。
@Override public void process(WatchedEvent event) { ReplyHeader h = new ReplyHeader(-1, -1L, 0); if (LOG.isTraceEnabled()) { ZooTrace.logTraceMessage(LOG, ZooTrace.EVENT_DELIVERY_TRACE_MASK, "Deliver event " + event + " to 0x" + Long.toHexString(this.sessionId) + " through " + this); } // Convert WatchedEvent to a type that can be sent over the wire WatcherEvent e = event.getWrapper(); sendResponse(h, e, "notification"); }
沒有任何確認機制,不會因爲發送失敗,而回寫watch。
到這裏,能夠知道watch的通知機制是不可靠的,zkServer不會保證通知的可靠抵達。雖然zkclient與zkServer端是會有心跳機制保持連接,可是若是通知過程當中斷開,即時從新創建鏈接後,watch的狀態是不會恢復。
如今已經知道了通知是不可靠的,會有丟失的狀況,那ZkClient的使用須要進行修正。
本地的存儲再也不是一個靜態的等待watch更新的狀態,而是引入緩存機制,按期的去從Zk主動拉取並註冊Watch(ZkServer會進行去重,對同一個Node節點的相同時間類型的Watch不會重複)。
另一種方式是,Client端收到斷開鏈接的通知,從新註冊全部關注節點的Watch。但做者遇到的現網狀況是client沒有收到更新通知的同時,也沒有查看到鏈接斷開的錯誤信息。這塊仍需進一步確認。水平有限,歡迎指正 :D
在StackOverFlow上的提問有了新進展:
https://stackoverflow.com/que...
原來官方文檔已經解釋了在鏈接斷開的時候,client對watch的一些恢復操作,ps:原來上面我提到的客戶端的策略已經官方實現。。。
客戶端會經過心跳保活,若是發現斷開了鏈接,會從新創建鏈接,併發送以前對節點設置的watch以及節點zxid,若是zxid與服務端的小則說明斷開期間有更改,那麼server會觸發通知。
這麼來看,Zookeeper的通知機制至少在官方的文檔說明上是可靠的,至少是有相應機制去保證。ps:除Exist watch外。可是本人遇到的問題仍未解開。。後悔當初沒有保留現場,深刻發掘。計劃先把實現改回原來的,後續進一步驗證。找到緣由再更新這裏。
最終結論更新!
經過深刻閱讀apache的zk論壇以及源碼,有一個重要的信息。
上面提到的鏈接斷開分爲recoverble以及unrecoverble兩種場景,這兩種的區別主要是基於Session的有效期,全部的client操做包括watch都是和Session關聯的,當Session在超時過時時間內,從新成功創建鏈接,則watch會在鏈接創建後從新設置。可是當Session Timeout後仍然沒有成功從新創建鏈接,那麼Session則處於Expire的狀態。下面鏈接講述了這個過程
How should I handle SESSION_EXPIRED?
這種狀況下,ZookeeperClient會從新鏈接,可是Session將會是全新的一個。同時以前的狀態是不會保存的。
private void conLossPacket(Packet p) { if (p.replyHeader == null) { return; } switch (state) { case AUTH_FAILED: p.replyHeader.setErr(KeeperException.Code.AUTHFAILED.intValue()); break; case CLOSED: // session關閉狀態,直接返回。 p.replyHeader.setErr(KeeperException.Code.SESSIONEXPIRED.intValue()); break; default: p.replyHeader.setErr(KeeperException.Code.CONNECTIONLOSS.intValue()); } // 若是session未過時,這裏進行session的狀態(watches)會從新註冊。 finishPacket(p); }
*一、什麼是zookeeper的會話過時?*
通常來講,咱們使用zookeeper是集羣形式,以下圖,client和zookeeper集羣(3個實例)創建一個會話session。
在這個會話session當中,client實際上是隨機與其中一個zk provider創建的連接,而且互發心跳heartbeat。zk集羣負責管理這個session,而且在全部的provider上維護這個session的信息,包括這個session中定義的臨時數據和監視點watcher。
若是再網絡不佳或者zk集羣中某一臺provider掛掉的狀況下,有可能出現connection loss的狀況,例如client和zk provider1鏈接斷開,這時候client不須要任何的操做(zookeeper api已經給咱們作好了),只須要等待client與其餘provider從新鏈接便可。這個過程可能致使兩個結果:
1)在session timeout以內鏈接成功
這個時候client成功切換到鏈接另外一個provider例如是provider2,因爲zk在全部的provider上同步了session相關的數據,此時能夠認爲無縫遷移了。
2)在session timeout以內沒有從新鏈接
這就是session expire的狀況,這時候zookeeper集羣會任務會話已經結束,並清除和這個session有關的全部數據,包括臨時節點和註冊的監視點Watcher。
在session超時以後,若是client從新鏈接上了zookeeper集羣,很不幸,zookeeper會發出session expired異常,且不會重建session,也就是不會重建臨時數據和watcher。
咱們實現的ZookeeperProcessor是基於Apache Curator的Client封裝實現的。
它對於Session Expire的處理是提供了處理的監聽註冊ConnectionStateListner,當遇到Session Expire時,執行使用者要作的邏輯。(例如:從新設置Watch)遺憾的是,咱們沒有對這個事件進行處理,所以鏈接是一致斷開的,可是!咱們應用仍然會讀到老的數據!
在這裏,咱們又犯了另一個錯誤,本地緩存了zookeeper的節點數據。。其實zookeeperClient已經作了本地緩存的機制,可是咱們有加了一層(注:這裏也有一個緣由,是由於zk節點的數據時二進制的數組,業務要使用一般要反序列化,咱們這裏的緩存是爲了減小反序列化帶來的開銷!),正式因爲咱們本地緩存了,所以即便zk斷開了,仍然讀取了老的值!
至此,謎團已經所有解開,看來以前的實現有許多姿式是錯誤的,致使後續出現了各類奇怪的BUG 。如今處理的方案,是監聽Reconnect的通知,當收到這個通知後,主動讓本地緩存失效(這裏仍然作了緩存,是由於減小反序列化的開銷,zkClient的緩存只是緩存了二進制,每次拿出來仍然須要反序列化)。代碼:
curatorFramework.getConnectionStateListenable().addListener(new ConnectionStateListener() { @Override public void stateChanged(CuratorFramework client, ConnectionState newState) { switch (newState) { case CONNECTED: break; case RECONNECTED: LOG.error("zookeeper connection reconnected"); System.out.println("zookeeper connection reconnected"); //原本使用invalidateAll,可是這個會使得cache全部緩存值同時失效 //若是關注節點比較多,致使同時請求zk讀值,可能服務會瞬時阻塞在這一步 //所以使用guava cache refresh方法,異步更新,更新過程當中, //老值返回,知道更新完成 for (String key : classInfoMap.keySet()) { zkDataCache.refresh(key); } break; case LOST: // session 超時,斷開鏈接,這裏不要作任何操做,緩存保持使用 LOG.error("zookeeper connection lost"); System.out.println("zookeeper connection lost"); break; case SUSPENDED: break; default: break; } } });
問答
如何閱讀Zookeeper事務日誌?
相關閱讀
Zookeeper總覽
ZooKeeper入門
zookeeper原理
【每日課程推薦】機器學習實戰!快速入門在線廣告業務及CTR相應知識
此文已由做者受權騰訊雲+社區發佈,更多原文請點擊
搜索關注公衆號「雲加社區」,第一時間獲取技術乾貨,關注後回覆1024 送你一份技術課程大禮包!
海量技術實踐經驗,盡在雲加社區!