【原創】大數據基礎之Hive(2)Hive SQL執行過程之SQL解析過程

Hive SQL解析過程

SQL->AST(Abstract Syntax Tree)->Task(MapRedTask,FetchTask)->QueryPlan(Task集合)->Job(Yarn)apache

SQL解析會在兩個地方進行:api

  • 一個是SQL執行前compile,具體在Driver.compile,爲了建立QueryPlan;
  • 一個是explain,具體在ExplainSemanticAnalyzer.analyzeInternal,爲了建立ExplainTask;

SQL執行過程

1 compile過程(SQL->AST(Abstract Syntax Tree)->QueryPlan)

org.apache.hadoop.hive.ql.Driverapp

  public int compile(String command, boolean resetTaskIds, boolean deferClose) {
...
      ParseDriver pd = new ParseDriver();
      ASTNode tree = pd.parse(command, ctx);
      tree = ParseUtils.findRootNonNullToken(tree);
...
      BaseSemanticAnalyzer sem = SemanticAnalyzerFactory.get(queryState, tree);
...
        sem.analyze(tree, ctx);
...
      // Record any ACID compliant FileSinkOperators we saw so we can add our transaction ID to
      // them later.
      acidSinks = sem.getAcidFileSinks();

      LOG.info("Semantic Analysis Completed");

      // validate the plan
      sem.validate();
      acidInQuery = sem.hasAcidInQuery();
      perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.ANALYZE);

      if (isInterrupted()) {
        return handleInterruption("after analyzing query.");
      }

      // get the output schema
      schema = getSchema(sem, conf);
      plan = new QueryPlan(queryStr, sem, perfLogger.getStartTime(PerfLogger.DRIVER_RUN), queryId,
        queryState.getHiveOperation(), schema);
...

compile過程爲先由ParseDriver將SQL轉換爲ASTNode,而後由BaseSemanticAnalyzer對ASTNode進行分析,最後將BaseSemanticAnalyzer傳入QueryPlan構造函數來建立QueryPlan;jvm

1)將SQL轉換爲ASTNode過程以下(SQL->AST(Abstract Syntax Tree))

org.apache.hadoop.hive.ql.parse.ParseDriver函數

  public ASTNode parse(String command, Context ctx, boolean setTokenRewriteStream)
      throws ParseException {
    if (LOG.isDebugEnabled()) {
      LOG.debug("Parsing command: " + command);
    }

    HiveLexerX lexer = new HiveLexerX(new ANTLRNoCaseStringStream(command));
    TokenRewriteStream tokens = new TokenRewriteStream(lexer);
    if (ctx != null) {
      if ( setTokenRewriteStream) {
        ctx.setTokenRewriteStream(tokens);
      }
      lexer.setHiveConf(ctx.getConf());
    }
    HiveParser parser = new HiveParser(tokens);
    if (ctx != null) {
      parser.setHiveConf(ctx.getConf());
    }
    parser.setTreeAdaptor(adaptor);
    HiveParser.statement_return r = null;
    try {
      r = parser.statement();
    } catch (RecognitionException e) {
      e.printStackTrace();
      throw new ParseException(parser.errors);
    }

    if (lexer.getErrors().size() == 0 && parser.errors.size() == 0) {
      LOG.debug("Parse Completed");
    } else if (lexer.getErrors().size() != 0) {
      throw new ParseException(lexer.getErrors());
    } else {
      throw new ParseException(parser.errors);
    }

    ASTNode tree = (ASTNode) r.getTree();
    tree.setUnknownTokenBoundaries();
    return tree;
  }

2)analyze過程(AST(Abstract Syntax Tree)->Task)

org.apache.hadoop.hive.ql.parse.BaseSemanticAnalyzeroop

  public void analyze(ASTNode ast, Context ctx) throws SemanticException {
    initCtx(ctx);
    init(true);
    analyzeInternal(ast);
  }

其中analyzeInternal是抽象方法,由不一樣的子類實現,好比DDLSemanticAnalyzer,SemanticAnalyzer,UpdateDeleteSemanticAnalyzer,ExplainSemanticAnalyzer等;
analyzeInternal主要的工做是將ASTNode轉化爲Task,包括可能的optimize,過程比較複雜,這裏不貼代碼;fetch

3)建立QueryPlan過程以下(Task->QueryPlan)

org.apache.hadoop.hive.ql.QueryPlanthis

  public QueryPlan(String queryString, BaseSemanticAnalyzer sem, Long startTime, String queryId,
                  HiveOperation operation, Schema resultSchema) {
    this.queryString = queryString;

    rootTasks = new ArrayList<Task<? extends Serializable>>(sem.getAllRootTasks());
    reducerTimeStatsPerJobList = new ArrayList<ReducerTimeStatsPerJob>();
    fetchTask = sem.getFetchTask();
    // Note that inputs and outputs can be changed when the query gets executed
    inputs = sem.getAllInputs();
    outputs = sem.getAllOutputs();
    linfo = sem.getLineageInfo();
    tableAccessInfo = sem.getTableAccessInfo();
    columnAccessInfo = sem.getColumnAccessInfo();
    idToTableNameMap = new HashMap<String, String>(sem.getIdToTableNameMap());

    this.queryId = queryId == null ? makeQueryId() : queryId;
    query = new org.apache.hadoop.hive.ql.plan.api.Query();
    query.setQueryId(this.queryId);
    query.putToQueryAttributes("queryString", this.queryString);
    queryProperties = sem.getQueryProperties();
    queryStartTime = startTime;
    this.operation = operation;
    this.autoCommitValue = sem.getAutoCommitValue();
    this.resultSchema = resultSchema;
  }

可見只是簡單的將BaseSemanticAnalyzer中的內容拷貝出來,其中最重要的是sem.getAllRootTasks和sem.getFetchTask;spa

2 execute過程(QueryPlan->Job)

org.apache.hadoop.hive.ql.Driverdebug

  public int execute(boolean deferClose) throws CommandNeedRetryException {
...
      // Add root Tasks to runnable
      for (Task<? extends Serializable> tsk : plan.getRootTasks()) {
        // This should never happen, if it does, it's a bug with the potential to produce
        // incorrect results.
        assert tsk.getParentTasks() == null || tsk.getParentTasks().isEmpty();
        driverCxt.addToRunnable(tsk);
      }
...
      // Loop while you either have tasks running, or tasks queued up
      while (driverCxt.isRunning()) {

        // Launch upto maxthreads tasks
        Task<? extends Serializable> task;
        while ((task = driverCxt.getRunnable(maxthreads)) != null) {
          TaskRunner runner = launchTask(task, queryId, noName, jobname, jobs, driverCxt);
          if (!runner.isRunning()) {
            break;
          }
        }
...

  private TaskRunner launchTask(Task<? extends Serializable> tsk, String queryId, boolean noName,
      String jobname, int jobs, DriverContext cxt) throws HiveException {
...
    TaskRunner tskRun = new TaskRunner(tsk, tskRes);
...
      tskRun.start();
...
      tskRun.runSequential();
...

Driver.run中從QueryPlan中取出Task,並逐個launchTask,launchTask過程爲將Task包裝爲TaskRunner,並最終調用TaskRunner.runSequential,下面看TaskRunner:

org.apache.hadoop.hive.ql.exec.TaskRunner

  public void runSequential() {
    int exitVal = -101;
    try {
      exitVal = tsk.executeTask();
...

這裏直接調用Task.executeTask

org.apache.hadoop.hive.ql.exec.Task

  public int executeTask() {
...
      int retval = execute(driverContext);
...

這裏execute是抽象方法,由子類實現,好比DDLTask,MapRedTask等,着重看MapRedTask,由於大部分的Task都是MapRedTask:

org.apache.hadoop.hive.ql.exec.mr.MapRedTask

  public int execute(DriverContext driverContext) {
...
      if (!runningViaChild) {
        // we are not running this mapred task via child jvm
        // so directly invoke ExecDriver
        return super.execute(driverContext);
      }
...

這裏直接調用父類方法,也就是ExecDriver.execute,下面看:

org.apache.hadoop.hive.ql.exec.mr.ExecDriver

  protected transient JobConf job;
...
  public int execute(DriverContext driverContext) {
...
    JobClient jc = null;

    MapWork mWork = work.getMapWork();
    ReduceWork rWork = work.getReduceWork();
...
    if (mWork.getNumMapTasks() != null) {
      job.setNumMapTasks(mWork.getNumMapTasks().intValue());
    }
...
    job.setNumReduceTasks(rWork != null ? rWork.getNumReduceTasks().intValue() : 0);
    job.setReducerClass(ExecReducer.class);
...
      jc = new JobClient(job);
...
      rj = jc.submitJob(job);
      this.jobID = rj.getJobID();
...

這裏將Task轉化爲Job提交到Yarn執行;

SQL Explain過程

另一個SQL解析的過程是explain,在ExplainSemanticAnalyzer中將ASTNode轉化爲ExplainTask:

org.apache.hadoop.hive.ql.parse.ExplainSemanticAnalyzer

  public void analyzeInternal(ASTNode ast) throws SemanticException {
...
    ctx.setExplain(true);
    ctx.setExplainLogical(logical);

    // Create a semantic analyzer for the query
    ASTNode input = (ASTNode) ast.getChild(0);
    BaseSemanticAnalyzer sem = SemanticAnalyzerFactory.get(queryState, input);
    sem.analyze(input, ctx);
    sem.validate();

    ctx.setResFile(ctx.getLocalTmpPath());
    List<Task<? extends Serializable>> tasks = sem.getAllRootTasks();
    if (tasks == null) {
      tasks = Collections.emptyList();
    }

    FetchTask fetchTask = sem.getFetchTask();
    if (fetchTask != null) {
      // Initialize fetch work such that operator tree will be constructed.
      fetchTask.getWork().initializeForFetch(ctx.getOpContext());
    }

    ParseContext pCtx = null;
    if (sem instanceof SemanticAnalyzer) {
      pCtx = ((SemanticAnalyzer)sem).getParseContext();
    }

    boolean userLevelExplain = !extended
        && !formatted
        && !dependency
        && !logical
        && !authorize
        && (HiveConf.getBoolVar(ctx.getConf(), HiveConf.ConfVars.HIVE_EXPLAIN_USER) && HiveConf
            .getVar(conf, HiveConf.ConfVars.HIVE_EXECUTION_ENGINE).equals("tez"));
    ExplainWork work = new ExplainWork(ctx.getResFile(),
        pCtx,
        tasks,
        fetchTask,
        sem,
        extended,
        formatted,
        dependency,
        logical,
        authorize,
        userLevelExplain,
        ctx.getCboInfo());

    work.setAppendTaskType(
        HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVEEXPLAINDEPENDENCYAPPENDTASKTYPES));

    ExplainTask explTask = (ExplainTask) TaskFactory.get(work, conf);

    fieldList = explTask.getResultSchema();
    rootTasks.add(explTask);
  }
相關文章
相關標籤/搜索