接着上一篇來講執行入口的分析,CliDriver
最終將用戶指令command
提交給了Driver
的run
方法(針對經常使用查詢語句而言),在這裏用戶的command
將會被編譯,優化並生成MapReduce任務進行執行。因此Driver
也是Hive的核心,他扮演了一個將用戶查詢和MapReduce Task轉換並執行的角色,下面咱們就看看Hive是如何一步一步操做的。node
在說run
方法以前,因爲CliDriver
須要獲得一個Driver
類的實例,因此首先看一下Driver
的構造方法。Driver
有三個構造函數,主要功能也就是設置類的實例變量HiveConf
。SessionState
前文已經有介紹,SessionState
返回了當前會話的一些信息,提取配置文件,初始化Driver
實例。apache
public Driver() { if (SessionState.get() != null) { conf = SessionState.get().getConf(); } }
下面就開始解析Driver
內部對用戶命令command
的處理流程,首先是入口函數run
. run
函數經過調用runInternal
方法處理用戶指令,在處理完成runInternal
以後,若是執行過程當中出現出錯,還附加了對錯誤碼和錯誤信息的處理,此處省略。segmentfault
public CommandProcessorResponse run(String command) throws CommandNeedRetryException { return run(command, false); } public CommandProcessorResponse run(String command, boolean alreadyCompiled) throws CommandNeedRetryException { CommandProcessorResponse cpr = runInternal(command, alreadyCompiled); ... }
runInternal
方法包含的主要操做有,處理preRunHook
(具體功能能夠顧名思義哦),compile
, execute
, 處理postRunHook
以及構造CommandProcessorResponse
並返回。下面依次從代碼的角度分析這幾步的具體操做:併發
處理preRunHook
,首先根據配置文件和指令,構造用戶Hook執行的上下文hookContext
,而後讀取用戶PreRunHook
配置指定的類(字符串), 此配置項對應於Hive配置文件當中的「hive.exec.driver.run.hooks」
一項,利用反射機制Class.forName
實例化PreRunHook
類實例(getHook
函數完成),依次執行各鉤子的功能(preDriverRun
函數完成)。app
HiveDriverRunHookContext hookContext = new HiveDriverRunHookContextImpl(conf, command); // Get all the driver run hooks and pre-execute them. List<HiveDriverRunHook> driverRunHooks; try{ driverRunHooks = getHooks(HiveConf.ConfVars.HIVE_DRIVER_RUN_HOOKS, HiveDriverRunHook.class); for (HiveDriverRunHook driverRunHook : driverRunHooks) { driverRunHook.preDriverRun(hookContext); } }catch (Exception e) { errorMessage = "FAILED: Hive Internal Error: " + Utilities.getNameMessage(e); SQLState = ErrorMsg.findSQLState(e.getMessage()); downstreamError = e; console.printError(errorMessage + "\n" + org.apache.hadoop.util.StringUtils.stringifyException(e)); return createProcessorResponse(12); }
編譯,直接調用complieInternal
函數編譯用戶指令,將指令翻譯成MapReduce
任務。這一個過程涉及的內容比較多,也很重要,後面將單獨用一篇文章說明編譯優化的過程。這裏借用網上的一幅圖,幫助對compile
的功能有個總體的理解,參考文獻: Hive實現原理.pdf。
ide
在運行以前還有獲取鎖的操做,因爲新版本添加了ACID
事務的支持,還設置了事務管理器等,目前還沒詳細的弄懂這塊的處理邏輯和功能,先放一下,主要看下execute
函數執行了什麼操做,也就是如何根據編譯結果執行任務的。函數
首先是從編譯獲得的查詢計劃QueryPlan
裏獲取基本的查詢ID,查詢字串等信息,並在回話狀態中把當前查詢字串和查詢計劃插入到歷史記錄中。oop
String queryId = plan.getQueryId(); String queryStr = plan.getQueryStr(); if (SessionState.get() != null) { SessionState.get().getHiveHistory().startQuery(queryStr, conf.getVar(HiveConf.ConfVars.HIVEQUERYID)); SessionState.get().getHiveHistory().logPlanProgress(plan); }
與PreRunHook
相似,在執行任務以前,檢查並執行用戶設定的"hive.pre.exec.hooks"
,此處再也不詳述。完成這部操做以後,向控制檯簡單的打印一些信息以後,就開始正式執行任務了。源碼分析
DriverContextpost
建立執行上下文DriverContext,它記錄的信息主要包括可執行的任務隊列(Queue<Task> runnable), 正在運行的任務隊列(Queue<TaskRunner> running), 當前啓動的任務數curJobNo, statsTasks(Map<String, StatsTask>, what used for?)以及語義分析Semantic Analyzers依賴的Context對象等。
DriverContext driverCxt = new DriverContext(ctx); driverCxt.prepare(plan); public DriverContext(Context ctx) { this.runnable = new ConcurrentLinkedQueue<Task<? extends Serializable>>(); this.running = new LinkedBlockingQueue<TaskRunner>(); this.ctx = ctx; } public void prepare(QueryPlan plan) { // extract stats keys from StatsTask List<Task<?>> rootTasks = plan.getRootTasks(); NodeUtils.iterateTask(rootTasks, StatsTask.class, new Function<StatsTask>() { public void apply(StatsTask statsTask) { statsTasks.put(statsTask.getWork().getAggKey(), statsTask); } }); }
順便提一下Context
對象,在Context
的源碼註釋當中提到, 每個查詢都要對應一個Context
對象,不一樣查詢之間Context
對象是不可重用的, 執行完一個查詢以後須要clear
對應的Context
對象(主要是語法分析用到的temp
文件目錄),在Hive的實現中也是這麼作的。回顧上一篇文章,從CliDriver
循環的讀取用戶指令,每讀取到一條指令都要進行processLine
,processCmd
,processLocalCmd
的處理,而後提交給Driver
編譯解析。Context
對象是在compile
函數中實例化的,也就說每一條查詢都會建立一個Context
對象,當執行完一條查詢從Driver
返回到processLocalCmd
中時,都會調用Driver
對象的close
函數對Context
進行清理(ctx.clear
),這樣就保證了一條查詢對應一個Context
對象。對於DriverContext
對象也是相似,在execute
函數中實例化,Driver
的close
函數中關閉(driverCtx.shutdown
),和Context
相比一個用來輔助語義分析,一個用來輔助任務執行。還有,咱們發如今processCmd
函數中經過CommandProcessorFactory
設置了Driver
類的實例對象,也就是每一條查詢都須要一個Driver
對象進行處理,那這些Driver
對象之間是否能夠共享呢?答案是確定的,在CommandProcessorFactory
中維持了一個HiveConf
到Driver
的Map,每次獲取Driver
對象時都是根據conf對象來查找到的,若是不存在才從新建立一個Driver
對象,而HiveConf
對象又是在CliDriver
的run
方法中實例化的,與一個CliSessionState
對應,因此Driver
實例應該是與一個Cli的會話對應,同一個會話內部的查詢共享一個Driver
實例。
Manage and run all tasks
扯得有點遠,繼續看Driver
對查詢任務的執行,在實例化DriverContext
對象以後,就將查詢計劃plan中的任務放入到DriverContext
的runnable
隊列中。
for (Task<? extends Serializable> tsk : plan.getRootTasks()) { assert tsk.getParentTasks() == null || tsk.getParentTasks().isEmpty(); driverCxt.addToRunnable(tsk); }
下面就開始運行任務Task,整個任務的運行由一個循環控制,只要DriverContext
沒有被關閉,而且runnable
和running
隊列中還有任務就一直循環。爲了方便描述,下文將一次對任務循環過程的每一步進行說明,這裏只給出循環判斷條件。
while (!destroyed && driverCxt.isRunning()) {} public synchronized boolean isRunning() { return !shutdown && (!running.isEmpty() || !runnable.isEmpty()); }
1. Put all the tasks into runnable queue
在循環內部,首先不停的從runnable
隊列中抽取隊首的任務,而後launch
該任務。
while (!destroyed && driverCxt.isRunning()) { // Launch upto maxthreads tasks Task<? extends Serializable> task; while ((task = driverCxt.getRunnable(maxthreads)) != null) { perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.TASK + task.getName() + "." + task.getId()); TaskRunner runner = launchTask(task, queryId, noName, jobname, jobs, driverCxt); if (!runner.isRunning()) { break; } }
2. Launch a task
在launch一個任務的過程當中,根據任務類型(是否是MapReduceTask或者ConditialTask),作一些操做(don't know what used for),將DriverContext
當前已啓動任務數curJobNo
加1,而後根據配置文件conf,查詢計劃plan,執行上下文cxt(DriverContext
),初始化一個任務,接着建立任務結果TaskResult
對象和任務執行對象TaskRunner
,將TaskRunner
放入DriverContext
的running
隊列中,表示該任務正在運行。最後,根據配置文件指定的任務運行模式,便是否支持並行運行,啓動任務。
private TaskRunner launchTask(Task<? extends Serializable> tsk, String queryId, boolean noName, String jobname, int jobs, DriverContext cxt) throws HiveException { if (SessionState.get() != null) { SessionState.get().getHiveHistory().startTask(queryId, tsk, tsk.getClass().getName()); } if (tsk.isMapRedTask() && !(tsk instanceof ConditionalTask)) { if (noName) { conf.setVar(HiveConf.ConfVars.HADOOPJOBNAME, jobname + "(" + tsk.getId() + ")"); } conf.set("mapreduce.workflow.node.name", tsk.getId()); Utilities.setWorkflowAdjacencies(conf, plan); cxt.incCurJobNo(1); console.printInfo("Launching Job " + cxt.getCurJobNo() + " out of " + jobs); } tsk.initialize(conf, plan, cxt); TaskResult tskRes = new TaskResult(); TaskRunner tskRun = new TaskRunner(tsk, tskRes); cxt.launching(tskRun); // Launch Task if (HiveConf.getBoolVar(conf, HiveConf.ConfVars.EXECPARALLEL) && (tsk.isMapRedTask() || (tsk instanceof MoveTask))) { // Launch it in the parallel mode, as a separate thread only for MR tasks //併發執行 if (LOG.isInfoEnabled()){ LOG.info("Starting task [" + tsk + "] in parallel"); } tskRun.setOperationLog(OperationLog.getCurrentOperationLog()); tskRun.start(); } else { if (LOG.isInfoEnabled()){ LOG.info("Starting task [" + tsk + "] in serial mode"); } //順序執行 tskRun.runSequential(); } return tskRun; }
3. Poll a finished task
完成任務的啓動以後,將調用DriverContext
的pollFinished
函數,查看任務是否執行完畢,若是有任務完成,則將該任務出隊,並將已完成的任務添加到鉤子上下文HookContext
中。
TaskRunner tskRun = driverCxt.pollFinished(); if (tskRun == null) { continue; } hookContext.addCompleteTask(tskRun); public synchronized TaskRunner pollFinished() throws InterruptedException { while (!shutdown) { Iterator<TaskRunner> it = running.iterator(); while (it.hasNext()) { TaskRunner runner = it.next(); if (runner != null && !runner.isRunning()) { it.remove(); return runner; } } wait(SLEEP_TIME); } return null; }
4. Handle the finished task
針對一個已完成的任務,首先獲取任務的結果對象TaskResult
和退出狀態, 若是任務非正常退出,則第一步先判斷任務是否支持Retry
,若是支持,關閉當前DriverContext
,設置jobTracker
爲初始狀態,拋出CommandNeedRetry
異常,這個異常會在CliDriver
的processLocalCmd
中捕獲,而後嘗試從新處理該命令,參見上一篇文章的說明。若是任務不支持Retry
,則啓動備份任務backupTask
(相似於回滾?),並添加到runnable
隊列,在下次循環過程當中執行。若是沒有backupTask
,則查找用戶配置「hive.exec.failure.hooks」
,根據用戶配置相應出錯處理,並關閉DriverContext
, 返回退出碼。
Task<? extends Serializable> tsk = tskRun.getTask(); TaskResult result = tskRun.getTaskResult(); int exitVal = result.getExitVal(); if (exitVal != 0) { if (tsk.ifRetryCmdWhenFail()) { driverCxt.shutdown(); // in case we decided to run everything in local mode, restore the // the jobtracker setting to its initial value ctx.restoreOriginalTracker(); throw new CommandNeedRetryException(); } Task<? extends Serializable> backupTask = tsk.getAndInitBackupTask(); if (backupTask != null) { setErrorMsgAndDetail(exitVal, result.getTaskError(), tsk); console.printError(errorMessage); errorMessage = "ATTEMPT: Execute BackupTask: " + backupTask.getClass().getName(); console.printError(errorMessage); // add backup task to runnable if (DriverContext.isLaunchable(backupTask)) { driverCxt.addToRunnable(backupTask); } continue; } else { hookContext.setHookType(HookContext.HookType.ON_FAILURE_HOOK); // Get all the failure execution hooks and execute them. for (Hook ofh : getHooks(HiveConf.ConfVars.ONFAILUREHOOKS)) { perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.FAILURE_HOOK + ofh.getClass().getName()); ((ExecuteWithHookContext) ofh).run(hookContext); perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.FAILURE_HOOK + ofh.getClass().getName()); } setErrorMsgAndDetail(exitVal, result.getTaskError(), tsk); SQLState = "08S01"; console.printError(errorMessage); driverCxt.shutdown(); // in case we decided to run everything in local mode, restore the // the jobtracker setting to its initial value ctx.restoreOriginalTracker(); return exitVal; } }
5. Find children tasks
最後調用DriverContext
的finished
函數,對完成的任務進行處理(處理邏輯沒看懂), 而後判斷當前任務是否包含子任務,若是包含則依次將子任務添加到runnable
隊列,下次循環中被啓動執行。
driverCxt.finished(tskRun); if (tsk.getChildTasks() != for (Task<? extends Serializable> child : tsk.getChildTasks()) { if (DriverContext.isLaunchable(child)) { driverCxt.addToRunnable(child); } } }
6. Do something before return
當全部的任務都完成以後,若是發現DriverContext
已經被關閉,代表任務取消,打印信息並返回對應的狀態碼。最後清楚任務執行中不完整的輸出,並加載執行用戶指定的"hive.exec.post.hooks"
,完成對應的鉤子功能。對於執行過程當中出現的異常,CommandNeedRetryException
將會直接向上拋出,其餘Exception
,直接打印出錯信息。不管是否發生異常,只要可以獲取到任務執行過程當中的MapReduce狀態信息,都將在finally語句塊中打印。(限於篇幅,此處只給出部分代碼,鉤子的處理方式前文已經給出再也不詳述,異常處理的部分,有興趣的執行查看)
//判斷DriverContext是否被關閉 if (driverCxt.isShutdown()) { SQLState = "HY008"; errorMessage = "FAILED: Operation cancelled"; console.printError(errorMessage); return 1000; } //刪除不完整的輸出 HashSet<WriteEntity> remOutputs = new HashSet<WriteEntity>(); for (WriteEntity output : plan.getOutputs()) { if (!output.isComplete()) { remOutputs.add(output); } } for (WriteEntity output : remOutputs) { plan.getOutputs().remove(output); }
最後的最後,若是全部的任務都正常執行完畢,這次查詢完成,plan.setDone(),打印OK~
還沒完~當execute
函數執行完成後,返回到runInternal
函數中,接着釋放鎖,與以前的PreRunHook
相對應,還須要加載相應用戶自定義的PostRunHook
(代碼再也不重複),最後才調用creatProcessorResponse
,建立響應對象CommandProcessorResponse
並返回。
private CommandProcessorResponse createProcessorResponse(int ret) { return new CommandProcessorResponse(ret, errorMessage, SQLState, downstreamError); }
想更一進步的支持我,請掃描下方的二維碼,你懂的~