zookeeper實現屏障_Barrier

zookeeper實現屏障_Barrierjava

Distributed systems use barriers to block processing of a set of nodes until a condition is met at which time all the nodes are allowed to proceed. Barriers are implemented in ZooKeeper by designating a barrier node. The barrier is in place if the barrier node exists. Here's the pseudo code:node

  1. Client calls the ZooKeeper API's exists() function on the barrier node, with watch set to true.shell

  2. If exists() returns false, the barrier is gone and the client proceedsapache

  3. Else, if exists() returns true, the clients wait for a watch event from ZooKeeper for the barrier node.服務器

  4. When the watch event is triggered, the client reissues the exists( ) call, again waiting until the barrier node is removed.dom


根據官網的demo,本身的理解,加了相應的註釋,這裏把代碼貼出來,以下,分佈式

Barrieride

package com.usfot;

import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.Stat;

import java.util.List;

/**
 * 繼承watcher,實現分佈式環境中不一樣任務之間的同步處理(利用了Watcher機制的反向推送)。
 * 針對事件的觸發使線程作出相應的處理,從而避免無謂的while(true),致使cpu空轉。
 */
public class Barrier implements Watcher {

    private static final String addr = "127.0.0.1:2181";
    private ZooKeeper zk = null;
    private Integer mutex;
    private int size = 0;
    private String root;

    public Barrier(String root, int size) {
        this.root = root;
        this.size = size;

        try {
            zk = new ZooKeeper(addr, 10 * 1000, this);
            mutex = new Integer(-1);
            Stat s = zk.exists(root, false);
            if (s == null) {
                zk.create(root, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
            }

        } catch (Exception e) {
            e.printStackTrace();
        }

    }

    /**
     * 當觸發事件後,喚醒在mutex上等待的線程
     * 只要是zk服務器上節點的數據發生改變(無論哪一個zk client改變了數據),
     * 這裏都會接收到相應的事件,從而喚醒相應的線程,作出相應的處理
     *
     * @param event
     */
    public synchronized void process(WatchedEvent event) {
        synchronized (mutex) {
            mutex.notify();
        }
    }

    /**
     * 當新建znode時,首先持有mutex監視器才能進入同步代碼塊。
     * 當znode發生事件後,會觸發process,從而喚醒在mutex上等待的線程。
     * 經過while循環判斷建立的節點個數,當節點個數大於設定的值時,這個enter方法才執行完成。
     *
     * @param name
     * @return
     * @throws Exception
     */
    public boolean enter(String name) throws Exception {
        zk.create(root + "/" + name, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
        while (true) {
            synchronized (mutex) {
                List<String> list = zk.getChildren(root, true);
                if (list.size() < size) {
                    mutex.wait();
                } else {
                    return true;
                }
            }
        }
    }

    /**
     * 同理。對於leave方法,當delete znode時,觸發事件,從而喚醒mutex上等待的線程,經過while循環
     * 判斷節點的個數,當節點所有刪除後,leave方法結束。
     * 從而使整個添加刪除znode的線程結束
     *
     * @param name
     * @return
     * @throws KeeperException
     * @throws InterruptedException
     */
    public boolean leave(String name) throws KeeperException, InterruptedException {
        zk.delete(root + "/" + name, 0);
        while (true) {
            synchronized (mutex) {
                List<String> list = zk.getChildren(root, true);
                if (list.size() > 0) {
                    mutex.wait();
                } else {
                    return true;
                }
            }
        }
    }
}

BarrierTestthis

package com.usfot;

import java.util.Random;

public class BarrierTest {

    /**
     * 啓動三個線程,也就對應着三個zk客戶端
     *
     * @param args
     * @throws Exception
     */
    public static void main(String args[]) throws Exception {
        for (int i = 0; i < 3; i++) {
            Process p = new Process("Thread-" + i, new Barrier("/test_node", 3));
            p.start();
        }
    }
}

class Process extends Thread {

    private String name;
    private Barrier barrier;

    public Process(String name, Barrier barrier) {
        this.name = name;
        this.barrier = barrier;
    }

    @Override
    public void run() {
        try {
            barrier.enter(name);
            System.out.println(name + " enter");
            Thread.sleep(1000 + new Random().nextInt(2000));
            barrier.leave(name);
            System.out.println(name + " leave");
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

執行這段程序,以下,spa

Thread-1 enter
Thread-2 enter
Thread-0 enter
Thread-0 leave
Thread-1 leave
Thread-2 leave

Process finished with exit code 0

打開zk的client,以下,

[zk: localhost:2181(CONNECTED) 8] ls /
[testRootPath, test_node, mynode, zookeeper, zk_test0000000005, zk_test]
[zk: localhost:2181(CONNECTED) 9] get /test_node

cZxid = 0x800000051
ctime = Tue Mar 17 19:08:49 CST 2015
mZxid = 0x800000051
mtime = Tue Mar 17 19:08:49 CST 2015
pZxid = 0x800000062
cversion = 12
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 0
numChildren = 0
[zk: localhost:2181(CONNECTED) 10] ls /test_node
[]
[zk: localhost:2181(CONNECTED) 11]

=====================END=====================

相關文章
相關標籤/搜索