zookeeper系列(六)實戰分佈式隊列

分佈式隊列

在傳統的單進程編程中,咱們使用隊列來存儲一些數據結構,用來在多線程之間共享或傳遞數據。node

分佈式環境下,咱們一樣須要一個相似單進程隊列的組件,用來實現跨進程、跨主機、跨網絡的數據共享和數據傳遞,這就是咱們的分佈式隊列。算法

zookeeper能夠經過順序節點實現分佈式隊列。編程

架構圖

clipboard.png

圖中左側表明zookeeper集羣,右側表明消費者和生產者。
生產者經過在queue節點下建立順序節點來存放數據,消費者經過讀取順序節點來消費數據。網絡

流程圖

offer核心算法流程數據結構

clipboard.png

poll核心算法流程多線程

clipboard.png

代碼實現

/**
 * 簡單分佈式隊列
 */
public class DistributedSimpleQueue<T> {

    protected final ZkClient zkClient;
    // queue節點
    protected final String root;
    // 順序節點前綴
    protected static final String Node_NAME = "n_";


    public DistributedSimpleQueue(ZkClient zkClient, String root) {
        this.zkClient = zkClient;
        this.root = root;
    }

    // 判斷隊列大小
    public int size() {
        return zkClient.getChildren(root).size();
    }

    // 判斷隊列是否爲空
    public boolean isEmpty() {
        return zkClient.getChildren(root).size() == 0;
    }

    // 向隊列提供數據
    public boolean offer(T element) throws Exception{

        // 建立順序節點
        String nodeFullPath = root .concat( "/" ).concat( Node_NAME );
        try {
            zkClient.createPersistentSequential(nodeFullPath , element);
        }catch (ZkNoNodeException e) {
            zkClient.createPersistent(root);
            offer(element);
        } catch (Exception e) {
            throw ExceptionUtil.convertToRuntimeException(e);
        }
        return true;
    }


    // 從隊列取數據
    public T poll() throws Exception {
        
        try {

            // 獲取全部順序節點
            List<String> list = zkClient.getChildren(root);
            if (list.size() == 0) {
                return null;
            }

            // 排序
            Collections.sort(list, new Comparator<String>() {
                public int compare(String lhs, String rhs) {
                    return getNodeNumber(lhs, Node_NAME).compareTo(getNodeNumber(rhs, Node_NAME));
                }
            });

            // 循環每一個順序節點名
            for ( String nodeName : list ){

                // 構造出順序節點的完整路徑
                String nodeFullPath = root.concat("/").concat(nodeName);    
                try {
                    // 讀取順序節點的內容
                    T node = (T) zkClient.readData(nodeFullPath);
                    // 刪除順序節點
                    zkClient.delete(nodeFullPath);
                    return node;
                } catch (ZkNoNodeException e) {
                    // ignore 由其餘客戶端把這個順序節點消費掉了
                }
            }
            
            return null;
            
        } catch (Exception e) {
            throw ExceptionUtil.convertToRuntimeException(e);
        }

    }

    private String getNodeNumber(String str, String nodeName) {
        int index = str.lastIndexOf(nodeName);
        if (index >= 0) {
            index += Node_NAME.length();
            return index <= str.length() ? str.substring(index) : "";
        }
        return str;

    }

}
public class User implements Serializable {
    
    String name;
    String id;
    
    public String getName() {
        return name;
    }
    public void setName(String name) {
        this.name = name;
    }
    public String getId() {
        return id;
    }
    public void setId(String id) {
        this.id = id;
    }

}
public class TestDistributedSimpleQueue {

    public static void main(String[] args) {
        
        
        ZkClient zkClient = new ZkClient("192.168.1.105:2181", 5000, 5000, new SerializableSerializer());
        DistributedSimpleQueue<User> queue = new DistributedSimpleQueue<User>(zkClient,"/Queue");
        
        User user1 = new User();
        user1.setId("1");
        user1.setName("xiao wang");
        
        User user2 = new User();
        user2.setId("2");
        user2.setName("xiao wang");        
        
        try {
            queue.offer(user1);
            queue.offer(user2);
            User u1 = (User) queue.poll();
            User u2 = (User) queue.poll();
            
            if (user1.getId().equals(u1.getId()) && user2.getId().equals(u2.getId())){
                System.out.println("Success!");
            }
            
        } catch (Exception e) {
            e.printStackTrace();
        }
        
    }
    
}

上面實現了一個簡單分佈式隊列,在此基礎上,咱們再擴展一個阻塞分佈式隊列。代碼以下:架構

/**
 * 阻塞分佈式隊列
 * 擴展自簡單分佈式隊列,在拿不到隊列數據時,進行阻塞直到拿到數據
 */
public class DistributedBlockingQueue<T> extends DistributedSimpleQueue<T>{      
    
    
    public DistributedBlockingQueue(ZkClient zkClient, String root) {
        super(zkClient, root);

    }
    

    @Override
    public T poll() throws Exception {

        while (true){ // 結束在latch上的等待後,再來一次
            
            final CountDownLatch    latch = new CountDownLatch(1);
            final IZkChildListener childListener = new IZkChildListener() {
                public void handleChildChange(String parentPath, List<String> currentChilds)
                        throws Exception {
                    latch.countDown(); // 隊列有變化,結束latch上的等待
                }
            };
            zkClient.subscribeChildChanges(root, childListener);
            try{
                T node = super.poll(); // 獲取隊列數據
                if ( node != null ){
                    return node;
                } else {
                    latch.await(); // 拿不到隊列數據,則在latch上await
                }
            } finally {
                zkClient.unsubscribeChildChanges(root, childListener);
            }
            
        }
    }

}
public class TestDistributedBlockingQueue {

    public static void main(String[] args) {
        
        
        ScheduledExecutorService delayExector = Executors.newScheduledThreadPool(1);
        int delayTime = 5;
        
        ZkClient zkClient = new ZkClient("192.168.1.105:2181", 5000, 5000, new SerializableSerializer());
        final DistributedBlockingQueue<User> queue = new DistributedBlockingQueue<User>(zkClient,"/Queue");
        
        final User user1 = new User();
        user1.setId("1");
        user1.setName("xiao wang");
        
        final User user2 = new User();
        user2.setId("2");
        user2.setName("xiao wang");        
        
        try {
            
            delayExector.schedule(new Runnable() {
                
                public void run() {
                    try {
                        queue.offer(user1);
                        queue.offer(user2);
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                    
                }
            }, delayTime , TimeUnit.SECONDS);
            
            System.out.println("ready poll!");
            User u1 = (User) queue.poll();
            User u2 = (User) queue.poll();
            
            if (user1.getId().equals(u1.getId()) && user2.getId().equals(u2.getId())){
                System.out.println("Success!");
            }
            
        } catch (Exception e) {
            e.printStackTrace();
        } finally{
            delayExector.shutdown();
            try {
                delayExector.awaitTermination(2, TimeUnit.SECONDS);
            } catch (InterruptedException e) {
            }
            
        }
        
    }
    
}
相關文章
相關標籤/搜索