基於curator的延遲隊列

這裏不介紹關於curator的用法及優劣,旨在探究curator對於延遲隊列的使用原理java

怎麼使用

<!--dependency-->
<dependency>
	 <groupId>org.apache.curator</groupId>
	 <artifactId>curator-recipes</artifactId>
	 <version>4.0.1</version>
</dependency>

 <dependency>
	 <groupId>org.apache.curator</groupId>
	 <artifactId>curator-framework</artifactId>
	 <version>4.0.1</version>
</dependency>
public class Processor {
    private final static CuratorFramework client;
    private final static DistributedDelayQueue<String> queue;

    static{
        ZookeeperConfig config = ZookeeperConfig.getConfig();
        // create client
        client = CuratorFrameworkFactory.newClient(config.getRegistryAddress(),
                new ExponentialBackoffRetry(3000, 2));
        // build queue
        queue = QueueBuilder.builder(client, new AutoSubmitConsumer(),
                new AutoSubmitQueueSerializer(), DelayQueueEnum.AUTO_SUBMIT.getPath())
                .buildDelayQueue();
        // 開啓執行計劃
        enable();
    }

    /**
     * 生產數據
     *
     * @param id
     * @param endTime
     * @throws Exception
     */
    public void producer(String id, Date endTime) throws Exception {
        queue.put(id, endTime.getTime());
    }


    private static void enable(){
        try {
            client.start();
            queue.start();
        } catch (Exception e) {
            logger.error("enable queue fail, exception:{}", e);
        }
    }

}
// Serializer
class AutoSubmitQueueSerializer implements QueueSerializer<String> {
    @Override
    public byte[] serialize(String s) {
         return s.getBytes("utf-8");
    }

    @Override
    public String deserialize(byte[] bytes) {
        return new String(bytes);
    }
}

// consumer
AutoSubmitConsumer implements QueueConsumer<String> {

    @Override
    public void consumeMessage(String id)  {
        logger.info("consumeMessage, :{}", id);
      	// service processor.
        logger.info("consumeMessage# auto submit end, result:{}, id:{}", result, id);
    }

    @Override
    public void stateChanged(CuratorFramework curatorFramework, ConnectionState connectionState) {
    }
}

是臨時節點仍是持久化節點,若是基於內存的話客戶端或者服務端掛了之後就會存在數據丟失的問題? 是否會從新排序,zk是按照請求的時間前後順序寫入的,那麼curator是怎麼監聽到期時間的呢?apache

猜測

  1. 是否持久化
  2. 是否會在每次請求的時候拿到服務端全部的節點數據進行排序後存入到服務端

驗證

  1. 針對第一點,咱們關閉zookeeper服務端和客戶端後從新啓動後以前的節點還存在因此是持久化節點安全

  2. 經過客戶端工具鏈接zookeeper發現並不會每次請求的時候都會從新排序,也就是說可能在client端進行處理的分佈式

如下是在客戶端工具上截取的一部分信息,key是由三部分組成的,第一部分固定的queue- , 第二部分暫不肯定,第三部分是節點的序號 截圖ide

源碼求證

// org.apache.curator.framework.recipes.queue.DistributedQueue#start
// 部分片斷
client.create().creatingParentContainersIfNeeded().forPath(queuePath);
 if ( !isProducerOnly )
        {
            service.submit
                (
                    new Callable<Object>()
                    {
                        @Override
                        public Object call()
                        {
                            runLoop(); // step1
                            return null;
                        }
                    }
                );
        }
// org.apache.curator.framework.recipes.queue.DistributedQueue#runLoop
// step1中的代碼片斷
while ( state.get() == State.STARTED  )
            {
                try
                {
                    ChildrenCache.Data      data = (maxWaitMs > 0) ? childrenCache.blockingNextGetData(currentVersion, maxWaitMs, TimeUnit.MILLISECONDS) : childrenCache.blockingNextGetData(currentVersion);
                    currentVersion = data.version;
					// 諸如:
                    //queue-|2E1D86A3BB6|0000000019
                    //queue-|1712F752AA0|0000000036
                    //queue-|1712F76FF60|0000000035
		    // 拿到全部的子節點
                    List<String> children = Lists.newArrayList(data.children); 
                    // 根據過時時間排序
	            // step6
                    sortChildren(children); 
		    // 排序後
                    //queue-|1712F752AA0|0000000036
                    //queue-|1712F76FF60|0000000035
                    //queue-|2E1D86A3BB6|0000000019
                    if ( children.size() > 0 )
                    { //獲取到期時間
                        maxWaitMs = getDelay(children.get(0));
                       
                        if ( maxWaitMs > 0 ) continue;
                    }
                    else  continue;
                   // 死循環不斷輪詢是否有知足條件的節點;
                   // 只要有知足條件的節點就將整個排序後的集合往下傳遞
                    processChildren(children, currentVersion); // step2
                }
               
            }
// org.apache.curator.framework.recipes.queue.DistributedQueue#processChildren
// step2對應的代碼片斷:
private void processChildren(List<String> children, long currentVersion)
    {
        final Semaphore processedLatch = new Semaphore(0);
        final boolean   isUsingLockSafety = (lockPath != null);
        int             min = minItemsBeforeRefresh;
        for ( final String itemNode : children )
        {
            if ( Thread.currentThread().isInterrupted() )
            {
                processedLatch.release(children.size());
                break;
            }

            if ( !itemNode.startsWith(QUEUE_ITEM_NAME) )
            {
                processedLatch.release();
                continue;
            }

            if ( min-- <= 0 )
            {
                if ( refreshOnWatch && (currentVersion != childrenCache.getData().version) )
                {
                    processedLatch.release(children.size());
                    break;
                }
            }
	    // step3
            if ( getDelay(itemNode) > 0 )
            {
                processedLatch.release();
                continue;
            }
            //這裏使用了線程池,爲了保證每個節點都執行完畢後才返回方法因此使用了信號燈
            executor.execute
            (
                new Runnable()
                {
                    @Override
                    public void run()
                    {
                        try
                        {
                            //是否採用了分佈式鎖,由於咱們初始化的時候並未使用因此沒有用到這裏的安全鎖,其實是進入到了else中
                            if ( isUsingLockSafety )
                            {
                                
                                processWithLockSafety(itemNode, ProcessType.NORMAL);
                            }
                            else
                            {
			        // 看這裏 step4
                                processNormally(itemNode, ProcessType.NORMAL);
                            }
                        }finally
                        {
                            processedLatch.release();
                        }
                    }
                }
            );
        }

        processedLatch.acquire(children.size());
    }

//  org.apache.curator.framework.recipes.queue.DistributedQueue#getDelay(java.lang.String)
// 對應step3處的代碼片斷
 protected long getDelay(String itemNode)
            {
                return getDelay(itemNode, System.currentTimeMillis());
            }
            
            private long getDelay(String itemNode, long sortTime)
            {  // 會從key上獲取時間戳        
		// step5
                long epoch = getEpoch(itemNode); 
                return epoch - sortTime; // 計算過時時間
            }

// 對應step5處的代碼
private static long getEpoch(String itemNode)
    {
	// itemNode -> queue-|時間戳|序號
        int     index2 = itemNode.lastIndexOf(SEPARATOR);
        int     index1 = (index2 > 0) ? itemNode.lastIndexOf(SEPARATOR, index2 - 1) : -1;
        if ( (index1 > 0) && (index2 > (index1 + 1)) )
        {
            try
            {
                String  epochStr = itemNode.substring(index1 + 1, index2);
                return Long.parseLong(epochStr, 16); // 從這裏能夠知道queue-|這裏是16進制的時間戳了|序號| 多是出於key長度的考量吧(更節省內存),用10進制的時間戳會長不少
            }
        }
        return 0;
    }

// org.apache.curator.framework.recipes.queue.DistributedQueue#sortChildren
// 會根據延時時間排序
// step6處的代碼片斷
protected void sortChildren(List<String> children)
            {
                final long sortTime = System.currentTimeMillis();
                Collections.sort
                (
                    children,
                    new Comparator<String>()
                    {
                        @Override
                        public int compare(String o1, String o2)
                        {
                            long        diff = getDelay(o1, sortTime) - getDelay(o2, sortTime);
                            return (diff < 0) ? -1 : ((diff > 0) ? 1 : 0);
                        }
                    }
                );
            }

// 對應step4處的代碼片斷
 private boolean processNormally(String itemNode, ProcessType type) throws Exception
    {
        try
        {
            String  itemPath = ZKPaths.makePath(queuePath, itemNode);
            Stat    stat = new Stat();

            byte[]  bytes = null;
            if ( type == ProcessType.NORMAL )
            {
                // 獲取key對應的value
                bytes = client.getData().storingStatIn(stat).forPath(itemPath);
            }
            if ( client.getState() == CuratorFrameworkState.STARTED )
            {
               // 移除節點
                			client.delete().withVersion(stat.getVersion()).forPath(itemPath);
            }

            if ( type == ProcessType.NORMAL )
            {
	        //step7
                processMessageBytes(itemNode, bytes);
            }

            return true;
        }

        return false;
    }
//對應step7處代碼,會回調咱們的業務代碼
private ProcessMessageBytesCode processMessageBytes(String itemNode, byte[] bytes) throws Exception
    {
        ProcessMessageBytesCode     resultCode = ProcessMessageBytesCode.NORMAL;
        MultiItem<T>                items;
        try
        {
		  // 根據咱們定義的序列化器序列化
            items = ItemSerializer.deserialize(bytes, serializer);
        }

        for(;;)
        {
		 // 省略一部分代碼
            try
            {
                consumer.consumeMessage(item); // 這裏就會回調到咱們的業務代碼
            }
        }
        return resultCode;
    }

總結

  1. org.apache.curator.framework.recipes.queue.DistributedQueue#internalCreateNode這個方法也證明了確實是持久化且有序的節點;
  2. 若是過時時間太長而數據生產的過於頻繁的話,那麼勢必會形成數據的積壓對於性能和內存都是很大的考驗;
  3. 並且是客戶端不斷的循環獲取全部的節點、排序、再處理,由此咱們也證實了前面猜測是排序後在服務端從新添加全部節點每次監聽第一個節點變化的想法看來是錯誤的;
相關文章
相關標籤/搜索