SQL->AST(Abstract Syntax Tree)->Task(MapRedTask,FetchTask)->QueryPlan(Task集合)->Job(Yarn)apache
SQL解析會在兩個地方進行:api
org.apache.hadoop.hive.ql.Driverapp
public int compile(String command, boolean resetTaskIds, boolean deferClose) { ... ParseDriver pd = new ParseDriver(); ASTNode tree = pd.parse(command, ctx); tree = ParseUtils.findRootNonNullToken(tree); ... BaseSemanticAnalyzer sem = SemanticAnalyzerFactory.get(queryState, tree); ... sem.analyze(tree, ctx); ... // Record any ACID compliant FileSinkOperators we saw so we can add our transaction ID to // them later. acidSinks = sem.getAcidFileSinks(); LOG.info("Semantic Analysis Completed"); // validate the plan sem.validate(); acidInQuery = sem.hasAcidInQuery(); perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.ANALYZE); if (isInterrupted()) { return handleInterruption("after analyzing query."); } // get the output schema schema = getSchema(sem, conf); plan = new QueryPlan(queryStr, sem, perfLogger.getStartTime(PerfLogger.DRIVER_RUN), queryId, queryState.getHiveOperation(), schema); ...
compile過程爲先由ParseDriver將SQL轉換爲ASTNode,而後由BaseSemanticAnalyzer對ASTNode進行分析,最後將BaseSemanticAnalyzer傳入QueryPlan構造函數來建立QueryPlan;jvm
org.apache.hadoop.hive.ql.parse.ParseDriver函數
public ASTNode parse(String command, Context ctx, boolean setTokenRewriteStream) throws ParseException { if (LOG.isDebugEnabled()) { LOG.debug("Parsing command: " + command); } HiveLexerX lexer = new HiveLexerX(new ANTLRNoCaseStringStream(command)); TokenRewriteStream tokens = new TokenRewriteStream(lexer); if (ctx != null) { if ( setTokenRewriteStream) { ctx.setTokenRewriteStream(tokens); } lexer.setHiveConf(ctx.getConf()); } HiveParser parser = new HiveParser(tokens); if (ctx != null) { parser.setHiveConf(ctx.getConf()); } parser.setTreeAdaptor(adaptor); HiveParser.statement_return r = null; try { r = parser.statement(); } catch (RecognitionException e) { e.printStackTrace(); throw new ParseException(parser.errors); } if (lexer.getErrors().size() == 0 && parser.errors.size() == 0) { LOG.debug("Parse Completed"); } else if (lexer.getErrors().size() != 0) { throw new ParseException(lexer.getErrors()); } else { throw new ParseException(parser.errors); } ASTNode tree = (ASTNode) r.getTree(); tree.setUnknownTokenBoundaries(); return tree; }
org.apache.hadoop.hive.ql.parse.BaseSemanticAnalyzeroop
public void analyze(ASTNode ast, Context ctx) throws SemanticException { initCtx(ctx); init(true); analyzeInternal(ast); }
其中analyzeInternal是抽象方法,由不一樣的子類實現,好比DDLSemanticAnalyzer,SemanticAnalyzer,UpdateDeleteSemanticAnalyzer,ExplainSemanticAnalyzer等;
analyzeInternal主要的工做是將ASTNode轉化爲Task,包括可能的optimize,過程比較複雜,這裏不貼代碼;fetch
org.apache.hadoop.hive.ql.QueryPlanthis
public QueryPlan(String queryString, BaseSemanticAnalyzer sem, Long startTime, String queryId, HiveOperation operation, Schema resultSchema) { this.queryString = queryString; rootTasks = new ArrayList<Task<? extends Serializable>>(sem.getAllRootTasks()); reducerTimeStatsPerJobList = new ArrayList<ReducerTimeStatsPerJob>(); fetchTask = sem.getFetchTask(); // Note that inputs and outputs can be changed when the query gets executed inputs = sem.getAllInputs(); outputs = sem.getAllOutputs(); linfo = sem.getLineageInfo(); tableAccessInfo = sem.getTableAccessInfo(); columnAccessInfo = sem.getColumnAccessInfo(); idToTableNameMap = new HashMap<String, String>(sem.getIdToTableNameMap()); this.queryId = queryId == null ? makeQueryId() : queryId; query = new org.apache.hadoop.hive.ql.plan.api.Query(); query.setQueryId(this.queryId); query.putToQueryAttributes("queryString", this.queryString); queryProperties = sem.getQueryProperties(); queryStartTime = startTime; this.operation = operation; this.autoCommitValue = sem.getAutoCommitValue(); this.resultSchema = resultSchema; }
可見只是簡單的將BaseSemanticAnalyzer中的內容拷貝出來,其中最重要的是sem.getAllRootTasks和sem.getFetchTask;spa
org.apache.hadoop.hive.ql.Driverdebug
public int execute(boolean deferClose) throws CommandNeedRetryException { ... // Add root Tasks to runnable for (Task<? extends Serializable> tsk : plan.getRootTasks()) { // This should never happen, if it does, it's a bug with the potential to produce // incorrect results. assert tsk.getParentTasks() == null || tsk.getParentTasks().isEmpty(); driverCxt.addToRunnable(tsk); } ... // Loop while you either have tasks running, or tasks queued up while (driverCxt.isRunning()) { // Launch upto maxthreads tasks Task<? extends Serializable> task; while ((task = driverCxt.getRunnable(maxthreads)) != null) { TaskRunner runner = launchTask(task, queryId, noName, jobname, jobs, driverCxt); if (!runner.isRunning()) { break; } } ... private TaskRunner launchTask(Task<? extends Serializable> tsk, String queryId, boolean noName, String jobname, int jobs, DriverContext cxt) throws HiveException { ... TaskRunner tskRun = new TaskRunner(tsk, tskRes); ... tskRun.start(); ... tskRun.runSequential(); ...
Driver.run中從QueryPlan中取出Task,並逐個launchTask,launchTask過程爲將Task包裝爲TaskRunner,並最終調用TaskRunner.runSequential,下面看TaskRunner:
org.apache.hadoop.hive.ql.exec.TaskRunner
public void runSequential() { int exitVal = -101; try { exitVal = tsk.executeTask(); ...
這裏直接調用Task.executeTask
org.apache.hadoop.hive.ql.exec.Task
public int executeTask() { ... int retval = execute(driverContext); ...
這裏execute是抽象方法,由子類實現,好比DDLTask,MapRedTask等,着重看MapRedTask,由於大部分的Task都是MapRedTask:
org.apache.hadoop.hive.ql.exec.mr.MapRedTask
public int execute(DriverContext driverContext) { ... if (!runningViaChild) { // we are not running this mapred task via child jvm // so directly invoke ExecDriver return super.execute(driverContext); } ...
這裏直接調用父類方法,也就是ExecDriver.execute,下面看:
org.apache.hadoop.hive.ql.exec.mr.ExecDriver
protected transient JobConf job; ... public int execute(DriverContext driverContext) { ... JobClient jc = null; MapWork mWork = work.getMapWork(); ReduceWork rWork = work.getReduceWork(); ... if (mWork.getNumMapTasks() != null) { job.setNumMapTasks(mWork.getNumMapTasks().intValue()); } ... job.setNumReduceTasks(rWork != null ? rWork.getNumReduceTasks().intValue() : 0); job.setReducerClass(ExecReducer.class); ... jc = new JobClient(job); ... rj = jc.submitJob(job); this.jobID = rj.getJobID(); ...
這裏將Task轉化爲Job提交到Yarn執行;
另一個SQL解析的過程是explain,在ExplainSemanticAnalyzer中將ASTNode轉化爲ExplainTask:
org.apache.hadoop.hive.ql.parse.ExplainSemanticAnalyzer
public void analyzeInternal(ASTNode ast) throws SemanticException { ... ctx.setExplain(true); ctx.setExplainLogical(logical); // Create a semantic analyzer for the query ASTNode input = (ASTNode) ast.getChild(0); BaseSemanticAnalyzer sem = SemanticAnalyzerFactory.get(queryState, input); sem.analyze(input, ctx); sem.validate(); ctx.setResFile(ctx.getLocalTmpPath()); List<Task<? extends Serializable>> tasks = sem.getAllRootTasks(); if (tasks == null) { tasks = Collections.emptyList(); } FetchTask fetchTask = sem.getFetchTask(); if (fetchTask != null) { // Initialize fetch work such that operator tree will be constructed. fetchTask.getWork().initializeForFetch(ctx.getOpContext()); } ParseContext pCtx = null; if (sem instanceof SemanticAnalyzer) { pCtx = ((SemanticAnalyzer)sem).getParseContext(); } boolean userLevelExplain = !extended && !formatted && !dependency && !logical && !authorize && (HiveConf.getBoolVar(ctx.getConf(), HiveConf.ConfVars.HIVE_EXPLAIN_USER) && HiveConf .getVar(conf, HiveConf.ConfVars.HIVE_EXECUTION_ENGINE).equals("tez")); ExplainWork work = new ExplainWork(ctx.getResFile(), pCtx, tasks, fetchTask, sem, extended, formatted, dependency, logical, authorize, userLevelExplain, ctx.getCboInfo()); work.setAppendTaskType( HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVEEXPLAINDEPENDENCYAPPENDTASKTYPES)); ExplainTask explTask = (ExplainTask) TaskFactory.get(work, conf); fieldList = explTask.getResultSchema(); rootTasks.add(explTask); }