vertx的ShardData共享數據

數據類型

一共4種node

synchronous shared maps (local)數據結構

asynchronous maps (local or cluster-wide)併發

asynchronous locks (local or cluster-wide)app

asynchronous counters (local or cluster-wide)async

synchronous shared maps (local)

數據結構: Map<key,Map<key,value>> , LocalMapImpl 類, 注意scope:vertx instances Global Map,  生命週期結束後或Application臨界點時調用remove、clean,close方法,防止內存泄露等問題ide

private final ConcurrentMap<String, LocalMap<?, ?>> maps;
private final String name;
private final ConcurrentMap<K, V> map = new ConcurrentHashMap<>();//儲存的數據結構

LocalMapImpl(String name, ConcurrentMap<String, LocalMap<?, ?>> maps) {
    this.name = name;
    this.maps = maps;
}

 

asynchronous maps (local or cluster-wide) cluster: zookeeper

利用zookeeper做集中式存儲,V 採用序列化/反序列化, cluster模式效率不是很高ui

public <K, V> void getAsyncMap(String name, Handler<AsyncResult<AsyncMap<K, V>>> resultHandler) {
    Objects.requireNonNull(name, "name");
    Objects.requireNonNull(resultHandler, "resultHandler");
    if (clusterManager == null) {//是否啓用集羣
      /**
        * local: Map<key,Map<key,Holder<V>>>
        * 新增了插入時間單位納秒,和TTL有效時間防止內存愈來愈大,連續內存不足致使內存溢出
        */
      getLocalAsyncMap(name, resultHandler);
    } else {
     /**
       * 獲取name,不存在建立zk path
       */
      clusterManager.<K, V>getAsyncMap(name, ar -> {
        if (ar.succeeded()) {
          // Wrap it
          resultHandler.handle(Future.succeededFuture(new WrappedAsyncMap<K, V>(ar.result())));
        } else {
          resultHandler.handle(Future.failedFuture(ar.cause()));
        }
      });
    }
}

 

cluster model:this

public void put(K k, V v, Handler<AsyncResult<Void>> completionHandler) {
    put(k, v, Optional.empty(), completionHandler);
  }

  
  public void put(K k, V v, long timeout, Handler<AsyncResult<Void>> completionHandler) {
    put(k, v, Optional.of(timeout), completionHandler);
  }
  
  /**
    * 添加數據 key/value
    */
  private void put(K k, V v, Optional<Long> timeoutOptional, Handler<AsyncResult<Void>> completionHandler) {
    assertKeyAndValueAreNotNull(k, v)//數據不爲null
      .compose(aVoid -> checkExists(k))//檢查key是否存在
      .compose(checkResult -> checkResult ? setData(k, v):create(k, v))//存在就賦值,不存在就建立
      .compose(aVoid -> {
        //keyPath 方法 k轉化爲字節流再 Base64 編碼
        JsonObject body = new JsonObject().put(TTL_KEY_BODY_KEY_PATH, keyPath(k));
        
        if (timeoutOptional.isPresent()) {//數據是否有生存時效
          asyncMapTTLMonitor.addAsyncMapWithPath(keyPath(k), this);
          body.put(TTL_KEY_BODY_TIMEOUT, timeoutOptional.get());
        } else body.put(TTL_KEY_IS_CANCEL, true);
        
        //publish 全部node 消息
        vertx.eventBus().publish(TTL_KEY_HANDLER_ADDRESS, body);
    
    
        Future<Void> future = Future.future();
        future.complete();
        return future;
      })
      .setHandler(completionHandler);/**處理完成回調*/
  }


/**
  * 先查詢再刪除
  */
public void remove(K k, Handler<AsyncResult<V>> asyncResultHandler) {
    assertKeyIsNotNull(k).compose(aVoid -> {
      Future<V> future = Future.future();
      get(k, future.completer()); //獲取數據
      return future;
    }).compose(value -> {
      Future<V> future = Future.future();
      if (value != null) {
        return delete(k, value); //刪除
      } else {
        future.complete();
      }
      return future;
    }).setHandler(asyncResultHandler);/**處理完成回調*/
}

/**
 * 獲取data
 */
public void get(K k, Handler<AsyncResult<V>> asyncResultHandler) {
    assertKeyIsNotNull(k) //檢查k不爲null
      .compose(aVoid -> checkExists(k)) //檢查是否存在
      .compose(checkResult -> {
        Future<V> future = Future.future();
        if (checkResult) {
         //獲取data
          ChildData childData = curatorCache.getCurrentData(keyPath(k));
          if (childData != null && childData.getData() != null) {
            try {
              V value = asObject(childData.getData());//反序列化
              future.complete(value);
            } catch (Exception e) {
              future.fail(e);
            }
          } else {
            future.complete();
          }
        } else {
          //ignore
          future.complete();
        }
        return future;
      })
      .setHandler(asyncResultHandler);/**處理完成回調*/
}

 

asynchronous locks (local or cluster-wide) cluster: zookeeper

/**
  * 獲取鎖
  */
public void getLock(String name, Handler<AsyncResult<Lock>> resultHandler) {
    Objects.requireNonNull(name, "name");
    Objects.requireNonNull(resultHandler, "resultHandler");
    //默認超時 10s
    getLockWithTimeout(name, DEFAULT_LOCK_TIMEOUT, resultHandler);
}

  
public void getLockWithTimeout(String name, long timeout, Handler<AsyncResult<Lock>> resultHandler) {
    Objects.requireNonNull(name, "name");
    Objects.requireNonNull(resultHandler, "resultHandler");
    Arguments.require(timeout >= 0, "timeout must be >= 0");
    if (clusterManager == null) {//是不是集羣模式
      getLocalLock(name, timeout, resultHandler);
    } else {
      clusterManager.getLockWithTimeout(name, timeout, resultHandler);
    }
}

 

local model:編碼

/**
  * 釋放lock
  */
public synchronized void release() {
    LockWaiter waiter = pollWaiters();
    if (waiter != null) {
      waiter.acquire(this);//queue中的下一個 owner getLock
    } else {
      owned = false;
    }
}

/**
  * Queue poll
  */
private LockWaiter pollWaiters() {
    //使用while用途:getlock超時狀況
    while (true) {
      LockWaiter waiter = waiters.poll();
      if (waiter == null) {
        return null;
      } else if (!waiter.timedOut) {
        return waiter;
      }
    }
}

/**
  * 獲取鎖
  * 採用狀態來判斷,存在併發問題因此採用 synchronized
  */
public void doAcquire(Context context, long timeout, Handler<AsyncResult<Lock>> resultHandler) {
    synchronized (this) {
      if (!owned) {
        // 獲取獲得 lock
        owned = true;
        lockAcquired(context, resultHandler);
      } else {
        //添加到wait Queue 中,並添加延時任務getLockTimeOut
        waiters.add(new LockWaiter(this, context, timeout, resultHandler));
      }
    }
}

 

cluster model:spa

/**
  * 利用ZK curator客戶端自帶實現的 DistributedLock
  */
public void getLockWithTimeout(String name, long timeout, Handler<AsyncResult<Lock>> resultHandler) {
    ContextImpl context = (ContextImpl) vertx.getOrCreateContext();//獲取context
    // 在 internalBlocking Pool 執行有序阻塞任務,利用Queque保證有序(FIFO)
    context.executeBlocking(() -> {
      ZKLock lock = locks.get(name);
      if (lock == null) {
        //初始不可重入的互斥鎖
        InterProcessSemaphoreMutex mutexLock = new InterProcessSemaphoreMutex(curator, ZK_PATH_LOCKS + name);
        lock = new ZKLock(mutexLock);
      }
      try {
       //獲取鎖直到 timeout
        if (lock.getLock().acquire(timeout, TimeUnit.MILLISECONDS)) {
          locks.putIfAbsent(name, lock);
          return lock;
        } else {
          throw new VertxException("Timed out waiting to get lock " + name);
        }
      } catch (Exception e) {
        throw new VertxException("get lock exception", e);
      }
    }, resultHandler);
}


public void release() {
  // 使用 worker Pool 釋放鎖
  vertx.executeBlocking(future -> {
    try {
      lock.release();
    } catch (Exception e) {
      log.error(e);
    }
    future.complete();
  }, false, null);
}

 

asynchronous counters (local or cluster-wide) cluster: zookeeper

local model: counters 採用 AtomicLong

private void getLocalCounter(String name, Handler<AsyncResult<Counter>> resultHandler) {
  //獲取計數器,AsynchronousCounter類對AtomicLong的封裝
    Counter counter = localCounters.computeIfAbsent(name, n -> new AsynchronousCounter(vertx));
    Context context = vertx.getOrCreateContext();
    context.runOnContext(v -> resultHandler.handle(Future.succeededFuture(counter)));
}

 

cluster model:

/**
 * 使用ZK curator客戶端自帶實現的 DistributedAtomicLong
 */
public void getCounter(String name, Handler<AsyncResult<Counter>> resultHandler) {
    //使用worker Pool執行阻塞任務
    vertx.executeBlocking(future -> {
      try {
        Objects.requireNonNull(name);
        future.complete(new ZKCounter(name, retryPolicy));
      } catch (Exception e) {
        future.fail(new VertxException(e));
      }
    }, resultHandler);
}
相關文章
相關標籤/搜索