Hive 源碼解析之 Hive 基本框架和執行入口

Hive簡介

在介紹Hive的框架和執行流程以前,這裏首先對Hive進行簡要的介紹。java

Hive 是創建在 Hadoop 上的數據倉庫基礎構架。它提供了一系列的工具,能夠用來進行數據抽取,轉化,加載(ETL),這是一種能夠存儲、查詢和分析存儲在 Hadoop 中的大規模數據的機制。Hive 定義了簡單的類 SQL 查詢語言,稱爲Hive QL,它容許熟悉 SQL 的用戶查詢數據。同時,這個語言也容許熟悉 MapReduce 開發者的開發自定義的 mapper 和 reducer 來處理內建的 mapper 和 reducer 沒法完成的複雜的分析工做。正則表達式

這裏假設已經對Hive的各個組成部分、做用以及Hive QL語言有了基本的認識,再也不作詳細的解釋,更加關注的是Hive源碼級別的解析。下面是從網絡上摘取的兩個關於Hive框架的解析圖:shell

簡要框架

官方框架

從框架圖中咱們能夠看見從用戶提交一個查詢(假設經過CLI入口)直到獲取最終結果,Hive內部的執行流程主要包括:數據庫

  1. CLI 獲取用戶查詢,解析用戶輸入的命令,提交給Driver;apache

  2. Driver 結合編譯器(COMPILER)和元數據庫(METASTORE),對用戶查詢進行編譯解析;數組

  3. 根據解析結果(查詢計劃)生成MR任務提交給Hadoop執行;網絡

  4. 獲取最終結果;session

源碼分析

咱們試圖根據Hive的源碼對上述過程的每一步進行解析, 使用的源碼版本1.1.0。話很少說,首先看看CLI如何解析用戶的輸入,並提交給Driver類執行的。這個過程主要涉及的類是:orgapachehadoophivecliCliDriver.java。app

main

執行入口,main函數,建立CliDriver實例,接受用戶輸入參數,開始運行。框架

public static void main(String[] args) throws Exception {
   int ret = new CliDriver().run(args);
   System.exit(ret);
}

這裏用到了建立CliDriver實例,看看CliDriver的構造函數內部都作了什麼操做:

public CliDriver() {
    SessionState ss = SessionState.get();
    conf = (ss != null) ? ss.getConf() : new Configuration();
    Log LOG = LogFactory.getLog("CliDriver");
    console = new LogHelper(LOG);
  }

首先獲取一個SessionStateSessionState封裝了一個會話的關聯的數據,包括配置信息HiveConf,輸入輸出流,指令類型,用戶名稱、IP地址等等。SessionState 是一個與線程關聯的靜態本地變量ThreadLocal,任何一個線程都對應一個SessionState,可以在Hive代碼的任何地方獲取到(大量被使用到),以返回用戶相關或者配置信息等。

private static ThreadLocal<SessionState> tss = new ThreadLocal<SessionState>();

public static SessionState get() {
    return tss.get();
  }

public static SessionState start(HiveConf conf) {
    //建立一個SessionState
    SessionState ss = new SessionState(conf);
    return start(ss);
  }

public static SessionState start(SessionState startSs) {
    setCurrentSessionState(startSs);
     .....
}

public static void setCurrentSessionState(SessionState startSs) {
    //將SessionState與線程本地變量tss關聯
    tss.set(startSs);
    Thread.currentThread().setContextClassLoader(startSs.getConf().getClassLoader());
 }

接着CliDriver的構造函數來講,獲取到SessionState以後,就初始化配置信息org.apache.hadoop.conf.Configuration conf.

run

CliDriver實例建立完畢,調用run(args), 開始處理用戶輸入。run方法的函數體比較長,爲了方便閱讀,下面按照代碼的出現順序,依次解析。

  1. 對輸入的指令進行初步解析,提取-e -h hiveconf hivevar等參數信息,設置用戶提供的系統和Hive環境變量。詳細實現,參考OptionsProcessor類,再也不詳細描述。

    OptionsProcessor oproc = new OptionsProcessor();
     if (!oproc.process_stage1(args)) {
     return 1;
     }
  2. 初始化Log4j日誌組件

    boolean logInitFailed = false;
     String logInitDetailMessage;
     try {
     logInitDetailMessage = LogUtils.initHiveLog4j();
      } catch (LogInitializationException e) {
     logInitFailed = true;
     logInitDetailMessage = e.getMessage();
      }
  3. 初始化HiveConf,並根據HiveConf實例化CliSessionState,設置輸入輸出流爲標準控制檯。

    CliSessionState 繼承了SessionState類,建立了一些記錄用戶輸入的字符串,在實例化的過程當中,主要是用來記錄HiveConf,並生成一個會話ID,參見SessionState構造函數.
    CliSessionState ss = new CliSessionState(new HiveConf(SessionState.class));
     ss.in = System.in;
     try {
    ss.out = new PrintStream(System.out, true, "UTF-8");
    ss.info = new PrintStream(System.err, true, "UTF-8");
    ss.err = new CachingPrintStream(System.err, true, "UTF-8");
     } catch (UnsupportedEncodingException e) {
    return 3;
     }
  4. 根據stage1解析的參數內容,填充CliSessionState的字符串,好比用戶輸入了-e 則這個stage就把-e 對應的字符串賦值給CliSessionState的 execString成員。

    if (!oproc.process_stage2(ss)) {
     return 2;
      }
  5. 在容許打印輸出的模式下,若是日誌初始化失敗,打印失敗信息

    if (!ss.getIsSilent()) {
     if (logInitFailed) {
       System.err.println(logInitDetailMessage);
     } else {
       SessionState.getConsole().printInfo(logInitDetailMessage);
     }
     }
  6. 將用戶命令行輸入的配置信息和變量等,覆蓋HiveConf的默認值

    HiveConf conf = ss.getConf();
     for (Map.Entry<Object, Object> item : ss.cmdProperties.entrySet()) {
       conf.set((String) item.getKey(), (String) item.getValue());
       ss.getOverriddenConfigurations().put((String) item.getKey(), (String) item.getValue());
     }
  7. 設置當前回話狀態,執行CLI驅動

    SessionState.start(ss);
    
    try {
     return executeDriver(ss, conf, oproc);
       } finally {
     ss.close();
       }
    }

executeDriver

在進入executeDriver以前,咱們能夠認爲Hive處理的是用戶進入Hive程序的指令,到此用戶已經進入了Hive,Cli的Driver將不斷讀取用戶的HiveQL語句並解析,提交給Driver。executeDriver函數內部出了根據用戶參數作出的一些執行響應外,還設置了用戶HiveQL的執行歷史記錄,也就是方便咱們使用上下標鍵查看以前執行的指令的功能,再也不詳述。executeDriver函數內部核心的代碼是經過while循環不斷按行讀取用戶的輸入,而後調用ProcessLine拼接一條命令cmd,傳遞給processCmd處理用戶輸入。下面就來看看processCmd函數。

processCmd

  1. 首先是設置當前clisession的用戶上一條指令,而後使用正則表達式,將用戶輸入的指令從空格,製表符等出斷開(tokenizeCmd函數),獲得token數組。

    CliSessionState ss = (CliSessionState) SessionState.get();
     ss.setLastCommand(cmd);
     // Flush the print stream, so it doesn't include output from the last command
     ss.err.flush();
     String cmd_trimmed = cmd.trim();
     String[] tokens = tokenizeCmd(cmd_trimmed);
  2. 而後根據用戶的輸入,進行不一樣的處理,這邊的處理主要包括:

    • quit或exit: 關閉回話,退出hive

    • source: 文件處理?不清楚對應什麼操做

    • ! 開頭: 調用Linux系統的shell執行指令

    • 本地模式:建立CommandProcessor, 執行用戶指令

限於篇幅緣由,前面三種狀況的代碼再也不詳述,重點介紹Hive的本地模式執行,也就是咱們經常使用的HiveQL語句,DFS命令等的處理方式:

try {
      CommandProcessor proc = CommandProcessorFactory.get(tokens, (HiveConf) conf);
      ret = processLocalCmd(cmd, proc, ss);
  } catch (SQLException e) {
      console.printError("Failed processing command " + tokens[0] + " " + e.getLocalizedMessage(),
      org.apache.hadoop.util.StringUtils.stringifyException(e));
      ret = 1;
  }

其中,CommandProcessor是一個接口類,定義以下:

public interface CommandProcessor {
 void init();
 CommandProcessorResponse run(String command) throws CommandNeedRetryException;
}

CommandProcessorFactory根據用戶指令生成的tokens和配置文件,返回CommandProcessor的一個具體實現。

public static CommandProcessor get(String[] cmd, HiveConf conf)
      throws SQLException {
    CommandProcessor result = getForHiveCommand(cmd, conf);
    if (result != null) {
      return result;
    }
    if (isBlank(cmd[0])) {
      return null;
    } else {
      if (conf == null) {
        return new Driver();
      }
      Driver drv = mapDrivers.get(conf);
      if (drv == null) {
        drv = new Driver();
        mapDrivers.put(conf, drv);
      }
      drv.init();
      return drv;
    }
  }

其中getForHiveCommand函數首先根據tokens的第一個字串,也就是用戶輸入指令的第一個單詞,在HiveCommand這個enum中定義的一些非SQL查詢操做集合中進行匹配,肯定相應的HiveCommand類型。在依據HiveCommand選擇合適的CommandProcessor實現方式,好比dfs命令對應的DFSProcessorset命令對應的SetProcessor等,若是用戶輸入的是諸如select之類的SQL查詢, getForHiveCommand返回null,直接在get函數中根據配置文件conf選擇或者生成一個Driver類實例,並做爲CommandProcessor返回。詳細的代碼參考CommandProcessorFactoryHiveCommand類。

processLocalCmd

到此Hive對用戶的一個指令cmd,配置了回話狀態CliSessionState,選擇了一個合適的CommandProcessorCliDriver將進行他的最後一步操做,提交用戶的查詢到指定的CommandProcessor,並獲取結果。這一切都是在processLocalCmd中執行的。

processLocalCmd函數的主體是一個以下的循環:

do {
      try {
        needRetry = false;
        if (proc != null) {
          //若是CommandProcessor是Driver實例
          if (proc instanceof Driver) {
            Driver qp = (Driver) proc;
            //獲取標準輸出流,打印結果信息
            PrintStream out = ss.out;
            long start = System.currentTimeMillis();
            if (ss.getIsVerbose()) {
              out.println(cmd);
            }

            qp.setTryCount(tryCount);
            //driver實例運行用戶指令,獲取運行結果響應碼
            ret = qp.run(cmd).getResponseCode();
            if (ret != 0) {
              qp.close();
              return ret;
            }

            // 統計指令的運行時間
            long end = System.currentTimeMillis();
            double timeTaken = (end - start) / 1000.0;

            ArrayList<String> res = new ArrayList<String>();
             //打印查詢結果的列名稱
            printHeader(qp, out);

            // 打印查詢結果
            int counter = 0;
            try {
              if (out instanceof FetchConverter) {
                ((FetchConverter)out).fetchStarted();
              }
              while (qp.getResults(res)) {
                for (String r : res) {
                  out.println(r);
                }
                
                counter += res.size();
                res.clear();
                if (out.checkError()) {
                  break;
                }
              }
            } catch (IOException e) {
              console.printError("Failed with exception " + e.getClass().getName() + ":"
                  + e.getMessage(), "\n"
                  + org.apache.hadoop.util.StringUtils.stringifyException(e));
              ret = 1;
            }
            //關閉結果
            int cret = qp.close();
            if (ret == 0) {
              ret = cret;
            }

            if (out instanceof FetchConverter) {
              ((FetchConverter)out).fetchFinished();
            }

            console.printInfo("Time taken: " + timeTaken + " seconds" +
                (counter == 0 ? "" : ", Fetched: " + counter + " row(s)"));
          } else {
            //若是proc不是Driver,也就是用戶執行的是非SQL查詢操做,直接執行語句,不自信FetchResult的操做
            String firstToken = tokenizeCmd(cmd.trim())[0];
            String cmd_1 = getFirstCmd(cmd.trim(), firstToken.length());

            if (ss.getIsVerbose()) {
              ss.out.println(firstToken + " " + cmd_1);
            }
            CommandProcessorResponse res = proc.run(cmd_1);
            if (res.getResponseCode() != 0) {
              ss.out.println("Query returned non-zero code: " + res.getResponseCode() +
                  ", cause: " + res.getErrorMessage());
            }
            ret = res.getResponseCode();
          }
        }
      } catch (CommandNeedRetryException e) {
        //若是執行過程當中出現異常,修改needRetry標誌,下次循環是retry。
        console.printInfo("Retry query with a different approach...");
        tryCount++;
        needRetry = true;
      }
    } while (needRetry);

前面對函數中關鍵的執行語句已經給出了註釋,這裏單獨對printHeader進行一下說明。
printHeader函數經過調用driver.getSchema.getFiledSchema,獲取查詢結果的列集合 ,而後依次打印出列名。

private void printHeader(Driver qp, PrintStream out) {
    List<FieldSchema> fieldSchemas = qp.getSchema().getFieldSchemas();
    if (HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVE_CLI_PRINT_HEADER)
          && fieldSchemas != null) {
      // Print the column names
      boolean first_col = true;
      for (FieldSchema fs : fieldSchemas) {
        if (!first_col) {
          out.print('\t');
        }
        out.print(fs.getName());
        first_col = false;
      }
      out.println();
    }
  }

想更一進步的支持我,請掃描下方的二維碼,你懂的~
圖片描述

相關文章
相關標籤/搜索