本文基於Hive2.1.0的Apache社區版,目的是爲了探究Metastore和底層RDBMS和底層服務變動(例如版本升級、服務遷移等運維操做)對客戶端和用戶的影響。Hive提供了在客戶端對Metastore鏈接超時自動重連的容錯機制,容許咱們經過調整參數配置調整停服時間限制,在規定時間內重啓服務對用戶無顯著影響。因爲Metastore底層RDBMS咱們採用的是業內通用的Mysql,所以後面以Mysql來替代RDBMS進行描述和驗證java
參數 | 默認值 | 說明 | 配置範圍 |
---|---|---|---|
hive.metastore.connect.retries | 3 | 客戶端創建與metastore鏈接時的重試次數 | Metastore客戶端,如CLI、Hiveserver2等 |
hive.metastore.failure.retries | 1 | 客戶端訪問metastore的失敗重試次數 | Metastore客戶端,如CLI、Hiveserver2等 |
hive.metastore.client.connect.retry.delay | 1s | Metastore客戶端重連/重試等待的時間 | Metastore客戶端,如CLI、Hiveserver2等 |
hive.metastore.client.socket.timeout | 600s | Metastore客戶端socket超時時間,傳遞給底層Socket,超時以後底層Socket會自動斷開 | Metastore客戶端,如CLI、Hiveserver2等 |
hive.metastore.client.socket.lifetime | 0 | socket存活時間,超時以後客戶端在下一次訪問Metastore時會主動斷開現有鏈接並從新創建鏈接,0表示不主動斷開 | Metastore客戶端,如CLI、Hiveserver2等 |
hive.hmshandler.retry.attempts | 10 | 在JDO數據存儲出現錯誤後嘗試鏈接的次數 | Metastore |
hive.hmshandler.retry.interval | 2000ms | JDO鏈接嘗試間隔,單位:ms | Metastore |
hive.server2.thrift.client.connect.retry.limit | 1 | 客戶端創建與Hiveserver2鏈接的重試次數 | Hiveserver2的客戶端,如Beeline等 |
hive.server2.thrift.client.retry.limit | 1 | 客戶端訪問Hiveserver2的失敗重試次數 | Hiveserver2的客戶端,如Beeline等 |
hive.server2.thrift.client.retry.delay.seconds | 1s | Hiveserver2客戶端重連/重試等待的時間 | Hiveserver2的客戶端,如Beeline等 |
爲了弄清這兩個參數的區別,讓咱們經過源碼來確認一下,ps:爲了方便閱讀後面會用......省略掉無關的代碼邏輯sql
CLI和Hiveserver2都是經過org.apache.hadoop.hive.ql.metadata.Hive類與Metastore的交互的。首先讓咱們以createDatabase(Database, boolean)方法爲例來看看具體的交互過程apache
/** * Create a database * @param db * @param ifNotExist if true, will ignore AlreadyExistsException exception * @throws AlreadyExistsException * @throws HiveException */ public void createDatabase(Database db, boolean ifNotExist) throws AlreadyExistsException, HiveException { try { getMSC().createDatabase(db); } catch (AlreadyExistsException e) { if (!ifNotExist) { throw e; } } catch (Exception e) { throw new HiveException(e); } } /** * @return the metastore client for the current thread * @throws MetaException */ @LimitedPrivate(value = {"Hive"}) @Unstable public synchronized IMetaStoreClient getMSC( boolean allowEmbedded, boolean forceCreate) throws MetaException { if (metaStoreClient == null || forceCreate) { ...... try { metaStoreClient = createMetaStoreClient(allowEmbedded); } catch (RuntimeException ex) { ...... } ...... } return metaStoreClient; }
Hive類維護了一個IMetaStoreClient對象,經過getMSC()方法獲取,getMSC()方法在這裏採用了懶漢模式去建立,接下來看下Hive是如何建立一個IMetaStoreClient對象的網絡
// org.apache.hadoop.hive.ql.metadata.Hive.java private IMetaStoreClient createMetaStoreClient(boolean allowEmbedded) throws MetaException { ...... if (conf.getBoolVar(ConfVars.METASTORE_FASTPATH)) { return new SessionHiveMetaStoreClient(conf, hookLoader, allowEmbedded); } else { return RetryingMetaStoreClient.getProxy(conf, hookLoader, metaCallTimeMap, SessionHiveMetaStoreClient.class.getName(), allowEmbedded); } }
if後面的分支用於建立客戶端內置的本地Metastore,這主要用於開發調試階段,所以咱們只關注else後面的邏輯,即經過RetryingMetaStoreClient.getProxy方法建立一個IMetaStoreClient對象。RetryingMetaStoreClient.getProxy方法經過幾回簡單地調用重載函數,最終來到下面的方法運維
// org.apache.hadoop.hive.metastore.RetryingMetaStoreClient.java public static IMetaStoreClient getProxy(HiveConf hiveConf, Class<?>[] constructorArgTypes, Object[] constructorArgs, ConcurrentHashMap<String, Long> metaCallTimeMap, String mscClassName) throws MetaException { @SuppressWarnings("unchecked") Class<? extends IMetaStoreClient> baseClass = (Class<? extends IMetaStoreClient>)MetaStoreUtils.getClass(mscClassName); RetryingMetaStoreClient handler = new RetryingMetaStoreClient(hiveConf, constructorArgTypes, constructorArgs, metaCallTimeMap, baseClass); return (IMetaStoreClient) Proxy.newProxyInstance( RetryingMetaStoreClient.class.getClassLoader(), baseClass.getInterfaces(), handler); }
能夠看到,這裏利用Java代理機制建立並返回了一個IMetaStoreClient的代理——RetryingMetaStoreClient,此後對IMetaStoreClient對象的調用都委託給RetryingMetaStoreClient.invoke 處理,接下來讓咱們看下RetryingMetaStoreClient.invoke方法是如何處理用戶對IMetastoreClient對象的操做的dom
// org.apache.hadoop.hive.metastore.RetryingMetaStoreClient.java public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { Object ret = null; int retriesMade = 0; TException caughtException = null; while (true) { try { reloginExpiringKeytabUser(); // 1. 檢查是否重連,重連的場景包括: // a) 上一次循環訪問Metastore異常,且異常類型支持自動重試訪問 // b) 底層socket超時,超時參數:hive.metastore.client.socket.lifetime if (retriesMade > 0 || hasConnectionLifeTimeReached(method)) { base.reconnect(); lastConnectionTime = System.currentTimeMillis(); } if (metaCallTimeMap == null) { ret = method.invoke(base, args); } else { // need to capture the timing long startTime = System.currentTimeMillis(); ret = method.invoke(base, args); long timeTaken = System.currentTimeMillis() - startTime; addMethodTime(method, timeTaken); } // 2. 訪問Metastore正常,返回結果給上層調用並結束循環,用戶不主動結束的狀況下底層與Metastore的鏈接持續保持着 break; // 3. 處理訪問Metastore過程當中出現的異常,異常主要分三類: // a) 用戶操做異常或元數據異常,將異常拋給用戶處理並結束循環 // b) 底層鏈接異常,例如網絡問題、Metastore服務異常(停服、鏈接超限等)等支持自動重連,進入步驟4 // c) 其餘未知異常,拋給用戶處理並結束循環 } catch (UndeclaredThrowableException e) { throw e.getCause(); } catch (InvocationTargetException e) { Throwable t = e.getCause(); if (t instanceof TApplicationException) { TApplicationException tae = (TApplicationException)t; switch (tae.getType()) { case TApplicationException.UNSUPPORTED_CLIENT_TYPE: case TApplicationException.UNKNOWN_METHOD: case TApplicationException.WRONG_METHOD_NAME: case TApplicationException.INVALID_PROTOCOL: throw t; default: caughtException = tae; } } else if ((t instanceof TProtocolException) || (t instanceof TTransportException)) { caughtException = (TException)t; } else if ((t instanceof MetaException) && t.getMessage().matches( "(?s).*(JDO[a-zA-Z]*|TProtocol|TTransport)Exception.*") && !t.getMessage().contains("java.sql.SQLIntegrityConstraintViolationException")) { caughtException = (MetaException)t; } else { throw t; } } catch (MetaException e) { if (e.getMessage().matches("(?s).*(IO|TTransport)Exception.*") && !e.getMessage().contains("java.sql.SQLIntegrityConstraintViolationException")) { caughtException = e; } else { throw e; } } // 4. 對於支持自動重試的異常,會記錄重試次數並驗證次數是否超限,是則返回異常並結束循環,不然將以warn形式輸出異常日誌提醒並等等一段時間後開始下一次循環自動重試訪問Metastore。這裏用到的重試次數參數和等待時間參數分別是 hive.metastore.failure.retries,hive.metastore.client.connect.retry.delay if (retriesMade >= retryLimit) { throw caughtException; } retriesMade++; Thread.sleep(retryDelaySeconds * 1000); } return ret; } protected RetryingMetaStoreClient(HiveConf hiveConf, Class<?>[] constructorArgTypes, Object[] constructorArgs, ConcurrentHashMap<String, Long> metaCallTimeMap, Class<? extends IMetaStoreClient> msClientClass) throws MetaException { this.retryLimit = hiveConf.getIntVar(HiveConf.ConfVars.METASTORETHRIFTFAILURERETRIES); this.retryDelaySeconds = hiveConf.getTimeVar( HiveConf.ConfVars.METASTORE_CLIENT_CONNECT_RETRY_DELAY, TimeUnit.SECONDS); this.metaCallTimeMap = metaCallTimeMap; this.connectionLifeTimeInMillis = hiveConf.getTimeVar( HiveConf.ConfVars.METASTORE_CLIENT_SOCKET_LIFETIME, TimeUnit.MILLISECONDS); ...... this.base = (IMetaStoreClient) MetaStoreUtils.newInstance( msClientClass, constructorArgTypes, constructorArgs); }
從 RetryingMetaStoreClient 的構造函數中能夠發現,RetryingMetaStoreClient 維護了一個 HiveMetaStoreClient 對象,用戶在上層調用一次 RetryingMetaStoreClient 對象操做,例如第一步的 createDatabase 方法,會通過 RetryingMetaStoreClient.invoke 的封裝最終調用HiveMetaStoreClient類中的同名方法進行操做。在 RetryingMetaStoreClient.invoke 中封裝了自動重試的邏輯,在底層與Metastore的鏈接過程當中出現異常的狀況下會自動重試而不影響上層用戶的操做。socket
這裏咱們在註釋中標註了 invoke 方法中主要的操做步驟,能夠看到,重試次數由參數hive.metastore.failure.retries控制,兩次重試之間的等待時間由hive.metastore.client.connect.retry.delay控制。ide
注意,這裏咱們說的是「重試」,而不是「重連」,一次重試中與Metastore的交互有兩步:1. 創建與Metastore的會話 2. 執行用戶請求。咱們繼續看下客戶端是怎麼創建與Metastore的會話的函數
// org.apache.hadoop.hive.metastore.HiveMetaStoreClient.java @Override public void reconnect() throws MetaException { ...... close(); // 當配置了多個Metastore時,會隨機調整Metastore順序 promoteRandomMetaStoreURI(); open(); } private void open() throws MetaException { isConnected = false; ...... // hive.metastore.client.socket.timeout int clientSocketTimeout = (int) conf.getTimeVar( ConfVars.METASTORE_CLIENT_SOCKET_TIMEOUT, TimeUnit.MILLISECONDS); for (int attempt = 0; !isConnected && attempt < retries; ++attempt) { for (URI store : metastoreUris) { try { transport = new TSocket(store.getHost(), store.getPort(), clientSocketTimeout); ...... try { transport.open(); isConnected = true; } catch (TTransportException e) { ...... } ...... } catch (MetaException e) { ...... } if (isConnected) { break; } } // Wait before launching the next round of connection retries. if (!isConnected && retryDelaySeconds > 0) { try { Thread.sleep(retryDelaySeconds * 1000); } catch (InterruptedException ignore) {} } } if (!isConnected) { throw new MetaException("Could not connect to meta store using any of the URIs provided." + " Most recent failure: " + StringUtils.stringifyException(tte)); } ...... } public HiveMetaStoreClient(HiveConf conf, HiveMetaHookLoader hookLoader, Boolean allowEmbedded) throws MetaException { ...... // hive.metastore.connect.retries retries = HiveConf.getIntVar(conf, HiveConf.ConfVars.METASTORETHRIFTCONNECTIONRETRIES); // hive.metastore.client.connect.retry.delay retryDelaySeconds = conf.getTimeVar( ConfVars.METASTORE_CLIENT_CONNECT_RETRY_DELAY, TimeUnit.SECONDS); ...... // 初始化一個HiveMetaStoreClient對象時會嘗試創建與Metastore的長會話 open(); }
同上一步的重試邏輯相似,與Metastore的鏈接支持自動重連,由 hive.metastore.connect.retries 控制重連次數,hive.metastore.client.connect.retry.delay 控制重連等待時間,底層利用Thrift提供的RPC通訊服務。oop
若是配置了多個Metastore地址,每一次重連的時候會按順序遍歷全部的Metastore並嘗試與之創建會話,直到有一個會話創建成功爲止。
此外,初始化一個HiveMetaStoreClient對象時會調用open()方法嘗試創建一個與Metastore的長會話,供後面的用戶請求使用