淺析jetcd中KeepAlive實現及使用

前言

Etcd的Java客戶端有不少開源實現,Jetcd是Etcd官方倉庫的Java客戶端,總體api接口設計實現和官方go客戶端相似,簡潔易用。其中,租期續約的接口提供了兩個分別是keepAliveOnce和keepAlive。功能如其名,keepAliveOnce是單次續約的接口,若是要保持租約,須要手動觸發這個接口,因此這個接口基本不用。而keepAlive是自動續約保活的接口。大多數場景下,使用keepAlive便可,可是針對不一樣的場景,咱們還須要考慮幾個問題,如租約ttl的設置,以及keepAlive異常時的處理。
Jetcd項目地址:https://github.com/etcd-io/jetcdhtml

背景問題

咱們有一個基於mysql的binlog訂閱數據變動的應用,線上有很是重要的應用基於這個服務,由於存在單點故障,後面使用了jetcd
的lock + keepAlive的機制實現了主備服務秒級切換的功能,具體參見《etcd選主實現故障主備秒級切換高可用架構》,系統上線運行後發現,binlog的服務常常切換髮生主備切換,而實際狀況是,binlog的服務很是穩定,在沒有上線主備切換服務前,歷來沒有發生過線上binlog服務宕掉的狀況。最後查明問題出在了租約TTL的設置上面。這裏先拋出問題和定位,下面先看下Jetcd的keepAlive具體實現,而後在分析爲何致使這個問題。mysql

KeepAlive實現

先看下keepAlive的用法git

private long acquireActiveLease() throws InterruptedException, ExecutionException {
        long leaseId = leaseClient.grant(leaseTTL).get().getID();
        logger.debug("LeaderSelector get leaseId:[{}] and ttl:[{}]", leaseId, leaseTTL);
        this.leaseCloser = leaseClient.keepAlive(leaseId, new StreamObserver<LeaseKeepAliveResponse>() {
            @Override
            public void onNext(LeaseKeepAliveResponse value) {
                logger.debug("LeaderSelector lease keeps alive for [{}]s:", value.getTTL());
            }
            @Override
            public void onError(Throwable t) {
                logger.debug("LeaderSelector lease renewal Exception!", t.fillInStackTrace());
                cancelTask();
            }
            @Override
            public void onCompleted() {
                logger.info("LeaderSelector lease renewal completed! start canceling task.");
                cancelTask();
            }
        });
        return leaseId;
    }

租約實現都在LeaseImpl類裏,經過EtcdClient拿到LeaseImpl實例後,首先經過grant方法設置ttl拿到租約的id,而後將租約做爲入參調用keepAlive方法,第二個入參是一個觀察者對象,內置了三個接口,分別是onNext:肯定下一次租約續約時間後觸發,onError:續約異常時觸發,onCompleted:租約過時後觸發。github

keepAlive方法代碼:sql

public synchronized CloseableClient keepAlive(long leaseId, StreamObserver<LeaseKeepAliveResponse> observer) {
    if (this.closed) {
      throw newClosedLeaseClientException();
    }

    KeepAlive keepAlive = this.keepAlives.computeIfAbsent(leaseId, (key) -> new KeepAlive(leaseId));
    keepAlive.addObserver(observer);

    if (!this.hasKeepAliveServiceStarted) {
      this.hasKeepAliveServiceStarted = true;
      this.start();
    }

    return new CloseableClient() {
      @Override
      public void close() {
        keepAlive.removeObserver(observer);
      }
    };
  }

LeaseImpl內部維護了一個以LeaseId爲key,KeepAlive對象爲value的map,KeepAlive的類中維護了一個StreamObserver集合,到期
時間deadLine,下次續約時間nextKeepAlive和續約leaseId。第一次調用keepAlive方法時會觸發start,啓動續約的線程(sendKeepAliveExecutor())和檢查是否
過時的線程(deadLineExecutor())。api

private void sendKeepAliveExecutor() {
    this.keepAliveResponseObserver = Observers.observer(
      response -> processKeepAliveResponse(response),
      error -> processOnError()
    );
    this.keepAliveRequestObserver = this.leaseStub.leaseKeepAlive(this.keepAliveResponseObserver);
    this.keepAliveFuture = scheduledExecutorService.scheduleAtFixedRate(
        () -> {
            // send keep alive req to the leases whose next keep alive is before now.
            this.keepAlives.entrySet().stream()
                .filter(entry -> entry.getValue().getNextKeepAlive() < System.currentTimeMillis())
                .map(Entry::getKey)
                .map(leaseId -> LeaseKeepAliveRequest.newBuilder().setID(leaseId).build())
                .forEach(keepAliveRequestObserver::onNext);
        },
        0,
        500,
        TimeUnit.MILLISECONDS
    );
  }

sendKeepAliveExecutor方法是整個keepAlive功能實現的核心,這個方法在LeaseImpl實例裏只會被觸發一次,開啓了一個時間間隔爲500毫秒的的定時任務調度。每次從keepAlives中篩選出nextkeepAlive時間小於當前時間的KeepAlive對象,觸發續約。nextkeepAlive初始化值就是建立KeepAlive實例時的當前時間,而後在續約的響應流觀察者實例中,執行了processKeepAliveResponse方法,在這個裏面維護了KeepAlive對象的nextkeepAlive。架構

private synchronized void processKeepAliveResponse(io.etcd.jetcd.api.LeaseKeepAliveResponse leaseKeepAliveResponse) {
    if (this.closed) {
      return;
    }
    final long leaseID = leaseKeepAliveResponse.getID();
    final long ttl = leaseKeepAliveResponse.getTTL();
    final KeepAlive ka = this.keepAlives.get(leaseID);
    if (ka == null) {
      // return if the corresponding keep alive has closed.
      return;
    }
    if (ttl > 0) {
      long nextKeepAlive = System.currentTimeMillis() + ttl * 1000 / 3;
      ka.setNextKeepAlive(nextKeepAlive);
      ka.setDeadLine(System.currentTimeMillis() + ttl * 1000);
      ka.onNext(leaseKeepAliveResponse);
    } else {
      // lease expired; close all keep alive
      this.removeKeepAlive(leaseID);
      ka.onError(
          newEtcdException(
            ErrorCode.NOT_FOUND,
            "etcdserver: requested lease not found"
          )
      );
    }
  }

能夠看到,在首次續約後的響應處理中,nextKeepAlive被設置爲當前時間加上ttl的1/3時間後,也就是說若是咱們設置一個key的過時時間爲6s,那麼在使用keepAlive時續期的時間間隔爲,每2s執行續約一次。若是ttl小於零,說明key已通過期被刪除了,就直接觸發onError,傳遞了一個requested lease not found的異常對象。ide

文末小結

回到最上面binlog的主備頻繁切換的問題,因爲咱們將ttl的時間設置的太小5s。只要client和etcd 服務失聯5s以上,期間可能因爲各類緣由致使keepAlive沒有正常續約上,就會觸發主備切換。這個時候binlog服務自己是沒有任何問題的,卻要由於失去領導權,而選擇自殺。後面將ttl調整到了20s後,主備切換就沒有那麼敏感了。
還有一個場景,在將etcd做爲服務註冊中心時,也會使用到keepAlive,即便設置了ttl爲20s,仍是有可能沒有續約上,致使註冊的服務過時被刪了,這個時候,咱們的服務進程仍是健康的。這個場景下,須要在onError、onCompleted事件中從新獲取租約以及添加新的KeepAlive。ui

相關文章
相關標籤/搜索