zookeeper之分佈式計數器

例子

DistributedAtomicide

public class DistributedAtomic {  
    static CuratorFramework client = CuratorConnect.getCuratorClient2();  
    private static final String path = "/atomic";  
  
    public static void distributedAtomicInteger() {  
        CountDownLatch countDownLatch = new CountDownLatch(50);  
        for (int i = 0; i < 50; i++) {  
            new Thread(new Runnable() {  
                @Override  
  public void run() {  
                    DistributedAtomicInteger distributedAtomicInteger = new DistributedAtomicInteger(client, path, new ExponentialBackoffRetry(100, 50));  
                    try {  
                        countDownLatch.countDown();  
                        countDownLatch.await();  
                        AtomicValue<Integer> increment = distributedAtomicInteger.increment();  
                        System.out.println("提交前:" + increment.preValue() + ",提交後:" + increment.postValue());  
                    } catch (Exception e) {  
                        e.printStackTrace();  
                    }  
                }  
            }).start();  
        }  
    }  
  
    public static void main(String[] args) {  
        DistributedAtomic.distributedAtomicInteger();  
    }  
}

運行結果以下:
image.pngoop

分析

大概流程:post

  1. 先判斷是否有值,沒有值,則建立節點,且初始值設爲0
  2. 值進行相加操做
  3. 若是有節點,經過版本號修改值
  4. 修改失敗,則重試
  5. 仍是失敗,經過加鎖操做
private AtomicValue<Integer>   worker(final Integer addAmount) throws Exception  
{  
    Preconditions.checkNotNull(addAmount, "addAmount cannot be null");  
    // 判斷是否有節點,沒有節點則賦值爲0,有則轉換爲數字相加
    MakeValue               makeValue = new MakeValue()  
    {  
        @Override  
  public byte[] makeFrom(byte[] previous)  
        {  
            int previousValue = (previous != null) ? bytesToValue(previous) : 0;  
            int newValue = previousValue + addAmount;  
            return valueToBytes(newValue);  
        }  
    };  
  
    AtomicValue<byte[]>     result = value.trySet(makeValue);  
    return new AtomicInteger(result);  
}

嘗試設置值atom

AtomicValue<byte[]>   trySet(MakeValue makeValue) throws Exception  
{  
    MutableAtomicValue<byte[]>  result = new MutableAtomicValue<byte[]>(null, null, false);  
  
    tryOptimistic(result, makeValue);  
    if ( !result.succeeded() && (mutex != null) )  
    {  
        // 沒成功,再經過mutex獲取鎖重試
        tryWithMutex(result, makeValue);  
    }  
  
    return result;  
}

重試機制下看可否設值成功spa

private void tryOptimistic(MutableAtomicValue<byte[]> result, MakeValue makeValue) throws Exception  
{  
    long startMs = System.currentTimeMillis();  
    int retryCount = 0;  
  
    boolean done = false;  
    while ( !done )  
    {  
        result.stats.incrementOptimisticTries();  
        // 嘗試成功,跳出循環
        if ( tryOnce(result, makeValue) )  
        {  
            result.succeeded = true;  
            done = true;  
        }  
        else  
  {  
            // 沒成功,重試機制再試
            if ( !retryPolicy.allowRetry(retryCount++, System.currentTimeMillis() - startMs, RetryLoop.getDefaultRetrySleeper()) )  
            {  
                done = true;  
            }  
        }  
    }  
  
    result.stats.setOptimisticTimeMs(System.currentTimeMillis() - startMs);  
}

嘗試設值code

private boolean tryOnce(MutableAtomicValue<byte[]> result, MakeValue makeValue) throws Exception  
{  
    Stat        stat = new Stat();  
    // 獲取當前值
    boolean createIt = getCurrentValue(result, stat);  
  
    boolean success = false;  
    try  
  {  
        // 返回新的值
        byte[]  newValue = makeValue.makeFrom(result.preValue);  
        if ( createIt )  
        {  
            // 若是沒有節點,則建立
            client.create().creatingParentContainersIfNeeded().forPath(path, newValue);  
        }  
        else  
  {  
            // 若是有節點,則經過版本號設值
            client.setData().withVersion(stat.getVersion()).forPath(path, newValue);  
        }  
        result.postValue = Arrays.copyOf(newValue, newValue.length);  
        success = true;  
    }  
    catch ( KeeperException.NodeExistsException e )  
    {  
        // do Retry  
  }  
    catch ( KeeperException.BadVersionException e )  
    {  
        // do Retry  
  }  
    catch ( KeeperException.NoNodeException e )  
    {  
        // do Retry  
  }  
  
    return success;  
}
相關文章
相關標籤/搜索