目標:介紹基於zookeeper的來實現的遠程通訊、介紹dubbo-remoting-zookeeper內的源碼解析。
對於zookeeper我相信確定不陌生,在以前的文章裏面也有講到zookeeper來做爲註冊中心。在這裏,基於zookeeper來實現遠程通信,duubo封裝了zookeeper client,來和zookeeper server通信。java
下面是類圖:git
public interface ZookeeperClient { /** * 建立client * @param path * @param ephemeral */ void create(String path, boolean ephemeral); /** * 刪除client * @param path */ void delete(String path); /** * 得到子節點集合 * @param path * @return */ List<String> getChildren(String path); /** * 向zookeeper的該節點發起訂閱,得到該節點全部 * @param path * @param listener * @return */ List<String> addChildListener(String path, ChildListener listener); /** * 移除該節點的子節點監聽器 * @param path * @param listener */ void removeChildListener(String path, ChildListener listener); /** * 新增狀態監聽器 * @param listener */ void addStateListener(StateListener listener); /** * 移除狀態監聽 * @param listener */ void removeStateListener(StateListener listener); /** * 判斷是否鏈接 * @return */ boolean isConnected(); /** * 關閉客戶端 */ void close(); /** * 得到url * @return */ URL getUrl(); }
該接口是基於zookeeper的客戶端接口,其中封裝了客戶端的一些方法。github
該類實現了ZookeeperClient接口,是客戶端的抽象類,它實現了一些公共邏輯,把具體的doClose、createPersistent等方法抽象出來,留給子類來實現。api
/** * url對象 */ private final URL url; /** * 狀態監聽器集合 */ private final Set<StateListener> stateListeners = new CopyOnWriteArraySet<StateListener>(); /** * 客戶端監聽器集合 */ private final ConcurrentMap<String, ConcurrentMap<ChildListener, TargetChildListener>> childListeners = new ConcurrentHashMap<String, ConcurrentMap<ChildListener, TargetChildListener>>(); /** * 是否關閉 */ private volatile boolean closed = false;
@Override public void create(String path, boolean ephemeral) { // 若是不是臨時節點 if (!ephemeral) { // 判斷該客戶端是否存在 if (checkExists(path)) { return; } } // 得到/的位置 int i = path.lastIndexOf('/'); if (i > 0) { // 建立客戶端 create(path.substring(0, i), false); } // 若是是臨時節點 if (ephemeral) { // 建立臨時節點 createEphemeral(path); } else { // 遞歸建立節點 createPersistent(path); } }
該方法是建立客戶端的方法,其中createEphemeral和createPersistent方法都被抽象出來。具體看下面的類的介紹。服務器
@Override public void addStateListener(StateListener listener) { // 狀態監聽器加入集合 stateListeners.add(listener); }
該方法就是增長狀態監聽器。app
@Override public void close() { if (closed) { return; } closed = true; try { // 關閉 doClose(); } catch (Throwable t) { logger.warn(t.getMessage(), t); } }
該方法是關閉客戶端,其中doClose方法也被抽象出。框架
/** * 關閉客戶端 */ protected abstract void doClose(); /** * 遞歸建立節點 * @param path */ protected abstract void createPersistent(String path); /** * 建立臨時節點 * @param path */ protected abstract void createEphemeral(String path); /** * 檢測該節點是否存在 * @param path * @return */ protected abstract boolean checkExists(String path); /** * 建立子節點監聽器 * @param path * @param listener * @return */ protected abstract TargetChildListener createTargetChildListener(String path, ChildListener listener); /** * 爲子節點添加監聽器 * @param path * @param listener * @return */ protected abstract List<String> addTargetChildListener(String path, TargetChildListener listener); /** * 移除子節點監聽器 * @param path * @param listener */ protected abstract void removeTargetChildListener(String path, TargetChildListener listener);
上述的方法都是被抽象的,又它的兩個子類來實現。ide
該類繼承了AbstractZookeeperClient,是zk客戶端的實現類。函數
/** * zk客戶端包裝類 */ private final ZkClientWrapper client; /** * 鏈接狀態 */ private volatile KeeperState state = KeeperState.SyncConnected;
該類有兩個屬性,其中client就是核心所在,幾乎全部方法都調用了client的方法。源碼分析
public ZkclientZookeeperClient(URL url) { super(url); // 新建一個zkclient包裝類 client = new ZkClientWrapper(url.getBackupAddress(), 30000); // 增長狀態監聽 client.addListener(new IZkStateListener() { /** * 若是狀態改變 * @param state * @throws Exception */ @Override public void handleStateChanged(KeeperState state) throws Exception { ZkclientZookeeperClient.this.state = state; // 若是狀態變爲了斷開鏈接 if (state == KeeperState.Disconnected) { // 則修改狀態 stateChanged(StateListener.DISCONNECTED); } else if (state == KeeperState.SyncConnected) { stateChanged(StateListener.CONNECTED); } } @Override public void handleNewSession() throws Exception { // 狀態變爲重連 stateChanged(StateListener.RECONNECTED); } }); // 啓動客戶端 client.start(); }
該方法是構造方法,同時在裏面也作了建立客戶端和啓動客戶端的操做。其餘方法都是實現了父類抽象的方法,而且調用的是client方法,爲舉個例子:
@Override public void createPersistent(String path) { try { // 遞歸建立節點 client.createPersistent(path); } catch (ZkNodeExistsException e) { } }
該方法是遞歸場景節點,調用的就是client.createPersistent(path)。
該類是Curator框架提供的一套高級API,簡化了ZooKeeper的操做,從而對客戶端的實現。
/** * 框架式客戶端 */ private final CuratorFramework client;
public CuratorZookeeperClient(URL url) { super(url); try { // 工廠建立者 CuratorFrameworkFactory.Builder builder = CuratorFrameworkFactory.builder() .connectString(url.getBackupAddress()) .retryPolicy(new RetryNTimes(1, 1000)) .connectionTimeoutMs(5000); String authority = url.getAuthority(); if (authority != null && authority.length() > 0) { builder = builder.authorization("digest", authority.getBytes()); } // 建立客戶端 client = builder.build(); // 添加監聽器 client.getConnectionStateListenable().addListener(new ConnectionStateListener() { @Override public void stateChanged(CuratorFramework client, ConnectionState state) { // 若是爲狀態爲lost,則改變爲未鏈接 if (state == ConnectionState.LOST) { CuratorZookeeperClient.this.stateChanged(StateListener.DISCONNECTED); } else if (state == ConnectionState.CONNECTED) { // 改變狀態爲鏈接 CuratorZookeeperClient.this.stateChanged(StateListener.CONNECTED); } else if (state == ConnectionState.RECONNECTED) { // 改變狀態爲未鏈接 CuratorZookeeperClient.this.stateChanged(StateListener.RECONNECTED); } } }); // 啓動客戶端 client.start(); } catch (Exception e) { throw new IllegalStateException(e.getMessage(), e); } }
該方法是構造方法,一樣裏面也包含了客戶端建立和啓動的邏輯。
其餘的方法也同樣是實現了父類的抽象方法,舉個列子:
@Override public void createPersistent(String path) { try { client.create().forPath(path); } catch (NodeExistsException e) { } catch (Exception e) { throw new IllegalStateException(e.getMessage(), e); } }
@SPI("curator") public interface ZookeeperTransporter { /** * 鏈接服務器 * @param url * @return */ @Adaptive({Constants.CLIENT_KEY, Constants.TRANSPORTER_KEY}) ZookeeperClient connect(URL url); }
該方法是zookeeper的信息交換接口。一樣也是一個可擴展接口,默認實現CuratorZookeeperTransporter類。
public class ZkclientZookeeperTransporter implements ZookeeperTransporter { @Override public ZookeeperClient connect(URL url) { // 新建ZkclientZookeeperClient實例 return new ZkclientZookeeperClient(url); } }
該類實現了ZookeeperTransporter,其中就是建立了ZkclientZookeeperClient實例。
public class CuratorZookeeperTransporter implements ZookeeperTransporter { @Override public ZookeeperClient connect(URL url) { // 建立CuratorZookeeperClient實例 return new CuratorZookeeperClient(url); } }
該接口實現了ZookeeperTransporter,是ZookeeperTransporter默認的實現類,一樣也是建立了;對應的CuratorZookeeperClient實例。
該類是zk客戶端的包裝類。
/** * 超時事件 */ private long timeout; /** * zk客戶端 */ private ZkClient client; /** * 客戶端狀態 */ private volatile KeeperState state; /** * 客戶端線程 */ private ListenableFutureTask<ZkClient> listenableFutureTask; /** * 是否開始 */ private volatile boolean started = false;
public ZkClientWrapper(final String serverAddr, long timeout) { this.timeout = timeout; listenableFutureTask = ListenableFutureTask.create(new Callable<ZkClient>() { @Override public ZkClient call() throws Exception { // 建立zk客戶端 return new ZkClient(serverAddr, Integer.MAX_VALUE); } }); }
設置了超時時間和客戶端線程。
public void start() { // 若是客戶端沒有開啓 if (!started) { // 建立鏈接線程 Thread connectThread = new Thread(listenableFutureTask); connectThread.setName("DubboZkclientConnector"); connectThread.setDaemon(true); // 開啓線程 connectThread.start(); try { // 得到zk客戶端 client = listenableFutureTask.get(timeout, TimeUnit.MILLISECONDS); } catch (Throwable t) { logger.error("Timeout! zookeeper server can not be connected in : " + timeout + "ms!", t); } started = true; } else { logger.warn("Zkclient has already been started!"); } }
該方法是客戶端啓動方法。
public void addListener(final IZkStateListener listener) { // 增長監聽器 listenableFutureTask.addListener(new Runnable() { @Override public void run() { try { client = listenableFutureTask.get(); // 增長監聽器 client.subscribeStateChanges(listener); } catch (InterruptedException e) { logger.warn(Thread.currentThread().getName() + " was interrupted unexpectedly, which may cause unpredictable exception!"); } catch (ExecutionException e) { logger.error("Got an exception when trying to create zkclient instance, can not connect to zookeeper server, please check!", e); } } }); }
該方法是爲客戶端添加監聽器。
其餘方法都是對於 客戶端是否還鏈接的檢測,可自行查看代碼。
public interface ChildListener { /** * 子節點修改 * @param path * @param children */ void childChanged(String path, List<String> children); }
該接口是子節點的監聽器,當子節點變化的時候會用到。
public interface StateListener { int DISCONNECTED = 0; int CONNECTED = 1; int RECONNECTED = 2; /** * 狀態修改 * @param connected */ void stateChanged(int connected); }
該接口是狀態監聽器,其中定義了一個狀態更改的方法以及三種狀態。
該部分相關的源碼解析地址: https://github.com/CrazyHZM/i...
該文章講解了基於zookeeper的來實現的遠程通訊、介紹dubbo-remoting-zookeeper內的源碼解析,關鍵須要對zookeeper有所瞭解。該篇以後,遠程通信的源碼解析就先到這裏了,其實你們會發現,若是可以對講解api系列的文章瞭解透了,那麼後面的文章九很簡單,就好像軌道鋪好,能夠直接順着軌道日後,根本沒有阻礙。接下來我將開始對rpc模塊進行講解。