【原創】大數據基礎之Hive(1)Hive SQL執行過程之代碼流程

hive 2.1html

 

hive執行sql有兩種方式:sql

  • 執行hive命令,又細分爲hive -e,hive -f,hive交互式;
  • 執行beeline命令,beeline會鏈接遠程thrift server;

下面分別看這些場景下sql是怎樣被執行的:apache

1 hive命令

啓動命令

啓動hive客戶端命令session

$HIVE_HOME/bin/hiveapp

等價於ide

$HIVE_HOME/bin/hive --service clioop

會調用fetch

$HIVE_HOME/bin/ext/cli.shui

實際啓動類爲:org.apache.hadoop.hive.cli.CliDriverthis

代碼解析

org.apache.hadoop.hive.cli.CliDriver

  public static void main(String[] args) throws Exception {
    int ret = new CliDriver().run(args);
    System.exit(ret);
  }

  public  int run(String[] args) throws Exception {
...
    // execute cli driver work
    try {
      return executeDriver(ss, conf, oproc);
    } finally {
      ss.resetThreadName();
      ss.close();
    }
...

  private int executeDriver(CliSessionState ss, HiveConf conf, OptionsProcessor oproc)
      throws Exception {
...
    if (ss.execString != null) {
      int cmdProcessStatus = cli.processLine(ss.execString);
      return cmdProcessStatus;
    }
...
    try {
      if (ss.fileName != null) {
        return cli.processFile(ss.fileName);
      }
    } catch (FileNotFoundException e) {
      System.err.println("Could not open input file for reading. (" + e.getMessage() + ")");
      return 3;
    }
...
    while ((line = reader.readLine(curPrompt + "> ")) != null) {
      if (!prefix.equals("")) {
        prefix += '\n';
      }
      if (line.trim().startsWith("--")) {
        continue;
      }
      if (line.trim().endsWith(";") && !line.trim().endsWith("\\;")) {
        line = prefix + line;
        ret = cli.processLine(line, true);
...

  public int processFile(String fileName) throws IOException {
...
      rc = processReader(bufferReader);
...

  public int processReader(BufferedReader r) throws IOException {
    String line;
    StringBuilder qsb = new StringBuilder();

    while ((line = r.readLine()) != null) {
      // Skipping through comments
      if (! line.startsWith("--")) {
        qsb.append(line + "\n");
      }
    }

    return (processLine(qsb.toString()));
  }
  
  public int processLine(String line, boolean allowInterrupting) {
...
        ret = processCmd(command);
...

  public int processCmd(String cmd) {
...
        CommandProcessor proc = CommandProcessorFactory.get(tokens, (HiveConf) conf);
        ret = processLocalCmd(cmd, proc, ss);
...

  int processLocalCmd(String cmd, CommandProcessor proc, CliSessionState ss) {
    int tryCount = 0;
    boolean needRetry;
    int ret = 0;

    do {
      try {
        needRetry = false;
        if (proc != null) {
          if (proc instanceof Driver) {
            Driver qp = (Driver) proc;
            PrintStream out = ss.out;
            long start = System.currentTimeMillis();
            if (ss.getIsVerbose()) {
              out.println(cmd);
            }

            qp.setTryCount(tryCount);
            ret = qp.run(cmd).getResponseCode();
...
              while (qp.getResults(res)) {
                for (String r : res) {
                  out.println(r);
                }
...

CliDriver.main會調用run,run會調用executeDriver,在executeDriver中對應上邊提到的三種狀況:

  • 一種是hive -e執行sql,此時ss.execString非空,執行完進程退出;
  • 一種是hive -f執行sql文件,此時ss.fileName非空,執行完進程退出;
  • 一種是hive交互式執行sql,此時會不斷讀取reader.readLine,而後執行失去了並輸出結果;

上述三種狀況最終都會調用processLine,processLine會調用processLocalCmd,在processLocalCmd中會先調用到Driver.run執行sql,執行完以後再調用Driver.getResults輸出結果,這也是Driver最重要的兩個接口,Driver實現後邊再看;

2 beeline命令

beeline須要鏈接到hive thrift server,先看hive thrift server如何啓動:

hive thrift server

啓動命令

啓動hive thrift server命令

$HIVE_HOME/bin/hiveserver2

等價於

$HIVE_HOME/bin/hive --service hiveserver2

會調用

$HIVE_HOME/bin/ext/hiveserver2.sh

實際啓動類爲:org.apache.hive.service.server.HiveServer2

啓動過程

HiveServer2.main

         startHiveServer2

                  init

                          addService-CLIService,ThriftBinaryCLIService

                  start

                          Service.start

                                   CLIService.start

                                   ThriftBinaryCLIService.start

                                            TThreadPoolServer.serve

 類結構:【接口或父類->子類】

TServer->TThreadPoolServer

         TProcessorFactory->SQLPlainProcessorFactory

                  TProcessor->TSetIpAddressProcessor

                          ThriftCLIService->ThriftBinaryCLIService

                                   CLIService

                                            HiveSession

代碼解析

org.apache.hive.service.cli.thrift.ThriftBinaryCLIService

  public ThriftBinaryCLIService(CLIService cliService, Runnable oomHook) {
    super(cliService, ThriftBinaryCLIService.class.getSimpleName());
    this.oomHook = oomHook;
  }

ThriftBinaryCLIService是一個核心類,其中會實際啓動thrift server,同時包裝一個CLIService,請求最後都會調用底層的CLIService處理,下面看CLIService代碼:

org.apache.hive.service.cli.CLIService

  @Override
  public OperationHandle executeStatement(SessionHandle sessionHandle, String statement,
      Map<String, String> confOverlay) throws HiveSQLException {
    OperationHandle opHandle =
        sessionManager.getSession(sessionHandle).executeStatement(statement, confOverlay);
    LOG.debug(sessionHandle + ": executeStatement()");
    return opHandle;
  }
  
  @Override
  public RowSet fetchResults(OperationHandle opHandle, FetchOrientation orientation,
                             long maxRows, FetchType fetchType) throws HiveSQLException {
    RowSet rowSet = sessionManager.getOperationManager().getOperation(opHandle)
        .getParentSession().fetchResults(opHandle, orientation, maxRows, fetchType);
    LOG.debug(opHandle + ": fetchResults()");
    return rowSet;
  }

CLIService最重要的兩個接口,一個是executeStatement,一個是fetchResults,兩個接口都會轉發給HiveSession處理,下面看HiveSession實現類代碼:

org.apache.hive.service.cli.session.HiveSessionImpl

  @Override
  public OperationHandle executeStatement(String statement, Map<String, String> confOverlay) throws HiveSQLException {
    return executeStatementInternal(statement, confOverlay, false, 0);
  }

  private OperationHandle executeStatementInternal(String statement,
      Map<String, String> confOverlay, boolean runAsync, long queryTimeout) throws HiveSQLException {
    acquire(true, true);

    ExecuteStatementOperation operation = null;
    OperationHandle opHandle = null;
    try {
      operation = getOperationManager().newExecuteStatementOperation(getSession(), statement,
          confOverlay, runAsync, queryTimeout);
      opHandle = operation.getHandle();
      operation.run();
...
  @Override
  public RowSet fetchResults(OperationHandle opHandle, FetchOrientation orientation,
      long maxRows, FetchType fetchType) throws HiveSQLException {
    acquire(true, false);
    try {
      if (fetchType == FetchType.QUERY_OUTPUT) {
        return operationManager.getOperationNextRowSet(opHandle, orientation, maxRows);
      }
      return operationManager.getOperationLogRowSet(opHandle, orientation, maxRows, sessionConf);
    } finally {
      release(true, false);
    }
  }

可見

  • HiveSessionImpl.executeStatement是調用ExecuteStatementOperation.run(ExecuteStatementOperation是Operation的一種)
  • HiveSessionImpl.fetchResults是調用OperationManager.getOperationNextRowSet,而後會調用到Operation.getNextRowSet

org.apache.hive.service.cli.operation.OperationManager

  public RowSet getOperationNextRowSet(OperationHandle opHandle,
      FetchOrientation orientation, long maxRows)
          throws HiveSQLException {
    return getOperation(opHandle).getNextRowSet(orientation, maxRows);
  }

 

下面寫詳細看Operation的run和getOperationNextRowSet:

org.apache.hive.service.cli.operation.Operation

  public void run() throws HiveSQLException {
    beforeRun();
    try {
      Metrics metrics = MetricsFactory.getInstance();
      if (metrics != null) {
        try {
          metrics.incrementCounter(MetricsConstant.OPEN_OPERATIONS);
        } catch (Exception e) {
          LOG.warn("Error Reporting open operation to Metrics system", e);
        }
      }
      runInternal();
    } finally {
      afterRun();
    }
  }
  
  public RowSet getNextRowSet() throws HiveSQLException {
    return getNextRowSet(FetchOrientation.FETCH_NEXT, DEFAULT_FETCH_MAX_ROWS);
  }

Operation是一個抽象類,

  • run會調用抽象方法runInternal
  • getNextRowSet會調用抽象方法getNextRowSet

下面會看到這兩個抽象方法在子類中的實現,最終會依賴Driver的run和getResults;

 

1)先看runInternal在子類HiveCommandOperation中被實現:

org.apache.hive.service.cli.operation.HiveCommandOperation

  @Override
  public void runInternal() throws HiveSQLException {
    setState(OperationState.RUNNING);
    try {
      String command = getStatement().trim();
      String[] tokens = statement.split("\\s");
      String commandArgs = command.substring(tokens[0].length()).trim();

      CommandProcessorResponse response = commandProcessor.run(commandArgs);
...

這裏會調用CommandProcessor.run,實際會調用Driver.run(Driver是CommandProcessor的實現類);

 

2)再看getNextRowSet在子類SQLOperation中被實現:

org.apache.hive.service.cli.operation.SQLOperation

  public RowSet getNextRowSet(FetchOrientation orientation, long maxRows)
    throws HiveSQLException {
...
      driver.setMaxRows((int) maxRows);
      if (driver.getResults(convey)) {
        return decode(convey, rowSet);
      }
...

這裏會調用Driver.getResults;

3 Driver

經過上面的代碼分析發現不管是hive命令行執行仍是beeline鏈接thrift server執行,最終都會依賴Driver,

Driver最核心的兩個接口:

  • run
  • getResults

代碼解析

org.apache.hadoop.hive.ql.Driver

  @Override
  public CommandProcessorResponse run(String command)
      throws CommandNeedRetryException {
    return run(command, false);
  }
  
  public CommandProcessorResponse run(String command, boolean alreadyCompiled)
        throws CommandNeedRetryException {
    CommandProcessorResponse cpr = runInternal(command, alreadyCompiled);
...
  private CommandProcessorResponse runInternal(String command, boolean alreadyCompiled)
      throws CommandNeedRetryException {
...
        ret = compileInternal(command, true);
...
      ret = execute(true);
...
  private int compileInternal(String command, boolean deferClose) {
...
      ret = compile(command, true, deferClose);
...
  public int compile(String command, boolean resetTaskIds, boolean deferClose) {
...
      plan = new QueryPlan(queryStr, sem, perfLogger.getStartTime(PerfLogger.DRIVER_RUN), queryId,
        queryState.getHiveOperation(), schema);
...
  public int execute(boolean deferClose) throws CommandNeedRetryException {
...
      // Add root Tasks to runnable
      for (Task<? extends Serializable> tsk : plan.getRootTasks()) {
        // This should never happen, if it does, it's a bug with the potential to produce
        // incorrect results.
        assert tsk.getParentTasks() == null || tsk.getParentTasks().isEmpty();
        driverCxt.addToRunnable(tsk);
      }
...
      // Loop while you either have tasks running, or tasks queued up
      while (driverCxt.isRunning()) {

        // Launch upto maxthreads tasks
        Task<? extends Serializable> task;
        while ((task = driverCxt.getRunnable(maxthreads)) != null) {
          TaskRunner runner = launchTask(task, queryId, noName, jobname, jobs, driverCxt);
          if (!runner.isRunning()) {
            break;
          }
        }

        // poll the Tasks to see which one completed
        TaskRunner tskRun = driverCxt.pollFinished();
        if (tskRun == null) {
          continue;
        }
        hookContext.addCompleteTask(tskRun);
        queryDisplay.setTaskResult(tskRun.getTask().getId(), tskRun.getTaskResult());

        Task<? extends Serializable> tsk = tskRun.getTask();
        TaskResult result = tskRun.getTaskResult();
...
        if (tsk.getChildTasks() != null) {
          for (Task<? extends Serializable> child : tsk.getChildTasks()) {
            if (DriverContext.isLaunchable(child)) {
              driverCxt.addToRunnable(child);
            }
          }
        }
      }

  public boolean getResults(List res) throws IOException, CommandNeedRetryException {
    if (driverState == DriverState.DESTROYED || driverState == DriverState.CLOSED) {
      throw new IOException("FAILED: query has been cancelled, closed, or destroyed.");
    }

    if (isFetchingTable()) {
      /**
       * If resultset serialization to thrift object is enabled, and if the destination table is
       * indeed written using ThriftJDBCBinarySerDe, read one row from the output sequence file,
       * since it is a blob of row batches.
       */
      if (fetchTask.getWork().isUsingThriftJDBCBinarySerDe()) {
        maxRows = 1;
      }
      fetchTask.setMaxRows(maxRows);
      return fetchTask.fetch(res);
    }
...
  • Driver的run會調用runInternal,runInternal中會先compileInternal編譯sql並生成QueryPlan,而後調用execute執行QueryPlan中的全部task;
  • Driver的getResults會調用FetchTask的fetch來獲取結果;

Hive SQL解析過程詳見: http://www.javashuo.com/article/p-oeaeytzt-ea.html

相關文章
相關標籤/搜索