DistributedAtomicide
public class DistributedAtomic { static CuratorFramework client = CuratorConnect.getCuratorClient2(); private static final String path = "/atomic"; public static void distributedAtomicInteger() { CountDownLatch countDownLatch = new CountDownLatch(50); for (int i = 0; i < 50; i++) { new Thread(new Runnable() { @Override public void run() { DistributedAtomicInteger distributedAtomicInteger = new DistributedAtomicInteger(client, path, new ExponentialBackoffRetry(100, 50)); try { countDownLatch.countDown(); countDownLatch.await(); AtomicValue<Integer> increment = distributedAtomicInteger.increment(); System.out.println("提交前:" + increment.preValue() + ",提交後:" + increment.postValue()); } catch (Exception e) { e.printStackTrace(); } } }).start(); } } public static void main(String[] args) { DistributedAtomic.distributedAtomicInteger(); } }
運行結果以下:
oop
大概流程:post
private AtomicValue<Integer> worker(final Integer addAmount) throws Exception { Preconditions.checkNotNull(addAmount, "addAmount cannot be null"); // 判斷是否有節點,沒有節點則賦值爲0,有則轉換爲數字相加 MakeValue makeValue = new MakeValue() { @Override public byte[] makeFrom(byte[] previous) { int previousValue = (previous != null) ? bytesToValue(previous) : 0; int newValue = previousValue + addAmount; return valueToBytes(newValue); } }; AtomicValue<byte[]> result = value.trySet(makeValue); return new AtomicInteger(result); }
嘗試設置值atom
AtomicValue<byte[]> trySet(MakeValue makeValue) throws Exception { MutableAtomicValue<byte[]> result = new MutableAtomicValue<byte[]>(null, null, false); tryOptimistic(result, makeValue); if ( !result.succeeded() && (mutex != null) ) { // 沒成功,再經過mutex獲取鎖重試 tryWithMutex(result, makeValue); } return result; }
重試機制下看可否設值成功spa
private void tryOptimistic(MutableAtomicValue<byte[]> result, MakeValue makeValue) throws Exception { long startMs = System.currentTimeMillis(); int retryCount = 0; boolean done = false; while ( !done ) { result.stats.incrementOptimisticTries(); // 嘗試成功,跳出循環 if ( tryOnce(result, makeValue) ) { result.succeeded = true; done = true; } else { // 沒成功,重試機制再試 if ( !retryPolicy.allowRetry(retryCount++, System.currentTimeMillis() - startMs, RetryLoop.getDefaultRetrySleeper()) ) { done = true; } } } result.stats.setOptimisticTimeMs(System.currentTimeMillis() - startMs); }
嘗試設值code
private boolean tryOnce(MutableAtomicValue<byte[]> result, MakeValue makeValue) throws Exception { Stat stat = new Stat(); // 獲取當前值 boolean createIt = getCurrentValue(result, stat); boolean success = false; try { // 返回新的值 byte[] newValue = makeValue.makeFrom(result.preValue); if ( createIt ) { // 若是沒有節點,則建立 client.create().creatingParentContainersIfNeeded().forPath(path, newValue); } else { // 若是有節點,則經過版本號設值 client.setData().withVersion(stat.getVersion()).forPath(path, newValue); } result.postValue = Arrays.copyOf(newValue, newValue.length); success = true; } catch ( KeeperException.NodeExistsException e ) { // do Retry } catch ( KeeperException.BadVersionException e ) { // do Retry } catch ( KeeperException.NoNodeException e ) { // do Retry } return success; }