UserServer處理RUN_QUERY_VALUE客戶端的查詢請求,會將任務分派給UserWorker處理, 由worker提交工做:
顯然worker要在構造UserServer的時候也一塊兒構造出來, 這樣在收到任務的時候, 確保當即有工人接手這份工做.
UserServer的構造在ServiceEngine,而服務引擎是由DrillBit建立的.
UserWorker是由WorkerManager管理的, 而WorkerManager也是由DrillBit建立的.
因此啓動DrillBit服務後,參與計算的角色都已經準備好了.java
Role | Explain |
---|---|
WorkerBee | 工蜂, 真正幹活的 |
UserWorker | 用戶操做的(工人), 經過WorkerBee構成 |
WorkerManager | 工人管理員,負責選擇一個工人來工做 |
UserServer | 用戶操做的服務端,會將工做交給UserWorker,它須要一個UserWorker |
Foreman | 包工頭,監工.由UserWorker建立出來. 由於UserWorker底層是WorkerBee,因此會將WorkerBee和Foreman關聯起來 |
ServiceEngine | 服務引擎,管理UserServer,Controller |
DrillBit | Drill的服務端控制進程,管理ServiceEngine,WorkerManager |
BootStrapContext | 啓動DrillBit的上下文,包括配置信息,度量註冊 |
DrillbitContext | DrillBit工做時候的上下文 |
Controller | 不一樣DrillBit節點的通訊 |
ControllServer | 不一樣節點間消息傳輸,鏈接等的RPC服務端 |
DataServer | 負責數據交互的RPC服務端 |
首先看下UserWorker是怎麼提交一個任務的:node
public class UserWorker{ private final WorkerBee bee; public QueryId submitWork(UserClientConnection connection, RunQuery query) { ThreadLocalRandom r = ThreadLocalRandom.current(); // create a new queryid where the first four bytes are a growing time (each new value comes earlier in sequence). Last 12 bytes are random. long time = (int) (System.currentTimeMillis()/1000); long p1 = ((Integer.MAX_VALUE - time) << 32) + r.nextInt(); long p2 = r.nextLong(); QueryId id = QueryId.newBuilder().setPart1(p1).setPart2(p2).build(); incrementer.increment(connection.getSession()); Foreman foreman = new Foreman(bee, bee.getContext(), connection, id, query); bee.addNewForeman(foreman); return id; }
返回的QueryId會由UserServer經過RPC發送給客戶端, 表示客戶端這一次的查詢標識. 服務端已經接受了此次查詢.
可是服務端尚未開始執行這個查詢任務, 後續若是客戶端須要查詢結果, 能夠憑這個QueryId, 就能夠向服務端要數據結果.sql
WorkerBee從名字上看是工做的蜜蜂, 工蜂一直默默無聞地工做. 它爲母蜂Foreman服務.
如今咱們由UserWorker建立了一個Foreman. 工蜂把它加進來.express
問題:
1.爲何不是由Foreman管理WorkerBee,而是讓WorkerBee(工蜂)主動把Foreman(監工)加進來?
2.爲何Foreman做爲一個進程,不是本身啓動,而是要由工人來啓動?apache
Foreman負責管理一次查詢的全部fragments, Foreman會做爲根節點/驅動節點編程
/** * Foreman manages all the fragments (local and remote) for a single query where this is the driving/root node. * The flow is as follows: * - Foreman is submitted as a runnable. 被提交爲可執行的 * - Runnable does query planning. 作什麼: 查詢計劃 * - state changes from PENDING to RUNNING 狀態改變 * - Runnable sends out starting fragments 發射起始fragments * - Status listener are activated 監聽器被激活 * - The Runnable's run() completes, but the Foreman stays around 線程的run方法結束,而Foreman還停留...作什麼, 看下面的 * - Foreman listens for state change messages. 監聽狀態改變的消息 * - state change messages can drive the state to FAILED or CANCELED, in which case 狀態消息會驅動/更新Foreman的狀態 * messages are sent to running fragments to terminate 消息會使得正在運行的fragments終結 * - when all fragments complete, state change messages drive the state to COMPLETED 當全部的fragments完成後, 狀態改變的消息更新Formeman的狀態爲已完成 */ public class Foreman implements Runnable { private final QueryId queryId; //the id for the query private final RunQuery queryRequest; //the query to execute private final QueryContext queryContext; private final QueryManager queryManager; // handles lower-level details of query execution private final WorkerBee bee; // provides an interface to submit tasks, used to submit additional work private final DrillbitContext drillbitContext; private final UserClientConnection initiatingClient; // used to send responses // Sets up the Foreman, but does not initiate any execution. 設置Foreman, 可是並無初始化任何的執行 public Foreman(final WorkerBee bee, final DrillbitContext drillbitContext, final UserClientConnection connection, final QueryId queryId, final RunQuery queryRequest) { this.bee = bee; this.queryId = queryId; this.queryRequest = queryRequest; this.drillbitContext = drillbitContext; this.initiatingClient = connection; this.closeFuture = initiatingClient.getChannel().closeFuture(); closeFuture.addListener(closeListener); queryContext = new QueryContext(connection.getSession(), drillbitContext); queryManager = new QueryManager(queryId, queryRequest, drillbitContext.getPersistentStoreProvider(), stateListener, this); recordNewState(QueryState.PENDING); }
Foreman的run方法根據RunQuery的類型執行不一樣的方法,好比SQL類型,則要負責將SQL語句經過Calcite解析成邏輯計劃,生成物理計劃,最後運行物理計劃.json
private void runSQL(final String sql) throws ExecutionSetupException { final DrillSqlWorker sqlWorker = new DrillSqlWorker(queryContext); final Pointer<String> textPlan = new Pointer<>(); final PhysicalPlan plan = sqlWorker.getPlan(sql, textPlan); queryManager.setPlanText(textPlan.value); runPhysicalPlan(plan); }
Calcite的planner對SQL進行parse解析, 生成SqlNode節點, 對於不一樣的SqlNode類型, 由不一樣的Handler進行進行解析.數組
public class DrillSqlWorker { private final Planner planner; //這兩個Planner都是Calcite的,負責解析成邏輯計劃 private final HepPlanner hepPlanner; private final QueryContext context; public PhysicalPlan getPlan(String sql, Pointer<String> textPlan) throws ForemanSetupException { SqlNode sqlNode = planner.parse(sql); //將SQL語句解析成SqlNode解析樹① AbstractSqlHandler handler; SqlHandlerConfig config = new SqlHandlerConfig(hepPlanner, planner, context); switch(sqlNode.getKind()){ case EXPLAIN: handler = new ExplainHandler(config); break; case SET_OPTION: handler = new SetOptionHandler(context); break; case OTHER: if(sqlNode instanceof SqlCreateTable) { handler = ((DrillSqlCall)sqlNode).getSqlHandler(config, textPlan); break; } if (sqlNode instanceof DrillSqlCall) { handler = ((DrillSqlCall)sqlNode).getSqlHandler(config); break; } default: handler = new DefaultSqlHandler(config, textPlan); } return handler.getPlan(sqlNode); }
The Drillbit that receives the query from a client or application becomes the Foreman for the query and drives the entire query.
A parser in the Foreman parses the SQL[①], applying custom rules[②] to convert specific SQL operators into a specific logical operator syntax that Drill understands.
This collection of logical operators forms a logical plan. The logical plan describes the work required to generate the query results and defines what data sources and operations to apply.promiseForeman中的parser解析SQL, 並運用定製的規則, 將SQL操做符(Calcite的節點)轉換成Drill認識的邏輯操做符(Drill的節點DrillRel).
轉換後的邏輯操做符集合會組成一個邏輯計劃. 注意上面的sqlNode=planner.parse(sql)對應的是SQL操做符, 轉換成DrillRelNode在Handler的getPlan中完成.app
Calcite的編程API主要包括了: Operator, Rule, RelationExpression, SqlNode.
Calcite的planner對SQL進行parse解析, 除了用到Calcite自身的一些規則外, Drill也會附加一些規則getRules給它. 定義在DrillSqlWorker的構造函數中.
規則包括物理計劃, 邏輯計劃, 轉換規則. 其中邏輯計劃包括基本規則,用戶自定義規則. 物理計劃包括物理規則,存儲插件的規則. 好比hive插件有本身的SQL執行轉換規則.
public DrillSqlWorker(QueryContext context) { FrameworkConfig config = Frameworks.newConfigBuilder() ... .ruleSets(getRules(context))... //Drill附加的規則② .build(); this.planner = Frameworks.getPlanner(config); } private RuleSet[] getRules(QueryContext context) { StoragePluginRegistry storagePluginRegistry = context.getStorage(); RuleSet drillLogicalRules = DrillRuleSets.mergedRuleSets(DrillRuleSets.getDrillBasicRules(context), DrillRuleSets.getJoinPermRules(context), DrillRuleSets.getDrillUserConfigurableLogicalRules(context)); RuleSet drillPhysicalMem = DrillRuleSets.mergedRuleSets(DrillRuleSets.getPhysicalRules(context), storagePluginRegistry.getStoragePluginRuleSet()); // Following is used in LOPT join OPT. RuleSet logicalConvertRules = DrillRuleSets.mergedRuleSets(DrillRuleSets.getDrillBasicRules(context), DrillRuleSets.getDrillUserConfigurableLogicalRules(context)); RuleSet[] allRules = new RuleSet[] {drillLogicalRules, drillPhysicalMem, logicalConvertRules}; return allRules; }
邏輯計劃的基本規則, 這些規則是通用的, 不須要在物理計劃階段完成, 通用的規則儘早作.
// Get an immutable list of rules that will always be used when running logical planning. public static RuleSet getDrillBasicRules(QueryContext context) { DRILL_BASIC_RULES = new DrillRuleSet(ImmutableSet.<RelOptRule> builder().add( // // Add support for Distinct Union (by using Union-All followed by Distinct) UnionToDistinctRule.INSTANCE, // Add support for WHERE style joins. 添加支持where類型的join DrillFilterJoinRules.DRILL_FILTER_ON_JOIN, DrillFilterJoinRules.DRILL_JOIN,
舉個where類型的join規則轉換: http://blog.aliyun.com/733
SELECT * FROM A JOIN B ON A.ID=B.ID WHERE A.AGE>10 AND B.AGE>5Predict Push Down
: 在遇有JOIN運算時,用戶頗有可能還要在JOIN以後作WHERE運算,此時就要從代數邏輯上分析,
WHERE中計算的條件是否能夠被提早到JOIN以前運算,以此來減小JOIN運算的數據量,提高效率
那麼Drill的FilterJoin規則是怎麼樣的呢?
public class DrillFilterJoinRules { /** Predicate that always returns true for any filter in OUTER join, and only true for EQUAL or IS_DISTINCT_FROM over RexInputRef in INNER join. * With this predicate, the filter expression that return true will be kept in the JOIN OP. * Example: INNER JOIN, L.C1 = R.C2 and L.C3 + 100 = R.C4 + 100 will be kepted in JOIN. * L.C5 < R.C6 will be pulled up into Filter above JOIN. * OUTER JOIN, Keep any filter in JOIN. */ public static final FilterJoinRule.Predicate EQUAL_IS_DISTINCT_FROM = new FilterJoinRule.Predicate() { public boolean apply(Join join, JoinRelType joinType, RexNode exp) { // In OUTER join, we could not pull-up the filter. All we can do is keep the filter with JOIN, and then decide whether the filter could be pushed down into LEFT/RIGHT. if (joinType != JoinRelType.INNER) return true; List<RexNode> tmpLeftKeys = Lists.newArrayList(); List<RexNode> tmpRightKeys = Lists.newArrayList(); List<RelDataTypeField> sysFields = Lists.newArrayList(); RexNode remaining = RelOptUtil.splitJoinCondition(sysFields, join.getLeft(), join.getRight(), exp, tmpLeftKeys, tmpRightKeys, null, null); if (remaining.isAlwaysTrue()) return true; return false; } }; /** Rule that pushes predicates from a Filter into the Join below them. */ public static final FilterJoinRule DRILL_FILTER_ON_JOIN = new FilterJoinRule.FilterIntoJoinRule(true, RelFactories.DEFAULT_FILTER_FACTORY, RelFactories.DEFAULT_PROJECT_FACTORY, EQUAL_IS_DISTINCT_FROM);
這裏最好要理解下Calcite的一些概念, 要否則理解起來有必定困難.
參考http://blog.csdn.net/yunlong34574/article/details/46375733瞭解下optiq-javaben這個項目的源碼.
而後參考這裏瞭解下查詢下推優化:https://datapsyche.wordpress.com/2014/08/06/optiq-query-push-down-conc...
下面引用了Optiq做者的Apache Calcite Overview的一個示例:
兩張表進行join後有一個where過濾條件, 沒有使用規則的話, 則要join完後才進行過濾:
使用FilterJoinRule後, 把Filter提早到Join以前, 掃描以後馬上進行, 這樣減小了join的數據量:
那麼怎麼定義一個規則呢? cal.rels是一個RelationExpression數組, 調用onMatch時, rels=[Join,Filter,Scan]
所以咱們要得到call.rels中的Join和Filter. 使用數組索引rel(0)表示Join, rel(1)表示Filter.
最後調用call.transform(newJoin)將原始的RelationExpression轉換成新的RelExp.
執行下面的SQL語句, 第一次不加where,第二次添加where過濾條件, 第三次where是字段比較
select * FROM dfs.`/home/hadoop/soft/apache-drill-1.0.0/sample-data/nation.parquet` nations join dfs.`/home/hadoop/soft/apache-drill-1.0.0/sample-data/region.parquet` regions on nations.N_REGIONKEY = regions.R_REGIONKEY select * FROM dfs.`/home/hadoop/soft/apache-drill-1.0.0/sample-data/nation.parquet` nations join dfs.`/home/hadoop/soft/apache-drill-1.0.0/sample-data/region.parquet` regions on nations.N_REGIONKEY = regions.R_REGIONKEY where nations.N_NATIONKEY>10 and regions.R_NAME='AMERICA' select * FROM dfs.`/home/hadoop/soft/apache-drill-1.0.0/sample-data/nation.parquet` nations join dfs.`/home/hadoop/soft/apache-drill-1.0.0/sample-data/region.parquet` regions on nations.N_REGIONKEY = regions.R_REGIONKEY where nations.N_NAME<regions.R_NAME
下面是對應物理計劃可視化圖, 圖1在Scan和JOIN之間有Project:
圖2雖然where過濾在join以後, 可是通過優化後, 會先於join執行的: 即filter以後才進行join
圖3就沒這麼幸運了,要在join以後才能filter.
還有不少規則, 都在DrillRuleSets中.
getPlan的參數SqlNode在前面經過Calcite的解析, 結果是一顆SQL parse tree(不要覺得Node就只有一個節點),
但它還只是Calcite認識的SQL操做符, 咱們要將它轉換爲Drill可以認識的邏輯操做符即DrillRel.
public PhysicalPlan getPlan(SqlNode sqlNode) throws ValidationException, RelConversionException, IOException, ForemanSetupException { SqlNode rewrittenSqlNode = rewrite(sqlNode); TypedSqlNode validatedTypedSqlNode = validateNode(rewrittenSqlNode); SqlNode validated = validatedTypedSqlNode.getSqlNode(); RelDataType validatedRowType = validatedTypedSqlNode.getType(); RelNode rel = convertToRel(validated); rel = preprocessNode(rel); log("Optiq Logical", rel); DrillRel drel = convertToDrel(rel, validatedRowType); log("Drill Logical", drel); Prel prel = convertToPrel(drel); log("Drill Physical", prel); PhysicalOperator pop = convertToPop(prel); PhysicalPlan plan = convertToPlan(pop); log("Drill Plan", plan); return plan; }
Relational Expression(Rel)
在查詢過程當中也說了: 執行計劃老是包含一個Screen Operator,用來阻塞而且等待返回的數據. 返回的DrillRel就是邏輯計劃.
SqlNode,RelNode是Calcite的節點, DrillRel是Drill的關係表達式節點,在最外層包裝了一個Screen用於屏幕輸出.
protected DrillRel convertToDrel(RelNode relNode, RelDataType validatedRowType) { // Put a non-trivial topProject to ensure the final output field name is preserved, when necessary. DrillRel topPreservedNameProj = addRenamedProject((DrillRel) convertedRelNode, validatedRowType); return new DrillScreenRel(topPreservedNameProj.getCluster(), topPreservedNameProj.getTraitSet(), topPreservedNameProj); }
Screen Node和其餘一些DrillRel的構造函數, 其中input指定了Screen的輸入,表示用Screen節點包裝上原先的節點, 使其成爲一個新的節點.
public class DrillScreenRel extends DrillScreenRelBase implements DrillRel { public DrillScreenRel(RelOptCluster cluster, RelTraitSet traitSet, RelNode input) { super(DRILL_LOGICAL, cluster, traitSet, input); } public LogicalOperator implement(DrillImplementor implementor) { LogicalOperator childOp = implementor.visitChild(this, 0, getInput()); return Store.builder().setInput(childOp).storageEngine("--SCREEN--").build(); }
類繼承關係: DrillScreenRel >> DrillRel >> DrillRelNode >> RelNode
其中DrillRel是邏輯計劃的關係表達式. 子類要實現implement方法, 返回邏輯操做符.
// Relational expression that is implemented in Drill. public interface DrillRel extends DrillRelNode { // Calling convention for relational expressions that are "implemented" by generating Drill logical plans public static final Convention DRILL_LOGICAL = new Convention.Impl("LOGICAL", DrillRel.class); LogicalOperator implement(DrillImplementor implementor); }
DrillImplementor: Context for converting a tree of DrillRel nodes into a Drill logical plan
而後將邏輯計劃轉換爲物理計劃, 將DrillRel轉換爲Prel. 最後纔是Drill的Plan. 注意Drill的物理計劃和最終的Plan是有點差異的.
protected Prel convertToPrel(RelNode drel) { Prel phyRelNode = (Prel) planner.transform(DrillSqlWorker.PHYSICAL_MEM_RULES, traits, drel); /* The order of the following transformation is important */ /* * 0.) For select * from join query, we need insert project on top of scan and a top project just * under screen operator. The project on top of scan will rename from * to T1*, while the top project * will rename T1* to *, before it output the final result. Only the top project will allow * duplicate columns, since user could "explicitly" ask for duplicate columns ( select *, col, *). * The rest of projects will remove the duplicate column when we generate POP in json format. */ phyRelNode = StarColumnConverter.insertRenameProject(phyRelNode); //* is star, and this column should convert }
轉換的過程比較複雜, 並且轉換的順序也很重要. 先看第一個, 在select * from join這種狀況下, 要插入兩個Project.
一個是scan(bottom)之上, 一個是screen(top)之下. 好比下面的SQL語句:
select * from dfs.`/usr/install/apache-drill-1.1.0/sample-data/nation.parquet` nations join dfs.`/usr/install/apache-drill-1.1.0/sample-data/region.parquet` regions on nations.N_REGIONKEY = regions.R_REGIONKEY; +--------------+-----------------+--------------+-----------------------+--------------+--------------+-----------------------+ | N_NATIONKEY | N_NAME | N_REGIONKEY | N_COMMENT | R_REGIONKEY | R_NAME | R_COMMENT | +--------------+-----------------+--------------+-----------------------+--------------+--------------+-----------------------+ | 0 | ALGERIA | 0 | haggle. carefully f | 0 | AFRICA | lar deposits. blithe | | 1 | ARGENTINA | 1 | al foxes promise sly | 1 | AMERICA | hs use ironic, even |
物理計劃:
00-00 Screen : rowType = RecordType(ANY *, ANY *0): rowcount = 25.0, cumulative cost = {62.5 rows, 402.5 cpu, 0.0 io, 0.0 network, 88.0 memory}, id = 2432 00-01 ⑤ ProjectAllowDup(*=[$0], *0=[$1]) : rowType = RecordType(ANY *, ANY *0): rowcount = 25.0, cumulative cost = {60.0 rows, 400.0 cpu, 0.0 io, 0.0 network, 88.0 memory}, id = 2431 00-02 ④ Project(T0¦¦*=[$0], T1¦¦*=[$2]) : rowType = RecordType(ANY T0¦¦*, ANY T1¦¦*): rowcount = 25.0, cumulative cost = {60.0 rows, 400.0 cpu, 0.0 io, 0.0 network, 88.0 memory}, id = 2430 00-03 ③ HashJoin(condition=[=($1, $3)], joinType=[inner]) : rowType = RecordType(ANY T0¦¦*, ANY N_REGIONKEY, ANY T1¦¦*, ANY R_REGIONKEY): rowcount = 25.0, cumulative cost = {60.0 rows, 400.0 cpu, 0.0 io, 0.0 network, 88.0 memory}, id = 2429 00-05 ① Project(T0¦¦*=[$0], N_REGIONKEY=[$1]) : rowType = RecordType(ANY T0¦¦*, ANY N_REGIONKEY): rowcount = 25.0, cumulative cost = {25.0 rows, 50.0 cpu, 0.0 io, 0.0 network, 0.0 memory}, id = 2426 00-07 Scan(groupscan=[ParquetGroupScan [entries=[ReadEntryWithPath [path=file:/usr/install/apache-drill-1.1.0/sample-data/nation.parquet]], selectionRoot=file:/usr/install/apache-drill-1.1.0/sample-data/nation.parquet, numFiles=1, columns=[`*`]]]) : rowType = (DrillRecordRow[*, N_REGIONKEY]): rowcount = 25.0, cumulative cost = {25.0 rows, 50.0 cpu, 0.0 io, 0.0 network, 0.0 memory}, id = 2425 00-04 ② Project(T1¦¦*=[$0], R_REGIONKEY=[$1]) : rowType = RecordType(ANY T1¦¦*, ANY R_REGIONKEY): rowcount = 5.0, cumulative cost = {5.0 rows, 10.0 cpu, 0.0 io, 0.0 network, 0.0 memory}, id = 2428 00-06 Scan(groupscan=[ParquetGroupScan [entries=[ReadEntryWithPath [path=file:/usr/install/apache-drill-1.1.0/sample-data/region.parquet]], selectionRoot=file:/usr/install/apache-drill-1.1.0/sample-data/region.parquet, numFiles=1, columns=[`*`]]]) : rowType = (DrillRecordRow[*, R_REGIONKEY]): rowcount = 5.0, cumulative cost = {5.0 rows, 10.0 cpu, 0.0 io, 0.0 network, 0.0 memory}, id = 2427
對應的可視化圖:
物理計劃中的$0, $1...這些數字表明的是as後的變量,若是是join有可能列名相同,因此也要添加project重命名防止名稱衝突:
① select T0.* as $0, T0.N_REGIONKEY as $1 from nations T0 ② select T1.* as $0, T1.R_REGIONKEY as $1 from regions T1 ③ select T0.$0 as $0, T0.$1 as $1, T1.$0 as $2, T1.$1 as $3 from (select T0.$0 as $0, T0.$1 as $1 from nations) T0 join (select T1.$0 as $2, T1.$1 as $3 from regions) T1 on T0.$1 = T1.$3 ④ select $0 as $0,$2 as $1 from ( select T0.$0 as $0, T0.$1 as $1, T1.$0 as $2, T1.$1 as $3 from (select T0.$0 as $0, T0.$1 as $1 from nations) T0 join (select T1.$0 as $2, T1.$1 as $3 from regions) T1 on T0.$1 = T1.$3 ) ⑤ select $0 as *, $1 as *0 from( select $0 as $0,$2 as $1 from ( select T0.$0 as $0, T0.$1 as $1, T1.$0 as $2, T1.$1 as $3 from (select T0.$0 as $0, T0.$1 as $1 from nations) T0 join (select T1.$0 as $2, T1.$1 as $3 from regions) T1 on T0.$1 = T1.$3 ) ) select *,*0 from ...
上面的StarColumn規則有點複雜, 咱們看下Join列衝突的規則. 對應上面的③JOIN. 將全部的列都重命名了($0,$1,$2,$3, 而後以$1,$3進行join).
/* * 1.) * Join might cause naming conflicts from its left and right child. * In such case, we have to insert Project to rename the conflicting names. */ phyRelNode = JoinPrelRenameVisitor.insertRenameProject(phyRelNode);
根據註釋中說的join有left或者right child. 注意child這個詞的含義. join做爲根, 而left和right表分別是根的左右子節點.
爲了防止名稱衝突, 添加project, 這樣就和上面咱們看到的可視化Plan圖是一一對應的了.
那麼思考下: 這裏的join插入的Project是在①和②,仍是④??
我以爲是在④這裏, 由於①和②已經在上面第一個轉換規則StarColumnConverter中運用過了.
insert操做讓傳入的phyRelNode節點調用它的accept方法, 並接收JoinPrelRenameVisitor實例對象.
public class JoinPrelRenameVisitor extends BasePrelVisitor<Prel, Void, RuntimeException>{ private static JoinPrelRenameVisitor INSTANCE = new JoinPrelRenameVisitor(); public static Prel insertRenameProject(Prel prel){ return prel.accept(INSTANCE, null); }
這裏的Prel經過層層的規則嵌套, 最終返回的仍是一個Prel, 也就是說,每次運用一個規則,都要把當前最新值傳進來. Prel也實現了DrillRelNode接口.
DrillRelNode再結合上Visitor, 有種層層嵌套的感受.首先註冊操做符的規則,從而構成一張圖,最後根據DAG圖訪問每一個操做符的時候,再運用上規則.
假設上面JoinPrelRenameVisitor的insertRenameProject的Prel是JoinPrel
public abstract class JoinPrel extends DrillJoinRelBase implements Prel{ public <T, X, E extends Throwable> T accept(PrelVisitor<T, X, E> logicalVisitor, X value) throws E { return logicalVisitor.visitJoin(this, value); } public Iterator<Prel> iterator() { return PrelUtil.iter(getLeft(), getRight()); }
accept()的參數logicalVisitor顯然就是JoinPrelRenameVisitor了. this是當前對象即JoinPrel.
那麼就要調用JoinPrelRenameVisitor的visitJoin方法. 你看又回到Visitor來了.
public class JoinPrelRenameVisitor extends BasePrelVisitor<Prel, Void, RuntimeException>{ public Prel visitJoin(JoinPrel prel, Void value) throws RuntimeException { List<RelNode> children = Lists.newArrayList(); for(Prel child : prel){ child = child.accept(this, null); children.add(child); } final int leftCount = children.get(0).getRowType().getFieldCount(); List<RelNode> reNamedChildren = Lists.newArrayList(); RelNode left = prel.getJoinInput(0, children.get(0)); RelNode right = prel.getJoinInput(leftCount, children.get(1)); reNamedChildren.add(left); reNamedChildren.add(right); return (Prel) prel.copy(prel.getTraitSet(), reNamedChildren); } }
JoinPrel是個迭代器, 所以用for-loop方式能夠遍歷它的節點: 即參與join的left和right表(實現了iterator方法).
JoinPrel的getJoinInput方法參數是offset和RelNode. offset表示join以後列的索引(兩張表join後的全部列).
假設咱們用兩張同樣的表進行join,能夠看到相同的列, 右邊的表會被重命名:
select * from dfs.`/usr/install/apache-drill-1.1.0/sample-data/region.parquet` region1 join dfs.`/usr/install/apache-drill-1.1.0/sample-data/region.parquet` regions on region1.R_REGIONKEY = regions.R_REGIONKEY; +--------------+--------------+-----------------------+---------------+--------------+-----------------------+ | R_REGIONKEY | R_NAME | R_COMMENT | R_REGIONKEY0 | R_NAME0 | R_COMMENT0 | +--------------+--------------+-----------------------+---------------+--------------+-----------------------+ | 0 | AFRICA | lar deposits. blithe | 0 | AFRICA | lar deposits. blithe |
分別調用兩次getJoinInput,傳入不一樣的offset和input, 這兩個結果必定是不一樣的.
// Check to make sure that the fields of the inputs are the same as the output field names. If not, insert a project renaming them. public RelNode getJoinInput(int offset, RelNode input) { final List<String> fields = getRowType().getFieldNames(); final List<String> inputFields = input.getRowType().getFieldNames(); final List<String> outputFields = fields.subList(offset, offset + inputFields.size()); if (!outputFields.equals(inputFields)) { // Ensure that input field names are the same as output field names. // If there are duplicate field names on left and right, fields will get lost. // In such case, we need insert a rename Project on top of the input. return rename(input, input.getRowType().getFieldList(), outputFields); } else { return input; } }
上面的處理不知道什麼狀況下會進入if部分. 假設有兩張表都是A,B,C三列.
left表不可能有重複的列名, right表相對於left而言,三個列都是重複的. 調用getJoinInput(3, rightNode){}
inputFields=[A,B,C], fields=[A,B,C,A,B,C]. outputFields=[A,B,C],不是相等的嗎??
看下相同表的join的可視化樹, 對比一下就知道了, 在00-04中加了Project:
00-00 Screen : rowType = RecordType(ANY *, ANY *0): rowcount = 5.0, cumulative cost = {20.5 rows, 120.5 cpu, 0.0 io, 0.0 network, 88.0 memory}, id = 1299 00-01 ProjectAllowDup(*=[$0], *0=[$1]) : rowType = RecordType(ANY *, ANY *0): rowcount = 5.0, cumulative cost = {20.0 rows, 120.0 cpu, 0.0 io, 0.0 network, 88.0 memory}, id = 1298 00-02 Project(T0¦¦*=[$0], T1¦¦*=[$2]) : rowType = RecordType(ANY T0¦¦*, ANY T1¦¦*): rowcount = 5.0, cumulative cost = {20.0 rows, 120.0 cpu, 0.0 io, 0.0 network, 88.0 memory}, id = 1297 00-03 HashJoin(condition=[=($1, $3)], joinType=[inner]) : rowType = RecordType(ANY T0¦¦*, ANY R_REGIONKEY, ANY T1¦¦*, ANY R_REGIONKEY0): rowcount = 5.0, cumulative cost = {20.0 rows, 120.0 cpu, 0.0 io, 0.0 network, 88.0 memory}, id = 1296 00-04 Project(T1¦¦*=[$0], R_REGIONKEY0=[$1]) : rowType = RecordType(ANY T1¦¦*, ANY R_REGIONKEY0): rowcount = 5.0, cumulative cost = {5.0 rows, 10.0 cpu, 0.0 io, 0.0 network, 0.0 memory}, id = 1295 00-06 Project(T1¦¦*=[$0], R_REGIONKEY=[$1]) : rowType = RecordType(ANY T1¦¦*, ANY R_REGIONKEY): rowcount = 5.0, cumulative cost = {5.0 rows, 10.0 cpu, 0.0 io, 0.0 network, 0.0 memory}, id = 1294 00-08 Scan(groupscan=[ParquetGroupScan [entries=[ReadEntryWithPath [path=file:/usr/install/apache-drill-1.1.0/sample-data/region.parquet]], selectionRoot=file:/usr/install/apache-drill-1.1.0/sample-data/region.parquet, numFiles=1, columns=[`*`]]]) : rowType = (DrillRecordRow[*, R_REGIONKEY]): rowcount = 5.0, cumulative cost = {5.0 rows, 10.0 cpu, 0.0 io, 0.0 network, 0.0 memory}, id = 1293 00-05 Project(T0¦¦*=[$0], R_REGIONKEY=[$1]) : rowType = RecordType(ANY T0¦¦*, ANY R_REGIONKEY): rowcount = 5.0, cumulative cost = {5.0 rows, 10.0 cpu, 0.0 io, 0.0 network, 0.0 memory}, id = 1292 00-07 Scan(groupscan=[ParquetGroupScan [entries=[ReadEntryWithPath [path=file:/usr/install/apache-drill-1.1.0/sample-data/region.parquet]], selectionRoot=file:/usr/install/apache-drill-1.1.0/sample-data/region.parquet, numFiles=1, columns=[`*`]]]) : rowType = (DrillRecordRow[*, R_REGIONKEY]): rowcount = 5.0, cumulative cost = {5.0 rows, 10.0 cpu, 0.0 io, 0.0 network, 0.0 memory}, id = 1291