一共4種node
synchronous shared maps (local)數據結構
asynchronous maps (local or cluster-wide)併發
asynchronous locks (local or cluster-wide)app
asynchronous counters (local or cluster-wide)async
數據結構: Map<key,Map<key,value>> , LocalMapImpl 類, 注意scope:vertx instances Global Map, 生命週期結束後或Application臨界點時調用remove、clean,close方法,防止內存泄露等問題ide
private final ConcurrentMap<String, LocalMap<?, ?>> maps; private final String name; private final ConcurrentMap<K, V> map = new ConcurrentHashMap<>();//儲存的數據結構 LocalMapImpl(String name, ConcurrentMap<String, LocalMap<?, ?>> maps) { this.name = name; this.maps = maps; }
利用zookeeper做集中式存儲,V 採用序列化/反序列化, cluster模式效率不是很高ui
public <K, V> void getAsyncMap(String name, Handler<AsyncResult<AsyncMap<K, V>>> resultHandler) { Objects.requireNonNull(name, "name"); Objects.requireNonNull(resultHandler, "resultHandler"); if (clusterManager == null) {//是否啓用集羣 /** * local: Map<key,Map<key,Holder<V>>> * 新增了插入時間單位納秒,和TTL有效時間防止內存愈來愈大,連續內存不足致使內存溢出 */ getLocalAsyncMap(name, resultHandler); } else { /** * 獲取name,不存在建立zk path */ clusterManager.<K, V>getAsyncMap(name, ar -> { if (ar.succeeded()) { // Wrap it resultHandler.handle(Future.succeededFuture(new WrappedAsyncMap<K, V>(ar.result()))); } else { resultHandler.handle(Future.failedFuture(ar.cause())); } }); } }
cluster model:this
public void put(K k, V v, Handler<AsyncResult<Void>> completionHandler) { put(k, v, Optional.empty(), completionHandler); } public void put(K k, V v, long timeout, Handler<AsyncResult<Void>> completionHandler) { put(k, v, Optional.of(timeout), completionHandler); } /** * 添加數據 key/value */ private void put(K k, V v, Optional<Long> timeoutOptional, Handler<AsyncResult<Void>> completionHandler) { assertKeyAndValueAreNotNull(k, v)//數據不爲null .compose(aVoid -> checkExists(k))//檢查key是否存在 .compose(checkResult -> checkResult ? setData(k, v):create(k, v))//存在就賦值,不存在就建立 .compose(aVoid -> { //keyPath 方法 k轉化爲字節流再 Base64 編碼 JsonObject body = new JsonObject().put(TTL_KEY_BODY_KEY_PATH, keyPath(k)); if (timeoutOptional.isPresent()) {//數據是否有生存時效 asyncMapTTLMonitor.addAsyncMapWithPath(keyPath(k), this); body.put(TTL_KEY_BODY_TIMEOUT, timeoutOptional.get()); } else body.put(TTL_KEY_IS_CANCEL, true); //publish 全部node 消息 vertx.eventBus().publish(TTL_KEY_HANDLER_ADDRESS, body); Future<Void> future = Future.future(); future.complete(); return future; }) .setHandler(completionHandler);/**處理完成回調*/ } /** * 先查詢再刪除 */ public void remove(K k, Handler<AsyncResult<V>> asyncResultHandler) { assertKeyIsNotNull(k).compose(aVoid -> { Future<V> future = Future.future(); get(k, future.completer()); //獲取數據 return future; }).compose(value -> { Future<V> future = Future.future(); if (value != null) { return delete(k, value); //刪除 } else { future.complete(); } return future; }).setHandler(asyncResultHandler);/**處理完成回調*/ } /** * 獲取data */ public void get(K k, Handler<AsyncResult<V>> asyncResultHandler) { assertKeyIsNotNull(k) //檢查k不爲null .compose(aVoid -> checkExists(k)) //檢查是否存在 .compose(checkResult -> { Future<V> future = Future.future(); if (checkResult) { //獲取data ChildData childData = curatorCache.getCurrentData(keyPath(k)); if (childData != null && childData.getData() != null) { try { V value = asObject(childData.getData());//反序列化 future.complete(value); } catch (Exception e) { future.fail(e); } } else { future.complete(); } } else { //ignore future.complete(); } return future; }) .setHandler(asyncResultHandler);/**處理完成回調*/ }
asynchronous locks (local or cluster-wide) cluster: zookeeper/** * 獲取鎖 */ public void getLock(String name, Handler<AsyncResult<Lock>> resultHandler) { Objects.requireNonNull(name, "name"); Objects.requireNonNull(resultHandler, "resultHandler"); //默認超時 10s getLockWithTimeout(name, DEFAULT_LOCK_TIMEOUT, resultHandler); } public void getLockWithTimeout(String name, long timeout, Handler<AsyncResult<Lock>> resultHandler) { Objects.requireNonNull(name, "name"); Objects.requireNonNull(resultHandler, "resultHandler"); Arguments.require(timeout >= 0, "timeout must be >= 0"); if (clusterManager == null) {//是不是集羣模式 getLocalLock(name, timeout, resultHandler); } else { clusterManager.getLockWithTimeout(name, timeout, resultHandler); } }
local model:編碼
/** * 釋放lock */ public synchronized void release() { LockWaiter waiter = pollWaiters(); if (waiter != null) { waiter.acquire(this);//queue中的下一個 owner getLock } else { owned = false; } } /** * Queue poll */ private LockWaiter pollWaiters() { //使用while用途:getlock超時狀況 while (true) { LockWaiter waiter = waiters.poll(); if (waiter == null) { return null; } else if (!waiter.timedOut) { return waiter; } } } /** * 獲取鎖 * 採用狀態來判斷,存在併發問題因此採用 synchronized */ public void doAcquire(Context context, long timeout, Handler<AsyncResult<Lock>> resultHandler) { synchronized (this) { if (!owned) { // 獲取獲得 lock owned = true; lockAcquired(context, resultHandler); } else { //添加到wait Queue 中,並添加延時任務getLockTimeOut waiters.add(new LockWaiter(this, context, timeout, resultHandler)); } } }
cluster model:spa
/** * 利用ZK curator客戶端自帶實現的 DistributedLock */ public void getLockWithTimeout(String name, long timeout, Handler<AsyncResult<Lock>> resultHandler) { ContextImpl context = (ContextImpl) vertx.getOrCreateContext();//獲取context // 在 internalBlocking Pool 執行有序阻塞任務,利用Queque保證有序(FIFO) context.executeBlocking(() -> { ZKLock lock = locks.get(name); if (lock == null) { //初始不可重入的互斥鎖 InterProcessSemaphoreMutex mutexLock = new InterProcessSemaphoreMutex(curator, ZK_PATH_LOCKS + name); lock = new ZKLock(mutexLock); } try { //獲取鎖直到 timeout if (lock.getLock().acquire(timeout, TimeUnit.MILLISECONDS)) { locks.putIfAbsent(name, lock); return lock; } else { throw new VertxException("Timed out waiting to get lock " + name); } } catch (Exception e) { throw new VertxException("get lock exception", e); } }, resultHandler); } public void release() { // 使用 worker Pool 釋放鎖 vertx.executeBlocking(future -> { try { lock.release(); } catch (Exception e) { log.error(e); } future.complete(); }, false, null); }
local model: counters 採用 AtomicLong
private void getLocalCounter(String name, Handler<AsyncResult<Counter>> resultHandler) { //獲取計數器,AsynchronousCounter類對AtomicLong的封裝 Counter counter = localCounters.computeIfAbsent(name, n -> new AsynchronousCounter(vertx)); Context context = vertx.getOrCreateContext(); context.runOnContext(v -> resultHandler.handle(Future.succeededFuture(counter))); }
cluster model:
/** * 使用ZK curator客戶端自帶實現的 DistributedAtomicLong */ public void getCounter(String name, Handler<AsyncResult<Counter>> resultHandler) { //使用worker Pool執行阻塞任務 vertx.executeBlocking(future -> { try { Objects.requireNonNull(name); future.complete(new ZKCounter(name, retryPolicy)); } catch (Exception e) { future.fail(new VertxException(e)); } }, resultHandler); }