Hive Driver源碼執行流程分析

引言

接着上一篇來講執行入口的分析CliDriver最終將用戶指令command提交給了Driverrun方法(針對經常使用查詢語句而言),在這裏用戶的command將會被編譯,優化並生成MapReduce任務進行執行。因此Driver也是Hive的核心,他扮演了一個將用戶查詢和MapReduce Task轉換並執行的角色,下面咱們就看看Hive是如何一步一步操做的。node

源碼分析

在說run方法以前,因爲CliDriver須要獲得一個Driver類的實例,因此首先看一下Driver的構造方法。Driver有三個構造函數,主要功能也就是設置類的實例變量HiveConfSessionState前文已經有介紹,SessionState返回了當前會話的一些信息,提取配置文件,初始化Driver實例。apache

public Driver() {
    if (SessionState.get() != null) {
      conf = SessionState.get().getConf();
    }
}

run

下面就開始解析Driver內部對用戶命令command的處理流程,首先是入口函數run. run函數經過調用runInternal方法處理用戶指令,在處理完成runInternal以後,若是執行過程當中出現出錯,還附加了對錯誤碼和錯誤信息的處理,此處省略。segmentfault

public CommandProcessorResponse run(String command)
      throws CommandNeedRetryException {
    return run(command, false);
}

public CommandProcessorResponse run(String command, boolean alreadyCompiled)
        throws CommandNeedRetryException {
    CommandProcessorResponse cpr = runInternal(command, alreadyCompiled);
     ...
}

runInternal

runInternal方法包含的主要操做有,處理preRunHook(具體功能能夠顧名思義哦),compileexecute, 處理postRunHook以及構造CommandProcessorResponse並返回。下面依次從代碼的角度分析這幾步的具體操做:併發

PreRunHook

處理preRunHook,首先根據配置文件和指令,構造用戶Hook執行的上下文hookContext,而後讀取用戶PreRunHook配置指定的類(字符串), 此配置項對應於Hive配置文件當中的「hive.exec.driver.run.hooks」一項,利用反射機制Class.forName實例化PreRunHook類實例(getHook函數完成),依次執行各鉤子的功能(preDriverRun函數完成)。app

HiveDriverRunHookContext hookContext 
    = new HiveDriverRunHookContextImpl(conf, command);
    // Get all the driver run hooks and pre-execute them.
List<HiveDriverRunHook> driverRunHooks;
try{
      driverRunHooks = getHooks(HiveConf.ConfVars.HIVE_DRIVER_RUN_HOOKS,
      HiveDriverRunHook.class);
      for (HiveDriverRunHook driverRunHook : driverRunHooks) {
          driverRunHook.preDriverRun(hookContext);
      }
}catch (Exception e) {
  errorMessage = "FAILED: Hive Internal Error: " + Utilities.getNameMessage(e);
  SQLState = ErrorMsg.findSQLState(e.getMessage());
  downstreamError = e;
  console.printError(errorMessage + "\n"
          + org.apache.hadoop.util.StringUtils.stringifyException(e));
  return createProcessorResponse(12);
}

compile

編譯,直接調用complieInternal函數編譯用戶指令,將指令翻譯成MapReduce任務。這一個過程涉及的內容比較多,也很重要,後面將單獨用一篇文章說明編譯優化的過程。這裏借用網上的一幅圖,幫助對compile的功能有個總體的理解,參考文獻: Hive實現原理.pdf。
編譯流程ide

execute

在運行以前還有獲取鎖的操做,因爲新版本添加了ACID事務的支持,還設置了事務管理器等,目前還沒詳細的弄懂這塊的處理邏輯和功能,先放一下,主要看下execute函數執行了什麼操做,也就是如何根據編譯結果執行任務的。函數

首先是從編譯獲得的查詢計劃QueryPlan裏獲取基本的查詢ID,查詢字串等信息,並在回話狀態中把當前查詢字串和查詢計劃插入到歷史記錄中。oop

String queryId = plan.getQueryId();
String queryStr = plan.getQueryStr();

if (SessionState.get() != null) {
   SessionState.get().getHiveHistory().startQuery(queryStr,
       conf.getVar(HiveConf.ConfVars.HIVEQUERYID));
   SessionState.get().getHiveHistory().logPlanProgress(plan);
}

PreRunHook相似,在執行任務以前,檢查並執行用戶設定的"hive.pre.exec.hooks",此處再也不詳述。完成這部操做以後,向控制檯簡單的打印一些信息以後,就開始正式執行任務了。源碼分析

  • DriverContextpost

建立執行上下文DriverContext,它記錄的信息主要包括可執行的任務隊列(Queue<Task> runnable), 正在運行的任務隊列(Queue<TaskRunner> running), 當前啓動的任務數curJobNo, statsTasks(Map<String, StatsTask>, what used for?)以及語義分析Semantic Analyzers依賴的Context對象等。

DriverContext driverCxt = new DriverContext(ctx); driverCxt.prepare(plan);

public DriverContext(Context ctx) {
    this.runnable = new ConcurrentLinkedQueue<Task<? extends Serializable>>();
    this.running = new LinkedBlockingQueue<TaskRunner>();
    this.ctx = ctx;
 }

public void prepare(QueryPlan plan) {
    // extract stats keys from StatsTask
    List<Task<?>> rootTasks = plan.getRootTasks();
    NodeUtils.iterateTask(rootTasks, StatsTask.class, new Function<StatsTask>()  
    {
      public void apply(StatsTask statsTask) {
        statsTasks.put(statsTask.getWork().getAggKey(), statsTask);
      }
    });
 }

順便提一下Context對象,在Context的源碼註釋當中提到, 每個查詢都要對應一個Context對象,不一樣查詢之間Context對象是不可重用的, 執行完一個查詢以後須要clear對應的Context對象(主要是語法分析用到的temp文件目錄),在Hive的實現中也是這麼作的。回顧上一篇文章,從CliDriver循環的讀取用戶指令,每讀取到一條指令都要進行processLineprocessCmdprocessLocalCmd的處理,而後提交給Driver編譯解析。Context對象是在compile函數中實例化的,也就說每一條查詢都會建立一個Context對象,當執行完一條查詢從Driver返回到processLocalCmd中時,都會調用Driver對象的close函數對Context進行清理(ctx.clear),這樣就保證了一條查詢對應一個Context對象。對於DriverContext對象也是相似,在execute函數中實例化,Driverclose函數中關閉(driverCtx.shutdown),和Context相比一個用來輔助語義分析,一個用來輔助任務執行。還有,咱們發如今processCmd函數中經過CommandProcessorFactory設置了Driver類的實例對象,也就是每一條查詢都須要一個Driver對象進行處理,那這些Driver對象之間是否能夠共享呢?答案是確定的,在CommandProcessorFactory中維持了一個HiveConfDriver的Map,每次獲取Driver對象時都是根據conf對象來查找到的,若是不存在才從新建立一個Driver對象,而HiveConf對象又是在CliDriverrun方法中實例化的,與一個CliSessionState對應,因此Driver實例應該是與一個Cli的會話對應,同一個會話內部的查詢共享一個Driver實例。

  • Manage and run all tasks

扯得有點遠,繼續看Driver對查詢任務的執行,在實例化DriverContext對象以後,就將查詢計劃plan中的任務放入到DriverContextrunnable隊列中。

for (Task<? extends Serializable> tsk : plan.getRootTasks()) {
    assert tsk.getParentTasks() == null || tsk.getParentTasks().isEmpty();
    driverCxt.addToRunnable(tsk);
}

下面就開始運行任務Task,整個任務的運行由一個循環控制,只要DriverContext沒有被關閉,而且runnablerunning隊列中還有任務就一直循環。爲了方便描述,下文將一次對任務循環過程的每一步進行說明,這裏只給出循環判斷條件。

while (!destroyed && driverCxt.isRunning()) {}

public synchronized boolean isRunning() {
    return !shutdown && (!running.isEmpty() || !runnable.isEmpty());
}

1. Put all the tasks into runnable queue

在循環內部,首先不停的從runnable隊列中抽取隊首的任務,而後launch該任務。

while (!destroyed && driverCxt.isRunning()) {
    // Launch upto maxthreads tasks
    Task<? extends Serializable> task;
    while ((task = driverCxt.getRunnable(maxthreads)) != null) {
        perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.TASK + task.getName() + "." + task.getId());
        TaskRunner runner = launchTask(task, queryId, noName, jobname, jobs, driverCxt);
        if (!runner.isRunning()) {
            break;
       }
 }

2. Launch a task

在launch一個任務的過程當中,根據任務類型(是否是MapReduceTask或者ConditialTask),作一些操做(don't know what used for),將DriverContext當前已啓動任務數curJobNo加1,而後根據配置文件conf,查詢計劃plan,執行上下文cxt(DriverContext),初始化一個任務,接着建立任務結果TaskResult對象和任務執行對象TaskRunner,將TaskRunner放入DriverContextrunning隊列中,表示該任務正在運行。最後,根據配置文件指定的任務運行模式,便是否支持並行運行,啓動任務。

private TaskRunner launchTask(Task<? extends Serializable> tsk, 
    String queryId, boolean noName,
    String jobname, int jobs, DriverContext cxt) throws HiveException {
    
    if (SessionState.get() != null) {
      SessionState.get().getHiveHistory().startTask(queryId, tsk, 
                         tsk.getClass().getName());
    }
    
    if (tsk.isMapRedTask() && !(tsk instanceof ConditionalTask)) {
      if (noName) {
        conf.setVar(HiveConf.ConfVars.HADOOPJOBNAME, jobname + "(" + 
        tsk.getId() + ")");
      }
      conf.set("mapreduce.workflow.node.name", tsk.getId());
      Utilities.setWorkflowAdjacencies(conf, plan);
      cxt.incCurJobNo(1);
      console.printInfo("Launching Job " + cxt.getCurJobNo() + " out of " + jobs);
    }
    
    tsk.initialize(conf, plan, cxt);
    TaskResult tskRes = new TaskResult();
    TaskRunner tskRun = new TaskRunner(tsk, tskRes);

    cxt.launching(tskRun);
    // Launch Task
    if (HiveConf.getBoolVar(conf, HiveConf.ConfVars.EXECPARALLEL)
        && (tsk.isMapRedTask() || (tsk instanceof MoveTask))) {
      // Launch it in the parallel mode, as a separate thread only for MR tasks
     //併發執行
      if (LOG.isInfoEnabled()){
        LOG.info("Starting task [" + tsk + "] in parallel");
      }
      tskRun.setOperationLog(OperationLog.getCurrentOperationLog());
      tskRun.start();
    } else {
      if (LOG.isInfoEnabled()){
        LOG.info("Starting task [" + tsk + "] in serial mode");
      }
     //順序執行
      tskRun.runSequential();
    }
    return tskRun;
  }

3. Poll a finished task

完成任務的啓動以後,將調用DriverContextpollFinished函數,查看任務是否執行完畢,若是有任務完成,則將該任務出隊,並將已完成的任務添加到鉤子上下文HookContext中。

TaskRunner tskRun = driverCxt.pollFinished();
        if (tskRun == null) {
          continue;
}

hookContext.addCompleteTask(tskRun);

public synchronized TaskRunner pollFinished() throws InterruptedException {
    while (!shutdown) {
      Iterator<TaskRunner> it = running.iterator();
      while (it.hasNext()) {
        TaskRunner runner = it.next();
        if (runner != null && !runner.isRunning()) {
          it.remove();
          return runner;
        }
      }
      wait(SLEEP_TIME);
    }
    return null;
  }

4. Handle the finished task

針對一個已完成的任務,首先獲取任務的結果對象TaskResult和退出狀態, 若是任務非正常退出,則第一步先判斷任務是否支持Retry,若是支持,關閉當前DriverContext,設置jobTracker爲初始狀態,拋出CommandNeedRetry異常,這個異常會在CliDriverprocessLocalCmd中捕獲,而後嘗試從新處理該命令,參見上一篇文章的說明。若是任務不支持Retry,則啓動備份任務backupTask(相似於回滾?),並添加到runnable隊列,在下次循環過程當中執行。若是沒有backupTask,則查找用戶配置「hive.exec.failure.hooks」,根據用戶配置相應出錯處理,並關閉DriverContext, 返回退出碼。

Task<? extends Serializable> tsk = tskRun.getTask();
TaskResult result = tskRun.getTaskResult();

int exitVal = result.getExitVal();
   if (exitVal != 0) {
       if (tsk.ifRetryCmdWhenFail()) {
          driverCxt.shutdown();
          // in case we decided to run everything in local mode, restore the
          // the jobtracker setting to its initial value
           ctx.restoreOriginalTracker();
           throw new CommandNeedRetryException();
       }
       Task<? extends Serializable> backupTask = tsk.getAndInitBackupTask();
       if (backupTask != null) {
           setErrorMsgAndDetail(exitVal, result.getTaskError(), tsk);
           console.printError(errorMessage);
           errorMessage = "ATTEMPT: Execute BackupTask: " + backupTask.getClass().getName();
           console.printError(errorMessage);

               // add backup task to runnable
           if (DriverContext.isLaunchable(backupTask)) {
              driverCxt.addToRunnable(backupTask);
           }
           
           continue;
           
       } else {
            hookContext.setHookType(HookContext.HookType.ON_FAILURE_HOOK);
            // Get all the failure execution hooks and execute them.
            for (Hook ofh : getHooks(HiveConf.ConfVars.ONFAILUREHOOKS)) {
              perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.FAILURE_HOOK + ofh.getClass().getName());

              ((ExecuteWithHookContext) ofh).run(hookContext);

              perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.FAILURE_HOOK + ofh.getClass().getName());
            }
            setErrorMsgAndDetail(exitVal, result.getTaskError(), tsk);
            SQLState = "08S01";
            console.printError(errorMessage);
            driverCxt.shutdown();
            // in case we decided to run everything in local mode, restore the
            // the jobtracker setting to its initial value
            ctx.restoreOriginalTracker();
            return exitVal;
          }
  }

5. Find children tasks

最後調用DriverContextfinished函數,對完成的任務進行處理(處理邏輯沒看懂), 而後判斷當前任務是否包含子任務,若是包含則依次將子任務添加到runnable隊列,下次循環中被啓動執行。

driverCxt.finished(tskRun);

if (tsk.getChildTasks() != 
    for (Task<? extends Serializable> child : tsk.getChildTasks()) {
        if (DriverContext.isLaunchable(child)) {
           driverCxt.addToRunnable(child);
        }
    }
}

6. Do something before return

當全部的任務都完成以後,若是發現DriverContext已經被關閉,代表任務取消,打印信息並返回對應的狀態碼。最後清楚任務執行中不完整的輸出,並加載執行用戶指定的"hive.exec.post.hooks",完成對應的鉤子功能。對於執行過程當中出現的異常,CommandNeedRetryException將會直接向上拋出,其餘Exception,直接打印出錯信息。不管是否發生異常,只要可以獲取到任務執行過程當中的MapReduce狀態信息,都將在finally語句塊中打印。(限於篇幅,此處只給出部分代碼,鉤子的處理方式前文已經給出再也不詳述,異常處理的部分,有興趣的執行查看)

//判斷DriverContext是否被關閉
if (driverCxt.isShutdown()) {
        SQLState = "HY008";
        errorMessage = "FAILED: Operation cancelled";
        console.printError(errorMessage);
        return 1000;
}

//刪除不完整的輸出
HashSet<WriteEntity> remOutputs = new HashSet<WriteEntity>();
      for (WriteEntity output : plan.getOutputs()) {
        if (!output.isComplete()) {
          remOutputs.add(output);
        }
 }

 for (WriteEntity output : remOutputs) {
     plan.getOutputs().remove(output);
 }

最後的最後,若是全部的任務都正常執行完畢,這次查詢完成,plan.setDone(),打印OK~

PostRunHook and return

還沒完~當execute函數執行完成後,返回到runInternal函數中,接着釋放鎖,與以前的PreRunHook相對應,還須要加載相應用戶自定義的PostRunHook(代碼再也不重複),最後才調用creatProcessorResponse,建立響應對象CommandProcessorResponse並返回。

private CommandProcessorResponse createProcessorResponse(int ret) {
    return new CommandProcessorResponse(ret, errorMessage, SQLState, downstreamError);
 }

想更一進步的支持我,請掃描下方的二維碼,你懂的~
圖片描述

相關文章
相關標籤/搜索