zookeeper系列(一)zookeeper必知
zookeeper系列(二)實戰master選舉
zookeeper系列(三)實戰數據發佈訂閱
zookeeper系列(四)實戰負載均衡
zookeeper系列(五)實戰分佈式鎖
zookeeper系列(六)實戰分佈式隊列
zookeeper系列(七)實戰分佈式命名服務
zookeeper系列(八)zookeeper運維java
在傳統的單進程編程中,咱們使用隊列來存儲一些數據結構,用來在多線程之間共享或傳遞數據。node
分佈式環境下,咱們一樣須要一個相似單進程隊列的組件,用來實現跨進程、跨主機、跨網絡的數據共享和數據傳遞,這就是咱們的分佈式隊列。算法
zookeeper能夠經過順序節點實現分佈式隊列。編程
圖中左側表明zookeeper集羣,右側表明消費者和生產者。
生產者經過在queue節點下建立順序節點來存放數據,消費者經過讀取順序節點來消費數據。segmentfault
offer核心算法流程網絡
poll核心算法流程數據結構
/** * 簡單分佈式隊列 */ public class DistributedSimpleQueue<T> { protected final ZkClient zkClient; // queue節點 protected final String root; // 順序節點前綴 protected static final String Node_NAME = "n_"; public DistributedSimpleQueue(ZkClient zkClient, String root) { this.zkClient = zkClient; this.root = root; } // 判斷隊列大小 public int size() { return zkClient.getChildren(root).size(); } // 判斷隊列是否爲空 public boolean isEmpty() { return zkClient.getChildren(root).size() == 0; } // 向隊列提供數據 public boolean offer(T element) throws Exception{ // 建立順序節點 String nodeFullPath = root .concat( "/" ).concat( Node_NAME ); try { zkClient.createPersistentSequential(nodeFullPath , element); }catch (ZkNoNodeException e) { zkClient.createPersistent(root); offer(element); } catch (Exception e) { throw ExceptionUtil.convertToRuntimeException(e); } return true; } // 從隊列取數據 public T poll() throws Exception { try { // 獲取全部順序節點 List<String> list = zkClient.getChildren(root); if (list.size() == 0) { return null; } // 排序 Collections.sort(list, new Comparator<String>() { public int compare(String lhs, String rhs) { return getNodeNumber(lhs, Node_NAME).compareTo(getNodeNumber(rhs, Node_NAME)); } }); // 循環每一個順序節點名 for ( String nodeName : list ){ // 構造出順序節點的完整路徑 String nodeFullPath = root.concat("/").concat(nodeName); try { // 讀取順序節點的內容 T node = (T) zkClient.readData(nodeFullPath); // 刪除順序節點 zkClient.delete(nodeFullPath); return node; } catch (ZkNoNodeException e) { // ignore 由其餘客戶端把這個順序節點消費掉了 } } return null; } catch (Exception e) { throw ExceptionUtil.convertToRuntimeException(e); } } private String getNodeNumber(String str, String nodeName) { int index = str.lastIndexOf(nodeName); if (index >= 0) { index += Node_NAME.length(); return index <= str.length() ? str.substring(index) : ""; } return str; } }
public class User implements Serializable { String name; String id; public String getName() { return name; } public void setName(String name) { this.name = name; } public String getId() { return id; } public void setId(String id) { this.id = id; } }
public class TestDistributedSimpleQueue { public static void main(String[] args) { ZkClient zkClient = new ZkClient("192.168.1.105:2181", 5000, 5000, new SerializableSerializer()); DistributedSimpleQueue<User> queue = new DistributedSimpleQueue<User>(zkClient,"/Queue"); User user1 = new User(); user1.setId("1"); user1.setName("xiao wang"); User user2 = new User(); user2.setId("2"); user2.setName("xiao wang"); try { queue.offer(user1); queue.offer(user2); User u1 = (User) queue.poll(); User u2 = (User) queue.poll(); if (user1.getId().equals(u1.getId()) && user2.getId().equals(u2.getId())){ System.out.println("Success!"); } } catch (Exception e) { e.printStackTrace(); } } }
上面實現了一個簡單分佈式隊列,在此基礎上,咱們再擴展一個阻塞分佈式隊列。代碼以下:多線程
/** * 阻塞分佈式隊列 * 擴展自簡單分佈式隊列,在拿不到隊列數據時,進行阻塞直到拿到數據 */ public class DistributedBlockingQueue<T> extends DistributedSimpleQueue<T>{ public DistributedBlockingQueue(ZkClient zkClient, String root) { super(zkClient, root); } @Override public T poll() throws Exception { while (true){ // 結束在latch上的等待後,再來一次 final CountDownLatch latch = new CountDownLatch(1); final IZkChildListener childListener = new IZkChildListener() { public void handleChildChange(String parentPath, List<String> currentChilds) throws Exception { latch.countDown(); // 隊列有變化,結束latch上的等待 } }; zkClient.subscribeChildChanges(root, childListener); try{ T node = super.poll(); // 獲取隊列數據 if ( node != null ){ return node; } else { latch.await(); // 拿不到隊列數據,則在latch上await } } finally { zkClient.unsubscribeChildChanges(root, childListener); } } } }
public class TestDistributedBlockingQueue { public static void main(String[] args) { ScheduledExecutorService delayExector = Executors.newScheduledThreadPool(1); int delayTime = 5; ZkClient zkClient = new ZkClient("192.168.1.105:2181", 5000, 5000, new SerializableSerializer()); final DistributedBlockingQueue<User> queue = new DistributedBlockingQueue<User>(zkClient,"/Queue"); final User user1 = new User(); user1.setId("1"); user1.setName("xiao wang"); final User user2 = new User(); user2.setId("2"); user2.setName("xiao wang"); try { delayExector.schedule(new Runnable() { public void run() { try { queue.offer(user1); queue.offer(user2); } catch (Exception e) { e.printStackTrace(); } } }, delayTime , TimeUnit.SECONDS); System.out.println("ready poll!"); User u1 = (User) queue.poll(); User u2 = (User) queue.poll(); if (user1.getId().equals(u1.getId()) && user2.getId().equals(u2.getId())){ System.out.println("Success!"); } } catch (Exception e) { e.printStackTrace(); } finally{ delayExector.shutdown(); try { delayExector.awaitTermination(2, TimeUnit.SECONDS); } catch (InterruptedException e) { } } } }
zookeeper系列(一)zookeeper必知
zookeeper系列(二)實戰master選舉
zookeeper系列(三)實戰數據發佈訂閱
zookeeper系列(四)實戰負載均衡
zookeeper系列(五)實戰分佈式鎖
zookeeper系列(六)實戰分佈式隊列
zookeeper系列(七)實戰分佈式命名服務
zookeeper系列(八)zookeeper運維架構