Zookeeper簡述

提到Zookeeper,不得不先聊聊分佈式協調技術html

1、什麼是分佈式協調技術java

  分佈式協調技術 主要用來解決分佈式環境當中多個進程之間的同步控制,讓他們有序的去訪問某種臨界資源,防止形成"髒數據"的後果。node

  那麼怎麼對這些進程進行調度呢?算法

  這時候咱們就須要一個協調器,來讓他們有序的來訪問這個資源。這個協調器就是咱們常常提到的那個鎖。經過這個鎖機制咱們就能保證了分佈式系統中多個進程可以有序的訪問該臨界資源。那麼咱們把這個分佈式環境下的這個鎖叫做分佈式鎖可是由於其運行所在的環境存在網絡延遲等不可靠因素的,致使對數據的處理存在許多困難。目前處理分佈式協調技術比較好的有Chubby(Google產品,收費)和Zookeeper(Apache產品,免費)。數據庫

2、什麼Zookeeperapache

  ZooKeeper是一種爲分佈式應用所設計的高可用、高性能且一致的開源協調服務,它提供了一項基本服務:分佈式鎖服務。因爲ZooKeeper的開源特性,後來咱們的開發者在分佈式鎖的基礎上,摸索了出了其餘的使用方法:配置維護、組服務、分佈式消息隊列、分佈式通知/協調等。編程

  ZooKeeper性能上的特色決定了它可以用在大型的、分佈式的系統當中。從可靠性方面來講,它並不會由於一個節點的錯誤而崩潰。除此以外,它嚴格的序列訪問控制意味着複雜的控制原語能夠應用在客戶端上。ZooKeeper在一致性、可用性、容錯性的保證,也是ZooKeeper的成功之處。vim

3、Zookeeper特性緩存

  一、全局數據一致:每一個server保存一份相同的數據副本,client不管鏈接到哪一個server,展現的數據都是一致性的,當客戶端操做一個節點的文件時,其餘兩個節點會隨之更新,這樣保證了全局數據的一致性安全

  二、可靠性:若是消息被其中一臺服務接受,那麼將被全部的服務器接受。

  三、順序性:包括全局有序和偏序兩種:全局有序是指若是在一臺服務器上消息a在消息b前發佈,則在全部Server上消息a都將在消息b前被髮布;偏序是指若是一個消息b在消息a後被同一個發送者發佈,a必將排在b前面。

  四、數據更新原子性:一次數據更新要麼成功,要麼失敗。

4、Zookeeper部署

  一、下載相對應的jar包

wget http://mirror.bit.edu.cn/apache/zookeeper/stable/zookeeper-3.4.12.tar.gz

  二、安裝到指定目錄

tail -zxvf zookeeper-3.4.12.tar.gz

  三、重命名配置文件

[oracle@bogon java]$ cd zookeeper-3.4.12/
[oracle@bogon zookeeper-3.4.12]$ cd conf
[oracle@bogon conf]$ ll
總用量 16
-rw-rw-r--. 1 oracle oracle  535 3月  27 12:32 configuration.xsl
-rw-rw-r--. 1 oracle oracle 2161 3月  27 12:32 log4j.properties
-rw-rw-r--. 1 oracle oracle  922 3月  27 12:32 zoo_sample.cfg
[oracle@bogon conf]$cp zoo_sample.cfg zoo.cfg  

  zookeeper默認讀取zoo.cfg配置文件

  其實到這單機版的zookeeper就算安裝完畢了,若是爲了方便管理,能夠爲其配置環境變量,這裏不作演示

  四、啓動

[root@bogon bin]# ./zkServer.sh start    #啓動服務端
[root@bogon bin]# ./zkServer.sh status   #查看狀態
[root@bogon bin]# ./zkCli.sh             #啓動客戶端

5、關於zookeeper數據模型

  在zookeeper下,其文件存儲相似於樹同樣具備層次結構,每一個文件節點被稱之爲Znode

  一、每一個Znode具備原子性

  二、每一個Znode存儲數據大小具備限制

  三、Znode經過路徑引用,可是其路徑必須是絕對的,所以他們必須由斜槓字符來開頭

  四、節點類型包含臨時節點和永久節點。臨時節點會話結束就自動刪除,臨時節點不容許擁有子節點。永久節點的生命週期不依賴與會話。

  五、Znode具備序列化特性。隨着其建立,其附帶一個序列號。此序列號對於此及節點的父節點是惟一的,這樣便記錄了每一個子節點建立的前後順序。

6、關於Zookeeper相關命令

 若是直接輸入zkCil.sh 他會自動匹配本機的zookeeper客戶端,Zookeeper本質就是一個小型的文件存儲系統。

 啓動通常語法:./zkCli.sh -timeout 0 -r -server ip:port

  如:./zkCli.sh -timeout 3000 -server h1:2181,表示鏈接到主機h1 超時時間爲3

 一、查詢  

  ls   語法:ls path [watch] 列出指定節點

[zk: localhost:2181(CONNECTED) 4] ls /      #遍歷根目錄
[zookeeper, QQQs, test]
[zk: localhost:2181(CONNECTED) 6] ls /test  #遍歷根目錄下子節點test
[test1]

  stat  語法:stat path [watch]

 列出指定節點的狀態信息,或者說是元數據

[zk: localhost:2181(CONNECTED) 8] stat /
cZxid = 0x0                      #節點被建立時的事務ID
ctime = Thu Jan 01 08:00:00 CST 1970    #節點被建立的時間
mZxid = 0x0                             #最近一次更新的時的事務ID
mtime = Thu Jan 01 08:00:00 CST 1970    #最近一次更新的時間
pZxid = 0x1a5                           #該節點的子節點列表最近一次被修改的事務ID
cversion = 99                   #子節點的版本號
dataVersion = 0                 #數據版本
aclVersion = 0                   #ACL版本號
ephemeralOwner = 0x0               #建立臨時節點的事務ID,若是是持久節點,則該節點爲0x0
dataLength = 0                   #當前節點的數據長度
numChildren = 3                     #當前節點的子節點數目

  get  語法:get path [watch]

  列出指定節點的數據

[zk: localhost:2181(CONNECTED) 10] get /test
demo

  ls2   語法:ls2 path [watch]

  是ls的升級版,列出子節點的同時列出節點的狀態信息

[zk: localhost:2181(CONNECTED) 11] ls2 /test
[test1]
cZxid = 0x1a5
ctime = Thu Jun 07 08:26:10 CST 2018
mZxid = 0x1a5
mtime = Thu Jun 07 08:26:10 CST 2018
pZxid = 0x1a6
cversion = 1
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 4
numChildren = 1

  history  查看歷史

  History命令能夠查看先前執行過的全部的命令,它通常與redo配合使用

[zk: localhost:2181(CONNECTED) 5] history
0 - ls -l
1 - ls /
2 - delquota -n /testquota
3 - ls /
4 - listquota /testquota
5 - history

  redo  從新執行先前命令,根據行數執行

[zk: localhost:2181(CONNECTED) 6] redo 1  #redo從新執行先前命令,根據行數執行
[testquota, zookeeper, hellozk]

  二、建立

  create  語法create [-s] [-e] path data acl

  其中:括號中時可選的,s表示建立永久節點,e表示建立臨時節點,acl表示訪問控制列表

[zk: localhost:2181(CONNECTED) 2] create /test demo
Created /test
[zk: localhost:2181(CONNECTED) 5] create /test/test1 demo1
Created /test/test1

  三、修改

  set  語法:set path data [version]

[zk: localhost:2181(CONNECTED) 12] ls /
[zookeeper, QQQs, test]
[zk: localhost:2181(CONNECTED) 13] set /test demo2
cZxid = 0x1a5
ctime = Thu Jun 07 08:26:10 CST 2018
mZxid = 0x1a7
mtime = Thu Jun 07 08:45:54 CST 2018  #修改時間與修改時間不一致
pZxid = 0x1a6  
cversion = 1  
dataVersion = 1                #數據版本號爲1,表示被更改一次
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 5
numChildren = 1

  四、刪除

  delete  語法:delete path [version]  

  只能刪除不含子節點的節點

[zk: localhost:2181(CONNECTED) 12] ls /
[zookeeper, QQQs, test]
[zk: localhost:2181(CONNECTED) 14] delete /QQQs
[zk: localhost:2181(CONNECTED) 15] ls /
[zookeeper, test]

  rmr  語法:rmr path

  能遞歸刪除節點

[zk: localhost:2181(CONNECTED) 15] ls /
[zookeeper, test]
[zk: localhost:2181(CONNECTED) 16] rmr /test
[zk: localhost:2181(CONNECTED) 17] ls / 
[zookeeper]

  五、增長限制

  setquota  語法:setquota -n | -b val path

  其中:n表示子節點的最大個數;b表示數據值的最大長度;val表示子節點最大個數或者數據值的最大長度;path表示節點路徑

[zk: localhost:2181(CONNECTED) 30] create /testquota 123    #建立具備限制屬                                                              性節點
Created /testquota
[zk: localhost:2181(CONNECTED) 35] setquota -n 3 /testquota #設置最大子節點爲3                                                               數爲3
Comment: the parts are option -n val 3 path /testquota   
[zk: localhost:2181(CONNECTED) 36] listquota /testquota
absolute path is /zookeeper/quota/testquota/zookeeper_limits
Output quota for /testquota count=3,bytes=-1           #count:子節點最大數  byte:數據長度(-1表示沒有限制)
Output stat for /testquota count=1,bytes=3             #count:當前子節點數(包含本身,1表示當前他沒有子節點)

  注意:若是建立的節點數超過了限制數,建立過程當中不會報警告,可是日誌中會顯示超額

2018-05-05 17:03:56,025 [myid:3] - WARN  [CommitProcessor:3:DataTree@302] - Quota exceeded: /testquota count=5 limit=3
Delquota

  刪除節點限制

[zk: localhost:2181(CONNECTED) 2] delquota -n /testquota  #刪除限制
[zk: localhost:2181(CONNECTED) 4] listquota /testquota
absolute path is /zookeeper/quota/testquota/zookeeper_limits
Output quota for /testquota count=-1,bytes=-1         #此時顯示-1表示沒有限制條件
Output stat for /testquota count=5,bytes=7

7、關於Zookeeper集羣

  Zookeeper集羣搭建指的是Zookeeper分佈式模擬安裝。一般由2n+1臺服務器組成。這是由於爲了保證leader選舉(基於Paxos算法的實現)可以獲得多數的支持,因此Zookeeper集羣的數量通常爲奇數。

  一、編輯相對應的配置文件

[root@bogon conf]# pwd
/usr/java/zookeeper-3.4.12/conf
[root@bogon conf]# vim zoo.cfg 

  二、配置相關數據

  1) 指定數據存儲目錄

dataDir=/usr/data/zkdata        #指定數據存儲目錄

  2) 添加配置Zookeeper服務器的地址即編號

##(心跳端口、選舉端口)
server.1=jiaxianseng.host:2888:3888 
server.2=jiaxianseng.host1:2888:3888
server.3=jiaxianseng.host2:2888:3888

  三、建立數據存儲目錄,在虛擬機中建立相對應的編號

[root@localhost conf]# mkdir -p /usr/data/zkdata
[root@localhost conf]#cd /usr/data/zkdata
[root@localhost zkdata]#touch myid
[root@localhost zkdata]# echo 1 >myid      #此表示選舉當前第一臺主機爲leader

  四、分發到另外兩臺機器

$scp -r zookeeper-3.4.12/ oracle@jiaxianseng.host1:/usr/java/
$scp -r zookeeper-3.4.12/ oracle@jiaxianseng.host2:/usr/java/

  五、分別開啓zookeeper,此時第一臺服務器當選爲leader,其建立的節點另外兩臺能夠收到

  擴展:若是在本機上玩僞集羣,須要注意:

  1) 在當前文件夾下須要進行分包配置,設定爲三臺zookeeper機器

[oracle@localhost zookeeper-3.4.12]$ ll
總用量 12
drwxr-xr-x 10 root root 4096 5月   5 14:40 zk1
drwxr-xr-x 10 root root 4096 5月   5 14:42 zk2
drwxr-xr-x 10 root root 4096 5月   5 12:46 zk3

  2) 配置文件設置

  由於是在本機上玩,三臺端口號應該不一樣

clientPort=2181

8、Zookeeper監聽機制 

  Zookeeper提供了分佈式數據發佈/訂閱功能。它容許客戶端向服務端註冊一個Watcher監聽。當服務端一些時間觸發了這個Watcher,那麼就會向指定客戶端發送一個時間通知實現分佈式的通知功能,其中節點的增刪改均可以觸發事件。

  值得注意的是:Zookeeper監聽機制嚴格按照先註冊再監聽順序,且觸發事件監聽是一次性的。觸發一次發送回調事件狀況後,下次觸發不會進行回調,須要從新註冊監聽。

  具體實現以下:

    1) 玩監聽:前提是先得支持監聽機制;其次是要註冊監聽,使用help命令進行查看

    [zk: localhost:2181(CONNECTED) 7] help   #使用help命令查看哪行命令支                                                持監聽
   ZooKeeper -server host:port cmd args
    connect host:port
    get path [watch]
    ls path [watch]
    stat path [watch]

  2) 先註冊監聽

[zk: localhost:2181(CONNECTED) 8] create /watchtest 123    #先註冊監聽
Created /watchtest

  3) 查看是否被監聽

[zk: localhost:2181(CONNECTED) 10] get /watchtest watch  #此節點被監聽

  4) 利用第二臺機器改變節點數據

[zk: localhost:2181(CONNECTED) 0] set /watchtest 456789

  5) 查看第一臺機器返回的信息

WATCHER::
WatchedEvent state:SyncConnected type:NodeDataChanged path:/watchtest  

  此時第一臺會收到節點被改變的提示,可是當節點再次被其餘機器改變時

  第一臺機器不會收到任何信息,說明監聽只會被觸發一次。若是想再次收到監聽信息,只能從新註冊監聽

9、關於Zookeeper Java API

  1) 建立maven工程,並導入約束

  <!-- https://mvnrepository.com/artifact/org.apache.zookeeper/zookeeper -->
<dependency>
    <groupId>org.apache.zookeeper</groupId>
    <artifactId>zookeeper</artifactId>
    <version>3.4.12</version>
</dependency>

  2) 編寫測試類,鏈接zookeeper,並建立節點

public class TestZKClient {
    public static void main(String[] args) throws Exception{  //main快捷:psvm
         //構造JAVA zookeeper客戶端
        //參數:1.鏈接ip+端口(可配置多個,用分號隔開)  2.time鏈接超時時間:默認值爲30000
        ZooKeeper zk=new ZooKeeper("192.168.174.133:2181,192.168.174.133:2182",30000,new Watcher(){

            //這裏就是事件通知的回調方法
            public void process(WatchedEvent event) {
                System.out.println("事件通知類型"+event.getState());
                System.out.println("事件發生類型"+event.getType());
                System.out.println("事件發生路徑"+event.getPath());
            }
        } );
        /**
         * 參數1:節點名 ;參數2:數據 ; 參數3:acl權限控制,這裏採用默認值 ;參數4:建立節點類型:這裏是持久序列化節點
         */
        zk.create("/myCirls","性感的".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT_SEQUENTIAL);
                zk.close();

    }
}

10、關於Zookeper分佈式鎖應用

  分佈式鎖,這個主要得益於Zookeeper保證了數據的強一致性。鎖服務能夠分爲兩類,一個保持獨佔,另外一個是控制時序。

  應用:在分佈式環境高併發場景下,生產有必定業務含義的惟一的訂單編號

  一、編寫服務類:

/**
 * 訂單編號服務
 * @author Administrator
 *
 */
public class OrderCodeGenerator {
    //自增加序列
    private static int i=0;
    //按照「年-月-日-小時-分鐘-秒-自增加序列」的規則生成訂單編號
    public String getOrderCode(){
        Date date=new Date();
        SimpleDateFormat sdf=new SimpleDateFormat("yyyy-MM-dd-HH-mm-ss-");
        return sdf.format(date)+ ++i;
    }
    public static void main(String[] args) {
        OrderCodeGenerator ong=new OrderCodeGenerator();
        for(int i=0;i<10;i++){
            System.out.println(ong.getOrderCode());
        }
    }
}

  二、編寫接口

/**
 * 定義訂單服務接口
 * @author Administrator
 *
 */
public interface OrderService {
    void createOrder();
}

  三、定義實現類

public class OrderServiceImpl implements OrderService{
    //定義爲靜態變量,保證訂單號惟一
    private static OrderCodeGenerator ocg=new OrderCodeGenerator();
    //建立訂單接口
    @Override
    public void createOrder() {
        // TODO Auto-generated method stub
        String orderCode=null;
        //獲取訂單號
        orderCode=ocg.getOrderCode();
        System.out.println(Thread.currentThread().getName()+"========"+orderCode);
    }

}

  四、定義線程模擬多線程建立訂單

public class DistrbutDemo {
    public static void main(String[] args) {
        //模擬多個併發建立訂單
        //併發數
        int currs=10;
        //方式一:定義一個循環屏障:用於祖塞當前線程的,設置的參數爲參與線程數,能保證設置每一個線程統一完成每段步驟,
        //如線程一完成後進入等待,接着線程二完成後進入,等待其他線程完成,才能進入下一步
        final CyclicBarrier cb=new CyclicBarrier(currs);
        /*方式二:定義一個倒計數儲存器:用於等待n個條件到達,每一個線程完成數值減一
        CountDownLatch cdl=new CountDownLatch(currs);
        cdl.countDown();
        cdl.await();*/
        for(int i=0;i<currs;i++){
            new Thread(new Runnable() {
                
                @Override
                public void run() {
                    //模擬建立訂單
                    OrderService os=new OrderServiceImpl();
                    //穿插線程阻塞
                    try {
                        cb.await();
                    } catch (InterruptedException e) {
                        // TODO Auto-generated catch block
                        e.printStackTrace();
                    } catch (BrokenBarrierException e) {
                        // TODO Auto-generated catch block
                        e.printStackTrace();
                    }
                    os.createOrder();
                }    
            }).start();
        }
    }
} 

  此時運行代碼,可能會出現沒有10這個訂單號(建立了相同的訂單,說明多線程編程下出現了不安全的狀況,此時應該加鎖)

打印結果以下:

Thread-5========2018-06-06-10-30-25-1
Thread-4========2018-06-06-10-30-25-6
Thread-3========2018-06-06-10-30-25-2
Thread-6========2018-06-06-10-30-25-8
Thread-0========2018-06-06-10-30-25-5
Thread-7========2018-06-06-10-30-25-4
Thread-8========2018-06-06-10-30-25-3
Thread-9========2018-06-06-10-30-25-9
Thread-1========2018-06-06-10-30-25-2
Thread-2========2018-06-06-10-30-25-7

  五、方式一,加同步鎖

public class OrderServiceImpl implements OrderService{
    //定義爲靜態變量,保證訂單號惟一
    private static OrderCodeGenerator ocg=new OrderCodeGenerator();
    //建立訂單接口
    @Override
    public void createOrder() {
        // TODO Auto-generated method stub
        String orderCode=null;
        //加同步鎖:保證線程安全
        synchronized (ocg) {
            //獲取訂單號
            orderCode=ocg.getOrderCode();
        }
        System.out.println(Thread.currentThread().getName()+"========"+orderCode);
    }

}

  注意:這裏同步鎖內參數不能使用this,由於在DistrbutDemo類中每次調用線程都從新new了一次,鎖的不是同一個對象,其不帶加鎖的目的(獲取的不是同一個對象的鎖),而用ocg是由於它是靜態變量,獲取的是同一個對象的鎖

  方式二:加鎖lock

public class OrderServiceImpl implements OrderService{
    //定義爲靜態變量,保證訂單號惟一
    private static OrderCodeGenerator ocg=new OrderCodeGenerator();
    //使用lock必定設置爲靜態變量,保證每一個線程競爭的是同一把鎖
    private static Lock lock=new ReentrantLock();
    //建立訂單接口
    @Override
    public void createOrder() {
        // TODO Auto-generated method stub
        String orderCode=null;
        try{//防止出現異常後鎖不能釋放,因此加try/catch
        lock.lock();
        orderCode=ocg.getOrderCode();
        }catch(Exception e){
            e.printStackTrace();
        }finally{
        //記得釋放鎖
        lock.unlock();
        }
        System.out.println(Thread.currentThread().getName()+"========"+orderCode);
    }
}

  此時運行代碼,不會出現重複的訂單,是個完整的建立了10個訂單

  分析:以上代碼確實可以保證線程安全,可是此只能在單機下玩,若是將服務放在多臺機器上調用,這裏須要引入分佈式鎖。

  六、分佈式鎖實現的技術

  基於數據庫實現分佈式鎖:

    性能較差,容易出現單點故障

    鎖沒有失效時間,容易死鎖

    非阻塞式的

    不可重入

  基於緩存實現分佈式鎖

    鎖沒有失效時間,容易死鎖

    非阻塞式的

    不可重入

  基於Zookeeper實現分佈式鎖

    實現相對簡單

    可靠性高

    性能較差

  具體實現以下:

  七、引入zk客戶端依賴

<!--對zookeeper客戶端進行了封裝 -->
        <groupId>com.101tec</groupId>
        <artifactId>zkclient</artifactId>
        <version>0.10</version>
</dependency>

  八、編寫序列化類

public class MyZkSerializer implements ZkSerializer{
    String charset="UTF-8";
    //反序列化
    @Override
    public Object deserialize(byte[] bytes) throws ZkMarshallingError {
        // TODO Auto-generated method stub
        try {
            return new String(bytes,charset);
        } catch (UnsupportedEncodingException e) {
            // TODO Auto-generated catch block
            throw new ZkMarshallingError(e);
        }
    }
    //序列化
    @Override
    public byte[] serialize(Object obj) throws ZkMarshallingError {
        // TODO Auto-generated method stub
        try {
            return String.valueOf(obj).getBytes(charset);
        } catch (UnsupportedEncodingException e) {
            // TODO Auto-generated catch block
            throw new ZkMarshallingError(e);
        }
    }

}

  九、編寫zk鏈接監聽程序

public class ZkWatcherDemo {
    public static void main(String[] args) {
        ZkClient client=new ZkClient("192.168.174.128:2181");
        client.setZkSerializer(new MyZkSerializer());
        client.subscribeDataChanges("/testWatch", new IZkDataListener() {
            
            @Override
            public void handleDataDeleted(String dataPath) throws Exception {
                System.out.println("----監聽到節點被刪除..");
                
            }
            
            @Override
            public void handleDataChange(String dataPath, Object data) throws Exception {
                // TODO Auto-generated method stub
                System.out.println("----監聽到數據變爲:"+data);
            }
        });
        try {
            //爲了方便查看,等待2分鐘
            Thread.sleep(2 * 60 * 1000);
        } catch (InterruptedException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
        
    }
}

  此時若是更改testWatch值,控制檯會打印監聽的值
  ----監聽到數據變爲:55

  十、Zookeeper分佈式鎖實現:

  1) 定義分佈式鎖:

//定義分佈式鎖
public class ZKDistributeLock implements Lock{
    /**
     * 利用zookeeper的同父子節點不可重名的特色來實現分佈式鎖
     * 加鎖的規則:去建立指定名稱的節點,若是能建立成功,則得到鎖(加鎖成功),若是節點已存在,就標識鎖被別人獲取了 
     *             你就得阻塞,等待
     * 鎖釋放規則:刪除指定名稱的節點
     */
    private String LockPath;
    private ZkClient client;
    public ZKDistributeLock(String lockPath) {
        super();
        LockPath = lockPath;
        client=new ZkClient("192.168.174.128:2181");
        client.setZkSerializer(new MyZkSerializer());
    }

    @Override
    public boolean tryLock() {
        try{
            this.client.createPersistent(LockPath);//建立永久節點
        }catch(ZkNodeExistsException e){
            return false;    
        }
        return true;
    }
    
    @Override
    public void lock() {
        if(!tryLock()){
            //阻塞等待
            waitForLock();
            //再次嘗試加鎖
            lock();
        }    
    }
    
    private void waitForLock() {
        //怎麼讓本身阻塞
        final CountDownLatch cdl=new CountDownLatch(1);
        //註冊watch,好通知本身何時被喚醒
        IZkDataListener listener=new IZkDataListener() {
            
            @Override
            public void handleDataDeleted(String dataPath) throws Exception {
                System.out.println("----監聽到節點被刪除..");
                cdl.countDown();
            }
            
            @Override
            public void handleDataChange(String dataPath, Object data) throws Exception {
                // TODO Auto-generated method stub
                System.out.println("----監聽到數據變爲:"+data);
            }
        };
        //註冊該事件
        client.subscribeDataChanges(LockPath, listener);
        //這裏得判斷節點是否存在,不然會永久阻塞
        if(this.client.exists(LockPath)){
            try {
                cdl.await();
            } catch (InterruptedException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
        }
        try{
            cdl.await();
        }catch(InterruptedException e){
            e.printStackTrace();
        }
        client.unsubscribeDataChanges(LockPath, listener);
    }

    @Override
    public void unlock() {
        //刪除節點
        //this.client.deleteRecursive(arg0)表示遞歸刪除
        this.client.delete(this.LockPath);
    }
    
    @Override
    public void lockInterruptibly() throws InterruptedException {
        // TODO Auto-generated method stub
    }

    @Override
    public boolean tryLock(long time, TimeUnit unit)
            throws InterruptedException {
        // TODO Auto-generated method stub
        return false;
    }

    @Override
    public Condition newCondition() {
        // TODO Auto-generated method stub
        return null;
    }

}

  2) 調用分佈式鎖

public class OrderServiceImpl implements OrderService{
    //定義爲靜態變量,保證訂單號惟一
    private static OrderCodeGenerator ocg=new OrderCodeGenerator();
    //使用lock必定設置爲靜態變量,保證每一個線程競爭的是同一把鎖
    private static Lock lock=new ZKDistributeLock("/testW");
    //建立訂單接口
    @Override
    public void createOrder() {
        // TODO Auto-generated method stub
        String orderCode=null;
        try{//防止出現異常後鎖不能釋放,因此加try/catch
        lock.lock();
        orderCode=ocg.getOrderCode();
        }catch(Exception e){
            e.printStackTrace();
        }finally{
        //記得釋放鎖
        lock.unlock();
        }
        System.out.println(Thread.currentThread().getName()+"========"+orderCode);
    }
}

打印結果以下:

log4j:WARN No appenders could be found for logger (org.I0Itec.zkclient.ZkConnection).
log4j:WARN No appenders could be found for logger (org.I0Itec.zkclient.ZkEventThread).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
Thread-4========2018-06-07-10-52-29-1
----監聽到節點被刪除..
----監聽到數據變爲:null
Thread-7========2018-06-07-10-52-29-2
----監聽到節點被刪除..
----監聽到數據變爲:null
Thread-9========2018-06-07-10-52-29-3
----監聽到節點被刪除..
----監聽到數據變爲:null
Thread-1========2018-06-07-10-52-29-4
----監聽到節點被刪除..
----監聽到數據變爲:null
Thread-3========2018-06-07-10-52-29-5
----監聽到節點被刪除..
----監聽到數據變爲:null
Thread-8========2018-06-07-10-52-29-6
----監聽到節點被刪除..
----監聽到節點被刪除..
----監聽到數據變爲:null
Thread-0========2018-06-07-10-52-29-7
----監聽到節點被刪除..
----監聽到數據變爲:null
Thread-2========2018-06-07-10-52-29-8
----監聽到節點被刪除..
----監聽到數據變爲:null
Thread-6========2018-06-07-10-52-29-9
----監聽到節點被刪除..
----監聽到數據變爲:null
Thread-5========2018-06-07-10-52-29-10
----監聽到節點被刪除..
----監聽到節點被刪除..
----監聽到節點被刪除..
----監聽到節點被刪除..
----監聽到節點被刪除..
----監聽到節點被刪除..
----監聽到節點被刪除..
----監聽到節點被刪除..
----監聽到節點被刪除..
----監聽到節點被刪除..
----監聽到節點被刪除..
----監聽到節點被刪除..
----監聽到節點被刪除..
----監聽到節點被刪除..
----監聽到節點被刪除..
----監聽到節點被刪除..
----監聽到節點被刪除..
----監聽到節點被刪除..

弊端問題:驚羣效應

  1每次喚醒操做將全部線程喚醒了,其中一個搶到鎖後,其餘沒搶到的又將進入阻塞等待狀態

  二、客戶端無端接受了不少與本身無關的事件通知

  三、利用持久節點建立的鎖存在死鎖的可能性!(當加鎖後忽然業務重啓,釋放鎖未執行) 

在集羣規模較大的環境中帶來的危害:

  一、巨大的服務器性能損耗  二、網絡衝擊  三、可能形成宕機

如打出不少----監聽到節點被刪除..這句話

  解決:採用建立順序節點

  改進建立的是臨時順序節點,每次都是最小的節點得到鎖,其下一個節點(更小)註冊watcher

  重寫分佈式鎖(升級版)

//定義分佈式鎖
public class ZKDistributeImproveLock implements Lock {
    /**
     * 利用zookeeper的同父子節點不可重名的特色來實現分佈式鎖
     * 加鎖的規則:去建立指定名稱的節點,若是能建立成功,則得到鎖(加鎖成功),若是節點已存在,就標識鎖被別人獲取了 你就得阻塞,等待
     * 鎖釋放規則:刪除指定名稱的節點
     */
    private String LockPath;
    private ZkClient client;
    // 須要將路徑設置爲ThreadLocal類型,不然不能被線程併發去使用
    private ThreadLocal<String> currentPath = new ThreadLocal<String>();
    private ThreadLocal<String> beforePath = new ThreadLocal<String>();

    public ZKDistributeImproveLock(String lockPath) {
        super();
        LockPath = lockPath;
        client = new ZkClient("192.168.174.128:2181");
        client.setZkSerializer(new MyZkSerializer());

        if (!this.client.exists(LockPath)) {
            this.client.createPersistent(LockPath);// 先建立永久節點
        }

    }

    @Override
    public boolean tryLock() {
        if (this.currentPath.get() == null) {
            // 注意:加鎖時建立的是臨時順序節點
            currentPath.set(this.client.createEphemeralSequential(LockPath
                    + "/", "aaa"));
        }
        // 得到全部的子節點
        List<String> children = this.client.getChildren(LockPath);
        // 進行下排序
        Collections.sort(children);

        // 判斷當前節點是不是最小的
        if (currentPath.get().equals(LockPath + "/" + children.get(0))) {
            return true;
        } else {
            // 獲取前一個節點
            // 獲得字節的索引號
            int curIndex = children.indexOf(currentPath.get().substring(
                    LockPath.length() + 1));
            beforePath.set(LockPath + "/" + children.get(curIndex - 1));
        }
        return false;

    }

    @Override
    public void lock() {
        if (!tryLock()) {
            // 阻塞等待
            waitForLock();
            // 再次嘗試加鎖
            lock();
        }

    }

    private void waitForLock() {
        // 怎麼讓本身阻塞
        final CountDownLatch cdl = new CountDownLatch(1);
        // 註冊watch,好通知本身何時被喚醒
        IZkDataListener listener = new IZkDataListener() {

            @Override
            public void handleDataDeleted(String dataPath) throws Exception {
                System.out.println("----監聽到節點被刪除..");
                cdl.countDown();

            }

            @Override
            public void handleDataChange(String dataPath, Object data)
                    throws Exception {
                // TODO Auto-generated method stub
                System.out.println("----監聽到數據變爲:" + data);
            }
        };
        // 註冊該事件
        client.subscribeDataChanges(this.beforePath.get(), listener);
        // 這裏得判斷節點是否存在,不然會永久阻塞
        if (this.client.exists(this.beforePath.get())) {
            try {
                cdl.await();
            } catch (InterruptedException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
        }
        try {
            cdl.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        client.unsubscribeDataChanges(this.beforePath.get(), listener);
    }

    @Override
    public void unlock() {
        // 刪除節點
        // this.client.deleteRecursive(arg0)表示遞歸刪除
        this.client.delete(this.currentPath.get());

    }

    @Override
    public void lockInterruptibly() throws InterruptedException {
        // TODO Auto-generated method stub

    }

    @Override
    public boolean tryLock(long time, TimeUnit unit)
            throws InterruptedException {
        // TODO Auto-generated method stub
        return false;
    }

    @Override
    public Condition newCondition() {
        // TODO Auto-generated method stub
        return null;
    }

}

  調用:

public class OrderServiceImpl implements OrderService{
    //定義爲靜態變量,保證訂單號惟一
    private static OrderCodeGenerator ocg=new OrderCodeGenerator();
    //使用lock必定設置爲靜態變量,保證每一個線程競爭的是同一把鎖
    private static Lock lock=new ZKDistributeImproveLock("/QQQs");
    //建立訂單接口
    @Override
    public void createOrder() {
        // TODO Auto-generated method stub
        String orderCode=null;
        try{//防止出現異常後鎖不能釋放,因此加try/catch
        lock.lock();
        orderCode=ocg.getOrderCode();
        }catch(Exception e){
            e.printStackTrace();
        }finally{
        //記得釋放鎖
        lock.unlock();
        }
        System.out.println(Thread.currentThread().getName()+"========"+orderCode);
    }
}

打印結果以下:

log4j:WARN No appenders could be found for logger (org.I0Itec.zkclient.ZkConnection).
log4j:WARN No appenders could be found for logger (org.I0Itec.zkclient.ZkEventThread).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
Thread-4========2018-06-07-10-53-36-1
----監聽到節點被刪除..
Thread-9========2018-06-07-10-53-36-2
----監聽到節點被刪除..
Thread-0========2018-06-07-10-53-36-3
----監聽到節點被刪除..
Thread-1========2018-06-07-10-53-36-4
----監聽到節點被刪除..
Thread-6========2018-06-07-10-53-36-5
----監聽到節點被刪除..
Thread-7========2018-06-07-10-53-36-6
----監聽到節點被刪除..
Thread-5========2018-06-07-10-53-36-7
----監聽到節點被刪除..
Thread-2========2018-06-07-10-53-36-8
----監聽到節點被刪除..
Thread-3========2018-06-07-10-53-36-9
----監聽到節點被刪除..
Thread-8========2018-06-07-10-53-37-10
相關文章
相關標籤/搜索