ZooKeeper學習第五期---構建ZooKeeper應用

1、配置服務

配置服務是分佈式應用所須要的基本服務之一,它使集羣中的機器能夠共享配置信息中那些公共的部分。簡單地說,ZooKeeper能夠做爲一個具備高可用性的配置存儲器,容許分佈式應用的參與者檢索和更新配置文件。使用ZooKeeper中的觀察機制,能夠創建一個活躍的配置服務,使那些感興趣的客戶端可以得到配置信息修改的通知。java

下面來編寫一個這樣的服務。咱們經過兩個假設來簡化所需實現的服務(稍加修改就能夠取消這兩個假設)。node

第一,咱們惟一須要存儲的配置數據是字符串,關鍵字是znode的路徑,所以咱們在每一個znode上存儲了一個鍵/值對。
第二,在任什麼時候候只有一個客戶端會執行更新操做。程序員

除此以外,這個模型看起來就像是有一個主人(相似於HDFS中的namenode)在更新信息,而他的工人則須要遵循這些信息。算法

在名爲ActiveKeyValueStore的類中編寫了以下代碼:apache

package org.zk;

import java.nio.charset.Charset;

import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.data.Stat;

public class ActiveKeyValueStore extends ConnectionWatcher {
    private static final Charset CHARSET=Charset.forName("UTF-8");
    public void write(String path,String value) throws KeeperException, InterruptedException {
        Stat stat = zk.exists(path, false);
        if(stat==null){
            zk.create(path, value.getBytes(CHARSET),Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
        }else{
            zk.setData(path, value.getBytes(CHARSET),-1);
        }
    }
    public String read(String path,Watcher watch) throws KeeperException, InterruptedException{
        byte[] data = zk.getData(path, watch, null);
        return new String(data,CHARSET);
        
    }
    
}

write()方法的任務是將一個關鍵字及其值寫到ZooKeeper。它隱藏了建立一個新的znode和用一個新值更新現有znode之間的區 別,而是使用exists操做來檢測znode是否存在,而後再執行相應的操做。其餘值得一提的細節是須要將字符串值轉換爲字節數組,由於咱們只用了 UTF-8編碼的getBytes()方法。☆☆☆數組

read()方法的任務是讀取一個節點的配置屬性。ZooKeeper的getData()方法有三個參數:安全

(1)路徑
(2)一個觀察對象
(3)一個Stat對象服務器

Stat對象由getData()方法返回的值填充,用來將信息回傳給調用者。經過這個方法,調用者能夠得到一個znode的數據和元數據,但在這個例子中,因爲咱們對元數據不感興趣,所以將Stat參數設爲null。網絡

爲了說明ActiveKeyValueStore的用法,咱們編寫了一個用來更新配置屬性值的類ConfigUpdater,如代碼1.1所示。session

代碼1.1 用於隨機更新ZooKeeper中的屬性

package org.zk;

import java.io.IOException;
import java.util.Random;
import java.util.concurrent.TimeUnit;

import org.apache.zookeeper.KeeperException;

public class ConfigUpdater {
    
    public static final String  PATH="/config";
    
    private ActiveKeyValueStore store;
    private Random random=new Random();
    
    public ConfigUpdater(String hosts) throws IOException, InterruptedException {
        store = new ActiveKeyValueStore();
        store.connect(hosts);
    }
    public void run() throws InterruptedException, KeeperException{
        while(true){
            String value=random.nextInt(100)+"";
            store.write(PATH, value);
            System.out.printf("Set %s to %s\n",PATH,value);
            TimeUnit.SECONDS.sleep(random.nextInt(100));
            
        }
    }
    public static void main(String[] args) throws IOException, InterruptedException, KeeperException {
        ConfigUpdater configUpdater = new ConfigUpdater(args[0]);
        configUpdater.run();
    }
}

這個程序很簡單,ConfigUpdater中定義了一個ActiveKeyValueStore,它在ConfigUpdater的構造函數中鏈接到ZooKeeper。run()方法永遠在循環,在隨機時間以隨機值更新/config znode。

  做爲配置服務的用戶,ConfigWatcher建立了一個ActiveKeyValueStore對象store,而且在啓動以後經過 displayConfig()調用了store的read()方法,顯示它所讀到的配置信息的初始值,並將自身做爲觀察傳遞給store。當節點狀態發 生變化時,再次經過displayConfig()顯示配置信息,並再次將自身做爲觀察傳遞給store,參見代碼1.2:

例1.2 該用應觀察ZooKeeper中屬性的更新狀況,並將其打印到控制檯

package org.zk;

import java.io.IOException;

import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.Watcher.Event.EventType;

public class ConfigWatcher implements Watcher{
    private ActiveKeyValueStore store;

    @Override
    public void process(WatchedEvent event) {
        if(event.getType()==EventType.NodeDataChanged){
            try{
                dispalyConfig();
            }catch(InterruptedException e){
                System.err.println("Interrupted. exiting. ");
                Thread.currentThread().interrupt();
            }catch(KeeperException e){
                System.out.printf("KeeperException錛?s. Exiting.\n", e);
            }
            
        }
        
    }
    public ConfigWatcher(String hosts) throws IOException, InterruptedException {
        store=new ActiveKeyValueStore();
        store.connect(hosts);
    }
    public void dispalyConfig() throws KeeperException, InterruptedException{
        String value=store.read(ConfigUpdater.PATH, this);
        System.out.printf("Read %s as %s\n",ConfigUpdater.PATH,value);
    }

    public static void main(String[] args) throws IOException, InterruptedException, KeeperException {
        ConfigWatcher configWatcher = new ConfigWatcher(args[0]);
        configWatcher.dispalyConfig();
        //stay alive until process is killed or Thread is interrupted
        Thread.sleep(Long.MAX_VALUE);
    }
}

當ConfigUpdater更新znode時,ZooKeeper產生一個類型爲EventType.NodeDataChanged的 事件,從而觸發觀察。ConfigWatcher在它的process()方法中對這個事件作出反應,讀取並顯示配置的最新版本。因爲觀察僅發送單次信 號,所以每次咱們調用ActiveKeyValueStore的read()方法時,都將一個新的觀察告知ZooKeeper來確保咱們能夠看到未來的更 新。可是,咱們仍是不能保證接收到每個更新,由於在收到觀察事件通知與下一次讀之間,znode可能已經被更新過,並且多是不少次,因爲客戶端在這段 時間沒有註冊任何觀察,所以不會收到通知。對於示例中的配置服務,這不是問題,由於客戶端只關心屬性的最新值,最新值優先於以前的值。可是,通常狀況下, 這個潛在的問題是不容忽視的。

讓咱們看看如何使用這個程序。在一個終端窗口中運行ConfigUpdater,而後在另外一個客戶端運行ConfigWatcher,咱們能夠預先 分別在兩個客戶端輸入命令,先不按回車,等兩個客戶端的命令輸入好後,先在運行ConfigUpdater的客戶端按回車,再在另外一個客戶端按回車,運行 結果以下:

2、可恢復的ZooKeeper應用

關於分佈式計算的第一個誤區是「網絡是可靠的」。按照他們的觀點,程序老是有一個可靠的網絡,所以當程序運行在真正的網絡中時,每每會出現各類備樣的故障。讓咱們看看各類可能的故障模式,以及可以解決故障的措施,使咱們的程序在面對故障時可以及時復原。

2.1 ZooKeeper異常

在Java API中的每個ZooKeeper操做都在其throws子句中聲明瞭兩種類型的異常,分別是InterruptedException和KeeperException。

(一)InterruptedException異常

若是操做被中斷,則會有一個InterruptedException異常被拋出。在Java語言中有一個取消阻塞方法的標準機制,即針對存在阻塞方法的線程調用interrupt()。一個成功的取消操做將產生一個InterruptedException異常。

ZooKeeper也遵循這一機制,所以你可使用這種方法來取消一個ZooKeeper操做。使用了ZooKeeper的類或庫一般會傳播 InterruptedException異常,使客戶端可以取消它們的操做。InterruptedException異常並不意味着有故障,而是代表相應的操做已經被取消,因此在配置服務的示例中,能夠經過傳播異常來停止應用程序的運行。

(二)KeeperException異常

(1) 若是ZooKeeper服務器發出一個錯誤信號或與服務器存在通訊問題,拋出的則是KeeperException異常。

針對不一樣的錯誤狀況,KeeperException異常存在不一樣的子類。

例如: KeeperException.NoNodeException是KeeperException的一個子類,若是你試圖針對一個不存在的znode執行操做,拋出的則是該異常。

每個KeeperException異常的子類都對應一個關於錯誤類型信息的代碼。

例如: KeeperException.NoNodeException異常的代碼是KeeperException.Code.NONODE

(2) 有兩種方法被用來處理KeeperException異常:

①捕捉KeeperException異常,而且經過檢測它的代碼來決定採起何種補救措施;

另外一種是捕捉等價的KeeperException子類,而且在每段捕捉代碼中執行相應的操做。

(3) KeeperException異常分爲三大類

① 狀態異常 

當一個操做因不能被應用於znode樹而致使失敗時,就會出現狀態異常。狀態異常產生的緣由一般是在同一時間有另一個進程正在修改znode。例如,若是一個znode先被另一個進程更新了,根據版本號執行setData操做的進程就會失敗,並收到一個KeeperException.BadVersionException異常,這是由於版本號不匹配。程序員一般都知道這種衝突老是存在的,也都會編寫代碼來進行處理。

一些狀態異常會指出程序中的錯誤,例如KeeperException.NoChildrenForEphemeralsException異常,試圖在短暫znode下建立子節點時就會拋出該異常。

② 可恢復異常

可恢復的異常是指那些應用程序可以在同一個ZooKeeper會話中恢復的異常。一個可恢復的異常是經過KeeperException.ConnectionLossException來表示的,它意味着已經丟失了與ZooKeeper的鏈接。ZooKeeper會嘗試從新鏈接,而且在大多數狀況下從新鏈接會成功,並確保會話是完整的。

可是ZooKeeper不能判斷與KeeperException.ConnectionLossException異常相關的操做是否成功執行。這種狀況就是部分失敗的一個例子。這時程序員有責任來解決這種不肯定性,而且根據應用的狀況來採起適當的操做。在這一點上,就須要對「冪等」(idempotent)操做和「非冪等」(Nonidempotent)操做進行區分。冪等操做是指那些一次或屢次執行都會產生相同結果的操做,例如讀請求或無條件執行的setData操做。對於冪等操做,只須要簡單地進行重試便可。對於非冪等操做,就不能盲目地進行重試,由於它們屢次執行的結果與一次執行是徹底不一樣的。程序能夠經過在znode的路徑它的數據中編碼信息來檢測是否非冪等操怍的更新已經完成。

③不可恢復的異常 

在某些狀況下,ZooKeeper會話會失效——也許由於超時或由於會話被關閉,兩種狀況下都會收到KeeperException.SessionExpiredException異常,或由於身份驗證失敗,KeeperException.AuthFailedException異常。不管上述哪一種狀況,全部與會話相關聯的短暫znode都將丟失,所以應用程序須要在從新鏈接到ZooKeeper以前重建它的狀態。

2.2 可靠地服務配置

首先咱們先回顧一下ActivityKeyValueStore的write()的方法,他由一個exists操做緊跟着一個create操做或setData操做組成:

public class ActiveKeyValueStore extends ConnectionWatcher {
    private static final Charset CHARSET=Charset.forName("UTF-8");
    public void write(String path,String value) throws KeeperException, InterruptedException {
        Stat stat = zk.exists(path, false);
        if(stat==null){
            zk.create(path, value.getBytes(CHARSET),Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
        }else{
            zk.setData(path, value.getBytes(CHARSET),-1);
        }
    }
    public String read(String path,Watcher watch) throws KeeperException, InterruptedException{
        byte[] data = zk.getData(path, watch, null);
        return new String(data,CHARSET);
        
    }
    
}

做爲一個總體,write()方法是一個「冪等」操做,因此咱們能夠對他進行無條件重試。咱們新建一個類ChangedActiveKeyValueStore,代碼以下:

package org.zk;

import java.nio.charset.Charset;
import java.util.concurrent.TimeUnit;

import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.data.Stat;

public class ChangedActiveKeyValueStore extends ConnectionWatcher{
    private static final Charset CHARSET=Charset.forName("UTF-8");
    private static final int MAX_RETRIES = 5; 
    private static final long RETRY_PERIOD_SECONDS = 5;
    
    public void write(String path,String value) throws InterruptedException, KeeperException{
        int retries=0;
        while(true){
            try {
                Stat stat = zk.exists(path, false);
                if(stat==null){
                    zk.create(path, value.getBytes(CHARSET),Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
                }else{
                    zk.setData(path, value.getBytes(CHARSET),stat.getVersion());
                }
            } catch (KeeperException.SessionExpiredException e) {
                throw e;
            } catch (KeeperException e) {
                if(retries++==MAX_RETRIES){
                    throw e;
                }
                //sleep then retry
                TimeUnit.SECONDS.sleep(RETRY_PERIOD_SECONDS);
            }
        }
    }
    public String read(String path,Watcher watch) throws KeeperException, InterruptedException{
        byte[] data = zk.getData(path, watch, null);
        return new String(data,CHARSET);
    }
}

在該類中,對前面的write()進行了修改,該版本的wirte()可以循環執行重試。其中設置了重試的最大次數MAX_RETRIES和兩次重試之間的間隔RETRY_PERIOD_SECONDS.

咱們再新建一個類ResilientConfigUpdater,該類對前面的ConfigUpdater進行了修改,代碼以下:

package org.zk;

import java.io.IOException;
import java.nio.charset.Charset;
import java.util.Random;
import java.util.concurrent.TimeUnit;

import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.KeeperException.SessionExpiredException;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.data.Stat;

public class ResilientConfigUpdater extends ConnectionWatcher{
    public static final String PATH="/config";
    private ChangedActiveKeyValueStore store;
    private Random random=new Random();
    
    public ResilientConfigUpdater(String hosts) throws IOException, InterruptedException {
        store=new ChangedActiveKeyValueStore();
        store.connect(hosts);
    }
    public void run() throws InterruptedException, KeeperException{
        while(true){
            String value=random.nextInt(100)+"";
            store.write(PATH,value);
            System.out.printf("Set %s to %s\n",PATH,value);
            TimeUnit.SECONDS.sleep(random.nextInt(10));
        }
    }

    public static void main(String[] args) throws Exception {
        while(true){
            try {
                ResilientConfigUpdater configUpdater = new ResilientConfigUpdater(args[0]);
                configUpdater.run();
            }catch (KeeperException.SessionExpiredException e) {
                // start a new session
            }catch (KeeperException e) {
                // already retried ,so exit
                e.printStackTrace();
                break;
            }
        }
    }
}

在這段代碼中沒有對KeepException.SeeionExpiredException異常進行重試,由於一個會話過時 時,ZooKeeper對象會進入CLOSED狀態,此狀態下它不能進行重試鏈接。咱們只能將這個異常簡單拋出並讓擁有着建立一個新實例,以重試整個 write()方法。一個簡單的建立新實例的方法是建立一個新的ResilientConfigUpdater用於恢復過時會話。

處理會話過時的另外一種方法是在觀察中(在這個例子中應該是ConnectionWatcher)尋找類型爲ExpiredKeepState,而後 再找到的時候建立一個新鏈接。即便咱們收到KeeperException.SessionExpiredEception異常,這種方法仍是可讓咱們 在write()方法內不斷重試,由於鏈接最終是可以從新創建的。無論咱們採用何種機制從過時會話中恢復,重要的是,這種不一樣於鏈接丟失的故障類型,須要 進行不一樣的處理。

注意:實際上,這裏忽略了另外一種故障模式。當ZooKeeper對象被建立時,他會嘗試鏈接另外一個ZooKeeper服務器。若是鏈接失敗或超時, 那麼他會嘗試鏈接集合體中的另外一臺服務器。若是在嘗試集合體中的全部服務器以後仍然沒法創建鏈接,它會拋出一個IOException異常。因爲全部的 ZooKeeper服務器都不可用的可能性很小,因此某些應用程序選擇循環重試操做,直到ZooKeeper服務爲止。

這僅僅是一種重試處理策略,還有許多其餘處理策略,例如使用「指數返回」,每次將重試的間隔乘以一個常數。Hadoop內核中 org.apache.hadoop.io.retry包是一組工具,用於能夠重用的方式將重試邏輯加入代碼,所以他對於構建ZooKeeper應用很是 有用。

3、鎖服務

3.1分佈式鎖概述

分佈式鎖在一組進程之間提供了一種互斥機制。在任什麼時候刻,在任什麼時候刻只有一個進程能夠持有鎖。分佈式鎖能夠在大型分佈式系統中實現領導者選舉,在任什麼時候間點,持有鎖的那個進程就是系統的領導者。

注意:不要將ZooKeeper本身的領導者選舉和使用了ZooKeeper基本操做實現的通常領導者選混爲一談。ZooKeeper本身的領導者選舉機制是對外不公開的,咱們這裏所描述的通常領導者選舉服務則不一樣,他是對那些須要與主進程保持一致的分佈式系統所設計的。

(1) 爲了使用ZooKeeper來實現分佈式鎖服務,咱們使用順序znode來爲那些競爭鎖的進程強制排序。

思路很簡單:

 首先指定一個做爲鎖的znode,一般用它來描述被鎖定的實體,稱爲/leader;
② 而後但願得到鎖的客戶端建立一些短暫順序znode,做爲鎖znode的子節點。
③ 在任什麼時候間點,順序號最小的客戶端將持有鎖。 

例如,有兩個客戶端差很少同時建立znode,分別爲/leader/lock-1和/leader/lock-2,那麼建立/leader/lock-1的客戶端將會持有鎖,由於它的znode順序號最小。ZooKeeper服務是順序的仲裁者,由於它負責分配順序號。

④ 經過刪除znode /leader/lock-l便可簡單地將鎖釋放;
⑤ 另外,若是客戶端進程死亡,對應的短暫znode也會被刪除。
⑥ 接下來,建立/leader/lock-2的客戶端將持有鎖,由於它順序號緊跟前一個。
⑦ 經過建立一個關於znode刪除的觀察,可使客戶端在得到鎖時獲得通知。

(2) 以下是申請獲取鎖的僞代碼。

在鎖znode下建立一個名爲lock-的短暫順序znode,而且記住它的實際路徑名(create操做的返回值)。
查詢鎖znode的子節點而且設置一個觀察。
若是步驟l中所建立的znode在步驟2中所返回的全部子節點中具備最小的順序號,則獲取到鎖。退出。
等待步驟2中所設觀察的通知而且轉到步驟2。

3.2 當前問題與方案

3.2.1 羊羣效應

(1) 問題

雖然這個算法是正確的,但仍是存在一些問題。第一個問題是這種實現會受到「羊羣效應」(herd effect)的影響。考慮有成百上千客戶端的狀況,全部的客戶端都在嘗試得到鎖,每一個客戶端都會在鎖znode上設置一個觀察,用於捕捉子節點的變化。 每次鎖被釋放或另一個進程開始申請獲取鎖的時候,觀察都會被觸發而且每一個客戶端都會收到一個通知。  「羊羣效應「就是指大量客戶端收到同一事件的通知,但實際上只有不多一部分須要處理這一事件。在這種狀況下,只有一個客戶端會成功地獲取鎖,可是維護過程及向全部客戶端發送觀察事件會產生峯值流量,這會對ZooKeeper服務器形成壓力。

(2) 方案解決方案

爲了不出現羊羣效應,咱們須要優化通知的條件。關鍵在於只有在前一個順序號的子節點消失時才須要通知下一個客戶端,而不是刪除(或建立)任何子節點時都須要通知。在咱們的例子中,若是客戶端建立了znode /leader/lock-一、/leader/lock-2和/leader/lock-3,那麼只有當/leader/lock-2消失時才須要通知/leader/lock-3對照的客戶端;/leader/lock-1消失或有新的znode /leader/lock-4加入時,不須要通知該客戶端。

3.2.2 可恢復的異常

(1) 問題

這個申請鎖的算法目前還存在另外一個問題,就是不能處理因鏈接丟失而致使的create操做失敗。如前所述,在這種狀況下,咱們不知道操做是成功仍是失敗。因爲建立一個順序znode是非冪等操做,因此咱們不能簡單地重試,由於若是第一次建立已經成功,重試會使咱們多出一個永遠刪不掉的孤兒zriode(至少到客戶端會話結束前)。不幸的結果是將會出現死鎖。

(2) 解決方案

問題在於,在從新鏈接以後客戶端不可以判斷它是否已經建立過子節點。解決方案是在znode的名稱中嵌入一個ID,若是客戶端出現鏈接丟失的狀況, 從新鏈接以後它即可以對鎖節點的全部於節點進行檢查,看看是否有子節點的名稱中包含其ID。若是有一個子節點的名稱包含其ID,它便知道建立操做已經成 功,不須要再建立子節點。若是沒有子節點的名稱中包含其ID,則客戶端能夠安全地建立一個新的順序子節點。
客戶端會話的ID是一個長整數,而且在ZooKeeper服務中是惟一的,所以很是適合在鏈接丟失後用於識別客戶端。能夠經過調用Java ZooKeeper類的getSessionld()方法來得到會話的ID。

在建立短暫順序znode時應當採用lock-<sessionld>-這樣的命名方式,ZooKeeper在其尾部添加順序號以後,znode的名稱會形如lock-<sessionld>-<sequenceNumber>。因爲順序號對於父節點來講是惟一的,但對於子節點名並不惟一,所以採用這樣的命名方式能夠詿子節點在保持建立順序的同時可以肯定本身的建立者。

3.2.3 不可恢復的異常

若是一個客戶端的ZooKeeper會話過時,那麼它所建立的短暫znode將會被刪除,已持有的鎖會被釋放,或是放棄了申請鎖的位置。使用鎖的應 用程序應當意識到它已經再也不持有鎖,應當清理它的狀態,而後經過建立並嘗試申請一個新的鎖對象來從新啓動。注意,這個過程是由應用程序控制的,而不是鎖, 由於鎖是不能預知應用程序須要如何清理本身的狀態。

4、ZooKeeper實現共享鎖

實現正確地實現一個分佈式鎖是一件棘手的事,由於很難對全部類型的故障都進行正確的解釋處理。ZooKeeper帶有一個 JavaWriteLock,客戶端能夠很方便地使用它。更多分佈式數據結構和協議例如「屏障」(bafrier)、隊列和兩階段提交協議。有趣的是它們 都是同步協議,即便咱們使用異步ZooKeeper基本操做(如通知)來實現它們。使用ZooKeeper能夠實現不少不一樣的分佈式數據結構和協 議,ZooKeeper網站(http://hadoop.apache.org/zookeeper/)提供了一些用於實現分佈式數據結構和協議的僞代碼。ZooKeeper自己也帶有一些棕準方法的實現,放在安裝位置下的recipes目錄中。

4.1 場景描述

你們也許都很熟悉了多個線程或者多個進程間的共享鎖的實現方式了,可是在分佈式場景中咱們會面臨多個Server之間的鎖的問題。

假設有這樣一個場景:兩臺server :serverA,serverB須要在C機器上的/usr/local/a.txt文 件上進行寫操做,若是兩臺機器同時寫該文件,那麼該文件的最終結果可能會產生亂序等問題。最早能想到的是serverA在寫文件前告訴ServerB 「我要開始寫文件了,你先別寫」,等待收到ServerB的確認回覆後ServerA開始寫文件,寫完文件後再通知ServerB「我已經寫完了」。假設 在咱們場景中有100臺機器呢,中間任意一臺機器通訊中斷了又該如何處理?容錯和性能問題呢?要能健壯,穩定,高可用並保持高性能,系統實現的複雜度比較 高,從頭開發這樣的系統代價也很大。幸運的是,咱們有了基於googlechubby原理開發的開源的ZooKeeper系統。接下來本文將介紹兩種 ZooKeeper實現分佈式共享鎖的方法。

4.2 利用節點名稱的惟一性來實現共享鎖

ZooKeeper表面上的節點結構是一個和unix文件系統相似的小型的樹狀的目錄結構,ZooKeeper機制規定:同一個目錄下只能有一個惟一的文件名。

例如:咱們在Zookeeper目錄/test目錄下建立,兩個客戶端建立一個名爲lock節點,只有一個可以成功。

(1) 算法思路:利用名稱惟一性,加鎖操做時,只須要全部客戶端一塊兒建立/Leader/lock節點,只有一個建立成功,成功者得到鎖。解鎖時,只需刪除/test/Lock節點,其他客戶端再次進入競爭建立節點,直到全部客戶端都得到鎖。

基於以上機制,利用節點名稱惟一性機制的共享鎖算法流程如圖所示:

4.3 利用順序節點實現共享鎖

首先介紹一下,Zookeeper中有一種節點叫作順序節點,故名思議,假如咱們在/lock/目錄下建立節3個點,ZooKeeper集羣會按照提起建立的順序來建立節點,節點分別爲/lock/000000000一、/lock/000000000二、/lock/0000000003。

ZooKeeper中還有一種名爲臨時節點的節點,臨時節點由某個客戶端建立,當客戶端與ZooKeeper集羣斷開鏈接,。則該節點自動被刪除。

算法思路:對於加鎖操做,可讓全部客戶端都去/lock目錄下建立臨時、順序節點,若是建立的客戶端發現自身建立節點序列號是/lock/目錄下最小的節點,則得到鎖。不然,監視比本身建立節點的序列號小的節點(當前序列在本身前面一個的節點),進入等待。解鎖操做,只須要將自身建立的節點刪除便可。具體算法流程以下圖所示:

4.4 ZooKeeper提供的一個寫鎖實現

 按照ZooKeeper提供的分佈式鎖的僞代碼,實現了一個分佈式鎖的簡單測試代碼以下:

(1)分佈式鎖,實現了Lock接口 DistributedLock.java

package com.concurrent;
 
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
 
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.Stat;
 
/**
    DistributedLock lock = null;
    try {
        lock = new DistributedLock("127.0.0.1:2182","test");
        lock.lock();
        //do something...
    } catch (Exception e) {
        e.printStackTrace();
    }
    finally {
        if(lock != null)
            lock.unlock();
    }
 * @author xueliang
 *
 */
public class DistributedLock implements Lock, Watcher{
    private ZooKeeper zk;
    private String root = "/locks";//根
    private String lockName;//競爭資源的標誌
    private String waitNode;//等待前一個鎖
    private String myZnode;//當前鎖
    private CountDownLatch latch;//計數器
    private int sessionTimeout = 30000;
    private List<Exception> exception = new ArrayList<Exception>();
     
    /**
     * 建立分佈式鎖,使用前請確認config配置的zookeeper服務可用
     * @param config 127.0.0.1:2181
     * @param lockName 競爭資源標誌,lockName中不能包含單詞lock
     */
    public DistributedLock(String config, String lockName){
        this.lockName = lockName;
        // 建立一個與服務器的鏈接
         try {
            zk = new ZooKeeper(config, sessionTimeout, this);
            Stat stat = zk.exists(root, false);
            if(stat == null){
                // 建立根節點
                zk.create(root, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.PERSISTENT);
            }
        } catch (IOException e) {
            exception.add(e);
        } catch (KeeperException e) {
            exception.add(e);
        } catch (InterruptedException e) {
            exception.add(e);
        }
    }
 
    /**
     * zookeeper節點的監視器
     */
    public void process(WatchedEvent event) {
        if(this.latch != null) { 
            this.latch.countDown(); 
        }
    }
     
    public void lock() {
        if(exception.size() > 0){
            throw new LockException(exception.get(0));
        }
        try {
            if(this.tryLock()){
                System.out.println("Thread " + Thread.currentThread().getId() + " " +myZnode + " get lock true");
                return;
            }
            else{
                waitForLock(waitNode, sessionTimeout);//等待鎖
            }
        } catch (KeeperException e) {
            throw new LockException(e);
        } catch (InterruptedException e) {
            throw new LockException(e);
        }
    }
 
    public boolean tryLock() {
        try {
            String splitStr = "_lock_";
            if(lockName.contains(splitStr))
                throw new LockException("lockName can not contains \\u000B");
            //建立臨時子節點
            myZnode = zk.create(root + "/" + lockName + splitStr, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.EPHEMERAL_SEQUENTIAL);
            System.out.println(myZnode + " is created ");
            //取出全部子節點
            List<String> subNodes = zk.getChildren(root, false);
            //取出全部lockName的鎖
            List<String> lockObjNodes = new ArrayList<String>();
            for (String node : subNodes) {
                String _node = node.split(splitStr)[0];
                if(_node.equals(lockName)){
                    lockObjNodes.add(node);
                }
            }
            Collections.sort(lockObjNodes);
            System.out.println(myZnode + "==" + lockObjNodes.get(0));
            if(myZnode.equals(root+"/"+lockObjNodes.get(0))){
                //若是是最小的節點,則表示取得鎖
                return true;
            }
            //若是不是最小的節點,找到比本身小1的節點
            String subMyZnode = myZnode.substring(myZnode.lastIndexOf("/") + 1);
            waitNode = lockObjNodes.get(Collections.binarySearch(lockObjNodes, subMyZnode) - 1);
        } catch (KeeperException e) {
            throw new LockException(e);
        } catch (InterruptedException e) {
            throw new LockException(e);
        }
        return false;
    }
 
    public boolean tryLock(long time, TimeUnit unit) {
        try {
            if(this.tryLock()){
                return true;
            }
            return waitForLock(waitNode,time);
        } catch (Exception e) {
            e.printStackTrace();
        }
        return false;
    }
 
    private boolean waitForLock(String lower, long waitTime) throws InterruptedException, KeeperException {
        Stat stat = zk.exists(root + "/" + lower,true);
        //判斷比本身小一個數的節點是否存在,若是不存在則無需等待鎖,同時註冊監聽
        if(stat != null){
            System.out.println("Thread " + Thread.currentThread().getId() + " waiting for " + root + "/" + lower);
            this.latch = new CountDownLatch(1);
            this.latch.await(waitTime, TimeUnit.MILLISECONDS);
            this.latch = null;
        }
        return true;
    }
 
    public void unlock() {
        try {
            System.out.println("unlock " + myZnode);
            zk.delete(myZnode,-1);
            myZnode = null;
            zk.close();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (KeeperException e) {
            e.printStackTrace();
        }
    }
 
    public void lockInterruptibly() throws InterruptedException {
        this.lock();
    }
 
    public Condition newCondition() {
        return null;
    }
     
    public class LockException extends RuntimeException {
        private static final long serialVersionUID = 1L;
        public LockException(String e){
            super(e);
        }
        public LockException(Exception e){
            super(e);
        }
    }
 
}

(2)併發測試工具 ConcurrentTest.java

package com.concurrent;
 
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;
 
/**
  ConcurrentTask[] task = new ConcurrentTask[5];
  for(int i=0;i<task.length;i++){
       task[i] = new ConcurrentTask(){
            public void run() {
                System.out.println("==============");
                 
            }};
  }
  new ConcurrentTest(task);
 * @author xueliang
 *
 */
public class ConcurrentTest {
    private CountDownLatch startSignal = new CountDownLatch(1);//開始閥門
    private CountDownLatch doneSignal = null;//結束閥門
    private CopyOnWriteArrayList<Long> list = new CopyOnWriteArrayList<Long>();
    private AtomicInteger err = new AtomicInteger();//原子遞增
    private ConcurrentTask[] task = null;
     
    public ConcurrentTest(ConcurrentTask... task){
        this.task = task;
        if(task == null){
            System.out.println("task can not null");
            System.exit(1);
        }
        doneSignal = new CountDownLatch(task.length);
        start();
    }
    /**
     * @param args
     * @throws ClassNotFoundException
     */
    private void start(){
        //建立線程,並將全部線程等待在閥門處
        createThread();
        //打開閥門
        startSignal.countDown();//遞減鎖存器的計數,若是計數到達零,則釋放全部等待的線程
        try {
            doneSignal.await();//等待全部線程都執行完畢
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        //計算執行時間
        getExeTime();
    }
    /**
     * 初始化全部線程,並在閥門處等待
     */
    private void createThread() {
        long len = doneSignal.getCount();
        for (int i = 0; i < len; i++) {
            final int j = i;
            new Thread(new Runnable(){
                public void run() {
                    try {
                        startSignal.await();//使當前線程在鎖存器倒計數至零以前一直等待
                        long start = System.currentTimeMillis();
                        task[j].run();
                        long end = (System.currentTimeMillis() - start);
                        list.add(end);
                    } catch (Exception e) {
                        err.getAndIncrement();//至關於err++
                    }
                    doneSignal.countDown();
                }
            }).start();
        }
    }
    /**
     * 計算平均響應時間
     */
    private void getExeTime() {
        int size = list.size();
        List<Long> _list = new ArrayList<Long>(size);
        _list.addAll(list);
        Collections.sort(_list);
        long min = _list.get(0);
        long max = _list.get(size-1);
        long sum = 0L;
        for (Long t : _list) {
            sum += t;
        }
        long avg = sum/size;
        System.out.println("min: " + min);
        System.out.println("max: " + max);
        System.out.println("avg: " + avg);
        System.out.println("err: " + err.get());
    }
     
    public interface ConcurrentTask {
        void run();
    }
 
}

(3)測試  ZkTest.java

package com.concurrent;

import com.concurrent.ConcurrentTest.ConcurrentTask;
 
public class ZkTest {
    public static void main(String[] args) {
        Runnable task1 = new Runnable(){
            public void run() {
                DistributedLock lock = null;
                try {
                    lock = new DistributedLock("127.0.0.1:2182","test1");
                    //lock = new DistributedLock("127.0.0.1:2182","test2");
                    lock.lock();
                    Thread.sleep(3000);
                    System.out.println("===Thread " + Thread.currentThread().getId() + " running");
                } catch (Exception e) {
                    e.printStackTrace();
                }
                finally {
                    if(lock != null)
                        lock.unlock();
                }
                 
            }
             
        };
        new Thread(task1).start();
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e1) {
            e1.printStackTrace();
        }
        ConcurrentTask[] tasks = new ConcurrentTask[60];
        for(int i=0;i<tasks.length;i++){
            ConcurrentTask task3 = new ConcurrentTask(){
                public void run() {
                    DistributedLock lock = null;
                    try {
                        lock = new DistributedLock("127.0.0.1:2183","test2");
                        lock.lock();
                        System.out.println("Thread " + Thread.currentThread().getId() + " running");
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                    finally {
                        lock.unlock();
                    }
                     
                }
            };
            tasks[i] = task3;
        }
        new ConcurrentTest(tasks);
    }
}

4.5 更多分佈式數據結構和協議

使用ZooKeeper能夠實現不少不一樣的分佈式數據結構和協議,例如「屏障」(bafrier)、隊列和兩階段提交協議。有趣的是它們都是同步協議,即便咱們使用異步ZooKeeper基本操做(如通知)來實現它們。

ZooKeeper網站(http://hadoop.apache.org/zookeeper)提供了一些用於實現分佈式數據結構和協議的僞代碼。ZooKeeper自己也帶有一些棕準方法的實現,放在安裝位置下的recipes目錄中。

5、BooKeeper

5.1 BooKeeper概述

BooKeeper具備副本功能,目的是提供可靠的日誌記錄。在BooKeeper中,服務器被稱爲帳本(Bookies),在帳本之中有不一樣的帳戶(Ledgers),每個帳戶由一條條記錄(Entry)組成。若是使用普通的磁盤存儲日誌數據,那麼日誌數據可能遭到破壞,當磁盤發生故障的時候,日誌也可能被丟失。BooKeeper爲每一份日誌提供了分佈式的存儲,並採用了大多數(quorum,相對於全體)的概念。也就是說,只要集羣中的大多數機器可用,那麼該日誌一直有效。

BooKeeper經過客戶端進行操做,客戶端能夠對BooKeeper進行添加帳戶、打開帳戶、添加帳戶記錄、讀取帳戶記錄等操做。另外,BooKeeper的服務依賴於ZooKeeper,能夠說BooKeeper依賴於ZooKeeper的一致性及其分佈式特色,在其之上提供另一種可靠性服務。BooKeeper的架構以下圖所示:☆

 

5.2 BooKeeper角色

從上圖中能夠看出,BooKeeper中總共包含四類角色:

① 帳本:Bookies
② 帳戶:Ledger
③ 客戶端:Client
④ 元數據及存儲服務:Metadata Storage Service

下面簡單介紹這四類角色的功能:

(1) 帳本 BooKies

帳本是BooKeeper的存儲服務器,他存儲的是一個個的帳本,能夠將帳本理解爲一個個節點。在一個BooKeeper系統中存在多個帳本(節點),每一個帳戶被不一樣的帳本所存儲。若要寫一條記錄到指定的帳戶中,該記錄將被寫到維護該帳戶全部賬本節點中。爲了提升系統的性能,這條記錄並非真正的被寫入到全部的節點中,而是選擇集羣的一個大多數集進行存儲。該系統獨有的特性,使得BooKeeper系統有良好的擴展性。即,咱們能夠經過簡單的添加機器節點的方法提升系統容量。☆☆

(2) 帳戶 Ledger

帳戶中存儲的是一系列記錄,每一條記錄包含必定的字段。記錄經過寫操做一次性寫入,只能進行附加操做不能進行修改。每條記錄包含以下字段:


當知足下列兩個條件時,某條記錄才被認爲是存儲成功:

 以前所記錄的數據被帳本節點的大多數集所存儲。
② 該記錄被帳本節點的大多數集所存儲。

(3) 客戶端 BooKeeper Client

客戶端一般與BooKeeper應用程序進行交互,它容許應用程序在系統上進行操做,包括建立帳戶,寫帳戶等。

(4) 元數據存儲服務 Metadata Storage Service

元數據信息存儲在ZooKeeper集羣當中,它存儲關於帳戶和帳本的信息。例如,帳本由集羣中的哪些節點進行維護,帳戶由哪一個帳本進行維護。應用 程序在使用帳本的時候,首先要建立一個帳戶。在建立帳戶時,系統首先將該帳本的Metadata信息寫入到ZooKeeper中。每個帳戶在某一時刻只 能有一個寫實例(分佈式鎖)。在其餘實例進行讀操做以前首先須要將寫實例關閉。若是寫操做實例因爲故障未能正常關閉,那麼下一個嘗試打開帳戶的實例將須要 首先對其進行恢復,並正確關閉寫操做。在進行寫操做的同時須要將最後一次的寫記錄存儲到ZooKeeper中,所以恢復程序僅須要在ZooKeeper中查看該帳戶所對應的最後一條寫記錄,而後將其正確的寫入到帳戶中,再在正確關閉寫操做。在BooKeeper中該恢復程序有系統自動執行不須要用戶參與。

相關文章
相關標籤/搜索