<!--dependency--><dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-recipes</artifactId> <version>4.0.1</version></dependency> <dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-framework</artifactId> <version>4.0.1</version></dependency>
public class Processor { private final static CuratorFramework client; private final static DistributedDelayQueue<String> queue; static{ ZookeeperConfig config = ZookeeperConfig.getConfig(); // create client client = CuratorFrameworkFactory.newClient(config.getRegistryAddress(), new ExponentialBackoffRetry(3000, 2)); // build queue queue = QueueBuilder.builder(client, new AutoSubmitConsumer(), new AutoSubmitQueueSerializer(), DelayQueueEnum.AUTO_SUBMIT.getPath()) .buildDelayQueue(); // 開啓執行計劃 enable(); } /** * 生產數據 * * @param id * @param endTime * @throws Exception */ public void producer(String id, Date endTime) throws Exception { queue.put(id, endTime.getTime()); } private static void enable(){ try { client.start(); queue.start(); } catch (Exception e) { logger.error("enable queue fail, exception:{}", e); } } }// Serializerclass AutoSubmitQueueSerializer implements QueueSerializer<String> { @Override public byte[] serialize(String s) { return s.getBytes("utf-8"); } @Override public String deserialize(byte[] bytes) { return new String(bytes); } }// consumerAutoSubmitConsumer implements QueueConsumer<String> { @Override public void consumeMessage(String id) { logger.info("consumeMessage, :{}", id); // service processor. logger.info("consumeMessage# auto submit end, result:{}, id:{}", result, id); } @Override public void stateChanged(CuratorFramework curatorFramework, ConnectionState connectionState) { } }
是臨時節點仍是持久化節點,若是基於內存的話客戶端或者服務端掛了之後就會存在數據丟失的問題? 是否會從新排序,zk是按照請求的時間前後順序寫入的,那麼curator是怎麼監聽到期時間的呢?java
是否持久化apache
是否會在每次請求的時候拿到服務端全部的節點數據進行排序後存入到服務端。鄭州不孕不育 醫院:http://jbk.39.net/yiyuanfengcai/tsyl_zztjyy/3030/安全
針對第一點,咱們關閉zookeeper
服務端和客戶端後從新啓動後以前的節點還存在因此是持久化節點app
經過客戶端工具鏈接zookeeper
發現並不會每次請求的時候都會從新排序,也就是說可能在client端進行處理的分佈式
如下是在客戶端工具上截取的一部分信息,key是由三部分組成的,第一部分固定的queue- , 第二部分暫不肯定,第三部分是節點的序號 ide
// org.apache.curator.framework.recipes.queue.DistributedQueue#start// 部分片斷client.create().creatingParentContainersIfNeeded().forPath(queuePath); if ( !isProducerOnly ) { service.submit ( new Callable<Object>() { @Override public Object call() { runLoop(); // step1 return null; } } ); }// org.apache.curator.framework.recipes.queue.DistributedQueue#runLoop// step1中的代碼片斷while ( state.get() == State.STARTED ) { try { ChildrenCache.Data data = (maxWaitMs > 0) ? childrenCache.blockingNextGetData(currentVersion, maxWaitMs, TimeUnit.MILLISECONDS) : childrenCache.blockingNextGetData(currentVersion); currentVersion = data.version; // 諸如: //queue-|2E1D86A3BB6|0000000019 //queue-|1712F752AA0|0000000036 //queue-|1712F76FF60|0000000035 // 拿到全部的子節點 List<String> children = Lists.newArrayList(data.children); // 根據過時時間排序 // step6 sortChildren(children); // 排序後 //queue-|1712F752AA0|0000000036 //queue-|1712F76FF60|0000000035 //queue-|2E1D86A3BB6|0000000019 if ( children.size() > 0 ) { //獲取到期時間 maxWaitMs = getDelay(children.get(0)); if ( maxWaitMs > 0 ) continue; } else continue; // 死循環不斷輪詢是否有知足條件的節點; // 只要有知足條件的節點就將整個排序後的集合往下傳遞 processChildren(children, currentVersion); // step2 } }// org.apache.curator.framework.recipes.queue.DistributedQueue#processChildren// step2對應的代碼片斷:private void processChildren(List<String> children, long currentVersion) { final Semaphore processedLatch = new Semaphore(0); final boolean isUsingLockSafety = (lockPath != null); int min = minItemsBeforeRefresh; for ( final String itemNode : children ) { if ( Thread.currentThread().isInterrupted() ) { processedLatch.release(children.size()); break; } if ( !itemNode.startsWith(QUEUE_ITEM_NAME) ) { processedLatch.release(); continue; } if ( min-- <= 0 ) { if ( refreshOnWatch && (currentVersion != childrenCache.getData().version) ) { processedLatch.release(children.size()); break; } } // step3 if ( getDelay(itemNode) > 0 ) { processedLatch.release(); continue; } //這裏使用了線程池,爲了保證每個節點都執行完畢後才返回方法因此使用了信號燈 executor.execute ( new Runnable() { @Override public void run() { try { //是否採用了分佈式鎖,由於咱們初始化的時候並未使用因此沒有用到這裏的安全鎖,其實是進入到了else中 if ( isUsingLockSafety ) { processWithLockSafety(itemNode, ProcessType.NORMAL); } else { // 看這裏 step4 processNormally(itemNode, ProcessType.NORMAL); } }finally { processedLatch.release(); } } } ); } processedLatch.acquire(children.size()); }// org.apache.curator.framework.recipes.queue.DistributedQueue#getDelay(java.lang.String)// 對應step3處的代碼片斷 protected long getDelay(String itemNode) { return getDelay(itemNode, System.currentTimeMillis()); } private long getDelay(String itemNode, long sortTime) { // 會從key上獲取時間戳 // step5 long epoch = getEpoch(itemNode); return epoch - sortTime; // 計算過時時間 }// 對應step5處的代碼private static long getEpoch(String itemNode) { // itemNode -> queue-|時間戳|序號 int index2 = itemNode.lastIndexOf(SEPARATOR); int index1 = (index2 > 0) ? itemNode.lastIndexOf(SEPARATOR, index2 - 1) : -1; if ( (index1 > 0) && (index2 > (index1 + 1)) ) { try { String epochStr = itemNode.substring(index1 + 1, index2); return Long.parseLong(epochStr, 16); // 從這裏能夠知道queue-|這裏是16進制的時間戳了|序號| 多是出於key長度的考量吧(更節省內存),用10進制的時間戳會長不少 } } return 0; }// org.apache.curator.framework.recipes.queue.DistributedQueue#sortChildren// 會根據延時時間排序// step6處的代碼片斷protected void sortChildren(List<String> children) { final long sortTime = System.currentTimeMillis(); Collections.sort ( children, new Comparator<String>() { @Override public int compare(String o1, String o2) { long diff = getDelay(o1, sortTime) - getDelay(o2, sortTime); return (diff < 0) ? -1 : ((diff > 0) ? 1 : 0); } } ); }// 對應step4處的代碼片斷 private boolean processNormally(String itemNode, ProcessType type) throws Exception { try { String itemPath = ZKPaths.makePath(queuePath, itemNode); Stat stat = new Stat(); byte[] bytes = null; if ( type == ProcessType.NORMAL ) { // 獲取key對應的value bytes = client.getData().storingStatIn(stat).forPath(itemPath); } if ( client.getState() == CuratorFrameworkState.STARTED ) { // 移除節點 client.delete().withVersion(stat.getVersion()).forPath(itemPath); } if ( type == ProcessType.NORMAL ) { //step7 processMessageBytes(itemNode, bytes); } return true; } return false; }//對應step7處代碼,會回調咱們的業務代碼private ProcessMessageBytesCode processMessageBytes(String itemNode, byte[] bytes) throws Exception { ProcessMessageBytesCode resultCode = ProcessMessageBytesCode.NORMAL; MultiItem<T> items; try { // 根據咱們定義的序列化器序列化 items = ItemSerializer.deserialize(bytes, serializer); } for(;;) { // 省略一部分代碼 try { consumer.consumeMessage(item); // 這裏就會回調到咱們的業務代碼 } } return resultCode; }
org.apache.curator.framework.recipes.queue.DistributedQueue#internalCreateNode這個方法也證明了確實是持久化且有序的節點;工具
若是過時時間太長而數據生產的過於頻繁的話,那麼勢必會形成數據的積壓對於性能和內存都是很大的考驗;oop
並且是客戶端不斷的循環獲取全部的節點、排序、再處理,由此咱們也證實了前面猜測是排序後在服務端從新添加全部節點每次監聽第一個節點變化的想法看來是錯誤的;鄭州作試管嬰兒多少錢:http://www.changhong120.com/性能