在介紹Hive的框架和執行流程以前,這裏首先對Hive進行簡要的介紹。java
Hive 是創建在 Hadoop 上的數據倉庫基礎構架。它提供了一系列的工具,能夠用來進行數據抽取,轉化,加載(ETL),這是一種能夠存儲、查詢和分析存儲在 Hadoop 中的大規模數據的機制。Hive 定義了簡單的類 SQL 查詢語言,稱爲Hive QL,它容許熟悉 SQL 的用戶查詢數據。同時,這個語言也容許熟悉 MapReduce 開發者的開發自定義的 mapper 和 reducer 來處理內建的 mapper 和 reducer 沒法完成的複雜的分析工做。正則表達式
這裏假設已經對Hive的各個組成部分、做用以及Hive QL語言有了基本的認識,再也不作詳細的解釋,更加關注的是Hive源碼級別的解析。下面是從網絡上摘取的兩個關於Hive框架的解析圖:shell
從框架圖中咱們能夠看見從用戶提交一個查詢(假設經過CLI入口)直到獲取最終結果,Hive內部的執行流程主要包括:數據庫
CLI 獲取用戶查詢,解析用戶輸入的命令,提交給Driver;apache
Driver 結合編譯器(COMPILER)和元數據庫(METASTORE),對用戶查詢進行編譯解析;數組
根據解析結果(查詢計劃)生成MR任務提交給Hadoop執行;網絡
獲取最終結果;session
咱們試圖根據Hive的源碼對上述過程的每一步進行解析, 使用的源碼版本1.1.0。話很少說,首先看看CLI如何解析用戶的輸入,並提交給Driver類執行的。這個過程主要涉及的類是:orgapachehadoophivecliCliDriver.java。app
執行入口,main函數,建立CliDriver實例,接受用戶輸入參數,開始運行。框架
public static void main(String[] args) throws Exception { int ret = new CliDriver().run(args); System.exit(ret); }
這裏用到了建立CliDriver實例,看看CliDriver的構造函數內部都作了什麼操做:
public CliDriver() { SessionState ss = SessionState.get(); conf = (ss != null) ? ss.getConf() : new Configuration(); Log LOG = LogFactory.getLog("CliDriver"); console = new LogHelper(LOG); }
首先獲取一個SessionState
, SessionState
封裝了一個會話的關聯的數據,包括配置信息HiveConf,輸入輸出流,指令類型,用戶名稱、IP地址等等。SessionState
是一個與線程關聯的靜態本地變量ThreadLocal,任何一個線程都對應一個SessionState
,可以在Hive代碼的任何地方獲取到(大量被使用到),以返回用戶相關或者配置信息等。
private static ThreadLocal<SessionState> tss = new ThreadLocal<SessionState>(); public static SessionState get() { return tss.get(); } public static SessionState start(HiveConf conf) { //建立一個SessionState SessionState ss = new SessionState(conf); return start(ss); } public static SessionState start(SessionState startSs) { setCurrentSessionState(startSs); ..... } public static void setCurrentSessionState(SessionState startSs) { //將SessionState與線程本地變量tss關聯 tss.set(startSs); Thread.currentThread().setContextClassLoader(startSs.getConf().getClassLoader()); }
接着CliDriver的構造函數來講,獲取到SessionState
以後,就初始化配置信息org.apache.hadoop.conf.Configuration conf.
CliDriver實例建立完畢,調用run(args), 開始處理用戶輸入。run方法的函數體比較長,爲了方便閱讀,下面按照代碼的出現順序,依次解析。
對輸入的指令進行初步解析,提取-e -h hiveconf hivevar
等參數信息,設置用戶提供的系統和Hive環境變量。詳細實現,參考OptionsProcessor
類,再也不詳細描述。
OptionsProcessor oproc = new OptionsProcessor(); if (!oproc.process_stage1(args)) { return 1; }
初始化Log4j日誌組件
boolean logInitFailed = false; String logInitDetailMessage; try { logInitDetailMessage = LogUtils.initHiveLog4j(); } catch (LogInitializationException e) { logInitFailed = true; logInitDetailMessage = e.getMessage(); }
初始化HiveConf,並根據HiveConf實例化CliSessionState,設置輸入輸出流爲標準控制檯。
CliSessionState 繼承了SessionState類,建立了一些記錄用戶輸入的字符串,在實例化的過程當中,主要是用來記錄HiveConf,並生成一個會話ID,參見SessionState構造函數.
CliSessionState ss = new CliSessionState(new HiveConf(SessionState.class)); ss.in = System.in; try { ss.out = new PrintStream(System.out, true, "UTF-8"); ss.info = new PrintStream(System.err, true, "UTF-8"); ss.err = new CachingPrintStream(System.err, true, "UTF-8"); } catch (UnsupportedEncodingException e) { return 3; }
根據stage1解析的參數內容,填充CliSessionState的字符串,好比用戶輸入了-e 則這個stage就把-e 對應的字符串賦值給CliSessionState的 execString成員。
if (!oproc.process_stage2(ss)) { return 2; }
在容許打印輸出的模式下,若是日誌初始化失敗,打印失敗信息
if (!ss.getIsSilent()) { if (logInitFailed) { System.err.println(logInitDetailMessage); } else { SessionState.getConsole().printInfo(logInitDetailMessage); } }
將用戶命令行輸入的配置信息和變量等,覆蓋HiveConf的默認值
HiveConf conf = ss.getConf(); for (Map.Entry<Object, Object> item : ss.cmdProperties.entrySet()) { conf.set((String) item.getKey(), (String) item.getValue()); ss.getOverriddenConfigurations().put((String) item.getKey(), (String) item.getValue()); }
設置當前回話狀態,執行CLI驅動
SessionState.start(ss); try { return executeDriver(ss, conf, oproc); } finally { ss.close(); } }
在進入executeDriver
以前,咱們能夠認爲Hive處理的是用戶進入Hive程序的指令,到此用戶已經進入了Hive,Cli的Driver將不斷讀取用戶的HiveQL語句並解析,提交給Driver。executeDriver
函數內部出了根據用戶參數作出的一些執行響應外,還設置了用戶HiveQL的執行歷史記錄,也就是方便咱們使用上下標鍵查看以前執行的指令的功能,再也不詳述。executeDriver
函數內部核心的代碼是經過while循環不斷按行讀取用戶的輸入,而後調用ProcessLine拼接一條命令cmd
,傳遞給processCmd
處理用戶輸入。下面就來看看processCmd
函數。
首先是設置當前clisession的用戶上一條指令,而後使用正則表達式,將用戶輸入的指令從空格,製表符等出斷開(tokenizeCmd函數),獲得token數組。
CliSessionState ss = (CliSessionState) SessionState.get(); ss.setLastCommand(cmd); // Flush the print stream, so it doesn't include output from the last command ss.err.flush(); String cmd_trimmed = cmd.trim(); String[] tokens = tokenizeCmd(cmd_trimmed);
而後根據用戶的輸入,進行不一樣的處理,這邊的處理主要包括:
quit或exit: 關閉回話,退出hive
source: 文件處理?不清楚對應什麼操做
!
開頭: 調用Linux系統的shell執行指令
本地模式:建立CommandProcessor, 執行用戶指令
限於篇幅緣由,前面三種狀況的代碼再也不詳述,重點介紹Hive的本地模式執行,也就是咱們經常使用的HiveQL語句,DFS命令等的處理方式:
try { CommandProcessor proc = CommandProcessorFactory.get(tokens, (HiveConf) conf); ret = processLocalCmd(cmd, proc, ss); } catch (SQLException e) { console.printError("Failed processing command " + tokens[0] + " " + e.getLocalizedMessage(), org.apache.hadoop.util.StringUtils.stringifyException(e)); ret = 1; }
其中,CommandProcessor
是一個接口類,定義以下:
public interface CommandProcessor { void init(); CommandProcessorResponse run(String command) throws CommandNeedRetryException; }
CommandProcessorFactory
根據用戶指令生成的tokens和配置文件,返回CommandProcessor
的一個具體實現。
public static CommandProcessor get(String[] cmd, HiveConf conf) throws SQLException { CommandProcessor result = getForHiveCommand(cmd, conf); if (result != null) { return result; } if (isBlank(cmd[0])) { return null; } else { if (conf == null) { return new Driver(); } Driver drv = mapDrivers.get(conf); if (drv == null) { drv = new Driver(); mapDrivers.put(conf, drv); } drv.init(); return drv; } }
其中getForHiveCommand
函數首先根據tokens的第一個字串,也就是用戶輸入指令的第一個單詞,在HiveCommand
這個enum
中定義的一些非SQL查詢操做集合中進行匹配,肯定相應的HiveCommand
類型。在依據HiveCommand
選擇合適的CommandProcessor
實現方式,好比dfs
命令對應的DFSProcessor
,set
命令對應的SetProcessor
等,若是用戶輸入的是諸如select
之類的SQL查詢, getForHiveCommand
返回null,直接在get
函數中根據配置文件conf選擇或者生成一個Driver
類實例,並做爲CommandProcessor
返回。詳細的代碼參考CommandProcessorFactory
和HiveCommand
類。
到此Hive對用戶的一個指令cmd
,配置了回話狀態CliSessionState
,選擇了一個合適的CommandProcessor
, CliDriver
將進行他的最後一步操做,提交用戶的查詢到指定的CommandProcessor
,並獲取結果。這一切都是在processLocalCmd
中執行的。
processLocalCmd
函數的主體是一個以下的循環:
do { try { needRetry = false; if (proc != null) { //若是CommandProcessor是Driver實例 if (proc instanceof Driver) { Driver qp = (Driver) proc; //獲取標準輸出流,打印結果信息 PrintStream out = ss.out; long start = System.currentTimeMillis(); if (ss.getIsVerbose()) { out.println(cmd); } qp.setTryCount(tryCount); //driver實例運行用戶指令,獲取運行結果響應碼 ret = qp.run(cmd).getResponseCode(); if (ret != 0) { qp.close(); return ret; } // 統計指令的運行時間 long end = System.currentTimeMillis(); double timeTaken = (end - start) / 1000.0; ArrayList<String> res = new ArrayList<String>(); //打印查詢結果的列名稱 printHeader(qp, out); // 打印查詢結果 int counter = 0; try { if (out instanceof FetchConverter) { ((FetchConverter)out).fetchStarted(); } while (qp.getResults(res)) { for (String r : res) { out.println(r); } counter += res.size(); res.clear(); if (out.checkError()) { break; } } } catch (IOException e) { console.printError("Failed with exception " + e.getClass().getName() + ":" + e.getMessage(), "\n" + org.apache.hadoop.util.StringUtils.stringifyException(e)); ret = 1; } //關閉結果 int cret = qp.close(); if (ret == 0) { ret = cret; } if (out instanceof FetchConverter) { ((FetchConverter)out).fetchFinished(); } console.printInfo("Time taken: " + timeTaken + " seconds" + (counter == 0 ? "" : ", Fetched: " + counter + " row(s)")); } else { //若是proc不是Driver,也就是用戶執行的是非SQL查詢操做,直接執行語句,不自信FetchResult的操做 String firstToken = tokenizeCmd(cmd.trim())[0]; String cmd_1 = getFirstCmd(cmd.trim(), firstToken.length()); if (ss.getIsVerbose()) { ss.out.println(firstToken + " " + cmd_1); } CommandProcessorResponse res = proc.run(cmd_1); if (res.getResponseCode() != 0) { ss.out.println("Query returned non-zero code: " + res.getResponseCode() + ", cause: " + res.getErrorMessage()); } ret = res.getResponseCode(); } } } catch (CommandNeedRetryException e) { //若是執行過程當中出現異常,修改needRetry標誌,下次循環是retry。 console.printInfo("Retry query with a different approach..."); tryCount++; needRetry = true; } } while (needRetry);
前面對函數中關鍵的執行語句已經給出了註釋,這裏單獨對printHeader
進行一下說明。printHeader
函數經過調用driver.getSchema.getFiledSchema
,獲取查詢結果的列集合 ,而後依次打印出列名。
private void printHeader(Driver qp, PrintStream out) { List<FieldSchema> fieldSchemas = qp.getSchema().getFieldSchemas(); if (HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVE_CLI_PRINT_HEADER) && fieldSchemas != null) { // Print the column names boolean first_col = true; for (FieldSchema fs : fieldSchemas) { if (!first_col) { out.print('\t'); } out.print(fs.getName()); first_col = false; } out.println(); } }
想更一進步的支持我,請掃描下方的二維碼,你懂的~