Apache Drill源碼分析(3)--DrilBit以及物理計劃


layout: post

Drill源碼閱讀(3) : 分析DrillBit各個角色

UserServer處理RUN_QUERY_VALUE客戶端的查詢請求,會將任務分派給UserWorker處理, 由worker提交工做:
顯然worker要在構造UserServer的時候也一塊兒構造出來, 這樣在收到任務的時候, 確保當即有工人接手這份工做.
UserServer的構造在ServiceEngine,而服務引擎是由DrillBit建立的.
UserWorker是由WorkerManager管理的, 而WorkerManager也是由DrillBit建立的.
因此啓動DrillBit服務後,參與計算的角色都已經準備好了.java

Role Explain
WorkerBee 工蜂, 真正幹活的
UserWorker 用戶操做的(工人), 經過WorkerBee構成
WorkerManager 工人管理員,負責選擇一個工人來工做
UserServer 用戶操做的服務端,會將工做交給UserWorker,它須要一個UserWorker
Foreman 包工頭,監工.由UserWorker建立出來. 由於UserWorker底層是WorkerBee,因此會將WorkerBee和Foreman關聯起來
ServiceEngine 服務引擎,管理UserServer,Controller
DrillBit Drill的服務端控制進程,管理ServiceEngine,WorkerManager
BootStrapContext 啓動DrillBit的上下文,包括配置信息,度量註冊
DrillbitContext DrillBit工做時候的上下文
Controller 不一樣DrillBit節點的通訊
ControllServer 不一樣節點間消息傳輸,鏈接等的RPC服務端
DataServer 負責數據交互的RPC服務端

工人和監工的那些事

首先看下UserWorker是怎麼提交一個任務的:node

public class UserWorker{
  private final WorkerBee bee;

  public QueryId submitWork(UserClientConnection connection, RunQuery query) {
    ThreadLocalRandom r = ThreadLocalRandom.current();
    // create a new queryid where the first four bytes are a growing time (each new value comes earlier in sequence).  Last 12 bytes are random.
    long time = (int) (System.currentTimeMillis()/1000);
    long p1 = ((Integer.MAX_VALUE - time) << 32) + r.nextInt();
    long p2 = r.nextLong();
    QueryId id = QueryId.newBuilder().setPart1(p1).setPart2(p2).build();
    incrementer.increment(connection.getSession());
    Foreman foreman = new Foreman(bee, bee.getContext(), connection, id, query);
    bee.addNewForeman(foreman);
    return id;
  }

返回的QueryId會由UserServer經過RPC發送給客戶端, 表示客戶端這一次的查詢標識. 服務端已經接受了此次查詢.
可是服務端尚未開始執行這個查詢任務, 後續若是客戶端須要查詢結果, 能夠憑這個QueryId, 就能夠向服務端要數據結果.sql

WorkerBee從名字上看是工做的蜜蜂, 工蜂一直默默無聞地工做. 它爲母蜂Foreman服務.
如今咱們由UserWorker建立了一個Foreman. 工蜂把它加進來.express

問題:
1.爲何不是由Foreman管理WorkerBee,而是讓WorkerBee(工蜂)主動把Foreman(監工)加進來?
2.爲何Foreman做爲一個進程,不是本身啓動,而是要由工人來啓動?apache

Foreman負責管理一次查詢的全部fragments, Foreman會做爲根節點/驅動節點編程

/**
 * Foreman manages all the fragments (local and remote) for a single query where this is the driving/root node.
 * The flow is as follows:
 * - Foreman is submitted as a runnable.  被提交爲可執行的
 * - Runnable does query planning. 作什麼: 查詢計劃
 * - state changes from PENDING to RUNNING 狀態改變
 * - Runnable sends out starting fragments 發射起始fragments
 * - Status listener are activated 監聽器被激活
 * - The Runnable's run() completes, but the Foreman stays around 線程的run方法結束,而Foreman還停留...作什麼, 看下面的
 * - Foreman listens for state change messages. 監聽狀態改變的消息
 * - state change messages can drive the state to FAILED or CANCELED, in which case 狀態消息會驅動/更新Foreman的狀態
 *   messages are sent to running fragments to terminate 消息會使得正在運行的fragments終結
 * - when all fragments complete, state change messages drive the state to COMPLETED 當全部的fragments完成後, 狀態改變的消息更新Formeman的狀態爲已完成
 */
public class Foreman implements Runnable {
  private final QueryId queryId; //the id for the query
  private final RunQuery queryRequest; //the query to execute
  private final QueryContext queryContext;
  private final QueryManager queryManager; // handles lower-level details of query execution
  private final WorkerBee bee; // provides an interface to submit tasks, used to submit additional work
  private final DrillbitContext drillbitContext;
  private final UserClientConnection initiatingClient; // used to send responses

  // Sets up the Foreman, but does not initiate any execution. 設置Foreman, 可是並無初始化任何的執行
  public Foreman(final WorkerBee bee, final DrillbitContext drillbitContext, final UserClientConnection connection, final QueryId queryId, final RunQuery queryRequest) {
    this.bee = bee;
    this.queryId = queryId;
    this.queryRequest = queryRequest;
    this.drillbitContext = drillbitContext;
    this.initiatingClient = connection;
    this.closeFuture = initiatingClient.getChannel().closeFuture();
    closeFuture.addListener(closeListener);
    queryContext = new QueryContext(connection.getSession(), drillbitContext);
    queryManager = new QueryManager(queryId, queryRequest, drillbitContext.getPersistentStoreProvider(), stateListener, this);
    recordNewState(QueryState.PENDING);
  }

Foreman的run方法根據RunQuery的類型執行不一樣的方法,好比SQL類型,則要負責將SQL語句經過Calcite解析成邏輯計劃,生成物理計劃,最後運行物理計劃.json

private void runSQL(final String sql) throws ExecutionSetupException {
    final DrillSqlWorker sqlWorker = new DrillSqlWorker(queryContext);
    final Pointer<String> textPlan = new Pointer<>();
    final PhysicalPlan plan = sqlWorker.getPlan(sql, textPlan);
    queryManager.setPlanText(textPlan.value);
    runPhysicalPlan(plan);
  }

SQL Parser

Calcite的planner對SQL進行parse解析, 生成SqlNode節點, 對於不一樣的SqlNode類型, 由不一樣的Handler進行進行解析.數組

public class DrillSqlWorker {
  private final Planner planner;  //這兩個Planner都是Calcite的,負責解析成邏輯計劃
  private final HepPlanner hepPlanner;
  private final QueryContext context;

  public PhysicalPlan getPlan(String sql, Pointer<String> textPlan) throws ForemanSetupException {
    SqlNode sqlNode = planner.parse(sql);  //將SQL語句解析成SqlNode解析樹①
    AbstractSqlHandler handler;
    SqlHandlerConfig config = new SqlHandlerConfig(hepPlanner, planner, context);
    switch(sqlNode.getKind()){
    case EXPLAIN:
      handler = new ExplainHandler(config);
      break;
    case SET_OPTION:
      handler = new SetOptionHandler(context);
      break;
    case OTHER:
      if(sqlNode instanceof SqlCreateTable) {
        handler = ((DrillSqlCall)sqlNode).getSqlHandler(config, textPlan);
        break;
      }
      if (sqlNode instanceof DrillSqlCall) {
        handler = ((DrillSqlCall)sqlNode).getSqlHandler(config);
        break;
      }
    default:
      handler = new DefaultSqlHandler(config, textPlan);
    }
    return handler.getPlan(sqlNode);
  }

The Drillbit that receives the query from a client or application becomes the Foreman for the query and drives the entire query.
A parser in the Foreman parses the SQL[①], applying custom rules[②] to convert specific SQL operators into a specific logical operator syntax that Drill understands.
This collection of logical operators forms a logical plan. The logical plan describes the work required to generate the query results and defines what data sources and operations to apply.promise

Foreman中的parser解析SQL, 並運用定製的規則, 將SQL操做符(Calcite的節點)轉換成Drill認識的邏輯操做符(Drill的節點DrillRel).
轉換後的邏輯操做符集合會組成一個邏輯計劃. 注意上面的sqlNode=planner.parse(sql)對應的是SQL操做符, 轉換成DrillRelNode在Handler的getPlan中完成.app

SqlNode(Calcite SQL操做符)

Calcite的編程API主要包括了: Operator, Rule, RelationExpression, SqlNode.

What's Rule?

Calcite的planner對SQL進行parse解析, 除了用到Calcite自身的一些規則外, Drill也會附加一些規則getRules給它. 定義在DrillSqlWorker的構造函數中.
規則包括物理計劃, 邏輯計劃, 轉換規則. 其中邏輯計劃包括基本規則,用戶自定義規則. 物理計劃包括物理規則,存儲插件的規則. 好比hive插件有本身的SQL執行轉換規則.

public DrillSqlWorker(QueryContext context) {
    FrameworkConfig config = Frameworks.newConfigBuilder() ...
        .ruleSets(getRules(context))...  //Drill附加的規則②
        .build();
    this.planner = Frameworks.getPlanner(config);    
  }

  private RuleSet[] getRules(QueryContext context) {
    StoragePluginRegistry storagePluginRegistry = context.getStorage();
    RuleSet drillLogicalRules = DrillRuleSets.mergedRuleSets(DrillRuleSets.getDrillBasicRules(context), DrillRuleSets.getJoinPermRules(context), DrillRuleSets.getDrillUserConfigurableLogicalRules(context));
    RuleSet drillPhysicalMem = DrillRuleSets.mergedRuleSets(DrillRuleSets.getPhysicalRules(context), storagePluginRegistry.getStoragePluginRuleSet());
    // Following is used in LOPT join OPT.
    RuleSet logicalConvertRules = DrillRuleSets.mergedRuleSets(DrillRuleSets.getDrillBasicRules(context), DrillRuleSets.getDrillUserConfigurableLogicalRules(context));
    RuleSet[] allRules = new RuleSet[] {drillLogicalRules, drillPhysicalMem, logicalConvertRules};
    return allRules;
  }

邏輯計劃的基本規則, 這些規則是通用的, 不須要在物理計劃階段完成, 通用的規則儘早作.

// Get an immutable list of rules that will always be used when running logical planning.
  public static RuleSet getDrillBasicRules(QueryContext context) {
      DRILL_BASIC_RULES = new DrillRuleSet(ImmutableSet.<RelOptRule> builder().add( //
      // Add support for Distinct Union (by using Union-All followed by Distinct)
      UnionToDistinctRule.INSTANCE,

      // Add support for WHERE style joins. 添加支持where類型的join
      DrillFilterJoinRules.DRILL_FILTER_ON_JOIN,
      DrillFilterJoinRules.DRILL_JOIN,

舉個where類型的join規則轉換: http://blog.aliyun.com/733
SELECT * FROM A JOIN B ON A.ID=B.ID WHERE A.AGE>10 AND B.AGE>5
Predict Push Down: 在遇有JOIN運算時,用戶頗有可能還要在JOIN以後作WHERE運算,此時就要從代數邏輯上分析,
WHERE中計算的條件是否能夠被提早到JOIN以前運算,以此來減小JOIN運算的數據量,提高效率

那麼Drill的FilterJoin規則是怎麼樣的呢?

public class DrillFilterJoinRules {
  /** Predicate that always returns true for any filter in OUTER join, and only true for EQUAL or IS_DISTINCT_FROM over RexInputRef in INNER join.
   * With this predicate, the filter expression that return true will be kept in the JOIN OP.
   * Example:  INNER JOIN,   L.C1 = R.C2 and L.C3 + 100 = R.C4 + 100 will be kepted in JOIN.
   *                         L.C5 < R.C6 will be pulled up into Filter above JOIN. 
   *           OUTER JOIN,   Keep any filter in JOIN.
  */
  public static final FilterJoinRule.Predicate EQUAL_IS_DISTINCT_FROM =
      new FilterJoinRule.Predicate() {
        public boolean apply(Join join, JoinRelType joinType, RexNode exp) {
          // In OUTER join, we could not pull-up the filter. All we can do is keep the filter with JOIN, and then decide whether the filter could be pushed down into LEFT/RIGHT.
          if (joinType != JoinRelType.INNER) return true;
          List<RexNode> tmpLeftKeys = Lists.newArrayList();
          List<RexNode> tmpRightKeys = Lists.newArrayList();
          List<RelDataTypeField> sysFields = Lists.newArrayList();
          RexNode remaining = RelOptUtil.splitJoinCondition(sysFields, join.getLeft(), join.getRight(), exp, tmpLeftKeys, tmpRightKeys, null, null);
          if (remaining.isAlwaysTrue())  return true;
          return false;
        }
      };

  /** Rule that pushes predicates from a Filter into the Join below them. */
  public static final FilterJoinRule DRILL_FILTER_ON_JOIN =
      new FilterJoinRule.FilterIntoJoinRule(true, RelFactories.DEFAULT_FILTER_FACTORY,
          RelFactories.DEFAULT_PROJECT_FACTORY, EQUAL_IS_DISTINCT_FROM);

這裏最好要理解下Calcite的一些概念, 要否則理解起來有必定困難.
參考http://blog.csdn.net/yunlong34574/article/details/46375733瞭解下optiq-javaben這個項目的源碼.
而後參考這裏瞭解下查詢下推優化:https://datapsyche.wordpress.com/2014/08/06/optiq-query-push-down-conc...

下面引用了Optiq做者的Apache Calcite Overview的一個示例:

兩張表進行join後有一個where過濾條件, 沒有使用規則的話, 則要join完後才進行過濾:

使用FilterJoinRule後, 把Filter提早到Join以前, 掃描以後馬上進行, 這樣減小了join的數據量:

那麼怎麼定義一個規則呢? cal.rels是一個RelationExpression數組, 調用onMatch時, rels=[Join,Filter,Scan]
所以咱們要得到call.rels中的Join和Filter. 使用數組索引rel(0)表示Join, rel(1)表示Filter.
最後調用call.transform(newJoin)將原始的RelationExpression轉換成新的RelExp.

Drill FilterJoin Example

執行下面的SQL語句, 第一次不加where,第二次添加where過濾條件, 第三次where是字段比較

select * FROM dfs.`/home/hadoop/soft/apache-drill-1.0.0/sample-data/nation.parquet` nations
join dfs.`/home/hadoop/soft/apache-drill-1.0.0/sample-data/region.parquet` regions
on nations.N_REGIONKEY = regions.R_REGIONKEY

select * FROM dfs.`/home/hadoop/soft/apache-drill-1.0.0/sample-data/nation.parquet` nations
join dfs.`/home/hadoop/soft/apache-drill-1.0.0/sample-data/region.parquet` regions
on nations.N_REGIONKEY = regions.R_REGIONKEY
where nations.N_NATIONKEY>10 and regions.R_NAME='AMERICA'

select * FROM dfs.`/home/hadoop/soft/apache-drill-1.0.0/sample-data/nation.parquet` nations
join dfs.`/home/hadoop/soft/apache-drill-1.0.0/sample-data/region.parquet` regions
on nations.N_REGIONKEY = regions.R_REGIONKEY
where nations.N_NAME<regions.R_NAME

下面是對應物理計劃可視化圖, 圖1在Scan和JOIN之間有Project:

圖2雖然where過濾在join以後, 可是通過優化後, 會先於join執行的: 即filter以後才進行join

圖3就沒這麼幸運了,要在join以後才能filter.

還有不少規則, 都在DrillRuleSets中.


DrillRel(Drill邏輯操做符)

getPlan的參數SqlNode在前面經過Calcite的解析, 結果是一顆SQL parse tree(不要覺得Node就只有一個節點),
但它還只是Calcite認識的SQL操做符, 咱們要將它轉換爲Drill可以認識的邏輯操做符即DrillRel.

public PhysicalPlan getPlan(SqlNode sqlNode) throws ValidationException, RelConversionException, IOException, ForemanSetupException {
    SqlNode rewrittenSqlNode = rewrite(sqlNode);
    TypedSqlNode validatedTypedSqlNode = validateNode(rewrittenSqlNode);
    SqlNode validated = validatedTypedSqlNode.getSqlNode();
    RelDataType validatedRowType = validatedTypedSqlNode.getType();
    RelNode rel = convertToRel(validated);
    rel = preprocessNode(rel);
    log("Optiq Logical", rel);

    DrillRel drel = convertToDrel(rel, validatedRowType);
    log("Drill Logical", drel);

    Prel prel = convertToPrel(drel);
    log("Drill Physical", prel);

    PhysicalOperator pop = convertToPop(prel);
    PhysicalPlan plan = convertToPlan(pop);
    log("Drill Plan", plan);
    return plan;
  }

Relational Expression(Rel)

在查詢過程當中也說了: 執行計劃老是包含一個Screen Operator,用來阻塞而且等待返回的數據. 返回的DrillRel就是邏輯計劃.
SqlNode,RelNode是Calcite的節點, DrillRel是Drill的關係表達式節點,在最外層包裝了一個Screen用於屏幕輸出.

protected DrillRel convertToDrel(RelNode relNode, RelDataType validatedRowType) {
        // Put a non-trivial topProject to ensure the final output field name is preserved, when necessary.
        DrillRel topPreservedNameProj = addRenamedProject((DrillRel) convertedRelNode, validatedRowType);
        return new DrillScreenRel(topPreservedNameProj.getCluster(), topPreservedNameProj.getTraitSet(), topPreservedNameProj);
      }

Screen Node和其餘一些DrillRel的構造函數, 其中input指定了Screen的輸入,表示用Screen節點包裝上原先的節點, 使其成爲一個新的節點.

public class DrillScreenRel extends DrillScreenRelBase implements DrillRel {
  public DrillScreenRel(RelOptCluster cluster, RelTraitSet traitSet, RelNode input) {
    super(DRILL_LOGICAL, cluster, traitSet, input);
  }
  public LogicalOperator implement(DrillImplementor implementor) {
    LogicalOperator childOp = implementor.visitChild(this, 0, getInput());
    return Store.builder().setInput(childOp).storageEngine("--SCREEN--").build();
  }

類繼承關係: DrillScreenRel >> DrillRel >> DrillRelNode >> RelNode
其中DrillRel是邏輯計劃的關係表達式. 子類要實現implement方法, 返回邏輯操做符.

// Relational expression that is implemented in Drill.
public interface DrillRel extends DrillRelNode {
  // Calling convention for relational expressions that are "implemented" by generating Drill logical plans
  public static final Convention DRILL_LOGICAL = new Convention.Impl("LOGICAL", DrillRel.class);

  LogicalOperator implement(DrillImplementor implementor);
}

DrillRel Nodes Tree → Drill LogicalPlan

DrillImplementor: Context for converting a tree of DrillRel nodes into a Drill logical plan

物理計劃Prel

而後將邏輯計劃轉換爲物理計劃, 將DrillRel轉換爲Prel. 最後纔是Drill的Plan. 注意Drill的物理計劃和最終的Plan是有點差異的.

protected Prel convertToPrel(RelNode drel) {
    Prel phyRelNode = (Prel) planner.transform(DrillSqlWorker.PHYSICAL_MEM_RULES, traits, drel);

    /*  The order of the following transformation is important */

    /*
     * 0.) For select * from join query, we need insert project on top of scan and a top project just
     * under screen operator. The project on top of scan will rename from * to T1*, while the top project
     * will rename T1* to *, before it output the final result. Only the top project will allow
     * duplicate columns, since user could "explicitly" ask for duplicate columns ( select *, col, *).
     * The rest of projects will remove the duplicate column when we generate POP in json format.
     */
    phyRelNode = StarColumnConverter.insertRenameProject(phyRelNode);  //* is star, and this column should convert
  }

轉換的過程比較複雜, 並且轉換的順序也很重要. 先看第一個, 在select * from join這種狀況下, 要插入兩個Project.
一個是scan(bottom)之上, 一個是screen(top)之下. 好比下面的SQL語句:

select * from dfs.`/usr/install/apache-drill-1.1.0/sample-data/nation.parquet` nations
join dfs.`/usr/install/apache-drill-1.1.0/sample-data/region.parquet` regions
on nations.N_REGIONKEY = regions.R_REGIONKEY;
+--------------+-----------------+--------------+-----------------------+--------------+--------------+-----------------------+
| N_NATIONKEY  |     N_NAME      | N_REGIONKEY  |       N_COMMENT       | R_REGIONKEY  |    R_NAME    |       R_COMMENT       |
+--------------+-----------------+--------------+-----------------------+--------------+--------------+-----------------------+
| 0            | ALGERIA         | 0            |  haggle. carefully f  | 0            | AFRICA       | lar deposits. blithe  |
| 1            | ARGENTINA       | 1            | al foxes promise sly  | 1            | AMERICA      | hs use ironic, even   |

物理計劃:

00-00    Screen : rowType = RecordType(ANY *, ANY *0): rowcount = 25.0, cumulative cost = {62.5 rows, 402.5 cpu, 0.0 io, 0.0 network, 88.0 memory}, id = 2432
00-01   ⑤ ProjectAllowDup(*=[$0], *0=[$1]) : rowType = RecordType(ANY *, ANY *0): rowcount = 25.0, cumulative cost = {60.0 rows, 400.0 cpu, 0.0 io, 0.0 network, 88.0 memory}, id = 2431
00-02   ④   Project(T0¦¦*=[$0], T1¦¦*=[$2]) : rowType = RecordType(ANY T0¦¦*, ANY T1¦¦*): rowcount = 25.0, cumulative cost = {60.0 rows, 400.0 cpu, 0.0 io, 0.0 network, 88.0 memory}, id = 2430
00-03   ③     HashJoin(condition=[=($1, $3)], joinType=[inner]) : rowType = RecordType(ANY T0¦¦*, ANY N_REGIONKEY, ANY T1¦¦*, ANY R_REGIONKEY): rowcount = 25.0, cumulative cost = {60.0 rows, 400.0 cpu, 0.0 io, 0.0 network, 88.0 memory}, id = 2429
00-05   ①       Project(T0¦¦*=[$0], N_REGIONKEY=[$1]) : rowType = RecordType(ANY T0¦¦*, ANY N_REGIONKEY): rowcount = 25.0, cumulative cost = {25.0 rows, 50.0 cpu, 0.0 io, 0.0 network, 0.0 memory}, id = 2426
00-07              Scan(groupscan=[ParquetGroupScan [entries=[ReadEntryWithPath [path=file:/usr/install/apache-drill-1.1.0/sample-data/nation.parquet]], selectionRoot=file:/usr/install/apache-drill-1.1.0/sample-data/nation.parquet, numFiles=1, columns=[`*`]]]) : rowType = (DrillRecordRow[*, N_REGIONKEY]): rowcount = 25.0, cumulative cost = {25.0 rows, 50.0 cpu, 0.0 io, 0.0 network, 0.0 memory}, id = 2425
00-04   ②       Project(T1¦¦*=[$0], R_REGIONKEY=[$1]) : rowType = RecordType(ANY T1¦¦*, ANY R_REGIONKEY): rowcount = 5.0, cumulative cost = {5.0 rows, 10.0 cpu, 0.0 io, 0.0 network, 0.0 memory}, id = 2428
00-06              Scan(groupscan=[ParquetGroupScan [entries=[ReadEntryWithPath [path=file:/usr/install/apache-drill-1.1.0/sample-data/region.parquet]], selectionRoot=file:/usr/install/apache-drill-1.1.0/sample-data/region.parquet, numFiles=1, columns=[`*`]]]) : rowType = (DrillRecordRow[*, R_REGIONKEY]): rowcount = 5.0, cumulative cost = {5.0 rows, 10.0 cpu, 0.0 io, 0.0 network, 0.0 memory}, id = 2427

對應的可視化圖:

物理計劃中的$0, $1...這些數字表明的是as後的變量,若是是join有可能列名相同,因此也要添加project重命名防止名稱衝突:

① select T0.* as $0, T0.N_REGIONKEY as $1 from nations T0 
② select T1.* as $0, T1.R_REGIONKEY as $1 from regions T1
③ select T0.$0 as $0, T0.$1 as $1, T1.$0 as $2, T1.$1 as $3 
   from (select T0.$0 as $0, T0.$1 as $1 from nations) T0
   join (select T1.$0 as $2, T1.$1 as $3 from regions) T1
   on T0.$1 = T1.$3 
④ select $0 as $0,$2 as $1 from ( 
     select T0.$0 as $0, T0.$1 as $1, T1.$0 as $2, T1.$1 as $3 
     from (select T0.$0 as $0, T0.$1 as $1 from nations) T0
     join (select T1.$0 as $2, T1.$1 as $3 from regions) T1
     on T0.$1 = T1.$3
   )
⑤ select $0 as *, $1 as *0 from(  
     select $0 as $0,$2 as $1 from ( 
       select T0.$0 as $0, T0.$1 as $1, T1.$0 as $2, T1.$1 as $3 
       from (select T0.$0 as $0, T0.$1 as $1 from nations) T0
       join (select T1.$0 as $2, T1.$1 as $3 from regions) T1
       on T0.$1 = T1.$3
     )
   )
   select *,*0 from ...

上面的StarColumn規則有點複雜, 咱們看下Join列衝突的規則. 對應上面的③JOIN. 將全部的列都重命名了($0,$1,$2,$3, 而後以$1,$3進行join).

/*
     * 1.)
     * Join might cause naming conflicts from its left and right child.
     * In such case, we have to insert Project to rename the conflicting names.
     */
    phyRelNode = JoinPrelRenameVisitor.insertRenameProject(phyRelNode);

根據註釋中說的join有left或者right child. 注意child這個詞的含義. join做爲根, 而left和right表分別是根的左右子節點.

爲了防止名稱衝突, 添加project, 這樣就和上面咱們看到的可視化Plan圖是一一對應的了.
那麼思考下: 這裏的join插入的Project是在①和②,仍是④??
我以爲是在④這裏, 由於①和②已經在上面第一個轉換規則StarColumnConverter中運用過了.

insert操做讓傳入的phyRelNode節點調用它的accept方法, 並接收JoinPrelRenameVisitor實例對象.

public class JoinPrelRenameVisitor extends BasePrelVisitor<Prel, Void, RuntimeException>{
  private static JoinPrelRenameVisitor INSTANCE = new JoinPrelRenameVisitor();

  public static Prel insertRenameProject(Prel prel){
    return prel.accept(INSTANCE, null);
  }

這裏的Prel經過層層的規則嵌套, 最終返回的仍是一個Prel, 也就是說,每次運用一個規則,都要把當前最新值傳進來. Prel也實現了DrillRelNode接口.
DrillRelNode再結合上Visitor, 有種層層嵌套的感受.首先註冊操做符的規則,從而構成一張圖,最後根據DAG圖訪問每一個操做符的時候,再運用上規則.

假設上面JoinPrelRenameVisitor的insertRenameProject的Prel是JoinPrel

public abstract class JoinPrel extends DrillJoinRelBase implements Prel{
  public <T, X, E extends Throwable> T accept(PrelVisitor<T, X, E> logicalVisitor, X value) throws E {
    return logicalVisitor.visitJoin(this, value);
  }

  public Iterator<Prel> iterator() {
    return PrelUtil.iter(getLeft(), getRight());
  }

accept()的參數logicalVisitor顯然就是JoinPrelRenameVisitor了. this是當前對象即JoinPrel.
那麼就要調用JoinPrelRenameVisitor的visitJoin方法. 你看又回到Visitor來了.

public class JoinPrelRenameVisitor extends BasePrelVisitor<Prel, Void, RuntimeException>{

  public Prel visitJoin(JoinPrel prel, Void value) throws RuntimeException {
    List<RelNode> children = Lists.newArrayList();
    for(Prel child : prel){
      child = child.accept(this, null);
      children.add(child);
    }

    final int leftCount = children.get(0).getRowType().getFieldCount();
    List<RelNode> reNamedChildren = Lists.newArrayList();

    RelNode left = prel.getJoinInput(0, children.get(0));
    RelNode right = prel.getJoinInput(leftCount, children.get(1));
    reNamedChildren.add(left);
    reNamedChildren.add(right);
    return (Prel) prel.copy(prel.getTraitSet(), reNamedChildren);
  }
}

JoinPrel是個迭代器, 所以用for-loop方式能夠遍歷它的節點: 即參與join的left和right表(實現了iterator方法).
JoinPrel的getJoinInput方法參數是offset和RelNode. offset表示join以後列的索引(兩張表join後的全部列).

假設咱們用兩張同樣的表進行join,能夠看到相同的列, 右邊的表會被重命名:

select * from dfs.`/usr/install/apache-drill-1.1.0/sample-data/region.parquet` region1
join dfs.`/usr/install/apache-drill-1.1.0/sample-data/region.parquet` regions
on region1.R_REGIONKEY = regions.R_REGIONKEY;
+--------------+--------------+-----------------------+---------------+--------------+-----------------------+
| R_REGIONKEY  |    R_NAME    |       R_COMMENT       | R_REGIONKEY0  |   R_NAME0    |      R_COMMENT0       |
+--------------+--------------+-----------------------+---------------+--------------+-----------------------+
| 0            | AFRICA       | lar deposits. blithe  | 0             | AFRICA       | lar deposits. blithe  |

分別調用兩次getJoinInput,傳入不一樣的offset和input, 這兩個結果必定是不一樣的.

// Check to make sure that the fields of the inputs are the same as the output field names.  If not, insert a project renaming them.
  public RelNode getJoinInput(int offset, RelNode input) {
    final List<String> fields = getRowType().getFieldNames();
    final List<String> inputFields = input.getRowType().getFieldNames();
    final List<String> outputFields = fields.subList(offset, offset + inputFields.size());
    if (!outputFields.equals(inputFields)) {
      // Ensure that input field names are the same as output field names.
      // If there are duplicate field names on left and right, fields will get lost.
      // In such case, we need insert a rename Project on top of the input.
      return rename(input, input.getRowType().getFieldList(), outputFields);
    } else {
      return input;
    }
  }

上面的處理不知道什麼狀況下會進入if部分. 假設有兩張表都是A,B,C三列.
left表不可能有重複的列名, right表相對於left而言,三個列都是重複的. 調用getJoinInput(3, rightNode){}
inputFields=[A,B,C], fields=[A,B,C,A,B,C]. outputFields=[A,B,C],不是相等的嗎??

看下相同表的join的可視化樹, 對比一下就知道了, 在00-04中加了Project:

00-00    Screen : rowType = RecordType(ANY *, ANY *0): rowcount = 5.0, cumulative cost = {20.5 rows, 120.5 cpu, 0.0 io, 0.0 network, 88.0 memory}, id = 1299
00-01      ProjectAllowDup(*=[$0], *0=[$1]) : rowType = RecordType(ANY *, ANY *0): rowcount = 5.0, cumulative cost = {20.0 rows, 120.0 cpu, 0.0 io, 0.0 network, 88.0 memory}, id = 1298
00-02        Project(T0¦¦*=[$0], T1¦¦*=[$2]) : rowType = RecordType(ANY T0¦¦*, ANY T1¦¦*): rowcount = 5.0, cumulative cost = {20.0 rows, 120.0 cpu, 0.0 io, 0.0 network, 88.0 memory}, id = 1297
00-03          HashJoin(condition=[=($1, $3)], joinType=[inner]) : rowType = RecordType(ANY T0¦¦*, ANY R_REGIONKEY, ANY T1¦¦*, ANY R_REGIONKEY0): rowcount = 5.0, cumulative cost = {20.0 rows, 120.0 cpu, 0.0 io, 0.0 network, 88.0 memory}, id = 1296
00-04            Project(T1¦¦*=[$0], R_REGIONKEY0=[$1]) : rowType = RecordType(ANY T1¦¦*, ANY R_REGIONKEY0): rowcount = 5.0, cumulative cost = {5.0 rows, 10.0 cpu, 0.0 io, 0.0 network, 0.0 memory}, id = 1295
00-06              Project(T1¦¦*=[$0], R_REGIONKEY=[$1]) : rowType = RecordType(ANY T1¦¦*, ANY R_REGIONKEY): rowcount = 5.0, cumulative cost = {5.0 rows, 10.0 cpu, 0.0 io, 0.0 network, 0.0 memory}, id = 1294
00-08                Scan(groupscan=[ParquetGroupScan [entries=[ReadEntryWithPath [path=file:/usr/install/apache-drill-1.1.0/sample-data/region.parquet]], selectionRoot=file:/usr/install/apache-drill-1.1.0/sample-data/region.parquet, numFiles=1, columns=[`*`]]]) : rowType = (DrillRecordRow[*, R_REGIONKEY]): rowcount = 5.0, cumulative cost = {5.0 rows, 10.0 cpu, 0.0 io, 0.0 network, 0.0 memory}, id = 1293
00-05            Project(T0¦¦*=[$0], R_REGIONKEY=[$1]) : rowType = RecordType(ANY T0¦¦*, ANY R_REGIONKEY): rowcount = 5.0, cumulative cost = {5.0 rows, 10.0 cpu, 0.0 io, 0.0 network, 0.0 memory}, id = 1292
00-07              Scan(groupscan=[ParquetGroupScan [entries=[ReadEntryWithPath [path=file:/usr/install/apache-drill-1.1.0/sample-data/region.parquet]], selectionRoot=file:/usr/install/apache-drill-1.1.0/sample-data/region.parquet, numFiles=1, columns=[`*`]]]) : rowType = (DrillRecordRow[*, R_REGIONKEY]): rowcount = 5.0, cumulative cost = {5.0 rows, 10.0 cpu, 0.0 io, 0.0 network, 0.0 memory}, id = 1291

相關文章
相關標籤/搜索