修改logback.xml的日誌級別爲debugjava
<logger name="org.apache.drill" additivity="false"> <level value="debug" /> <appender-ref ref="FILE" /> </logger> <logger name="query.logger" additivity="false"> <level value="debug" /> <appender-ref ref="QUERY" /> </logger> <root> <level value="debug" /> <appender-ref ref="STDOUT" /> </root>
使用單機模式,而不是集羣模式. 啓動drill-embeddednode
除了在上面的drill-embedded觀察輸出, 還要觀察log目錄下的sqlline.log文件sql
2015-07-10 11:11:41,636 [main] DEBUG o.apache.drill.exec.server.Drillbit - Construction started. 2015-07-10 11:11:42,481 [main] INFO o.apache.drill.exec.server.Drillbit - Construction completed (845 ms). 2015-07-10 11:11:42,481 [main] DEBUG o.apache.drill.exec.server.Drillbit - Startup begun. 2015-07-10 11:11:42,481 [main] DEBUG o.a.d.e.c.l.LocalClusterCoordinator - Local Cluster Coordinator started. 2015-07-10 11:11:42,607 [main] DEBUG o.a.drill.exec.rpc.user.UserServer - Server of type UserServer started on port 31010. 2015-07-10 11:11:42,650 [main] DEBUG o.a.d.exec.rpc.control.ControlServer - Server of type ControlServer started on port 31011. 2015-07-10 11:11:42,688 [main] DEBUG o.a.drill.exec.rpc.data.DataServer - Server of type DataServer started on port 31012. 2015-07-10 11:11:42,924 [main] DEBUG o.a.drill.common.util.PathScanner - Classpath scanning took 60ms 2015-07-10 11:11:42,924 [main] DEBUG o.a.d.e.p.base.PhysicalOperatorUtil - Adding Physical Operator sub types: ................. 2015-07-10 11:11:43,047 [main] DEBUG org.apache.drill.common.JSONOptions - Creating Deserializer. 2015-07-10 11:11:43,146 [main] DEBUG o.a.d.e.p.i.OperatorCreatorRegistry - Adding Operator Creator map:.............. 2015-07-10 11:11:43,385 [main] DEBUG o.a.d.e.e.f.FunctionImplementationRegistry - Generating function registry. 2015-07-10 11:11:48,643 [main] DEBUG o.a.d.e.s.h.HBaseStoragePluginConfig - Initializing HBase StoragePlugin configuration with zookeeper quorum 'localhost', port '2181'. 2015-07-10 11:11:48,977 [main] DEBUG o.a.d.e.c.l.LocalClusterCoordinator - Endpoint registered address: "localhost" user_port: 31010 control_port: 31011 data_port: 31012 . 2015-07-10 11:11:51,700 [main] INFO o.apache.drill.exec.server.Drillbit - Startup completed (9218 ms). 2015-07-10 11:11:51,741 [main] DEBUG o.a.drill.exec.client.DrillClient - Connecting to server localhost:31010
能夠看到命令行執行drill-embedded, 會鏈接到本地的Drill Server上.express
在sqlline上執行一條SQL命令, 能夠看到最終調用的是FragmentExecutor線程的run方法:apache
0: jdbc:drill:zk=local> select count(*) from cp.`employee.json`; 11:22:02.300 [2a60c5a4-e01a-ac02-84fd-a01023a1319a:foreman] DEBUG o.a.h.security.UserGroupInformation - PrivilegedAction as:zhengqh (auth:SIMPLE) from:org.apache.drill.exec.util.ImpersonationUtil.createFileSystem(ImpersonationUtil.java:141) 11:22:02.390 [2a60c5a4-e01a-ac02-84fd-a01023a1319a:frag:0:0] DEBUG o.a.h.security.UserGroupInformation - PrivilegedAction as:zhengqh (auth:SIMPLE) from:org.apache.drill.exec.work.fragment.FragmentExecutor.run(FragmentExecutor.java:255) +---------+ | EXPR$0 | +---------+ | 1155 | +---------+ 1 row selected (0.221 seconds)
sqlline.log日誌咱們一段一段地分析編程
首先註冊查詢語句, 即在sqlline輸入的sql語句, 會分配一個query-id. 訪問http://localhost:8047/profiles能夠找到這個Query Job.json
2015-07-10 11:22:02,257 [main] DEBUG o.a.d.j.impl.DrillStatementRegistry - Adding to open-statements registry: org.apache.drill.jdbc.impl.DrillStatementImpl@4eb2bb3d 2015-07-10 11:22:02,258 [main] DEBUG o.a.d.j.i.DrillResultSetImpl$ResultsListener - [#3] Query listener created. 2015-07-10 11:22:02,259 [UserServer-1] DEBUG o.a.drill.exec.rpc.user.UserServer - Received query to run. Returning query handle. 2015-07-10 11:22:02,273 [UserServer-1] DEBUG o.a.d.exec.memory.TopLevelAllocator - New child allocator with initial reservation 1048576 2015-07-10 11:22:02,274 [UserServer-1] DEBUG o.a.drill.exec.rpc.user.UserServer - Sending response with Sender 575856913 2015-07-10 11:22:02,275 [Client-1] DEBUG o.a.d.j.i.DrillResultSetImpl$ResultsListener - [#3] Received query ID: 2a60c5a4-e01a-ac02-84fd-a01023a1319a. 2015-07-10 11:22:02,276 [Client-1] DEBUG o.a.d.e.rpc.user.QueryResultHandler - Received QueryId 2a60c5a4-e01a-ac02-84fd-a01023a1319a successfully. Adding results listener org.apache.drill.jdbc.impl.DrillResultSetImpl$ResultsListener@6f3634b1.
使用了Optiq生成Logical邏輯計劃, 查看SQL語句對應的邏輯計劃/物理計劃是從下到上的, 好比下面的TableScan->Project->Aggregate
對於SELECT COUNT(COLUMN) FOME TABLE
, 實際上邏輯計劃的順序是:
FROM TABLE(TableScan掃描) -> SELECT COLUMN(Project映射) -> COUNT(Aggregate聚合計算)app
2015-07-10 11:22:02,315 [2a60c5a4-e01a-ac02-84fd-a01023a1319a:foreman] DEBUG o.a.d.e.p.s.h.DefaultSqlHandler - Optiq Logical : LogicalAggregate(group=[{}], EXPR$0=[COUNT()]): rowcount = 10.0, cumulative cost = {211.25 rows, 201.0 cpu, 0.0 io, 0.0 network, 0.0 memory}, id = 211 LogicalProject($f0=[0]): rowcount = 100.0, cumulative cost = {200.0 rows, 201.0 cpu, 0.0 io, 0.0 network, 0.0 memory}, id = 209 EnumerableTableScan(table=[[cp, employee.json]]): rowcount = 100.0, cumulative cost = {100.0 rows, 101.0 cpu, 0.0 io, 0.0 network, 0.0 memory}, id = 205
Optiq的邏輯計劃最終會造成Drill的邏輯計劃,再到Drill的物理計劃: Drill Logial -> Drill Physical的過程ide
2015-07-10 11:22:02,320 [2a60c5a4-e01a-ac02-84fd-a01023a1319a:foreman] DEBUG o.a.d.e.s.schedule.BlockMapBuilder - Took 0 ms to build endpoint map 2015-07-10 11:22:02,323 [2a60c5a4-e01a-ac02-84fd-a01023a1319a:foreman] DEBUG o.a.d.e.s.schedule.BlockMapBuilder - FileWork group (classpath:/employee.json,0) max bytes 474631 2015-07-10 11:22:02,323 [2a60c5a4-e01a-ac02-84fd-a01023a1319a:foreman] DEBUG o.a.d.e.s.schedule.BlockMapBuilder - Took 2 ms to set endpoint bytes 2015-07-10 11:22:02,323 [2a60c5a4-e01a-ac02-84fd-a01023a1319a:foreman] INFO o.a.d.e.s.schedule.BlockMapBuilder - Get block maps: Executed 1 out of 1 using 1 threads. Time: 2ms total, 2.240000ms avg, 2ms max. 2015-07-10 11:22:02,323 [2a60c5a4-e01a-ac02-84fd-a01023a1319a:foreman] INFO o.a.d.e.s.schedule.BlockMapBuilder - Get block maps: Executed 1 out of 1 using 1 threads. Earliest start: 1.000000 μs, Latest start: 1.000000 μs, Average start: 1.000000 μs . 2015-07-10 11:22:02,324 [2a60c5a4-e01a-ac02-84fd-a01023a1319a:foreman] DEBUG o.a.d.e.s.schedule.AffinityCreator - Work: [File: classpath:/employee.json start: 0 length: 474630] Endpoint: localhost Bytes: 474630 2015-07-10 11:22:02,324 [2a60c5a4-e01a-ac02-84fd-a01023a1319a:foreman] DEBUG o.a.d.e.s.schedule.AffinityCreator - Endpoint localhost has affinity 1.0 2015-07-10 11:22:02,324 [2a60c5a4-e01a-ac02-84fd-a01023a1319a:foreman] DEBUG o.a.d.e.s.schedule.AffinityCreator - Took 0 ms to get operator affinity 2015-07-10 11:22:02,329 [2a60c5a4-e01a-ac02-84fd-a01023a1319a:foreman] DEBUG o.a.d.e.p.s.h.DefaultSqlHandler - VolCalciteRel : 2015-07-10 11:22:02,331 [2a60c5a4-e01a-ac02-84fd-a01023a1319a:foreman] DEBUG o.a.d.e.p.s.h.DefaultSqlHandler - HepCalciteRel : 2015-07-10 11:22:02,333 [2a60c5a4-e01a-ac02-84fd-a01023a1319a:foreman] DEBUG o.a.d.e.p.s.h.DefaultSqlHandler - Drill Logical : DrillScreenRel: rowcount = 1.0, cumulative cost = {927.1 rows, 1853.1 cpu, 0.0 io, 0.0 network, 0.0 memory}, id = 239 DrillAggregateRel(group=[{}], EXPR$0=[COUNT()]): rowcount = 1.0, cumulative cost = {927.0 rows, 1853.0 cpu, 0.0 io, 0.0 network, 0.0 memory}, id = 236 DrillProjectRel($f0=[0]): rowcount = 463.0, cumulative cost = {926.0 rows, 1852.0 cpu, 0.0 io, 0.0 network, 0.0 memory}, id = 234 DrillScanRel(table=[[cp, employee.json]], groupscan=[EasyGroupScan [selectionRoot=classpath:/employee.json, numFiles=1, columns=[`*`], files=[classpath:/employee.json]]]): rowcount = 463.0, cumulative cost = {463.0 rows, 0.0 cpu, 0.0 io, 0.0 network, 0.0 memory}, id = 222 2015-07-10 11:22:02,368 [2a60c5a4-e01a-ac02-84fd-a01023a1319a:foreman] DEBUG o.a.d.e.p.s.h.DefaultSqlHandler - Drill Physical : 00-00 Screen : rowType = RecordType(BIGINT EXPR$0): rowcount = 1.0, cumulative cost = {1389.1 rows, 7408.1 cpu, 0.0 io, 0.0 network, 0.0 memory}, id = 317 00-01 StreamAgg(group=[{}], EXPR$0=[COUNT()]) : rowType = RecordType(BIGINT EXPR$0): rowcount = 1.0, cumulative cost = {1389.0 rows, 7408.0 cpu, 0.0 io, 0.0 network, 0.0 memory}, id = 316 00-02 Project($f0=[0]) : rowType = RecordType(INTEGER $f0): rowcount = 463.0, cumulative cost = {926.0 rows, 1852.0 cpu, 0.0 io, 0.0 network, 0.0 memory}, id = 315 00-03 Scan(groupscan=[EasyGroupScan [selectionRoot=classpath:/employee.json, numFiles=1, columns=[`*`], files=[classpath:/employee.json]]]) : rowType = RecordType(): rowcount = 463.0, cumulative cost = {463.0 rows, 0.0 cpu, 0.0 io, 0.0 network, 0.0 memory}, id = 314
訪問http://localhost:8047/profiles/2a60c5a4-e01a-ac02-84fd-a01023a1319a查看這個做業的物理計劃
能夠看到物理計劃從下到上的順序是: Scan->Project->StreamAgg. 由於物理計劃其實是從邏輯計劃計算出來的.post
而後會輸出詳細的json格式的計劃. graph有幾個字段pop表明操做類型,@id是編號,child是VisualizedPlan樹從上到下第幾層.
graph域的fs-scan表明掃描文件系統,掃描全部列*; project映射字段,$f0實際上就是columns中的第一個字段;
streaming-aggregate的計算表達式count(1), 最後經過screen輸出
咱們先看一下Web頁面的可視化計劃數, 先來個比較直觀的映象
以Scan 00-03爲例,00是major id,03是fs-scan的@id=3
下面不一樣的pop,對應的metadata也不同, 好比project映射須要表達式,由於選擇一個列,是能夠在列上作計算的
2015-07-10 11:22:02,370 [2a60c5a4-e01a-ac02-84fd-a01023a1319a:foreman] DEBUG o.a.d.e.p.s.h.DefaultSqlHandler - Drill Plan : { "head" : { "version" : 1, "generator" : { "type" : "DefaultSqlHandler", "info" : "" }, "type" : "APACHE_DRILL_PHYSICAL", "options" : [ ], "queue" : 0, "resultMode" : "EXEC" }, "graph" : [ { "pop" : "fs-scan", "@id" : 3, "userName" : "zhengqh", "files" : [ "classpath:/employee.json" ], ... "columns" : [ "`*`" ], "selectionRoot" : "classpath:/employee.json", }, { "pop" : "project", "@id" : 2, "exprs" : [ { "ref" : "`$f0`", "expr" : "0" } ], "child" : 3 }, { "pop" : "streaming-aggregate", "@id" : 1, "child" : 2, "keys" : [ ], "exprs" : [ { "ref" : "`EXPR$0`", "expr" : "count(1) " } ] }, { "pop" : "screen", "@id" : 0, "child" : 1 } ] }
Drill的執行引擎會將邏輯計劃組成一個Fragment樹. 下面是Root Fragment.
對照WebUI, 這個Query在本地測試時,只生成了一個Fragment.
Root Fragment的major和minor id都是0. 下面的fragment_json和上面graph不一樣的是它是嵌套的.
爲何可視化的Plan對應的graph是扁平的,而Root Fragment是嵌套的?
能夠這麼理解: 若是圖的結構是嵌套的,那麼就要在Screen這個組件裏畫上streaming-aggregate
並在streaming-aggregate裏再畫上project,以此類推,就不叫圖了,圖是一個DAG有向無環圖.
而樹若是是扁平的,則只能像上面的圖同樣一直下去,沒有分支. 使用嵌套,就有了分支的概念了.
2015-07-10 11:22:02,372 [2a60c5a4-e01a-ac02-84fd-a01023a1319a:foreman] DEBUG o.a.d.e.s.schedule.AssignmentCreator - Took 0 ms to assign 1 work units to 1 fragments 2015-07-10 11:22:02,378 [2a60c5a4-e01a-ac02-84fd-a01023a1319a:foreman] DEBUG o.a.d.e.p.f.SimpleParallelizer - Root fragment: handle { query_id { part1: 3053657859282349058 part2: -8863752500417580646 } major_fragment_id: 0 minor_fragment_id: 0 } fragment_json: "{ "pop" : "screen", "@id" : 0, "child" : { "pop" : "streaming-aggregate", "@id" : 1, "child" : { "pop" : "project", "@id" : 2, "exprs" : [ { "ref" : "`$f0`", "expr" : "0" } ], "child" : { "pop" : "fs-sub-scan", "@id" : 3, "userName" : "zhengqh", "files" : [ { "start" : 0, "length" : 474630, "path" : "classpath:/employee.json" } ], "columns" : [ "`*`" ], "selectionRoot" : "classpath:/employee.json" }, "initialAllocation" : 1000000, "maxAllocation" : 10000000000, "cost" : 463.0 }, "keys" : [ ], "exprs" : [ { "ref" : "`EXPR$0`", "expr" : "count(1) " } ] } }" leaf_fragment: true assignment { address: "localhost" user_port: 31010 control_port: 31011 data_port: 31012 } foreman { address: "localhost" user_port: 31010 control_port: 31011 data_port: 31012 } mem_initial: 3000000 mem_max: 30000000000 credentials { user_name: "anonymous" } options_json: "[ ]" context { query_start_time: 1436498522273 time_zone: 299 default_schema_name: "" }
上面和fragment_json同級的還有foreman,表示接受客戶端查詢做業的節點, 由於是本地模式,因此是localhost.
初始化內存mem_initial和最大內存mem_max在FragmentContext中.
Fragment提交到Foreman後, 查詢開始運行, Foreman的狀態從PENDING到RUNNING, FragmentExecutor從AWAITING_ALLOCATION到RUNNING.
這裏有個比較重要的概念是經過ImplCreator建立的RecordBatch Tree.
上面咱們已經有了Root Fragment造成的Tree的概念. 這裏批記錄還有樹.
什麼是RecordBatch,顧名思義是批記錄. 那爲何又有樹的概念?
2015-07-10 11:22:02,378 [2a60c5a4-e01a-ac02-84fd-a01023a1319a:foreman] DEBUG o.a.d.exec.rpc.control.WorkEventBus - Adding fragment status listener for queryId 2a60c5a4-e01a-ac02-84fd-a01023a1319a. 2015-07-10 11:22:02,378 [2a60c5a4-e01a-ac02-84fd-a01023a1319a:foreman] DEBUG o.a.drill.exec.work.foreman.Foreman - Submitting fragments to run. 2015-07-10 11:22:02,378 [2a60c5a4-e01a-ac02-84fd-a01023a1319a:foreman] DEBUG o.a.drill.exec.ops.FragmentContext - Getting initial memory allocation of 3000000 2015-07-10 11:22:02,378 [2a60c5a4-e01a-ac02-84fd-a01023a1319a:foreman] DEBUG o.a.drill.exec.ops.FragmentContext - Fragment max allocation: 30000000000 2015-07-10 11:22:02,378 [2a60c5a4-e01a-ac02-84fd-a01023a1319a:foreman] DEBUG o.a.d.exec.memory.TopLevelAllocator - New child allocator with initial reservation 3000000 2015-07-10 11:22:02,379 [2a60c5a4-e01a-ac02-84fd-a01023a1319a:foreman] DEBUG o.a.d.e.work.batch.IncomingBuffers - Came up with a list of 0 required fragments. Fragments {} 2015-07-10 11:22:02,380 [2a60c5a4-e01a-ac02-84fd-a01023a1319a:foreman] INFO o.a.drill.exec.work.foreman.Foreman - State change requested. PENDING --> RUNNING 2015-07-10 11:22:02,381 [2a60c5a4-e01a-ac02-84fd-a01023a1319a:foreman] DEBUG o.a.drill.exec.work.foreman.Foreman - Fragments running. 2015-07-10 11:22:02,381 [2a60c5a4-e01a-ac02-84fd-a01023a1319a:frag:0:0] DEBUG o.a.d.exec.memory.BufferAllocator - New child allocator with initial reservation 1000000 2015-07-10 11:22:02,387 [2a60c5a4-e01a-ac02-84fd-a01023a1319a:frag:0:0] DEBUG o.a.d.exec.memory.BufferAllocator - New child allocator with initial reservation 1000000 2015-07-10 11:22:02,387 [2a60c5a4-e01a-ac02-84fd-a01023a1319a:frag:0:0] DEBUG o.a.d.exec.memory.BufferAllocator - New child allocator with initial reservation 1000000 2015-07-10 11:22:02,388 [2a60c5a4-e01a-ac02-84fd-a01023a1319a:frag:0:0] DEBUG o.a.d.exec.memory.BufferAllocator - New child allocator with initial reservation 1000000 2015-07-10 11:22:02,388 [2a60c5a4-e01a-ac02-84fd-a01023a1319a:frag:0:0] DEBUG o.a.d.exec.physical.impl.ImplCreator - Took 7 ms to create RecordBatch tree 2015-07-10 11:22:02,388 [2a60c5a4-e01a-ac02-84fd-a01023a1319a:frag:0:0] INFO o.a.d.e.w.fragment.FragmentExecutor - 2a60c5a4-e01a-ac02-84fd-a01023a1319a:0:0: State change requested from AWAITING_ALLOCATION --> RUNNING for 2015-07-10 11:22:02,388 [2a60c5a4-e01a-ac02-84fd-a01023a1319a:frag:0:0] INFO o.a.d.e.w.f.AbstractStatusReporter - State changed for 2a60c5a4-e01a-ac02-84fd-a01023a1319a:0:0. New state: RUNNING
Fragment的狀態會被QueryManager管理, 其中operator_profile是操做算子的選項, 包括了一些字段input_profile, 操做算子id, 操做類型等等.
什麼是profile, 其實WEB頁面http://localhost:8047/profiles就是Drill查詢做業運行時的profile收集頁面.
包括了Query Profile, Fragment Profiles,Operator Profiles,Full JSON Profile.
2015-07-10 11:22:02,389 [2a60c5a4-e01a-ac02-84fd-a01023a1319a:frag:0:0] DEBUG o.a.d.e.w.f.NonRootStatusReporter - Sending status change message message to remote node: profile { state: RUNNING minor_fragment_id: 0 operator_profile { input_profile { records: 0 batches: 0 schemas: 0 } operator_id: 3 operator_type: 29 setup_nanos: 0 process_nanos: 4651000 peak_local_memory_allocated: 0 wait_nanos: 3000 } ... 2015-07-10 11:22:02,390 [2a60c5a4-e01a-ac02-84fd-a01023a1319a:frag:0:0] DEBUG o.a.d.e.w.fragment.FragmentExecutor - Starting fragment 0:0 on localhost:31010 2015-07-10 11:22:02,392 [BitServer-4] DEBUG o.a.d.exec.work.foreman.QueryManager - New fragment status was provided to QueryManager of profile {
上面提到的RecordBatch在下面有幾個實現類: ProjectRecordBatch, StreamingAggBatch.
2015-07-10 11:22:02,392 [BitServer-4] DEBUG o.a.d.exec.rpc.control.ControlServer - Sending response with Sender 762133699 2015-07-10 11:22:02,408 [2a60c5a4-e01a-ac02-84fd-a01023a1319a:frag:0:0] DEBUG o.a.d.e.p.i.p.ProjectRecordBatch - Added eval for project expression. 2015-07-10 11:22:02,410 [2a60c5a4-e01a-ac02-84fd-a01023a1319a:frag:0:0] DEBUG o.a.d.e.p.i.a.StreamingAggBatch - Creating new aggregator. 2015-07-10 11:22:02,413 [2a60c5a4-e01a-ac02-84fd-a01023a1319a:frag:0:0] DEBUG o.a.d.e.p.i.a.StreamingAggBatch - Next outcome of OK_NEW_SCHEMA 2015-07-10 11:22:02,413 [2a60c5a4-e01a-ac02-84fd-a01023a1319a:frag:0:0] DEBUG o.a.d.e.p.i.a.StreamingAggBatch - Creating new aggregator. +++++++++++++batch1+++++++++++++ 2015-07-10 11:22:02,414 [Client-1] DEBUG o.a.d.e.rpc.user.QueryResultHandler - batchArrived: queryId = part1: 3053657859282349058 part2: -8863752500417580646 2015-07-10 11:22:02,415 [Client-1] DEBUG o.a.d.j.i.DrillResultSetImpl$ResultsListener - [#3] Received query data batch #1: QueryResultBatch [header=query_id { 2015-07-10 11:22:02,415 [Client-1] DEBUG o.a.drill.exec.rpc.user.UserClient - Sending response with Sender 955795882 2015-07-10 11:22:02,416 [main] DEBUG o.a.d.j.i.DrillResultSetImpl$ResultsListener - [#3] Dequeued query data batch #1: QueryResultBatch [header=query_id {
上面的batchArrived表示批記錄到來, 那麼接下去就是處理到來的數據了:
對於batch, 老是先Received query data batch
, 而後Sending response with Sender
,
最後Dequeued query data batch
. 很顯然query data batch會在隊列中進進出出.
+++++++++++++batch2+++++++++++++ 2015-07-10 11:22:02,419 [2a60c5a4-e01a-ac02-84fd-a01023a1319a:frag:0:0] DEBUG o.a.d.e.p.i.a.StreamingAggBatch - Aggregator response RETURN_OUTCOME, records 1 2015-07-10 11:22:02,419 [2a60c5a4-e01a-ac02-84fd-a01023a1319a:frag:0:0] DEBUG o.a.d.e.p.i.a.StreamingAggBatch - Aggregator response CLEANUP_AND_RETURN, records 1 2015-07-10 11:22:02,421 [Client-1] DEBUG o.a.d.e.rpc.user.QueryResultHandler - batchArrived: queryId = part1: 3053657859282349058 part2: -8863752500417580646 2015-07-10 11:22:02,421 [Client-1] DEBUG o.a.d.j.i.DrillResultSetImpl$ResultsListener - [#3] Received query data batch #2: QueryResultBatch [header=query_id { 2015-07-10 11:22:02,421 [Client-1] DEBUG o.a.drill.exec.rpc.user.UserClient - Sending response with Sender 1000578767 2015-07-10 11:22:02,422 [main] DEBUG o.a.d.j.i.DrillResultSetImpl$ResultsListener - [#3] Dequeued query data batch #2: QueryResultBatch [header=query_id {
計算完成, 關閉上下文, FragmentExecutor的狀態從RUNNING到FINISHED. 一樣也會打印profile.
這裏咱們終於看到了1155這個Query計算出來的結果了.
2015-07-10 11:22:02,423 [2a60c5a4-e01a-ac02-84fd-a01023a1319a:frag:0:0] DEBUG o.a.d.exec.ops.OperatorContextImpl - Closing context for org.apache.drill.exec.store.dfs.easy.EasySubScan 2015-07-10 11:22:02,423 [2a60c5a4-e01a-ac02-84fd-a01023a1319a:frag:0:0] DEBUG o.a.d.exec.ops.OperatorContextImpl - Closing context for org.apache.drill.exec.physical.config.Project 2015-07-10 11:22:02,423 [2a60c5a4-e01a-ac02-84fd-a01023a1319a:frag:0:0] DEBUG o.a.d.exec.ops.OperatorContextImpl - Closing context for org.apache.drill.exec.physical.config.StreamingAggregate 2015-07-10 11:22:02,423 [2a60c5a4-e01a-ac02-84fd-a01023a1319a:frag:0:0] DEBUG o.a.d.exec.ops.OperatorContextImpl - Closing context for org.apache.drill.exec.physical.config.Screen 2015-07-10 11:22:02,423 [2a60c5a4-e01a-ac02-84fd-a01023a1319a:frag:0:0] DEBUG o.apache.drill.exec.memory.Accountor - Fragment 0:0 accountor being closed 2015-07-10 11:22:02,423 [2a60c5a4-e01a-ac02-84fd-a01023a1319a:frag:0:0] INFO o.a.d.e.w.fragment.FragmentExecutor - 2a60c5a4-e01a-ac02-84fd-a01023a1319a:0:0: State change requested from RUNNING --> FINISHED for 2015-07-10 11:22:02,423 [2a60c5a4-e01a-ac02-84fd-a01023a1319a:frag:0:0] INFO o.a.d.e.w.f.AbstractStatusReporter - State changed for 2a60c5a4-e01a-ac02-84fd-a01023a1319a:0:0. New state: FINISHED 2015-07-10 11:22:02,424 [2a60c5a4-e01a-ac02-84fd-a01023a1319a:frag:0:0] DEBUG o.a.d.e.w.f.NonRootStatusReporter - Sending status change message message to remote node: profile { state: FINISHED minor_fragment_id: 0 operator_profile { input_profile { records: 1155 batches: 1 schemas: 1 } operator_id: 3 operator_type: 29 setup_nanos: 0 process_nanos: 22301000 peak_local_memory_allocated: 4608 wait_nanos: 133000 } ...
最後Forman也會關閉, Foreman的狀態從RUNNING到COMPLETED. 打印resultArrived的時候其實結果已經在sqlline上輸出了.
剩下就是一些資源移除,註銷之類的工做了. 其實和最開始的資源申請,註冊是對應的.
2015-07-10 11:22:02,427 [BitServer-4] INFO o.a.drill.exec.work.foreman.Foreman - State change requested. RUNNING --> COMPLETED 2015-07-10 11:22:02,427 [BitServer-4] INFO o.a.drill.exec.work.foreman.Foreman - foreman cleaning up. 2015-07-10 11:22:02,429 [BitServer-4] DEBUG o.a.d.exec.rpc.control.WorkEventBus - Removing fragment status listener for queryId 2a60c5a4-e01a-ac02-84fd-a01023a1319a. 2015-07-10 11:22:02,430 [BitServer-4] DEBUG o.apache.drill.exec.memory.Accountor - Fragment 0:0 accountor being closed 2015-07-10 11:22:02,446 [BitServer-4] DEBUG o.a.d.exec.rpc.control.ControlServer - Sending response with Sender 739019148 2015-07-10 11:22:02,447 [Client-1] DEBUG o.a.d.e.rpc.user.QueryResultHandler - resultArrived: queryState: COMPLETED, queryId = part1: 3053657859282349058 part2: -8863752500417580646 2015-07-10 11:22:02,448 [Client-1] DEBUG o.a.d.j.i.DrillResultSetImpl$ResultsListener - [#3] Received query completion: COMPLETED. 2015-07-10 11:22:02,448 [Client-1] DEBUG o.a.drill.exec.rpc.user.UserClient - Sending response with Sender 264929084 2015-07-10 11:22:02,479 [main] DEBUG o.a.d.j.i.DrillResultSetImpl$ResultsListener - [#3] Query listener closing. 2015-07-10 11:22:02,479 [main] DEBUG o.a.d.j.impl.DrillStatementRegistry - Removing from open-statements registry: org.apache.drill.jdbc.impl.DrillStatementImpl@4eb2bb3d
http://drill.apache.org/docs/drill-plan-syntax/
https://docs.google.com/document/d/1QTL8warUYS2KjldQrGUse7zp8eA72VKtLO...
http://yangyoupeng-cn-fujitsu-com.iteye.com/blog/1971728
在Architecture中咱們見到這張圖了
在DesignDoc中是一張比較粗略的圖
總的來講過程就是: 查詢語句--解析器--邏輯計劃--優化器--物理計劃--執行引擎
關於邏輯計劃比較詳細的文檔也給出了(上面第二個連接,請自行fq).
以Logical Plan Operators的Scan爲例
The Scan operator outputs a stream of records. The "storageengine" argument must refer by name to a storage engine defined in the engines clause of the logical plan. The "selection" argument accepts a JSON object that is used by the data source itself to limit the amount of data actually retrieved. The format and content of this object is specific to the actual input source being used. Examples might include an HBase table name, a MongoDB collection, an HDFS path or a partition of a Hive table. Data sources will use the the selection argument in an implementation-specific way. The provided 「ref」 argument ensures that all records within the scanned source are held in the provided namespace.
{ @id†: < opref >, op: 「scan」,
storageengine: < string >,
selection*: < json >,
ref: < name >
}
Scan操做算子會輸出記錄流. 參數storageengine必須引用邏輯計劃中定義的engine聲明.
selection參數接收JSON對象,會被數據源使用,用於限制接收到的數據的數量.
這個JSON對象的格式和內容和實際的數據源有關.好比HBase的表名,MongoDB的集合,HDFS的路徑,或者Hive表的一個分區.
Talk is cheap, Show me the Code.
在org.apache.drill.common.logical.data
有不少上文提到的操做符好比Scan,Join,Project,Union等.
@JsonTypeName("scan") public class Scan extends SourceOperator { private final String storageEngine; private final JSONOptions selection; @JsonCreator public Scan(@JsonProperty("storageengine") String storageEngine, @JsonProperty("selection") JSONOptions selection) { super(); this.storageEngine = storageEngine; this.selection = selection; } @JsonProperty("storageengine") public String getStorageEngine() { return storageEngine; } public JSONOptions getSelection() { return selection; } @Override public <T, X, E extends Throwable> T accept(LogicalVisitor<T, X, E> logicalVisitor, X value) throws E { return logicalVisitor.visitScan(this, value); } }
LogicalVisitor
是邏輯(操做符)的訪問器. Visitor class designed to traversal of a operator tree. 遍歷一顆操做符樹
Basis for a number of operator manipulations including fragmentation and materialization. 對算子的維護包括分片,序列化
Scan操做比較簡單, 它繼承的是SourceOperator:An operator that produces data without any parents. (zero input operator)
Operator分紅若干類,每個operator都標示了它的類型,目前operator類包括:
0:能夠產生不依賴其餘operator的數據,相似於源數據. 好比Scan,
1:該operator能夠處理一個單獨input source, 好比Project,Order,Limit等
M:能夠處理多個input source數據
K:該operator不會產生輸出。
咱們知道Project包括字段和表達式, 好比count(), 其中是ref引用,count是expr表達式
@JsonTypeName("project") public class Project extends SingleInputOperator { private final NamedExpression[] selections; @JsonPropertyOrder({"ref", "expr"}) public class NamedExpression { private final LogicalExpression expr; private final FieldReference ref;
在org.apache.drill.common.logical這個包下有個比較重要的類LogicalPlan,先來看看Plan的屬性有哪些
public class PlanProperties { public static enum PlanType {APACHE_DRILL_LOGICAL, APACHE_DRILL_PHYSICAL} public PlanType type; public int version; public Generator generator; public ResultMode resultMode; public JSONOptions options; public int queue; public static class Generator { public String type; public String info; public static enum ResultMode { EXEC, LOGICAL, PHYSICAL; } private Generator(@JsonProperty("type") String type, @JsonProperty("info") String info) { this.type = type; this.info = info; } }
對應了前面的Drill Plan中head部分的輸出(雖然前面咱們看到的Drill Plan應該是物理計劃,而不是邏輯計劃,可是head部分是同樣的)
{ "head" : { "version" : 1, "generator" : { "type" : "DefaultSqlHandler", "info" : "" }, "type" : "APACHE_DRILL_PHYSICAL", "options" : [ ], "queue" : 0, "resultMode" : "EXEC" },
邏輯計劃LogicalPlan包含了三個部分: head,storage,query.
http://www.confusedcoders.com/bigdata/apache-drill/understanding-apach...
The query node is the actual query that we want to execute on Drill. The query itself is a collection of operations on the data.
@JsonPropertyOrder({ "head", "storage", "query" }) public class LogicalPlan { static final Logger logger = LoggerFactory.getLogger(LogicalPlan.class); private final PlanProperties properties; private final Map<String, StoragePluginConfig> storageEngineMap; private final Graph<LogicalOperator, SinkOperator, SourceOperator> graph; @JsonCreator public LogicalPlan(@JsonProperty("head") PlanProperties head, @JsonProperty("storage") Map<String, StoragePluginConfig> storageEngineMap, @JsonProperty("query") List<LogicalOperator> operators) { this.storageEngineMap = storageEngineMap != null ? storageEngineMap : new HashMap<String, StoragePluginConfig>(); this.properties = head; this.graph = Graph.newGraph(operators, SinkOperator.class, SourceOperator.class); }
query是邏輯操做符集合, 它們和Sink,Source操做符一塊兒構成了一張邏輯計劃數據流的執行圖graph.
LogicalPlan的構建器的build()會建立LogicalPlan對象. 這個Builder對象提供了邏輯計劃的編程接口.
public class LogicalPlanBuilder { private PlanProperties planProperties; private ImmutableMap.Builder<String, StoragePluginConfig> storageEngines = ImmutableMap.builder(); private ImmutableList.Builder<LogicalOperator> operators = ImmutableList.builder(); public LogicalPlanBuilder addLogicalOperator(LogicalOperator operator) { this.operators.add(operator); return this; } public LogicalPlan build() { return new LogicalPlan(this.planProperties, this.storageEngines.build(), this.operators.build()); } }
在build以前,要構建一個完整的圖,要調用相應的addXXX()方法,由於方法返回this,因此調用者能夠鏈式調用.
build就是構建者模式,一旦調用了build方法,返回的對象就是不可修改的.所以要在build前填充全部的數據.
怎麼知道前面日誌中打印的Drill Plan是物理計劃,而不是邏輯計劃, 首先能夠從調用的類DefaultSqlHandler,搜索Drill Plan
public PhysicalPlan getPlan(SqlNode sqlNode) { final ConvertedRelNode convertedRelNode = validateAndConvert(sqlNode); final RelDataType validatedRowType = convertedRelNode.getValidatedRowType(); final RelNode queryRelNode = convertedRelNode.getConvertedNode(); log("Optiq Logical", queryRelNode, logger); DrillRel drel = convertToDrel(queryRelNode, validatedRowType); log("Drill Logical", drel, logger); Prel prel = convertToPrel(drel); log("Drill Physical", prel, logger); PhysicalOperator pop = convertToPop(prel); PhysicalPlan plan = convertToPlan(pop); log("Drill Plan", plan, logger); return plan; }
1.能夠看到在上面的getPlan方法中, 依次生成的計劃是: Optiq邏輯計劃-->Drill邏輯計劃-->Drill物理計劃
2.物理計劃和邏輯計劃同樣也有不少operator, 都在org.apache.drill.exec.physical
包下
3.Optiq如今變成Apache的Calcite, 入門教程: http://blog.csdn.net/yunlong34574/article/details/46375733
getPlan調用樹是被Foreman線程運行,由Foreman建立的DrillSqlWorker調用執行的.
真正運行物理計劃,仍是在Foreman的runPhysicalPlan中
private void runSQL(final String sql) throws ExecutionSetupException { final DrillSqlWorker sqlWorker = new DrillSqlWorker(queryContext); final Pointer<String> textPlan = new Pointer<>(); final PhysicalPlan plan = sqlWorker.getPlan(sql, textPlan); queryManager.setPlanText(textPlan.value); runPhysicalPlan(plan); }