在查詢一節中說過: 有了物理計劃,全部的統計信息,最優端點,Foreman中的Parallellizer會將物理計劃轉換爲多個fragments
將物理計劃轉換爲fragments是在Foreman中, 就是在runPhysicalPlan的第一步getQueryWorkUnit中node
private QueryWorkUnit getQueryWorkUnit(final PhysicalPlan plan) throws ExecutionSetupException { final PhysicalOperator rootOperator = plan.getSortedOperators(false).iterator().next(); // 物理計劃的根節點物理操做符爲Screen final Fragment rootFragment = rootOperator.accept(MakeFragmentsVisitor.INSTANCE, null); // ①遞歸調用樹入口,從上到下調用每一個操做符的accept方法 final SimpleParallelizer parallelizer = new SimpleParallelizer(queryContext); // 並行, 用來設置fragments的並行度 final QueryWorkUnit queryWorkUnit = parallelizer.getFragments( queryContext.getOptions().getOptionList(), queryContext.getCurrentEndpoint(), queryId, queryContext.getActiveEndpoints(), drillbitContext.getPlanReader(), rootFragment, initiatingClient.getSession(), queryContext.getQueryContextInfo()); return queryWorkUnit; }
在debug一節,咱們知道DrillRel drel, Prel prel, PhysicalOperator pop, PhysicalPlan plan各個變量的值.
上面經過PhysicalPlan得到的rootOperator就是PhysicalOperator pop根節點, 即rootOperator=Screen
.apache
rootOperator.accept後返回的是rootFragment. 而後經過root又開始遞歸遍歷了(跟debug一節rootLOP.accept同樣).
下面是MakeFragmentsVisitor訪問器當訪問到的是一個操做符時, 首先將當前操做符加入到Fragment中, 而後遍歷其孩子節點.json
public Fragment visitOp(PhysicalOperator op, Fragment value) throws ForemanSetupException{ value = ensureBuilder(value); value.addOperator(op); for (PhysicalOperator child : op) { child.accept(this, value); } return value; }
下面是Screen->..->Scan的遞歸調用示例樹:設計模式
rootOperator.accept(v,null)-->Screen.accept(visitor, null) |--MakeFragmentsVisitor.visitOp |--value=new Fragment |--value.addOperator(Screen) |--for child : [EasyGroupScan]--child.accept(visitor, value) 咱們省略了中間的一些節點,假設Screen的下一個節點是Scan |--EasyGroupScan.accept(visitor, value) |--MakeFragmentsVisitor.visitOp |--value.addOperator(EasyGroupScan) |--EasyGroupScan has no child |--return value
遞歸調用樹有點相似於設計模式中的訪問者模式(Visitor Pattern).session
返回值Fragment rootFragment = rootOperator.accept(MakeFragmentsVisitor.INSTANCE, null);
由於參數是null, 因此第一次調用rootOperator.accept的時候就建立了新的Fragment. 接下來child.accept,
由於把value : Fragment傳入, 因此不會再構造Fragment了(除非出現Exchange的時候纔會new一個新的Fragment).
注意在每次遞歸調用child.accept以前, 把當前的物理操做符加入到Fragment中.
也就是說物理計劃組成的DAG圖的每一個物理操做符都會加入到Fragment中, 這個Fragment並不表明DAG圖中的某個節點(好比根節點), 而是包含了全部的操做符.
這和前面的DrillRel, Prel, PhysicalOperator, PhysicalPlan不同:它們的值是DAG圖的第一個節點,而後經過input或者child嵌套包含其餘節點.app
固然Fragment中要有根物理操做符, 這樣把根拎出來, 其餘全部的操做符也都能找到了.分佈式
public class Fragment implements Iterable<Fragment.ExchangeFragmentPair> { private PhysicalOperator root; private ExchangeFragmentPair sendingExchange; private final List<ExchangeFragmentPair> receivingExchangePairs = Lists.newLinkedList(); // Set the given operator as root operator of this fragment. If root operator is already set, then this method call is a no-op. public void addOperator(PhysicalOperator o) { if (root == null) { root = o; } } public void addSendExchange(Exchange e, Fragment sendingToFragment) throws ForemanSetupException{ if (sendingExchange != null) { throw new ForemanSetupException("Fragment was trying to add a second SendExchange. "); } addOperator(e); sendingExchange = new ExchangeFragmentPair(e, sendingToFragment); } public void addReceiveExchange(Exchange e, Fragment fragment) { this.receivingExchangePairs.add(new ExchangeFragmentPair(e, fragment)); } }
MakeFragmentsVisitor若是訪問到的是一個Exchange操做符, Exchange會和Fragment組成一個ExchangeFragmentPair.ide
public Fragment visitExchange(Exchange exchange, Fragment value) throws ForemanSetupException { Fragment next = getNextBuilder(); // 老是會新建一個新的Fragment:next value.addReceiveExchange(exchange, next); // 將Exchange操做符和新的Fragment組成一個ExchangeFragmentPair, 添加到原來Fragment的list中 next.addSendExchange(exchange, value); // Exchange操做符和原來的Fragment也會組成一個ExchangeFragmentPair,不過用於發送 exchange.getChild().accept(this, next); // Exchange下面的孩子節點, 用的Fragment是新的那一個 return value; // 可是咱們最後返回的, 仍然是第一次新建的那一個Fragment }
調用這個方法時, value必定不爲空: The simple fragmenter was called without a FragmentBuilder value.
This will only happen if the initial call to SimpleFragmenter is by a Exchange node.
This should never happen since an Exchange node should never be the root node of a plan
一個Exchange節點永遠不能是一個計劃的根節點. Exchange前是一個Major Fragment, Exchange後也是一個Major Fragment.函數
至於爲何先是Receiver,而後是Sender, 咱們先看下官網中的概念,以及舉個帶有Exchange的例子:oop
http://drill.apache.org/docs/drill-query-execution/
A parallelizer in the Foreman transforms the physical plan into multiple phases, called major and minor fragments.
These fragments create a multi-level execution tree that rewrites the query and executes it in
parallel against the configured data sources, sending the results back to the client or application.並行化會將物理計劃分紅多個階段. 何時須要並行? 任務是能夠分解的時候, 任務之間沒有關聯, 好比Hadoop的MapReduce就是可並行化的.
這些階段叫作major或者minor fragmens. 它們組成了一個多層的執行樹, 重寫查詢, 而且可以並行地在數據源上執行.以傳統DAG圖的方式, 只有前面的節點處理完後,後面的節點纔會繼續運行. 而用並行化的方式,每一個節點運行完一部分數據,後面的節點就能夠接着這些數據進行計算.
Drill separates major fragments by an exchange operator. An exchange is a change in data location and/or parallelization of the physical plan.
An exchange is composed of a sender and a receiver to allow data to move between nodesDrill用交換操做符來分隔major fragments. 一個交換操做符是數據位置的交換, 或者物理計劃並行度的變動.
一個交換操做符由一個發送器和一個接收器組成, 以運行數據在不一樣節點之間進行移動.Major fragments do not actually perform any query tasks. Each major fragment is divided into one or multiple minor fragments
that actually execute the operations required to complete the query and return results back to the client.Major Fragments不執行任何的查詢任務, 每一個major fragments會分紅一個或多個minor fragments.
Minor Fragments會執行完成這個查詢須要的操做, 而且返回結果給客戶端.
那麼何時會產生Major Fragments: 讀取的是HDFS上的文件時(count(*)無條件查詢即便是hdfs文件也沒有exchange).
- select count(*) from hdfs.`/user/hive/warehouse/test.db/koudai` + select count(*) from hdfs.`/user/hive/warehouse/test.db/koudai` where sequence_id like '%12%' + select t.event_result_map.map from hdfs.`/user/hive/warehouse/test.db/koudai` t where t.sequence_id='1433300095954-25887486' + select t.sequence_id from hdfs.`/user/hive/warehouse/test.db/koudai` t limit 1 + select * from hdfs.`/user/hive/warehouse/test.db/koudai` limit 1 - select * from cp.`employee.json` limit 1 - select * from dfs.`/Users/zhengqh/data/hive_alltypes.parquet` limit 1
爲何無條件的count()查詢沒有exchange. 觀察Operator,發現count查詢底層的scan是DIRECT_SUB_SCAN,
而parquet的其餘查詢(帶條件的count,where,limit,)用的是PARQUET_ROW_GROUP_SCAN. 後面的cp和本地查詢則沒有Exchange.
下圖中白色的UnionExchange分隔了兩個Major Fragments. totalFragments的個數指的是全部的minor framgnet.
對比DAG圖和Operator Profiles. 能夠看到Exchange對應的Operator是Receiver和Sender.
注意左側的UnionExchange在右側中被分紅了Receiver和Sender.
第一個Major framgnet在UnionExchange的上方, 即Screen, 只有一個mior Fragment.
第二個Major framgnet包括了多個操做符, 有2個minor Fragments.
以上圖中的Screen->UnionExchange->Project->...->Scan的順序分析下UnionExchange:
在訪問根操做符visitOp(Screen,null)時, MakeFragmentsVisitor會新建一個Fragment, 設置Fragment的root=Screen.
接着由於Screen的Child是UninonExchange,調用的是MakeFragmentsVisitor的visitExchange(UnionExchange,Fragment value).
第二個參數Fragment value是訪問Screen時建立的第一個Fragment, 第一個Fragment value必定不爲空, 由於不容許根節點是Exchange.
由於Exchange是用來分隔Major Fragment的, 因此在Exchange以前和以後都要有一個Major Fragment,以前就是第一個Fragment了.
重點看下visitExchange的下面的邏輯, 理清到底第一個Fragment value和下一個Fragment next分別添加的是什麼組件.
value.addReceiveExchange(exchange, next); // first add Receiver from next next.addSendExchange(exchange, value); // next add Sender to first
注意咱們的DAG圖從上到下,第一個節點是Screen,最下面的節點是Scan, 因此上面的是做爲接收數據的一方,下面的是發送數據的一方.
而first Fragment即上面的value, 是在上方的,那麼就是做爲接收方Receiver的.
並且最後返回給客戶端的也是上層的,客戶端只須要知道和Screen相關的那個Fragment,即返回值是value.
第一個Fragment value添加一個Receive Exchange, 只是把新建的ExchangeFragmentPair加入到value的List receivingExchangePairs中.
而第二個Fragment next咱們已經知道了在visitExchange時建立了一個新的Fragment. 對於每個全新的Fragment, 都要設置root節點操做符.
實際上觀察前面的DAG圖和Operator Profiles,你會發現UnionExchange的UNORDERED_RECEIVER的編號是00-xx-01,所以是屬於第一個Fragment的.
而SINGLE_SENDER的編號是01-xx-00, 則是屬於第二個Fragment. 所以第二個Fragment的root就是SINGLE_SENDER.
上面兩個value和next互相添加對方, 其實是爲了在上下文中都能找到對方. 不然若是隻是value添加了next. 則在next時就沒法找到value的.
Fragment | root | sendingExchange | receivingExchangePairs | Explain | Role |
---|---|---|---|---|---|
value | Screen | × | ExchangeFragmentPair(e,next) | value的接收者是next | Reciever |
next | SINGLE_SENDER | ExchangeFragmentPair(e,value) | × | next要發送給value | Sender |
看看UnionExchange的幾個相關方法:
public class UnionExchange extends AbstractExchange{ // Ephemeral info for generating execution fragments. 這幾個變量是AbstractExchange中的,爲了閱讀的方便放在這裏 protected int senderMajorFragmentId; protected int receiverMajorFragmentId; protected List<DrillbitEndpoint> senderLocations; protected List<DrillbitEndpoint> receiverLocations; public void setupSenders(List<DrillbitEndpoint> senderLocations) { this.senderLocations = senderLocations; } protected void setupReceivers(List<DrillbitEndpoint> receiverLocations) throws PhysicalOperatorSetupException { Preconditions.checkArgument(receiverLocations.size() == 1, "Union Exchange only supports a single receiver endpoint."); super.setupReceivers(receiverLocations); } public Sender getSender(int minorFragmentId, PhysicalOperator child) { return new SingleSender(receiverMajorFragmentId, child, receiverLocations.get(0)); } public Receiver getReceiver(int minorFragmentId) { return new UnorderedReceiver(senderMajorFragmentId, PhysicalOperatorUtil.getIndexOrderedEndpoints(senderLocations), false); } }
上面getSender和getReceiver的第一個參數是minorFragmentId. new一個Sender或者Receiver都要知道對方的MajorFragmentId.
好比SingleSender要知道Receiver的MajorFragmentId,以及接收者的一個Location. UnorderReceiver要知道Sender的MajorId,以及全部發送者的Locations.
SingleSender: Sender that pushes all data to a single destination node.
發送者會發送全部的數據到一個目標節點,那麼固然要指定這個目標節點了.
這個目標節點應該是跟上表中的sendingExchange變量相關的, 能夠看到這一行的root=SINGLE_SENDER. 固然目標節點指的應該是Drillbit級別,而不是Operator了.
public class SingleSender extends AbstractSender { /** * Create a SingleSender which sends data to fragment identified by given MajorFragmentId and MinorFragmentId, and running at given endpoint * * @param oppositeMajorFragmentId MajorFragmentId of the receiver fragment. * @param oppositeMinorFragmentId MinorFragmentId of the receiver fragment. * @param child Child operator * @param destination Drillbit endpoint where the receiver fragment is running. */ @JsonCreator public SingleSender(@JsonProperty("receiver-major-fragment") int oppositeMajorFragmentId, @JsonProperty("receiver-minor-fragment") int oppositeMinorFragmentId, @JsonProperty("child") PhysicalOperator child, @JsonProperty("destination") DrillbitEndpoint destination) { super(oppositeMajorFragmentId, child, Collections.singletonList(new MinorFragmentEndpoint(oppositeMinorFragmentId, destination))); }
MinorFragmentEndpoint represents fragment's MinorFragmentId and Drillbit endpoint to which the fragment is assigned for execution.
DrillbitEndpoint是運行'Drillbit'服務的節點(集羣的計算節點). MinorFragmentEndpoint是fragment要執行在哪一個Drillbit節點,更細粒度(Container?).
對於Reciever而言, 它能夠有多個Sender.
public class UnorderedReceiver extends AbstractReceiver{ @JsonCreator public UnorderedReceiver(@JsonProperty("sender-major-fragment") int oppositeMajorFragmentId, @JsonProperty("senders") List<MinorFragmentEndpoint> senders, @JsonProperty("spooling") boolean spooling) { super(oppositeMajorFragmentId, senders, spooling); }
下面解釋下FragmentLeaf這個接口下都有哪些實現類.
FragmentLeaf是一個Fragment的葉子節點, Fragment和DAG圖的葉子節點是有點差異呢的. 由於一個DAG圖會包括多個Fragment.
1.接收者是一個Fragment的葉子, 由於Exchange會分隔Fragment. Fragment的上方是接收者,是上面一個Fragment的葉子節點.
2.整個DAG圖的葉子節點一般是Scan,是組成DAG最下面的那個Fragment的葉子節點.
Fragment的Root是一個Fragment的根節點
1.發送者是一個Fragment的根節點, 即Exchange分隔的下面一個Fragment的根節點,而Fragment下發是一個Sender.
2.整個DAG圖的根節點一般是Screen.
在運行物理計劃的第一句是根據物理計劃獲得QueryWorkUnit:
final QueryWorkUnit work = getQueryWorkUnit(plan); final List<PlanFragment> planFragments = work.getFragments(); final PlanFragment rootPlanFragment = work.getRootFragment();
查詢的工做單元包含了三個組件, 對於本地而言的根Fragment和根操做符. 這裏的本地指的是Foreman.
public class QueryWorkUnit { private final PlanFragment rootFragment; // for local private final FragmentRoot rootOperator; // for local private final List<PlanFragment> fragments; // Major+Minor Fragments
而PlanFragment既是Plan又是Fragment. 前面咱們知道Fragment由Exchange分紅了多個Major Fragment.
在遍歷物理操做符時, 會將物理操做符加入到對應的Fragment中.
必須上Protobuf這道菜了. 對於理解不一樣組件之間的關係是有做用的. 其實前面RPC部分也是用到了protobuf.
PlanFragment的protobuf定義在BitControl.proto中. FragmentHandle在ExecutionProtos.proto中
message PlanFragment { optional FragmentHandle handle = 1; optional float network_cost = 4; optional float cpu_cost = 5; optional float disk_cost = 6; optional float memory_cost = 7; optional string fragment_json = 8; optional bool leaf_fragment = 9; optional DrillbitEndpoint assignment = 10; optional DrillbitEndpoint foreman = 11; optional int64 mem_initial = 12 [default = 20000000]; // 20 megs optional int64 mem_max = 13 [default = 2000000000]; // 20 gigs optional exec.shared.UserCredentials credentials = 14; optional string options_json = 15; optional QueryContextInformation context = 16; repeated Collector collector = 17; } message FragmentHandle { optional exec.shared.QueryId query_id = 1; optional int32 major_fragment_id = 2; optional int32 minor_fragment_id = 3; }
在日誌一節, 其中Root Fragment(rootFragment對象)打印的信息以下, 能夠看到正好對應了上面的PlanFragment的協議格式:
handle { → FragmentHandle query_id { part1: 3053657859282349058 part2: -8863752500417580646 } major_fragment_id: 0 minor_fragment_id: 0 } fragment_json: "{ → fragment_json ... }" leaf_fragment: true assignment { → DrillbitEndpoint address: "localhost" user_port: 31010 control_port: 31011 data_port: 31012 } foreman { → DrillbitEndpoint address: "localhost" user_port: 31010 control_port: 31011 data_port: 31012 } context { query_start_time: 1436498522273 time_zone: 299 default_schema_name: "" }
計算fragments要根據查詢的上下文QueryContext,以及DrillbitContext.
queryContext.getCurrentEndpoint()表示Foreman節點, queryContext.getActiveEndpoints()表示參與計算的其餘節點.
咱們重點看下獲取活動的Endpoints是怎麼作得, 由於Drill是分佈式的計算引擎,添加計算節點可以讓計算能力提升.
那麼它是怎麼實現的, 經過ZK的Watcher機制, 若是有節點增長進來,獲取可用的計算節點時就是動態實時的.
queryContext.getActiveEndpoints() | |-->drillbitContext.getBits() | |-->ClusterCoordinator.getAvailableEndpoints() | |<--ZKClusterCoordinator.endpoints | |<--updateEndpoints()
咱們知道了經過ZK實時獲取動態的計算節點, 可是任務是怎麼分配到計算節點上的. 咱們能不能自定義轉發規則??
由SimpleParallelizer得到Fragments, 參數activeEndpoints就是上面從上下文中獲得的集羣中可用的Drillbit計算節點.
rootFragment是rootOperator返回的Fragment, 物理計劃的rootOperator通常是Screen. 這裏的Fragment指的是由Exchange分割的Major Fragment.
該方法根據提供的Fragment樹生成分配好的Fragments集合, 就是PlanFragment Protobuf對象集合, 會被分配到單獨的節點.
返回的Fragments(注意是複數形式), 則是Major+Minor級別的Fragment了. 而Minor Fragments能夠有多個, 是能夠並行處理的.
什麼是Fragment樹? 就是文檔中提到的將物理計劃轉成多個Fragments,這些Fragments組成了一棵樹.
final QueryWorkUnit queryWorkUnit = parallelizer.getFragments( queryContext.getOptions().getOptionList(), queryContext.getCurrentEndpoint(), queryId, queryContext.getActiveEndpoints(), drillbitContext.getPlanReader(), rootFragment, initiatingClient.getSession(), queryContext.getQueryContextInfo()); /** * Generate a set of assigned fragments based on the provided fragment tree. Do not allow parallelization stages to go beyond the global max width. * @param foremanNode The driving/foreman node for this query. (this node) 本次查詢的驅動節點/Foreman節點. * @param activeEndpoints The list of endpoints to consider for inclusion in planning this query. 要計劃本次查詢, 須要考慮包括在內的計算節點 * @param reader Tool used to read JSON plans 讀取JSON格式的物理計劃 * @param rootFragment The root node of the PhysicalPlan that we will be parallelizing. 物理計劃的根節點(對應的Fragment), 會並行處理. * @return The list of generated PlanFragment protobuf objects to be assigned out to the individual nodes. */ public QueryWorkUnit getFragments(OptionList options, DrillbitEndpoint foremanNode, QueryId queryId, Collection<DrillbitEndpoint> activeEndpoints, PhysicalPlanReader reader, Fragment rootFragment, UserSession session, QueryContextInformation queryContextInfo) { final PlanningSet planningSet = new PlanningSet(); initFragmentWrappers(rootFragment, planningSet); final Set<Wrapper> leafFragments = constructFragmentDependencyGraph(planningSet); // Start parallelizing from leaf fragments for (Wrapper wrapper : leafFragments) { parallelizeFragment(wrapper, planningSet, activeEndpoints); } return generateWorkUnit(options, foremanNode, queryId, reader, rootFragment, planningSet, session, queryContextInfo); }
咱們知道rootFragment只是表明了DAG圖最頂上的那個Major Fragment, 在下面的迭代中,要給DAG圖中的每一個Major Fragment都添加到planningSet中.
// For every fragment, create a Wrapper in PlanningSet. public void initFragmentWrappers(Fragment rootFragment, PlanningSet planningSet) { planningSet.get(rootFragment); for(ExchangeFragmentPair fragmentPair : rootFragment) { initFragmentWrappers(fragmentPair.getNode(), planningSet); } }
下面咱們再給出Fragment的迭代方法iterator. for循環迭代的是receivingExchangePairs.
前面分析過上一個Fragment做爲接收者接收下一個Fragment發送的數據: value.addReceiveExchange(exchange, next);
那麼ExchangeFragmentPair的Node就是next, 即下一個Major Fragment, 而後繼續遞歸下去.
public class Fragment implements Iterable<Fragment.ExchangeFragmentPair> { private final List<ExchangeFragmentPair> receivingExchangePairs = Lists.newLinkedList(); public void addReceiveExchange(Exchange e, Fragment fragment) { this.receivingExchangePairs.add(new ExchangeFragmentPair(e, fragment)); } public Iterator<ExchangeFragmentPair> iterator() { return this.receivingExchangePairs.iterator(); }
既然用到了ReceiveExchange, 下面立刻就用到了SendingExchange. 添加是在: next.addSendExchange(exchange, value);
下面用的不是ExchangeFragmentPair的Fragment了, 而是Fragment的Exchange. 這裏要作到的是設置MajorFragmentId.
由於由Exchange分割的Major Fragment, 它們的ID分別是00,01,02等等.
public class PlanningSet implements Iterable<Wrapper> { private final Map<Fragment, Wrapper> fragmentMap = Maps.newHashMap(); private int majorFragmentIdIndex = 0; public Wrapper get(Fragment node) { Wrapper wrapper = fragmentMap.get(node); if (wrapper == null) { int majorFragmentId = 0; // If there is a sending exchange, we need to number other than zero. if (node.getSendingExchange() != null) { // assign the upper 16 bits as the major fragment id. majorFragmentId = node.getSendingExchange().getChild().getOperatorId() >> 16; // if they are not assigned, that means we mostly likely have an externally generated plan. in this case, come up with a major fragmentid. if (majorFragmentId == 0) majorFragmentId = majorFragmentIdIndex; } wrapper = new Wrapper(node, majorFragmentId); // Wrapper由Fragment和major編號組成 fragmentMap.put(node, wrapper); majorFragmentIdIndex++; // 只有調用Major Fragment時, 每遇到新的Major, 索引編號+1 } return wrapper; // planningSet.get並無用返回值作什麼事情. 其實主要是放到Map中, 由迭代器訪問全部的Wrapper. } public Iterator<Wrapper> iterator() { return this.fragmentMap.values().iterator(); }
Wrapper: A wrapping class that allows us to add additional information to each fragment node for planning purposes
它的構造函數建立對象是由PlanningSet指定MajorFragment和MajorFragmentId. 它的其他屬性須要在下面中設置進來.
先來看下Exchange的並行依賴: 發送者和接收者是否相互依賴.
/** * Exchanges are fragment boundaries in physical operator tree. It is divided into two parts. First part is Sender * which becomes part of the sending fragment. Second part is Receiver which becomes part of the fragment that receives the data. * Exchange是物理操做符樹的Fragment邊界. 第一部分Sender,它是發送者Fragment的一部分, 第二部分Reciever是接收者Fragment的一部分. * Assignment dependency describes whether sender fragments depend on receiver fragment's endpoint assignment for * determining its parallelization and endpoint assignment and vice versa. * 分配依賴描述了發送者Fragment是否依賴於接收者Fragment的節點分配任務, 以便於決定並行度和如何分配工做到節點上. 反過來同樣. */ public enum ParallelizationDependency { SENDER_DEPENDS_ON_RECEIVER, // Sending fragment depends on receiving fragment for parallelization RECEIVER_DEPENDS_ON_SENDER, // Receiving fragment depends on sending fragment for parallelization (default value). }
根據PlanningSet構造依賴圖:
final Set<Wrapper> leafFragments = constructFragmentDependencyGraph(planningSet); /** 根據Exchange的親密程序分割兩個fragments, 而且設置fragment的依賴關係. * Based on the affinity of the Exchange that separates two fragments, setup fragment dependencies. * @return Returns a list of leaf fragments in fragment dependency graph. */ private static Set<Wrapper> constructFragmentDependencyGraph(PlanningSet planningSet) { // Set up dependency of fragments based on the affinity of exchange that separates the fragments. for(Wrapper currentFragmentWrapper : planningSet) { // PlanningSet包含了全部的Major Fragment組成的Wrapper,循環每個Wrapper ExchangeFragmentPair sendingExchange = currentFragmentWrapper.getNode().getSendingExchangePair(); //每一個MajorFragment要發送的目標 if (sendingExchange != null) { // SendingExchange不爲空的, 好比next, 而不是DAG圖的第一個Fragment. 由於只有next纔是發送者 ParallelizationDependency dependency = sendingExchange.getExchange().getParallelizationDependency(); // 依賴關係記錄在Exchange中, 而不是Fragment中 Wrapper receivingFragmentWrapper = planningSet.get(sendingExchange.getNode()); // 目標節點, 實際上就是接收者了 // 根據依賴關係, 判斷要加到哪一個Wrapper中, 其實是哪一個Fragment中. 由於Wrapper由MajorFragment組成. if (dependency == ParallelizationDependency.RECEIVER_DEPENDS_ON_SENDER) { // Receiver依賴Sender receivingFragmentWrapper.addFragmentDependency(currentFragmentWrapper); // Receiver的依賴關係圖中有當前Major Fragment } else if (dependency == ParallelizationDependency.SENDER_DEPENDS_ON_RECEIVER) { // Sender依賴Reciever currentFragmentWrapper.addFragmentDependency(receivingFragmentWrapper); // 當前節點恰好是Sender, 因此它依賴了接收者 } } } // 上面的添加Fragment依賴圖, 下面的Wrapper才能夠得到依賴圖, 來判斷是不是葉子節點. // Identify leaf fragments. Leaf fragments are fragments that have no other fragments depending on them for parallelization info. // First assume all fragments are leaf fragments. Go through the fragments one by one and remove the fragment on which the current fragment depends on. final Set<Wrapper> roots = Sets.newHashSet(); for(Wrapper w : planningSet) { roots.add(w); // 全部的Major Fragment } for(Wrapper wrapper : planningSet) { final List<Wrapper> fragmentDependencies = wrapper.getFragmentDependencies(); // 每一個Major Fragment的依賴圖 if (fragmentDependencies != null && fragmentDependencies.size() > 0) for(Wrapper dependency : fragmentDependencies) // 它的全部依賴者 if (roots.contains(dependency)) roots.remove(dependency); // 從roots中移除 } return roots; // 返回值是leaf fragments. }
上面的方法roots返回的是leaf fragments. 在這以前首先對每一個Major Fragments都設置了依賴圖. 而後把非葉子節點從全部的Major中刪除.
葉子節點的定義是: 沒有依賴其餘任何一個節點. 一旦一個節點有依賴某一個節點, 它就不是葉子節點了.
得到葉子Fragment後, 對每個葉子節點進行並行處理. 處理的時候先處理依賴的,而後才處理本身.因此也是遞歸的過程
// Start parallelizing from leaf fragments 從葉子節點開始並行處理 for (Wrapper wrapper : leafFragments) { parallelizeFragment(wrapper, planningSet, activeEndpoints); } // Helper method for parallelizing a given fragment. Dependent fragments are parallelized first before parallelizing the given fragment. private void parallelizeFragment(Wrapper fragmentWrapper, PlanningSet planningSet, Collection<DrillbitEndpoint> activeEndpoints) { // First parallelize fragments on which this fragment depends on. final List<Wrapper> fragmentDependencies = fragmentWrapper.getFragmentDependencies(); if (fragmentDependencies != null && fragmentDependencies.size() > 0) { for(Wrapper dependency : fragmentDependencies) { parallelizeFragment(dependency, planningSet, activeEndpoints); } } Fragment fragment = fragmentWrapper.getNode(); // Step 1: Find stats. Stats include various factors including cost of physical operators, parallelizability of work in physical operator and affinity of physical operator to certain nodes. fragment.getRoot().accept(new StatsCollector(planningSet), fragmentWrapper); // Step 2: Find the parallelization width of fragment List<DrillbitEndpoint> assignedEndpoints = findEndpoints(activeEndpoints, parallelizationInfo.getEndpointAffinityMap(), fragmentWrapper.getWidth()); fragmentWrapper.assignEndpoints(assignedEndpoints); }
找到要分配的DrillBit後,就爲Fragment分配計算節點 . 一個Fragment的Sending只有最多一個,能夠有多個Receiver.
public void assignEndpoints(List<DrillbitEndpoint> assignedEndpoints) { endpoints.addAll(assignedEndpoints); // Set scan and store endpoints. AssignEndpointsToScanAndStore visitor = new AssignEndpointsToScanAndStore(); node.getRoot().accept(visitor, endpoints); // Set the endpoints for this (one at most) sending exchange. if (node.getSendingExchange() != null) { node.getSendingExchange().setupSenders(majorFragmentId, endpoints); } // Set the endpoints for each incoming exchange within this fragment. for (ExchangeFragmentPair e : node.getReceivingExchangePairs()) { e.getExchange().setupReceivers(majorFragmentId, endpoints); } }
最後基於上面的工做, 生成WorkUnit, QueryWorkUnit只是封裝了rootOperator,rootFragment,fragments的對象. 注意下面是個雙層循環,
外層的是對每一個MajorFragment,內層則對每一個MinorFragment. 若是不是根節點,則把建立的PlanFragment加入到fragments中.
PlanFragment一個重要的對象是FragmentHandle,顧名思義是Fragment的處理類, 它只封裝了Major,Minor的FragmentID,以及查詢ID.
private QueryWorkUnit generateWorkUnit(OptionList options, DrillbitEndpoint foremanNode, QueryId queryId, PhysicalPlanReader reader, Fragment rootNode, PlanningSet planningSet, UserSession session, QueryContextInformation queryContextInfo) { List<PlanFragment> fragments = Lists.newArrayList(); PlanFragment rootFragment = null; FragmentRoot rootOperator = null; // now we generate all the individual plan fragments and associated assignments. Note, we need all endpoints // assigned before we can materialize, so we start a new loop here rather than utilizing the previous one. for (Wrapper wrapper : planningSet) { Fragment node = wrapper.getNode(); final PhysicalOperator physicalOperatorRoot = node.getRoot(); boolean isRootNode = rootNode == node; // a fragment is self driven if it doesn't rely on any other exchanges. boolean isLeafFragment = node.getReceivingExchangePairs().size() == 0; // Create a minorFragment for each major fragment. for (int minorFragmentId = 0; minorFragmentId < wrapper.getWidth(); minorFragmentId++) { IndexedFragmentNode iNode = new IndexedFragmentNode(minorFragmentId, wrapper); wrapper.resetAllocation(); PhysicalOperator op = physicalOperatorRoot.accept(Materializer.INSTANCE, iNode); FragmentRoot root = (FragmentRoot) op; FragmentHandle handle = FragmentHandle.newBuilder() // .setMajorFragmentId(wrapper.getMajorFragmentId()) // .setMinorFragmentId(minorFragmentId) // .setQueryId(queryId) // .build(); PlanFragment fragment = PlanFragment.newBuilder() // .setForeman(foremanNode) // .setFragmentJson(reader.writeJson(root)) // .setHandle(handle) // .setAssignment(wrapper.getAssignedEndpoint(minorFragmentId)) // .setLeafFragment(isLeafFragment) // .setContext(queryContextInfo) .setMemInitial(wrapper.getInitialAllocation())// .setMemMax(wrapper.getMaxAllocation()) .setOptionsJson(reader.writeJson(options)) .setCredentials(session.getCredentials()) .addAllCollector(CountRequiredFragments.getCollectors(root)) .build(); if (isRootNode) { logger.debug("Root fragment:\n {}", DrillStringUtils.unescapeJava(fragment.toString())); rootFragment = fragment; rootOperator = root; } else { logger.debug("Remote fragment:\n {}", DrillStringUtils.unescapeJava(fragment.toString())); fragments.add(fragment); } } } return new QueryWorkUnit(rootOperator, rootFragment, fragments); }
如今主線回到Foreman的runPhysicalPlan, 在提交Fragments執行前, 先添加了兩個監聽器到DrillbitContext對應的WorkBus和集羣協調器.
而後設置RootFragment和非RootFragment. 設置根節點須要QueryWorkUnit的rootFragment和rootOperator. 非根節點只須要planFragments.
private void runPhysicalPlan(final PhysicalPlan plan) throws ExecutionSetupException { final QueryWorkUnit work = getQueryWorkUnit(plan); final List<PlanFragment> planFragments = work.getFragments(); final PlanFragment rootPlanFragment = work.getRootFragment(); drillbitContext.getWorkBus().addFragmentStatusListener(queryId, queryManager.getFragmentStatusListener()); drillbitContext.getClusterCoordinator().addDrillbitStatusListener(queryManager.getDrillbitStatusListener()); logger.debug("Submitting fragments to run."); // set up the root fragment first so we'll have incoming buffers available. setupRootFragment(rootPlanFragment, work.getRootOperator()); setupNonRootFragments(planFragments); drillbitContext.getAllocator().resetFragmentLimits(); // TODO a global effect for this query?!? moveToState(QueryState.RUNNING, null); logger.debug("Fragments running."); }