Hive Metastore客戶端自動重連機制源碼解析

前言

​ 本文基於Hive2.1.0的Apache社區版,目的是爲了探究Metastore和底層RDBMS和底層服務變動(例如版本升級、服務遷移等運維操做)對客戶端和用戶的影響。Hive提供了在客戶端對Metastore鏈接超時自動重連的容錯機制,容許咱們經過調整參數配置調整停服時間限制,在規定時間內重啓服務對用戶無顯著影響。因爲Metastore底層RDBMS咱們採用的是業內通用的Mysql,所以後面以Mysql來替代RDBMS進行描述和驗證java

相關參數

參數 默認值 說明 配置範圍
hive.metastore.connect.retries 3 客戶端創建與metastore鏈接時的重試次數 Metastore客戶端,如CLI、Hiveserver2等
hive.metastore.failure.retries 1 客戶端訪問metastore的失敗重試次數 Metastore客戶端,如CLI、Hiveserver2等
hive.metastore.client.connect.retry.delay 1s Metastore客戶端重連/重試等待的時間 Metastore客戶端,如CLI、Hiveserver2等
hive.metastore.client.socket.timeout 600s Metastore客戶端socket超時時間,傳遞給底層Socket,超時以後底層Socket會自動斷開 Metastore客戶端,如CLI、Hiveserver2等
hive.metastore.client.socket.lifetime 0 socket存活時間,超時以後客戶端在下一次訪問Metastore時會主動斷開現有鏈接並從新創建鏈接,0表示不主動斷開 Metastore客戶端,如CLI、Hiveserver2等
hive.hmshandler.retry.attempts 10 在JDO數據存儲出現錯誤後嘗試鏈接的次數 Metastore
hive.hmshandler.retry.interval 2000ms JDO鏈接嘗試間隔,單位:ms Metastore
hive.server2.thrift.client.connect.retry.limit 1 客戶端創建與Hiveserver2鏈接的重試次數 Hiveserver2的客戶端,如Beeline等
hive.server2.thrift.client.retry.limit 1 客戶端訪問Hiveserver2的失敗重試次數 Hiveserver2的客戶端,如Beeline等
hive.server2.thrift.client.retry.delay.seconds 1s Hiveserver2客戶端重連/重試等待的時間 Hiveserver2的客戶端,如Beeline等

hive.metastore.connect.retries 和 hive.metastore.failure.retries的區別

​ 爲了弄清這兩個參數的區別,讓咱們經過源碼來確認一下,ps:爲了方便閱讀後面會用......省略掉無關的代碼邏輯sql

1. Hive與Metastore交互

​ CLI和Hiveserver2都是經過org.apache.hadoop.hive.ql.metadata.Hive類與Metastore的交互的。首先讓咱們以createDatabase(Database, boolean)方法爲例來看看具體的交互過程apache

/**
   * Create a database
   * @param db
   * @param ifNotExist if true, will ignore AlreadyExistsException exception
   * @throws AlreadyExistsException
   * @throws HiveException
   */
  public void createDatabase(Database db, boolean ifNotExist)
      throws AlreadyExistsException, HiveException {
    try {
      getMSC().createDatabase(db);
    } catch (AlreadyExistsException e) {
      if (!ifNotExist) {
        throw e;
      }
    } catch (Exception e) {
      throw new HiveException(e);
    }
  }
  /**
   * @return the metastore client for the current thread
   * @throws MetaException
   */
  @LimitedPrivate(value = {"Hive"})
  @Unstable
  public synchronized IMetaStoreClient getMSC(
      boolean allowEmbedded, boolean forceCreate) throws MetaException {
    if (metaStoreClient == null || forceCreate) {
      ......
      try {
        metaStoreClient = createMetaStoreClient(allowEmbedded);
      } catch (RuntimeException ex) {
        ......
      }
      ......
    }
    return metaStoreClient;
  }

​ Hive類維護了一個IMetaStoreClient對象,經過getMSC()方法獲取,getMSC()方法在這裏採用了懶漢模式去建立,接下來看下Hive是如何建立一個IMetaStoreClient對象的網絡

2. 建立一個IMetaStoreClient對象

// org.apache.hadoop.hive.ql.metadata.Hive.java
  private IMetaStoreClient createMetaStoreClient(boolean allowEmbedded) throws MetaException {
    ......
    if (conf.getBoolVar(ConfVars.METASTORE_FASTPATH)) {
      return new SessionHiveMetaStoreClient(conf, hookLoader, allowEmbedded);
    } else {
      return RetryingMetaStoreClient.getProxy(conf, hookLoader, metaCallTimeMap,
          SessionHiveMetaStoreClient.class.getName(), allowEmbedded);
    }
  }

​ if後面的分支用於建立客戶端內置的本地Metastore,這主要用於開發調試階段,所以咱們只關注else後面的邏輯,即經過RetryingMetaStoreClient.getProxy方法建立一個IMetaStoreClient對象。RetryingMetaStoreClient.getProxy方法經過幾回簡單地調用重載函數,最終來到下面的方法運維

// org.apache.hadoop.hive.metastore.RetryingMetaStoreClient.java
  public static IMetaStoreClient getProxy(HiveConf hiveConf, Class<?>[] constructorArgTypes,
      Object[] constructorArgs, ConcurrentHashMap<String, Long> metaCallTimeMap,
      String mscClassName) throws MetaException {

    @SuppressWarnings("unchecked")
    Class<? extends IMetaStoreClient> baseClass =
        (Class<? extends IMetaStoreClient>)MetaStoreUtils.getClass(mscClassName);

    RetryingMetaStoreClient handler =
        new RetryingMetaStoreClient(hiveConf, constructorArgTypes, constructorArgs,
            metaCallTimeMap, baseClass);
    return (IMetaStoreClient) Proxy.newProxyInstance(
        RetryingMetaStoreClient.class.getClassLoader(), baseClass.getInterfaces(), handler);
  }

​ 能夠看到,這裏利用Java代理機制建立並返回了一個IMetaStoreClient的代理——RetryingMetaStoreClient,此後對IMetaStoreClient對象的調用都委託給RetryingMetaStoreClient.invoke 處理,接下來讓咱們看下RetryingMetaStoreClient.invoke方法是如何處理用戶對IMetastoreClient對象的操做的dom

3. 每次調用IMetaStoreClient對象訪問Metastore時的底層實現邏輯

// org.apache.hadoop.hive.metastore.RetryingMetaStoreClient.java
  public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
    Object ret = null;
    int retriesMade = 0;
    TException caughtException = null;
    while (true) {
      try {
        reloginExpiringKeytabUser();
        // 1. 檢查是否重連,重連的場景包括:
        // a) 上一次循環訪問Metastore異常,且異常類型支持自動重試訪問
        // b) 底層socket超時,超時參數:hive.metastore.client.socket.lifetime
        if (retriesMade > 0 || hasConnectionLifeTimeReached(method)) {
          base.reconnect();
          lastConnectionTime = System.currentTimeMillis();
        }
        if (metaCallTimeMap == null) {
          ret = method.invoke(base, args);
        } else {
          // need to capture the timing
          long startTime = System.currentTimeMillis();
          ret = method.invoke(base, args);
          long timeTaken = System.currentTimeMillis() - startTime;
          addMethodTime(method, timeTaken);
        }
        // 2. 訪問Metastore正常,返回結果給上層調用並結束循環,用戶不主動結束的狀況下底層與Metastore的鏈接持續保持着
        break;
        
        // 3. 處理訪問Metastore過程當中出現的異常,異常主要分三類:
        // a) 用戶操做異常或元數據異常,將異常拋給用戶處理並結束循環
        // b) 底層鏈接異常,例如網絡問題、Metastore服務異常(停服、鏈接超限等)等支持自動重連,進入步驟4
        // c) 其餘未知異常,拋給用戶處理並結束循環
      } catch (UndeclaredThrowableException e) {
        throw e.getCause();
      } catch (InvocationTargetException e) {
        Throwable t = e.getCause();
        if (t instanceof TApplicationException) {
          TApplicationException tae = (TApplicationException)t;
          switch (tae.getType()) {
          case TApplicationException.UNSUPPORTED_CLIENT_TYPE:
          case TApplicationException.UNKNOWN_METHOD:
          case TApplicationException.WRONG_METHOD_NAME:
          case TApplicationException.INVALID_PROTOCOL:
            throw t;
          default:
            caughtException = tae;
          }
        } else if ((t instanceof TProtocolException) || (t instanceof TTransportException)) {
          caughtException = (TException)t;
        } else if ((t instanceof MetaException) && t.getMessage().matches(
            "(?s).*(JDO[a-zA-Z]*|TProtocol|TTransport)Exception.*") &&
            !t.getMessage().contains("java.sql.SQLIntegrityConstraintViolationException")) {
          caughtException = (MetaException)t;
        } else {
          throw t;
        }
      } catch (MetaException e) {
        if (e.getMessage().matches("(?s).*(IO|TTransport)Exception.*") &&
            !e.getMessage().contains("java.sql.SQLIntegrityConstraintViolationException")) {
          caughtException = e;
        } else {
          throw e;
        }
      }

      // 4. 對於支持自動重試的異常,會記錄重試次數並驗證次數是否超限,是則返回異常並結束循環,不然將以warn形式輸出異常日誌提醒並等等一段時間後開始下一次循環自動重試訪問Metastore。這裏用到的重試次數參數和等待時間參數分別是 hive.metastore.failure.retries,hive.metastore.client.connect.retry.delay
      if (retriesMade >= retryLimit) {
        throw caughtException;
      }
      retriesMade++;
      Thread.sleep(retryDelaySeconds * 1000);
    }
    return ret;
  }

  protected RetryingMetaStoreClient(HiveConf hiveConf, Class<?>[] constructorArgTypes,
      Object[] constructorArgs, ConcurrentHashMap<String, Long> metaCallTimeMap,
      Class<? extends IMetaStoreClient> msClientClass) throws MetaException {

    this.retryLimit = hiveConf.getIntVar(HiveConf.ConfVars.METASTORETHRIFTFAILURERETRIES);
    this.retryDelaySeconds = hiveConf.getTimeVar(
        HiveConf.ConfVars.METASTORE_CLIENT_CONNECT_RETRY_DELAY, TimeUnit.SECONDS);
    this.metaCallTimeMap = metaCallTimeMap;
    this.connectionLifeTimeInMillis = hiveConf.getTimeVar(
        HiveConf.ConfVars.METASTORE_CLIENT_SOCKET_LIFETIME, TimeUnit.MILLISECONDS);
    ......
    this.base = (IMetaStoreClient) MetaStoreUtils.newInstance(
        msClientClass, constructorArgTypes, constructorArgs);
  }

​ 從 RetryingMetaStoreClient 的構造函數中能夠發現,RetryingMetaStoreClient 維護了一個 HiveMetaStoreClient 對象,用戶在上層調用一次 RetryingMetaStoreClient 對象操做,例如第一步的 createDatabase 方法,會通過 RetryingMetaStoreClient.invoke 的封裝最終調用HiveMetaStoreClient類中的同名方法進行操做。在 RetryingMetaStoreClient.invoke 中封裝了自動重試的邏輯,在底層與Metastore的鏈接過程當中出現異常的狀況下會自動重試而不影響上層用戶的操做。socket

​ 這裏咱們在註釋中標註了 invoke 方法中主要的操做步驟,能夠看到,重試次數由參數hive.metastore.failure.retries控制,兩次重試之間的等待時間由hive.metastore.client.connect.retry.delay控制。ide

​ 注意,這裏咱們說的是「重試」,而不是「重連」,一次重試中與Metastore的交互有兩步:1. 創建與Metastore的會話 2. 執行用戶請求。咱們繼續看下客戶端是怎麼創建與Metastore的會話的函數

4. Metastore重連

// org.apache.hadoop.hive.metastore.HiveMetaStoreClient.java
  @Override
  public void reconnect() throws MetaException {
      ......
      close();
      // 當配置了多個Metastore時,會隨機調整Metastore順序
      promoteRandomMetaStoreURI();
      open();
  }


  private void open() throws MetaException {
    isConnected = false;
    ......
    // hive.metastore.client.socket.timeout
    int clientSocketTimeout = (int) conf.getTimeVar(
        ConfVars.METASTORE_CLIENT_SOCKET_TIMEOUT, TimeUnit.MILLISECONDS);

    for (int attempt = 0; !isConnected && attempt < retries; ++attempt) {
      for (URI store : metastoreUris) {
        try {
          transport = new TSocket(store.getHost(), store.getPort(), clientSocketTimeout);
          ......
          try {
            transport.open();
            isConnected = true;
          } catch (TTransportException e) {
            ......
          }
          ......
        } catch (MetaException e) {
          ......
        }
        if (isConnected) {
          break;
        }
      }
      // Wait before launching the next round of connection retries.
      if (!isConnected && retryDelaySeconds > 0) {
        try {
          Thread.sleep(retryDelaySeconds * 1000);
        } catch (InterruptedException ignore) {}
      }
    }

    if (!isConnected) {
      throw new MetaException("Could not connect to meta store using any of the URIs provided." +
        " Most recent failure: " + StringUtils.stringifyException(tte));
    }
    ......
  }

  public HiveMetaStoreClient(HiveConf conf, HiveMetaHookLoader hookLoader, Boolean allowEmbedded)
    throws MetaException {
    ......
    // hive.metastore.connect.retries
    retries = HiveConf.getIntVar(conf, HiveConf.ConfVars.METASTORETHRIFTCONNECTIONRETRIES);
    // hive.metastore.client.connect.retry.delay
    retryDelaySeconds = conf.getTimeVar(
        ConfVars.METASTORE_CLIENT_CONNECT_RETRY_DELAY, TimeUnit.SECONDS);
    ......
    // 初始化一個HiveMetaStoreClient對象時會嘗試創建與Metastore的長會話
    open();
  }

​ 同上一步的重試邏輯相似,與Metastore的鏈接支持自動重連,由 hive.metastore.connect.retries 控制重連次數,hive.metastore.client.connect.retry.delay 控制重連等待時間,底層利用Thrift提供的RPC通訊服務。oop

​ 若是配置了多個Metastore地址,每一次重連的時候會按順序遍歷全部的Metastore並嘗試與之創建會話,直到有一個會話創建成功爲止。

​ 此外,初始化一個HiveMetaStoreClient對象時會調用open()方法嘗試創建一個與Metastore的長會話,供後面的用戶請求使用

總結

  1. HiveMetaStoreClient.open() 方法創建一個與Metastore的會話,該方法中會在鏈接失敗的狀況下自動重連,重連次數、重連等待時間分別由參數 hive.metastore.connect.retrieshive.metastore.client.connect.retry.delay 控制。且每次重連時會遍歷用戶配置的全部的Metastore直到成功創建一個會話
  2. 用戶新建一個Metastore客戶端(例如啓動一個CLI、Hiveserver2進程)時,會初始化並維護一個IMetaStoreClient對象,在初始化時調用 HiveMetaStoreClient.open()方法創建一個與Metastore的長會話
  3. 用戶每次調用IMetaStoreClient中的方法進行業務操做,實際上委託給 RetryingMetaStoreClient.invoke 方法操做,在遇到與Metastore鏈接等異常時會進行自動重試,重試次數、重試等待時間分別由參數 hive.metastore.failure.retrieshive.metastore.client.connect.retry.delay 控制
  4. RetryingMetaStoreClient.invoke 中每次重試會嘗試調用 HiveMetaStoreClient.reconnect() 方法重連Metastore,HiveMetaStoreClient.reconnect() 方法內會調用 HiveMetaStoreClient.open() 去鏈接Metastore。所以,invoke方法實際上在重試循環中嵌套了循環重連Metastore的操做
  5. 因此 hive.metastore.failure.retries 參數實際上僅用於在已經創建了Metastore的會話的基礎上進行正常的業務訪問過程當中遇到鏈接異常等問題時的重試次數限制,而 hive.metastore.connect.retries 則是更底層自動重連Metastore的次數限制
  6. 此外,hive.server2.thrift.client.connect.retry.limit 同 hive.server2.thrift.client.retry.limit 的區別也與hive.metastore.connect.retries 和 hive.metastore.failure.retries的區別相似,這裏就再也不贅述,有興趣的同窗能夠參照本篇文檔去研究下源碼

結論

  • 僅中止Mysql服務
    • Metastore重試總時間 = hive.hmshandler.retry.attempts * hive.hmshandler.retry.interval
    • CLI、Hiveserver2會報JDO相關的異常,並斷開與Metastore的鏈接
  • 當CLI、Hiveserver2與Metastore的鏈接無響應時間超過hive.metastore.client.socket.timeout值會自動斷開鏈接
  • 僅中止metastore服務,CLI、Hiveserver2會打印「Failed to connect to the MetaStore Server」及重連失敗的異常,但會在屢次重連仍然失敗後才退出
    • 已經與Metastore創建會話的狀況下,客戶端的每一次業務請求的重試總時間 = hive.metastore.connect.retries * (hive.metastore.failure.retries + 1) hive.metastore.client.connect.retry.delay*
    • 停服期間客戶端新建一個Metastore鏈接過程當中重試總時間間隔 = hive.metastore.connect.retries * hive.metastore.client.connect.retry.delay
    • 關聯Beeline會卡住,直到Hiveserver2走完一個完整的重連Metastore週期後放棄鏈接Metastore爲止,此時Hiveserver2會返回異常
  • 僅關閉Hiveserver2服務,Beeline直接報錯,不會重試,須要手動重連
  • 以上各組件除Beeline外都可在上游服務恢復後自動恢復,經過自動重連機制實現
  • 綜上,建議進行底層服務變動(Metastore或MySQL)時中止Metastore服務,在停服以前需提早配置好Metastore客戶端超時重連的相關參數,預留適當的變動時間
相關文章
相關標籤/搜索