ZooKeeper Watcher 和 AsyncCallback 的區別與實現

前言

初學 Zookeeper 會發現客戶端有兩種回調方式: Watcher 和 AsyncCallback,而 Zookeeper 的使用是離不開這兩種方式的,搞清楚它們之間的區別與實現顯得尤其重要。本文將圍繞下面幾個方面展開服務器

  • Watcher 和 AsyncCallback 的區別
  • Watcher 的回調實現
  • AsyncCallback 的回調實現
  • IO 與事件處理

Watcher 和 AsyncCallback 的區別

咱們先經過一個例子來感覺一下:session

zooKeeper.getData(root, new Watcher() { public void process(WatchedEvent event) { } }, new AsyncCallback.DataCallback() { public void processResult(int rc, String path, Object ctx, byte[] data, Stat stat) { } }, null);異步

1函數

2spa

3線程

4code

5接口

6隊列

7事件

8

9

zooKeeper.getData(root, new Watcher() {

            public void process(WatchedEvent event) {

 

            }

        }, new AsyncCallback.DataCallback() {

            public void processResult(int rc, String path, Object ctx, byte[] data, Stat stat) {

 

            }

        }, null);

能夠看到,getData方法能夠同時設置兩個回調:Watcher 和 AsyncCallback,一樣是回調,它們的區別是什麼呢?要解決這個問題,咱們就得從這兩個接口的功能入手。

  • WatcherWatcher是用於監聽節點,session 狀態的,好比getData對數據節點a設置了watcher,那麼當a的數據內容發生改變時,客戶端會收到NodeDataChanged通知,而後進行watcher的回調。
  • AsyncCallback:AsyncCallback是在以異步方式使用 ZooKeeper API 時,用於處理返回結果的。例如:getData同步調用的版本是:byte[] getData(String path, boolean watch,Stat stat),異步調用的版本是:void getData(String path,Watcher watcher,AsyncCallback.DataCallback cb,Object ctx),能夠看到,前者是直接返回獲取的結果,後者是經過AsyncCallback回調處理結果的。

Watcher

Watcher 主要是經過ClientWatchManager進行管理的。

Watcher 的類型

ClientWatchManager中有四種Watcher

  • defaultWatcher:建立Zookeeper鏈接時傳入的Watcher,用於監聽 session 狀態
  • dataWatches:存放getData傳入的Watcher
  • existWatches:存放exists傳入的Watcher,若是節點已存在,則Watcher會被添加到dataWatches
  • childWatches:存放getChildren傳入的Watcher

從代碼上能夠發現,監聽器是存在HashMap中的,key是節點名稱pathvalue是Set<Watcher>

private final Map<String, Set<Watcher>> dataWatches = new HashMap<String, Set<Watcher>>(); private final Map<String, Set<Watcher>> existWatches = new HashMap<String, Set<Watcher>>(); private final Map<String, Set<Watcher>> childWatches = new HashMap<String, Set<Watcher>>(); private volatile Watcher defaultWatcher;

1

2

3

4

5

6

7

8

private final Map<String, Set<Watcher>> dataWatches =

        new HashMap<String, Set<Watcher>>();

private final Map<String, Set<Watcher>> existWatches =

        new HashMap<String, Set<Watcher>>();

private final Map<String, Set<Watcher>> childWatches =

        new HashMap<String, Set<Watcher>>();

 

private volatile Watcher defaultWatcher;

 

通知的狀態類型與事件類型

Watcher接口中,已經定義了全部的狀態類型和事件類型

  • KeeperState.Disconnected(0)此時客戶端處於斷開鏈接狀態,和ZK集羣都沒有創建鏈接。
    • EventType.None(-1)觸發條件:通常是在與服務器斷開鏈接的時候,客戶端會收到這個事件。
  • KeeperState. SyncConnected(3)此時客戶端處於鏈接狀態
    • EventType.None(-1)觸發條件:客戶端與服務器成功創建會話以後,會收到這個通知。
    • EventType. NodeCreated (1)觸發條件:所關注的節點被建立。
    • EventType. NodeDeleted (2)觸發條件:所關注的節點被刪除。
    • EventType. NodeDataChanged (3)觸發條件:所關注的節點的內容有更新。注意,這個地方說的內容是指數據的版本號dataVersion。所以,即便使用相同的數據內容來更新,仍是會收到這個事件通知的。不管如何,調用了更新接口,就必定會更新dataVersion的。
    • EventType. NodeChildrenChanged (4)觸發條件:所關注的節點的子節點有變化。這裏說的變化是指子節點的個數和組成,具體到子節點內容的變化是不會通知的。
  • KeeperState. AuthFailed(4)認證失敗
    • EventType.None(-1)
  • KeeperState. Expired(-112)session 超時
    • EventType.None(-1)

materialize 方法

ClientWatchManager只有一個方法,那就是materialize,它根據事件類型typepath返回監聽該節點的特定類型的Watcher

public Set<Watcher> materialize(Watcher.Event.KeeperState state, Watcher.Event.EventType type, String path);

1

2

public Set<Watcher> materialize(Watcher.Event.KeeperState state,

    Watcher.Event.EventType type, String path);

核心邏輯以下:

  1. type == None:返回全部Watcher,也就是說全部的Watcher都會被觸發。若是disableAutoWatchReset == true且當前state != SyncConnected,那麼還會清空Watcher,意味着移除全部在節點上的Watcher
  2. type == NodeDataChanged | NodeCreated:返回監聽path節點的dataWatches & existWatches
  3. type == NodeChildrenChanged:返回監聽path節點的childWatches
  4. type == NodeDeleted:返回監聽path節點的dataWatches | childWatches

每次返回都會從HashMap中移除節點對應的Watcher,例如:addTo(dataWatches.remove(clientPath), result);,這就是爲何Watcher是一次性的緣由(defaultWatcher除外)。值得注意的是,因爲使用的是HashSet存儲Watcher,重複添加同一個實例的Watcher也只會被觸發一次。

AsyncCallback

Zookeeper 的exists,getData,getChildren方法都有異步的版本,它們與同步方法的區別僅僅在因而否等待響應,底層發送都是經過sendThread異步發送的。下面咱們用一幅圖來講明:


上面的圖展現了同步/異步調用getData的流程,其餘方法也是相似的。

IO 與事件處理

Zookeeper 客戶端會啓動兩個常駐線程

  • SendThread:負責 IO 操做,包括髮送,接受響應,發送 ping 等。
  • EventThread:負責處理事件,執行回調函數。

readResponse

readResponseSendThread處理響應的核心函數,核心邏輯以下:

  1. 接受服務器的響應,並反序列化出ReplyHeader: 有一個單獨的線程SendThread,負責接收服務器端的響應。假設接受到的服務器傳遞過來的字節流是incomingBuffer,那麼就將這個incomingBuffer反序列化爲ReplyHeader
  2. 判斷響應類型:判斷ReplyHeaderWatcher響應仍是AsyncCallback響應:ReplyHeader.getXid()存儲了響應類型。
    1. 若是是Watcher類型響應:從ReplyHeader中建立WatchedEventWatchedEvent裏面存儲了節點的路徑,而後去WatcherManager中找到和這個節點相關聯的全部Watcher,將他們寫入到EventThreadwaitingEvents中。
    2. 若是是AsyncCallback類型響應:從ReplyHeader中讀取response,這個response描述了是Exists,setData,getData,getChildren,create.....中的哪個異步回調。從pendingQueue中拿到PacketPacket中的cb存儲了AsyncCallback,也就是異步 API 的結果回調。最後將Packet寫入到EventThreadwaitingEvents中。

processEvent

processEventEventThread處理事件核心函數,核心邏輯以下:

  1. 若是event instanceof WatcherSetEventPair,取出pair中的Watchers,逐個調用watcher.process(pair.event)
  2. 不然eventAsyncCallback,根據p.response判斷爲哪一種響應類型,執行響應的回調processResult

可見,WatcherAsyncCallback都是由EventThread處理的,經過processEvent進行區分處理。

總結

Zookeeper 客戶端中WatcherAsyncCallback都是異步回調的方式,但它們回調的時機是不同的,前者是由服務器發送事件觸發客戶端回調,後者是在執行了請求後獲得響應後客戶端主動觸發的。它們的共同點在於都須要在獲取了服務器響應以後,由SendThread寫入EventThreadwaitingEvents中,而後由EventThread逐個從事件隊列中獲取並處理。

相關文章
相關標籤/搜索