ZooKeeper屏障和隊列的指南(五)

引言

在這個指南中,使用展現了使用ZooKeeper實現的屏障和生產-消費隊列。咱們分別稱這些類爲Barrier和Queue。這些例子假定你至少有一個運行的ZooKeeper服務。java

兩個原語都使用下面的代碼片段:node

 

static ZooKeeper zk = null;
    static Integer mutex;

    String root;

    SyncPrimitive(String address) {
        if(zk == null){
            try {
                System.out.println("Starting ZK:");
                zk = new ZooKeeper(address, 3000, this);
                mutex = new Integer(-1);
                System.out.println("Finished starting ZK: " + zk);
            } catch (IOException e) {
                System.out.println(e.toString());
                zk = null;
            }
        }
    }

    synchronized public void process(WatchedEvent event) {
        synchronized (mutex) {
            mutex.notify();
        }
    }

複製代碼

兩個類都擴展了SyncPrimitive。用這種方式,咱們的執行步驟和SyncPrimitive的構造函數的全部原語差很少。爲了保持例子簡單,咱們建立一個ZooKeeper對象,在咱們每一次實例化barrier對象或queue對象的時候,而且咱們聲明一個靜態的變量引用這個對象。隨後的Barrier對象和Queue檢查一個ZooKeeper對象是否存在。另外,咱們能夠有一個建立ZooKeeper的應用而且傳遞它給Barrier和Queue的構造函數。apache

咱們使用process()方法來處理監視器的通知。在下面的討論,咱們呈現設置監視器的代碼。一個監視器是一個內部的數據結構,它可以使ZooKeeper通知節點的改變。例如,若是一個客戶端正在 等待其它客戶端離開一個屏障,那麼它能夠給一個特定的節點設置一個監視器而且等待修改。這能表示它等待結束了。一旦你看完例子你就明白這一點。數據結構

屏障

一個屏障是一個原語,它使一組進程能夠同步地開始和結束一個計算。這種實現的整體思想是有一個barrier節點做爲每個進程節點的父節點。假如咱們這個屏障節點爲"/b1"。每個進程"p"建立一個節點」/b1/p「。一旦有足夠的進程已經建立的它們對象的節點,加入的進程能夠開始計算。app

在這個例子中,每個進程表明一個Barrier對象,而且它的構造函數有這些參數:dom

  • ZooKeeeper服務端的地址(例如"zoo1.foo.com:2181")
  • ZooKeeper中屏障節點的路徑(例如,"/b1")
  • 這組進程的數量

Barrier的構造函數傳遞ZooKeeper服務端的地址給父類的構造器。父類建立一個ZooKeeper實例若是沒有這個實例。Barrier的構造函數在ZooKeeper上建立一個節點,這個節點是全部進程節點的父節點,而且 咱們稱它爲root(注意:它不是ZooKeeper的根"/").函數

 

/**
         * Barrier constructor
         *
         * @param address
         * @param root
         * @param size
         */
        Barrier(String address, String root, int size) {
            super(address);
            this.root = root;
            this.size = size;

            // Create barrier node
            if (zk != null) {
                try {
                    Stat s = zk.exists(root, false);
                    if (s == null) {
                        zk.create(root, new byte[0], Ids.OPEN_ACL_UNSAFE,
                                CreateMode.PERSISTENT);
                    }
                } catch (KeeperException e) {
                    System.out
                            .println("Keeper exception when instantiating queue: "
                                    + e.toString());
                } catch (InterruptedException e) {
                    System.out.println("Interrupted exception");
                }
            }

            // My node name
            try {
                name = new String(InetAddress.getLocalHost().getCanonicalHostName().toString());
            } catch (UnknownHostException e) {
                System.out.println(e.toString());
            }

        }

 

爲了進入屏障,一個進程調用enter()。一個進程在root下面建立一個節來表明它,用它的主機名來表示節點的名字。它而後等待直到足夠的進程進入到屏障。一個進程經過用getChildren()檢查root節點的孩子節點數量來實現。而且等待通知一旦它沒有足夠的孩子節點。爲了收到一個通知當root節點改變的時候,一個進程不得不設置 一個監視器,而且經過調用"getChildren()"來實現。在代碼中,咱們的"getChildren()"方法有兩個參數。第一個是那一個節點來讀數據的,而且第二個是布爾標識使進程設置 一個監視器,在代碼中標識是true.oop

 

/**
         * Join barrier
         *
         * @return
         * @throws KeeperException
         * @throws InterruptedException
         */

        boolean enter() throws KeeperException, InterruptedException{
            zk.create(root + "/" + name, new byte[0], Ids.OPEN_ACL_UNSAFE,
                    CreateMode.EPHEMERAL_SEQUENTIAL);
            while (true) {
                synchronized (mutex) {
                    List<String> list = zk.getChildren(root, true);

                    if (list.size() < size) {
                        mutex.wait();
                    } else {
                        return true;
                    }
                }
            }
        }

 

注意enter()拋出KeeperException 和InterruptedException異常,因此應用有責任來捕獲和處理這樣的異常。this

一旦計算結束,一個進程調用leave()來離開這個屏障。首先它刪除它對應的節點,而後它獲得root節點的孩子。若是至少有一個孩子,那麼它就會等待一個通知(注意調用getChildred()的第二個參數是true),意爲着ZooKeeper不得不設置一個監視器在root節點上。一旦收到一個通知,它再次檢查是否root節點有任何孩子。spa

 

/**
         * Wait until all reach barrier
         *
         * @return
         * @throws KeeperException
         * @throws InterruptedException
         */

        boolean leave() throws KeeperException, InterruptedException{
            zk.delete(root + "/" + name, 0);
            while (true) {
                synchronized (mutex) {
                    List<String> list = zk.getChildren(root, true);
                        if (list.size() > 0) {
                            mutex.wait();
                        } else {
                            return true;
                        }
                    }
                }
        }
    }

 

生產者-消費者隊列

一個生產者-消費者隊列是一個數據分發結構,一組進程生產而且消費物品。生產者進程建立新的元素而且加入到隊列。消費者進程從隊列中刪除元素,而且處理它們。在這個實現中,元素中簡單的數字。隊列被一個root節點表示,而且加入一個元素到這個隊列,一個生產者進程建立一個新節點,root節點的孩子節點。

下面的代碼片段對象對象的構造函數。就像Barrier對象同樣,它首先父類的構造函數,SyncPrimitive,來建立一個ZooKeeper對象,若是這個對象不存在。它而後校驗隊列的root節點是否存在,而且若是不存在就建立一個。

 

/**
         * Constructor of producer-consumer queue
         *
         * @param address
         * @param name
         */
        Queue(String address, String name) {
            super(address);
            this.root = name;
            // Create ZK node name
            if (zk != null) {
                try {
                    Stat s = zk.exists(root, false);
                    if (s == null) {
                        zk.create(root, new byte[0], Ids.OPEN_ACL_UNSAFE,
                                CreateMode.PERSISTENT);
                    }
                } catch (KeeperException e) {
                    System.out
                            .println("Keeper exception when instantiating queue: "
                                    + e.toString());
                } catch (InterruptedException e) {
                    System.out.println("Interrupted exception");
                }
            }
        }

 

一個生產者進程調用 "produce()" 來增長元素到隊列中,而且傳一個數字做爲參數。爲了增長元素到隊列中,這個方法建立一個節點經過"create()」,而且使用SEQUENCE標示來讓ZooKeeper增長序列計數器的值到root節點上。用這種方法,咱們暴露了隊列中元素的總的順序,所以保證隊列中最老的元素是下一個要消費的元素。

 

/**
         * Add element to the queue.
         *
         * @param i
         * @return
         */

        boolean produce(int i) throws KeeperException, InterruptedException{
            ByteBuffer b = ByteBuffer.allocate(4);
            byte[] value;

            // Add child with value i
            b.putInt(i);
            value = b.array();
            zk.create(root + "/element", value, Ids.OPEN_ACL_UNSAFE,
                        CreateMode.PERSISTENT_SEQUENTIAL);

            return true;
        }

 

爲了消費一個元素,一個消費者進程獲取root節點的孩子,讀取最小值的節點,而且返回這個元素。注意若是有一個衝突,那麼兩個競爭進程中的一個將不能刪除它的節點而且刪除操做將會拋出異常。

調用getChildren()返回一個以字典序列排序的列表。由於字典序列對計算器的值的順序沒有必要,咱們須要那一個元素是最小的值。來了決定哪個有最小計算器值,咱們遍歷列表,而且刪除每個的前綴"element"。

 

/**
         * Remove first element from the queue.
         *
         * @return
         * @throws KeeperException
         * @throws InterruptedException
         */
        int consume() throws KeeperException, InterruptedException{
            int retvalue = -1;
            Stat stat = null;

            // Get the first element available
            while (true) {
                synchronized (mutex) {
                    List<String> list = zk.getChildren(root, true);
                    if (list.size() == 0) {
                        System.out.println("Going to wait");
                        mutex.wait();
                    } else {
                        Integer min = new Integer(list.get(0).substring(7));
                        for(String s : list){
                            Integer tempValue = new Integer(s.substring(7));
                            //System.out.println("Temporary value: " + tempValue);
                            if(tempValue < min) min = tempValue;
                        }
                        System.out.println("Temporary value: " + root + "/element" + min);
                        byte[] b = zk.getData(root + "/element" + min,
                                    false, stat);
                        zk.delete(root + "/element" + min, 0);
                        ByteBuffer buffer = ByteBuffer.wrap(b);
                        retvalue = buffer.getInt();

                        return retvalue;
                    }
                }
            }
        }
    }

 

完整的源碼列表

SyncPrimitive.Java

 

import java.io.IOException;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.nio.ByteBuffer;
import java.util.List;
import java.util.Random;

import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.data.Stat;

public class SyncPrimitive implements Watcher {

    static ZooKeeper zk = null;
    static Integer mutex;

    String root;

    SyncPrimitive(String address) {
        if(zk == null){
            try {
                System.out.println("Starting ZK:");
                zk = new ZooKeeper(address, 3000, this);
                mutex = new Integer(-1);
                System.out.println("Finished starting ZK: " + zk);
            } catch (IOException e) {
                System.out.println(e.toString());
                zk = null;
            }
        }
        //else mutex = new Integer(-1);
    }

    synchronized public void process(WatchedEvent event) {
        synchronized (mutex) {
            //System.out.println("Process: " + event.getType());
            mutex.notify();
        }
    }

    /**
     * Barrier
     */
    static public class Barrier extends SyncPrimitive {
        int size;
        String name;

        /**
         * Barrier constructor
         *
         * @param address
         * @param root
         * @param size
         */
        Barrier(String address, String root, int size) {
            super(address);
            this.root = root;
            this.size = size;

            // Create barrier node
            if (zk != null) {
                try {
                    Stat s = zk.exists(root, false);
                    if (s == null) {
                        zk.create(root, new byte[0], Ids.OPEN_ACL_UNSAFE,
                                CreateMode.PERSISTENT);
                    }
                } catch (KeeperException e) {
                    System.out
                            .println("Keeper exception when instantiating queue: "
                                    + e.toString());
                } catch (InterruptedException e) {
                    System.out.println("Interrupted exception");
                }
            }

            // My node name
            try {
                name = new String(InetAddress.getLocalHost().getCanonicalHostName().toString());
            } catch (UnknownHostException e) {
                System.out.println(e.toString());
            }

        }

        /**
         * Join barrier
         *
         * @return
         * @throws KeeperException
         * @throws InterruptedException
         */

        boolean enter() throws KeeperException, InterruptedException{
            zk.create(root + "/" + name, new byte[0], Ids.OPEN_ACL_UNSAFE,
                    CreateMode.EPHEMERAL_SEQUENTIAL);
            while (true) {
                synchronized (mutex) {
                    List<String> list = zk.getChildren(root, true);

                    if (list.size() < size) {
                        mutex.wait();
                    } else {
                        return true;
                    }
                }
            }
        }

        /**
         * Wait until all reach barrier
         *
         * @return
         * @throws KeeperException
         * @throws InterruptedException
         */

        boolean leave() throws KeeperException, InterruptedException{
            zk.delete(root + "/" + name, 0);
            while (true) {
                synchronized (mutex) {
                    List<String> list = zk.getChildren(root, true);
                        if (list.size() > 0) {
                            mutex.wait();
                        } else {
                            return true;
                        }
                    }
                }
        }
    }

    /**
     * Producer-Consumer queue
     */
    static public class Queue extends SyncPrimitive {

        /**
         * Constructor of producer-consumer queue
         *
         * @param address
         * @param name
         */
        Queue(String address, String name) {
            super(address);
            this.root = name;
            // Create ZK node name
            if (zk != null) {
                try {
                    Stat s = zk.exists(root, false);
                    if (s == null) {
                        zk.create(root, new byte[0], Ids.OPEN_ACL_UNSAFE,
                                CreateMode.PERSISTENT);
                    }
                } catch (KeeperException e) {
                    System.out
                            .println("Keeper exception when instantiating queue: "
                                    + e.toString());
                } catch (InterruptedException e) {
                    System.out.println("Interrupted exception");
                }
            }
        }

        /**
         * Add element to the queue.
         *
         * @param i
         * @return
         */

        boolean produce(int i) throws KeeperException, InterruptedException{
            ByteBuffer b = ByteBuffer.allocate(4);
            byte[] value;

            // Add child with value i
            b.putInt(i);
            value = b.array();
            zk.create(root + "/element", value, Ids.OPEN_ACL_UNSAFE,
                        CreateMode.PERSISTENT_SEQUENTIAL);

            return true;
        }


        /**
         * Remove first element from the queue.
         *
         * @return
         * @throws KeeperException
         * @throws InterruptedException
         */
        int consume() throws KeeperException, InterruptedException{
            int retvalue = -1;
            Stat stat = null;

            // Get the first element available
            while (true) {
                synchronized (mutex) {
                    List<String> list = zk.getChildren(root, true);
                    if (list.size() == 0) {
                        System.out.println("Going to wait");
                        mutex.wait();
                    } else {
                        Integer min = new Integer(list.get(0).substring(7));
                        for(String s : list){
                            Integer tempValue = new Integer(s.substring(7));
                            //System.out.println("Temporary value: " + tempValue);
                            if(tempValue < min) min = tempValue;
                        }
                        System.out.println("Temporary value: " + root + "/element" + min);
                        byte[] b = zk.getData(root + "/element" + min,
                                    false, stat);
                        zk.delete(root + "/element" + min, 0);
                        ByteBuffer buffer = ByteBuffer.wrap(b);
                        retvalue = buffer.getInt();

                        return retvalue;
                    }
                }
            }
        }
    }

    public static void main(String args[]) {
        if (args[0].equals("qTest"))
            queueTest(args);
        else
            barrierTest(args);

    }

    public static void queueTest(String args[]) {
        Queue q = new Queue(args[1], "/app1");

        System.out.println("Input: " + args[1]);
        int i;
        Integer max = new Integer(args[2]);

        if (args[3].equals("p")) {
            System.out.println("Producer");
            for (i = 0; i < max; i++)
                try{
                    q.produce(10 + i);
                } catch (KeeperException e){

                } catch (InterruptedException e){

                }
        } else {
            System.out.println("Consumer");

            for (i = 0; i < max; i++) {
                try{
                    int r = q.consume();
                    System.out.println("Item: " + r);
                } catch (KeeperException e){
                    i--;
                } catch (InterruptedException e){

                }
            }
        }
    }

    public static void barrierTest(String args[]) {
        Barrier b = new Barrier(args[1], "/b1", new Integer(args[2]));
        try{
            boolean flag = b.enter();
            System.out.println("Entered barrier: " + args[2]);
            if(!flag) System.out.println("Error when entering the barrier");
        } catch (KeeperException e){

        } catch (InterruptedException e){

        }

        // Generate random integer
        Random rand = new Random();
        int r = rand.nextInt(100);
        // Loop for rand iterations
        for (int i = 0; i < r; i++) {
            try {
                Thread.sleep(100);
            } catch (InterruptedException e) {

            }
        }
        try{
            b.leave();
        } catch (KeeperException e){

        } catch (InterruptedException e){

        }
        System.out.println("Left barrier");
    }
}
相關文章
相關標籤/搜索