初學 Zookeeper 會發現客戶端有兩種回調方式: Watcher 和 AsyncCallback,而 Zookeeper 的使用是離不開這兩種方式的,搞清楚它們之間的區別與實現顯得尤其重要。本文將圍繞下面幾個方面展開服務器
咱們先經過一個例子來感覺一下:session
zooKeeper.getData(root, new Watcher() { public void process(WatchedEvent event) { } }, new AsyncCallback.DataCallback() { public void processResult(int rc, String path, Object ctx, byte[] data, Stat stat) { } }, null);異步
1函數 2spa 3線程 4code 5接口 6隊列 7事件 8 9 |
zooKeeper.getData(root, new Watcher() { public void process(WatchedEvent event) {
} }, new AsyncCallback.DataCallback() { public void processResult(int rc, String path, Object ctx, byte[] data, Stat stat) {
} }, null); |
能夠看到,getData
方法能夠同時設置兩個回調:Watcher 和 AsyncCallback,一樣是回調,它們的區別是什麼呢?要解決這個問題,咱們就得從這兩個接口的功能入手。
Watcher
:Watcher
是用於監聽節點,session 狀態的,好比getData
對數據節點a
設置了watcher
,那麼當a
的數據內容發生改變時,客戶端會收到NodeDataChanged
通知,而後進行watcher
的回調。AsyncCallback
:AsyncCallback
是在以異步方式使用 ZooKeeper API 時,用於處理返回結果的。例如:getData
同步調用的版本是:byte[] getData(String path, boolean watch,Stat stat)
,異步調用的版本是:void getData(String path,Watcher watcher,AsyncCallback.DataCallback cb,Object ctx)
,能夠看到,前者是直接返回獲取的結果,後者是經過AsyncCallback
回調處理結果的。Watcher 主要是經過ClientWatchManager
進行管理的。
ClientWatchManager
中有四種Watcher
defaultWatcher
:建立Zookeeper
鏈接時傳入的Watcher
,用於監聽 session 狀態dataWatches
:存放getData
傳入的Watcher
existWatches
:存放exists
傳入的Watcher
,若是節點已存在,則Watcher
會被添加到dataWatches
childWatches
:存放getChildren
傳入的Watcher
從代碼上能夠發現,監聽器是存在HashMap
中的,key
是節點名稱path
,value是
Set<Watcher>
private final Map<String, Set<Watcher>> dataWatches = new HashMap<String, Set<Watcher>>(); private final Map<String, Set<Watcher>> existWatches = new HashMap<String, Set<Watcher>>(); private final Map<String, Set<Watcher>> childWatches = new HashMap<String, Set<Watcher>>(); private volatile Watcher defaultWatcher;
1 2 3 4 5 6 7 8 |
private final Map<String, Set<Watcher>> dataWatches = new HashMap<String, Set<Watcher>>(); private final Map<String, Set<Watcher>> existWatches = new HashMap<String, Set<Watcher>>(); private final Map<String, Set<Watcher>> childWatches = new HashMap<String, Set<Watcher>>();
private volatile Watcher defaultWatcher; |
在Watcher
接口中,已經定義了全部的狀態類型和事件類型
號dataVersion
。所以,即便使用相同的數據內容來更新,仍是會收到這個事件通知的。不管如何,調用了更新接口,就必定會更新dataVersion
的。ClientWatchManager
只有一個方法,那就是materialize
,它根據事件類型type
和path
返回監聽該節點的特定類型的Watcher
。
public Set<Watcher> materialize(Watcher.Event.KeeperState state, Watcher.Event.EventType type, String path);
1 2 |
public Set<Watcher> materialize(Watcher.Event.KeeperState state, Watcher.Event.EventType type, String path); |
核心邏輯以下:
type == None
:返回全部Watcher
,也就是說全部的Watcher
都會被觸發。若是disableAutoWatchReset == true
且當前state != SyncConnected
,那麼還會清空Watcher
,意味着移除全部在節點上的Watcher
。type == NodeDataChanged | NodeCreated
:返回監聽path
節點的dataWatches & existWatches
type == NodeChildrenChanged
:返回監聽path
節點的childWatches
type == NodeDeleted
:返回監聽path
節點的dataWatches | childWatches
每次返回都會從HashMap
中移除節點對應的Watcher
,例如:addTo(dataWatches.remove(clientPath), result);
,這就是爲何Watcher
是一次性的緣由(defaultWatcher
除外)。值得注意的是,因爲使用的是HashSet
存儲Watcher
,重複添加同一個實例的Watcher
也只會被觸發一次。
Zookeeper 的exists
,getData
,getChildren
方法都有異步的版本,它們與同步方法的區別僅僅在因而否等待響應,底層發送都是經過sendThread
異步發送的。下面咱們用一幅圖來講明:
上面的圖展現了同步/異步調用getData
的流程,其餘方法也是相似的。
Zookeeper 客戶端會啓動兩個常駐線程
SendThread
:負責 IO 操做,包括髮送,接受響應,發送 ping 等。EventThread
:負責處理事件,執行回調函數。readResponse
是SendThread
處理響應的核心函數,核心邏輯以下:
ReplyHeader
: 有一個單獨的線程SendThread
,負責接收服務器端的響應。假設接受到的服務器傳遞過來的字節流是incomingBuffer
,那麼就將這個incomingBuffer
反序列化爲ReplyHeader
。ReplyHeader
是Watcher
響應仍是AsyncCallback
響應:ReplyHeader.getXid()
存儲了響應類型。
Watcher
類型響應:從ReplyHeader
中建立WatchedEvent
,WatchedEvent
裏面存儲了節點的路徑,而後去WatcherManager
中找到和這個節點相關聯的全部Watcher
,將他們寫入到EventThread
的waitingEvents
中。AsyncCallback
類型響應:從ReplyHeader
中讀取response
,這個response
描述了是Exists,setData,getData,getChildren,create.....
中的哪個異步回調。從pendingQueue
中拿到Packet
,Packet
中的cb
存儲了AsyncCallback
,也就是異步 API 的結果回調。最後將Packet
寫入到EventThread
的waitingEvents
中。processEvent
是EventThread
處理事件核心函數,核心邏輯以下:
event instanceof WatcherSetEventPair
,取出pair
中的Watchers
,逐個調用watcher.process(pair.event)
event
爲AsyncCallback
,根據p.response
判斷爲哪一種響應類型,執行響應的回調processResult
。可見,Watcher
和AsyncCallback
都是由EventThread
處理的,經過processEvent
進行區分處理。
Zookeeper 客戶端中Watcher
和AsyncCallback
都是異步回調的方式,但它們回調的時機是不同的,前者是由服務器發送事件觸發客戶端回調,後者是在執行了請求後獲得響應後客戶端主動觸發的。它們的共同點在於都須要在獲取了服務器響應以後,由SendThread
寫入EventThread
的waitingEvents
中,而後由EventThread
逐個從事件隊列中獲取並處理。