Apache Drill 源碼分析1--準備工做,邏輯計劃,物理計劃


layout: post

Drill源碼閱讀(1) : 環境準備和查看日誌

準備工做

修改logback.xml的日誌級別爲debugjava

<logger name="org.apache.drill" additivity="false">
    <level value="debug" />
    <appender-ref ref="FILE" />
  </logger>

  <logger name="query.logger" additivity="false">
    <level value="debug" />
    <appender-ref ref="QUERY" />
  </logger>

  <root>
    <level value="debug" />
    <appender-ref ref="STDOUT" />
  </root>

使用單機模式,而不是集羣模式. 啓動drill-embeddednode

除了在上面的drill-embedded觀察輸出, 還要觀察log目錄下的sqlline.log文件sql

2015-07-10 11:11:41,636 [main] DEBUG o.apache.drill.exec.server.Drillbit - Construction started.
2015-07-10 11:11:42,481 [main] INFO  o.apache.drill.exec.server.Drillbit - Construction completed (845 ms).
2015-07-10 11:11:42,481 [main] DEBUG o.apache.drill.exec.server.Drillbit - Startup begun.
2015-07-10 11:11:42,481 [main] DEBUG o.a.d.e.c.l.LocalClusterCoordinator - Local Cluster Coordinator started.
2015-07-10 11:11:42,607 [main] DEBUG o.a.drill.exec.rpc.user.UserServer - Server of type UserServer started on port 31010.
2015-07-10 11:11:42,650 [main] DEBUG o.a.d.exec.rpc.control.ControlServer - Server of type ControlServer started on port 31011.
2015-07-10 11:11:42,688 [main] DEBUG o.a.drill.exec.rpc.data.DataServer - Server of type DataServer started on port 31012.
2015-07-10 11:11:42,924 [main] DEBUG o.a.drill.common.util.PathScanner - Classpath scanning took 60ms
2015-07-10 11:11:42,924 [main] DEBUG o.a.d.e.p.base.PhysicalOperatorUtil - Adding Physical Operator sub types: .................
2015-07-10 11:11:43,047 [main] DEBUG org.apache.drill.common.JSONOptions - Creating Deserializer.
2015-07-10 11:11:43,146 [main] DEBUG o.a.d.e.p.i.OperatorCreatorRegistry - Adding Operator Creator map:..............
2015-07-10 11:11:43,385 [main] DEBUG o.a.d.e.e.f.FunctionImplementationRegistry - Generating function registry.
2015-07-10 11:11:48,643 [main] DEBUG o.a.d.e.s.h.HBaseStoragePluginConfig - Initializing HBase StoragePlugin configuration with zookeeper quorum 'localhost', port '2181'.
2015-07-10 11:11:48,977 [main] DEBUG o.a.d.e.c.l.LocalClusterCoordinator - Endpoint registered address: "localhost"
user_port: 31010
control_port: 31011
data_port: 31012
.
2015-07-10 11:11:51,700 [main] INFO  o.apache.drill.exec.server.Drillbit - Startup completed (9218 ms).
2015-07-10 11:11:51,741 [main] DEBUG o.a.drill.exec.client.DrillClient - Connecting to server localhost:31010

能夠看到命令行執行drill-embedded, 會鏈接到本地的Drill Server上.express

在sqlline上執行一條SQL命令, 能夠看到最終調用的是FragmentExecutor線程的run方法:apache

0: jdbc:drill:zk=local> select count(*) from cp.`employee.json`;
11:22:02.300 [2a60c5a4-e01a-ac02-84fd-a01023a1319a:foreman] DEBUG o.a.h.security.UserGroupInformation - PrivilegedAction as:zhengqh (auth:SIMPLE) from:org.apache.drill.exec.util.ImpersonationUtil.createFileSystem(ImpersonationUtil.java:141)
11:22:02.390 [2a60c5a4-e01a-ac02-84fd-a01023a1319a:frag:0:0] DEBUG o.a.h.security.UserGroupInformation - PrivilegedAction as:zhengqh (auth:SIMPLE) from:org.apache.drill.exec.work.fragment.FragmentExecutor.run(FragmentExecutor.java:255)
+---------+
| EXPR$0  |
+---------+
| 1155    |
+---------+
1 row selected (0.221 seconds)

日誌分析

sqlline.log日誌咱們一段一段地分析編程

首先註冊查詢語句, 即在sqlline輸入的sql語句, 會分配一個query-id. 訪問http://localhost:8047/profiles能夠找到這個Query Job.json

2015-07-10 11:22:02,257 [main] DEBUG o.a.d.j.impl.DrillStatementRegistry - Adding to open-statements registry: org.apache.drill.jdbc.impl.DrillStatementImpl@4eb2bb3d
2015-07-10 11:22:02,258 [main] DEBUG o.a.d.j.i.DrillResultSetImpl$ResultsListener - [#3] Query listener created.
2015-07-10 11:22:02,259 [UserServer-1] DEBUG o.a.drill.exec.rpc.user.UserServer - Received query to run.  Returning query handle.
2015-07-10 11:22:02,273 [UserServer-1] DEBUG o.a.d.exec.memory.TopLevelAllocator - New child allocator with initial reservation 1048576
2015-07-10 11:22:02,274 [UserServer-1] DEBUG o.a.drill.exec.rpc.user.UserServer - Sending response with Sender 575856913
2015-07-10 11:22:02,275 [Client-1] DEBUG o.a.d.j.i.DrillResultSetImpl$ResultsListener - [#3] Received query ID: 2a60c5a4-e01a-ac02-84fd-a01023a1319a.
2015-07-10 11:22:02,276 [Client-1] DEBUG o.a.d.e.rpc.user.QueryResultHandler - Received QueryId 2a60c5a4-e01a-ac02-84fd-a01023a1319a successfully. Adding results listener org.apache.drill.jdbc.impl.DrillResultSetImpl$ResultsListener@6f3634b1.

使用了Optiq生成Logical邏輯計劃, 查看SQL語句對應的邏輯計劃/物理計劃是從下到上的, 好比下面的TableScan->Project->Aggregate
對於SELECT COUNT(COLUMN) FOME TABLE, 實際上邏輯計劃的順序是:
FROM TABLE(TableScan掃描) -> SELECT COLUMN(Project映射) -> COUNT(Aggregate聚合計算)app

2015-07-10 11:22:02,315 [2a60c5a4-e01a-ac02-84fd-a01023a1319a:foreman] DEBUG o.a.d.e.p.s.h.DefaultSqlHandler - Optiq Logical :
LogicalAggregate(group=[{}], EXPR$0=[COUNT()]): rowcount = 10.0, cumulative cost = {211.25 rows, 201.0 cpu, 0.0 io, 0.0 network, 0.0 memory}, id = 211
  LogicalProject($f0=[0]): rowcount = 100.0, cumulative cost = {200.0 rows, 201.0 cpu, 0.0 io, 0.0 network, 0.0 memory}, id = 209
    EnumerableTableScan(table=[[cp, employee.json]]): rowcount = 100.0, cumulative cost = {100.0 rows, 101.0 cpu, 0.0 io, 0.0 network, 0.0 memory}, id = 205

Optiq的邏輯計劃最終會造成Drill的邏輯計劃,再到Drill的物理計劃: Drill Logial -> Drill Physical的過程ide

2015-07-10 11:22:02,320 [2a60c5a4-e01a-ac02-84fd-a01023a1319a:foreman] DEBUG o.a.d.e.s.schedule.BlockMapBuilder - Took 0 ms to build endpoint map
2015-07-10 11:22:02,323 [2a60c5a4-e01a-ac02-84fd-a01023a1319a:foreman] DEBUG o.a.d.e.s.schedule.BlockMapBuilder - FileWork group (classpath:/employee.json,0) max bytes 474631
2015-07-10 11:22:02,323 [2a60c5a4-e01a-ac02-84fd-a01023a1319a:foreman] DEBUG o.a.d.e.s.schedule.BlockMapBuilder - Took 2 ms to set endpoint bytes
2015-07-10 11:22:02,323 [2a60c5a4-e01a-ac02-84fd-a01023a1319a:foreman] INFO  o.a.d.e.s.schedule.BlockMapBuilder - Get block maps: Executed 1 out of 1 using 1 threads. Time: 2ms total, 2.240000ms avg, 2ms max.
2015-07-10 11:22:02,323 [2a60c5a4-e01a-ac02-84fd-a01023a1319a:foreman] INFO  o.a.d.e.s.schedule.BlockMapBuilder - Get block maps: Executed 1 out of 1 using 1 threads. Earliest start: 1.000000 μs, Latest start: 1.000000 μs, Average start: 1.000000 μs .

2015-07-10 11:22:02,324 [2a60c5a4-e01a-ac02-84fd-a01023a1319a:foreman] DEBUG o.a.d.e.s.schedule.AffinityCreator - Work: [File: classpath:/employee.json start: 0 length: 474630] Endpoint: localhost Bytes: 474630
2015-07-10 11:22:02,324 [2a60c5a4-e01a-ac02-84fd-a01023a1319a:foreman] DEBUG o.a.d.e.s.schedule.AffinityCreator - Endpoint localhost has affinity 1.0
2015-07-10 11:22:02,324 [2a60c5a4-e01a-ac02-84fd-a01023a1319a:foreman] DEBUG o.a.d.e.s.schedule.AffinityCreator - Took 0 ms to get operator affinity
2015-07-10 11:22:02,329 [2a60c5a4-e01a-ac02-84fd-a01023a1319a:foreman] DEBUG o.a.d.e.p.s.h.DefaultSqlHandler - VolCalciteRel :
2015-07-10 11:22:02,331 [2a60c5a4-e01a-ac02-84fd-a01023a1319a:foreman] DEBUG o.a.d.e.p.s.h.DefaultSqlHandler - HepCalciteRel :
2015-07-10 11:22:02,333 [2a60c5a4-e01a-ac02-84fd-a01023a1319a:foreman] DEBUG o.a.d.e.p.s.h.DefaultSqlHandler - Drill Logical :
DrillScreenRel: rowcount = 1.0, cumulative cost = {927.1 rows, 1853.1 cpu, 0.0 io, 0.0 network, 0.0 memory}, id = 239
  DrillAggregateRel(group=[{}], EXPR$0=[COUNT()]): rowcount = 1.0, cumulative cost = {927.0 rows, 1853.0 cpu, 0.0 io, 0.0 network, 0.0 memory}, id = 236
    DrillProjectRel($f0=[0]): rowcount = 463.0, cumulative cost = {926.0 rows, 1852.0 cpu, 0.0 io, 0.0 network, 0.0 memory}, id = 234
      DrillScanRel(table=[[cp, employee.json]], groupscan=[EasyGroupScan [selectionRoot=classpath:/employee.json, numFiles=1, columns=[`*`], files=[classpath:/employee.json]]]): rowcount = 463.0, cumulative cost = {463.0 rows, 0.0 cpu, 0.0 io, 0.0 network, 0.0 memory}, id = 222

2015-07-10 11:22:02,368 [2a60c5a4-e01a-ac02-84fd-a01023a1319a:foreman] DEBUG o.a.d.e.p.s.h.DefaultSqlHandler - Drill Physical :
00-00    Screen : rowType = RecordType(BIGINT EXPR$0): rowcount = 1.0, cumulative cost = {1389.1 rows, 7408.1 cpu, 0.0 io, 0.0 network, 0.0 memory}, id = 317
00-01      StreamAgg(group=[{}], EXPR$0=[COUNT()]) : rowType = RecordType(BIGINT EXPR$0): rowcount = 1.0, cumulative cost = {1389.0 rows, 7408.0 cpu, 0.0 io, 0.0 network, 0.0 memory}, id = 316
00-02        Project($f0=[0]) : rowType = RecordType(INTEGER $f0): rowcount = 463.0, cumulative cost = {926.0 rows, 1852.0 cpu, 0.0 io, 0.0 network, 0.0 memory}, id = 315
00-03          Scan(groupscan=[EasyGroupScan [selectionRoot=classpath:/employee.json, numFiles=1, columns=[`*`], files=[classpath:/employee.json]]]) : rowType = RecordType(): rowcount = 463.0, cumulative cost = {463.0 rows, 0.0 cpu, 0.0 io, 0.0 network, 0.0 memory}, id = 314

訪問http://localhost:8047/profiles/2a60c5a4-e01a-ac02-84fd-a01023a1319a查看這個做業的物理計劃
能夠看到物理計劃從下到上的順序是: Scan->Project->StreamAgg. 由於物理計劃其實是從邏輯計劃計算出來的.post

而後會輸出詳細的json格式的計劃. graph有幾個字段pop表明操做類型,@id是編號,child是VisualizedPlan樹從上到下第幾層.
graph域的fs-scan表明掃描文件系統,掃描全部列*; project映射字段,$f0實際上就是columns中的第一個字段;
streaming-aggregate的計算表達式count(1), 最後經過screen輸出

咱們先看一下Web頁面的可視化計劃數, 先來個比較直觀的映象
以Scan 00-03爲例,00是major id,03是fs-scan的@id=3

下面不一樣的pop,對應的metadata也不同, 好比project映射須要表達式,由於選擇一個列,是能夠在列上作計算的

2015-07-10 11:22:02,370 [2a60c5a4-e01a-ac02-84fd-a01023a1319a:foreman] DEBUG o.a.d.e.p.s.h.DefaultSqlHandler - Drill Plan :
{
  "head" : {
    "version" : 1,
    "generator" : {
      "type" : "DefaultSqlHandler",
      "info" : ""
    },
    "type" : "APACHE_DRILL_PHYSICAL",
    "options" : [ ],
    "queue" : 0,
    "resultMode" : "EXEC"
  },
  "graph" : [ {
    "pop" : "fs-scan",
    "@id" : 3,
    "userName" : "zhengqh",
    "files" : [ "classpath:/employee.json" ],
    ...
    "columns" : [ "`*`" ],
    "selectionRoot" : "classpath:/employee.json",
  }, {
    "pop" : "project",
    "@id" : 2,
    "exprs" : [ {
      "ref" : "`$f0`",
      "expr" : "0"
    } ],
    "child" : 3
  }, {
    "pop" : "streaming-aggregate",
    "@id" : 1,
    "child" : 2,
    "keys" : [ ],
    "exprs" : [ {
      "ref" : "`EXPR$0`",
      "expr" : "count(1) "
    } ]
  }, {
    "pop" : "screen",
    "@id" : 0,
    "child" : 1
  } ]
}

Drill的執行引擎會將邏輯計劃組成一個Fragment樹. 下面是Root Fragment.
對照WebUI, 這個Query在本地測試時,只生成了一個Fragment.
Root Fragment的major和minor id都是0. 下面的fragment_json和上面graph不一樣的是它是嵌套的.

爲何可視化的Plan對應的graph是扁平的,而Root Fragment是嵌套的?
能夠這麼理解: 若是圖的結構是嵌套的,那麼就要在Screen這個組件裏畫上streaming-aggregate
並在streaming-aggregate裏再畫上project,以此類推,就不叫圖了,圖是一個DAG有向無環圖.
而樹若是是扁平的,則只能像上面的圖同樣一直下去,沒有分支. 使用嵌套,就有了分支的概念了.

2015-07-10 11:22:02,372 [2a60c5a4-e01a-ac02-84fd-a01023a1319a:foreman] DEBUG o.a.d.e.s.schedule.AssignmentCreator - Took 0 ms to assign 1 work units to 1 fragments
2015-07-10 11:22:02,378 [2a60c5a4-e01a-ac02-84fd-a01023a1319a:foreman] DEBUG o.a.d.e.p.f.SimpleParallelizer - Root fragment:
 handle {
  query_id {
    part1: 3053657859282349058
    part2: -8863752500417580646
  }
  major_fragment_id: 0
  minor_fragment_id: 0
}
fragment_json: "{
  "pop" : "screen",
  "@id" : 0,
  "child" : {
    "pop" : "streaming-aggregate",
    "@id" : 1,
    "child" : {
      "pop" : "project",
      "@id" : 2,
      "exprs" : [ {
        "ref" : "`$f0`",
        "expr" : "0"
      } ],
      "child" : {
        "pop" : "fs-sub-scan",
        "@id" : 3,
        "userName" : "zhengqh",
        "files" : [ {
          "start" : 0,
          "length" : 474630,
          "path" : "classpath:/employee.json"
        } ],
        "columns" : [ "`*`" ],
        "selectionRoot" : "classpath:/employee.json"
      },
      "initialAllocation" : 1000000,
      "maxAllocation" : 10000000000,
      "cost" : 463.0
    },
    "keys" : [ ],
    "exprs" : [ {
      "ref" : "`EXPR$0`",
      "expr" : "count(1) "
    } ]
  }
}"
leaf_fragment: true
assignment {
  address: "localhost"
  user_port: 31010
  control_port: 31011
  data_port: 31012
}
foreman {
  address: "localhost"
  user_port: 31010
  control_port: 31011
  data_port: 31012
}
mem_initial: 3000000
mem_max: 30000000000
credentials {
  user_name: "anonymous"
}
options_json: "[ ]"
context {
  query_start_time: 1436498522273
  time_zone: 299
  default_schema_name: ""
}

上面和fragment_json同級的還有foreman,表示接受客戶端查詢做業的節點, 由於是本地模式,因此是localhost.
初始化內存mem_initial和最大內存mem_max在FragmentContext中.

Fragment提交到Foreman後, 查詢開始運行, Foreman的狀態從PENDING到RUNNING, FragmentExecutor從AWAITING_ALLOCATION到RUNNING.
這裏有個比較重要的概念是經過ImplCreator建立的RecordBatch Tree.

上面咱們已經有了Root Fragment造成的Tree的概念. 這裏批記錄還有樹.
什麼是RecordBatch,顧名思義是批記錄. 那爲何又有樹的概念?

2015-07-10 11:22:02,378 [2a60c5a4-e01a-ac02-84fd-a01023a1319a:foreman] DEBUG o.a.d.exec.rpc.control.WorkEventBus - Adding fragment status listener for queryId 2a60c5a4-e01a-ac02-84fd-a01023a1319a.
2015-07-10 11:22:02,378 [2a60c5a4-e01a-ac02-84fd-a01023a1319a:foreman] DEBUG o.a.drill.exec.work.foreman.Foreman - Submitting fragments to run.
2015-07-10 11:22:02,378 [2a60c5a4-e01a-ac02-84fd-a01023a1319a:foreman] DEBUG o.a.drill.exec.ops.FragmentContext - Getting initial memory allocation of 3000000
2015-07-10 11:22:02,378 [2a60c5a4-e01a-ac02-84fd-a01023a1319a:foreman] DEBUG o.a.drill.exec.ops.FragmentContext - Fragment max allocation: 30000000000
2015-07-10 11:22:02,378 [2a60c5a4-e01a-ac02-84fd-a01023a1319a:foreman] DEBUG o.a.d.exec.memory.TopLevelAllocator - New child allocator with initial reservation 3000000
2015-07-10 11:22:02,379 [2a60c5a4-e01a-ac02-84fd-a01023a1319a:foreman] DEBUG o.a.d.e.work.batch.IncomingBuffers - Came up with a list of 0 required fragments.  Fragments {}
2015-07-10 11:22:02,380 [2a60c5a4-e01a-ac02-84fd-a01023a1319a:foreman] INFO  o.a.drill.exec.work.foreman.Foreman - State change requested.  PENDING --> RUNNING
2015-07-10 11:22:02,381 [2a60c5a4-e01a-ac02-84fd-a01023a1319a:foreman] DEBUG o.a.drill.exec.work.foreman.Foreman - Fragments running.
2015-07-10 11:22:02,381 [2a60c5a4-e01a-ac02-84fd-a01023a1319a:frag:0:0] DEBUG o.a.d.exec.memory.BufferAllocator - New child allocator with initial reservation 1000000
2015-07-10 11:22:02,387 [2a60c5a4-e01a-ac02-84fd-a01023a1319a:frag:0:0] DEBUG o.a.d.exec.memory.BufferAllocator - New child allocator with initial reservation 1000000
2015-07-10 11:22:02,387 [2a60c5a4-e01a-ac02-84fd-a01023a1319a:frag:0:0] DEBUG o.a.d.exec.memory.BufferAllocator - New child allocator with initial reservation 1000000
2015-07-10 11:22:02,388 [2a60c5a4-e01a-ac02-84fd-a01023a1319a:frag:0:0] DEBUG o.a.d.exec.memory.BufferAllocator - New child allocator with initial reservation 1000000
2015-07-10 11:22:02,388 [2a60c5a4-e01a-ac02-84fd-a01023a1319a:frag:0:0] DEBUG o.a.d.exec.physical.impl.ImplCreator - Took 7 ms to create RecordBatch tree
2015-07-10 11:22:02,388 [2a60c5a4-e01a-ac02-84fd-a01023a1319a:frag:0:0] INFO  o.a.d.e.w.fragment.FragmentExecutor - 2a60c5a4-e01a-ac02-84fd-a01023a1319a:0:0: State change requested from AWAITING_ALLOCATION --> RUNNING for
2015-07-10 11:22:02,388 [2a60c5a4-e01a-ac02-84fd-a01023a1319a:frag:0:0] INFO  o.a.d.e.w.f.AbstractStatusReporter - State changed for 2a60c5a4-e01a-ac02-84fd-a01023a1319a:0:0. New state: RUNNING

Fragment的狀態會被QueryManager管理, 其中operator_profile是操做算子的選項, 包括了一些字段input_profile, 操做算子id, 操做類型等等.
什麼是profile, 其實WEB頁面http://localhost:8047/profiles就是Drill查詢做業運行時的profile收集頁面.
包括了Query Profile, Fragment Profiles,Operator Profiles,Full JSON Profile.

2015-07-10 11:22:02,389 [2a60c5a4-e01a-ac02-84fd-a01023a1319a:frag:0:0] DEBUG o.a.d.e.w.f.NonRootStatusReporter - Sending status change message message to remote node: profile {
  state: RUNNING
  minor_fragment_id: 0
  operator_profile {
    input_profile {
      records: 0
      batches: 0
      schemas: 0
    }
    operator_id: 3
    operator_type: 29
    setup_nanos: 0
    process_nanos: 4651000
    peak_local_memory_allocated: 0
    wait_nanos: 3000
  }
  ...
2015-07-10 11:22:02,390 [2a60c5a4-e01a-ac02-84fd-a01023a1319a:frag:0:0] DEBUG o.a.d.e.w.fragment.FragmentExecutor - Starting fragment 0:0 on localhost:31010
2015-07-10 11:22:02,392 [BitServer-4] DEBUG o.a.d.exec.work.foreman.QueryManager - New fragment status was provided to QueryManager of profile {

上面提到的RecordBatch在下面有幾個實現類: ProjectRecordBatch, StreamingAggBatch.

2015-07-10 11:22:02,392 [BitServer-4] DEBUG o.a.d.exec.rpc.control.ControlServer - Sending response with Sender 762133699
2015-07-10 11:22:02,408 [2a60c5a4-e01a-ac02-84fd-a01023a1319a:frag:0:0] DEBUG o.a.d.e.p.i.p.ProjectRecordBatch - Added eval for project expression.
2015-07-10 11:22:02,410 [2a60c5a4-e01a-ac02-84fd-a01023a1319a:frag:0:0] DEBUG o.a.d.e.p.i.a.StreamingAggBatch - Creating new aggregator.
2015-07-10 11:22:02,413 [2a60c5a4-e01a-ac02-84fd-a01023a1319a:frag:0:0] DEBUG o.a.d.e.p.i.a.StreamingAggBatch - Next outcome of OK_NEW_SCHEMA
2015-07-10 11:22:02,413 [2a60c5a4-e01a-ac02-84fd-a01023a1319a:frag:0:0] DEBUG o.a.d.e.p.i.a.StreamingAggBatch - Creating new aggregator.

+++++++++++++batch1+++++++++++++
2015-07-10 11:22:02,414 [Client-1] DEBUG o.a.d.e.rpc.user.QueryResultHandler - batchArrived: queryId = part1: 3053657859282349058 part2: -8863752500417580646
2015-07-10 11:22:02,415 [Client-1] DEBUG o.a.d.j.i.DrillResultSetImpl$ResultsListener - [#3] Received query data batch #1: QueryResultBatch [header=query_id {
2015-07-10 11:22:02,415 [Client-1] DEBUG o.a.drill.exec.rpc.user.UserClient - Sending response with Sender 955795882
2015-07-10 11:22:02,416 [main] DEBUG o.a.d.j.i.DrillResultSetImpl$ResultsListener - [#3] Dequeued query data batch #1: QueryResultBatch [header=query_id {

上面的batchArrived表示批記錄到來, 那麼接下去就是處理到來的數據了:
對於batch, 老是先Received query data batch, 而後Sending response with Sender,
最後Dequeued query data batch. 很顯然query data batch會在隊列中進進出出.

+++++++++++++batch2+++++++++++++
2015-07-10 11:22:02,419 [2a60c5a4-e01a-ac02-84fd-a01023a1319a:frag:0:0] DEBUG o.a.d.e.p.i.a.StreamingAggBatch - Aggregator response RETURN_OUTCOME, records 1
2015-07-10 11:22:02,419 [2a60c5a4-e01a-ac02-84fd-a01023a1319a:frag:0:0] DEBUG o.a.d.e.p.i.a.StreamingAggBatch - Aggregator response CLEANUP_AND_RETURN, records 1

2015-07-10 11:22:02,421 [Client-1] DEBUG o.a.d.e.rpc.user.QueryResultHandler - batchArrived: queryId = part1: 3053657859282349058 part2: -8863752500417580646
2015-07-10 11:22:02,421 [Client-1] DEBUG o.a.d.j.i.DrillResultSetImpl$ResultsListener - [#3] Received query data batch #2: QueryResultBatch [header=query_id {
2015-07-10 11:22:02,421 [Client-1] DEBUG o.a.drill.exec.rpc.user.UserClient - Sending response with Sender 1000578767
2015-07-10 11:22:02,422 [main] DEBUG o.a.d.j.i.DrillResultSetImpl$ResultsListener - [#3] Dequeued query data batch #2: QueryResultBatch [header=query_id {

計算完成, 關閉上下文, FragmentExecutor的狀態從RUNNING到FINISHED. 一樣也會打印profile.
這裏咱們終於看到了1155這個Query計算出來的結果了.

2015-07-10 11:22:02,423 [2a60c5a4-e01a-ac02-84fd-a01023a1319a:frag:0:0] DEBUG o.a.d.exec.ops.OperatorContextImpl - Closing context for org.apache.drill.exec.store.dfs.easy.EasySubScan
2015-07-10 11:22:02,423 [2a60c5a4-e01a-ac02-84fd-a01023a1319a:frag:0:0] DEBUG o.a.d.exec.ops.OperatorContextImpl - Closing context for org.apache.drill.exec.physical.config.Project
2015-07-10 11:22:02,423 [2a60c5a4-e01a-ac02-84fd-a01023a1319a:frag:0:0] DEBUG o.a.d.exec.ops.OperatorContextImpl - Closing context for org.apache.drill.exec.physical.config.StreamingAggregate
2015-07-10 11:22:02,423 [2a60c5a4-e01a-ac02-84fd-a01023a1319a:frag:0:0] DEBUG o.a.d.exec.ops.OperatorContextImpl - Closing context for org.apache.drill.exec.physical.config.Screen
2015-07-10 11:22:02,423 [2a60c5a4-e01a-ac02-84fd-a01023a1319a:frag:0:0] DEBUG o.apache.drill.exec.memory.Accountor - Fragment 0:0  accountor being closed
2015-07-10 11:22:02,423 [2a60c5a4-e01a-ac02-84fd-a01023a1319a:frag:0:0] INFO  o.a.d.e.w.fragment.FragmentExecutor - 2a60c5a4-e01a-ac02-84fd-a01023a1319a:0:0: State change requested from RUNNING --> FINISHED for
2015-07-10 11:22:02,423 [2a60c5a4-e01a-ac02-84fd-a01023a1319a:frag:0:0] INFO  o.a.d.e.w.f.AbstractStatusReporter - State changed for 2a60c5a4-e01a-ac02-84fd-a01023a1319a:0:0. New state: FINISHED
2015-07-10 11:22:02,424 [2a60c5a4-e01a-ac02-84fd-a01023a1319a:frag:0:0] DEBUG o.a.d.e.w.f.NonRootStatusReporter - Sending status change message message to remote node: profile {
  state: FINISHED
  minor_fragment_id: 0
  operator_profile {
    input_profile {
      records: 1155
      batches: 1
      schemas: 1
    }
    operator_id: 3
    operator_type: 29
    setup_nanos: 0
    process_nanos: 22301000
    peak_local_memory_allocated: 4608
    wait_nanos: 133000
  }
  ...

最後Forman也會關閉, Foreman的狀態從RUNNING到COMPLETED. 打印resultArrived的時候其實結果已經在sqlline上輸出了.
剩下就是一些資源移除,註銷之類的工做了. 其實和最開始的資源申請,註冊是對應的.

2015-07-10 11:22:02,427 [BitServer-4] INFO  o.a.drill.exec.work.foreman.Foreman - State change requested.  RUNNING --> COMPLETED
2015-07-10 11:22:02,427 [BitServer-4] INFO  o.a.drill.exec.work.foreman.Foreman - foreman cleaning up.
2015-07-10 11:22:02,429 [BitServer-4] DEBUG o.a.d.exec.rpc.control.WorkEventBus - Removing fragment status listener for queryId 2a60c5a4-e01a-ac02-84fd-a01023a1319a.
2015-07-10 11:22:02,430 [BitServer-4] DEBUG o.apache.drill.exec.memory.Accountor - Fragment 0:0  accountor being closed
2015-07-10 11:22:02,446 [BitServer-4] DEBUG o.a.d.exec.rpc.control.ControlServer - Sending response with Sender 739019148
2015-07-10 11:22:02,447 [Client-1] DEBUG o.a.d.e.rpc.user.QueryResultHandler - resultArrived: queryState: COMPLETED, queryId = part1: 3053657859282349058
part2: -8863752500417580646

2015-07-10 11:22:02,448 [Client-1] DEBUG o.a.d.j.i.DrillResultSetImpl$ResultsListener - [#3] Received query completion: COMPLETED.
2015-07-10 11:22:02,448 [Client-1] DEBUG o.a.drill.exec.rpc.user.UserClient - Sending response with Sender 264929084
2015-07-10 11:22:02,479 [main] DEBUG o.a.d.j.i.DrillResultSetImpl$ResultsListener - [#3] Query listener closing.
2015-07-10 11:22:02,479 [main] DEBUG o.a.d.j.impl.DrillStatementRegistry - Removing from open-statements registry: org.apache.drill.jdbc.impl.DrillStatementImpl@4eb2bb3d

Logical Plan邏輯計劃

http://drill.apache.org/docs/drill-plan-syntax/
https://docs.google.com/document/d/1QTL8warUYS2KjldQrGUse7zp8eA72VKtLO...
http://yangyoupeng-cn-fujitsu-com.iteye.com/blog/1971728

在Architecture中咱們見到這張圖了

在DesignDoc中是一張比較粗略的圖

總的來講過程就是: 查詢語句--解析器--邏輯計劃--優化器--物理計劃--執行引擎

關於邏輯計劃比較詳細的文檔也給出了(上面第二個連接,請自行fq).

以Logical Plan Operators的Scan爲例

The Scan operator outputs a stream of records. The "storageengine" argument must refer by name to a storage engine defined in the engines clause of the logical plan. The "selection" argument accepts a JSON object that is used by the data source itself to limit the amount of data actually retrieved. The format and content of this object is specific to the actual input source being used. Examples might include an HBase table name, a MongoDB collection, an HDFS path or a partition of a Hive table. Data sources will use the the selection argument in an implementation-specific way. The provided 「ref」 argument ensures that all records within the scanned source are held in the provided namespace.
{ @id†: < opref >, op: 「scan」,
storageengine: < string >,
selection*: < json >,
ref: < name >
}

Scan操做算子會輸出記錄流. 參數storageengine必須引用邏輯計劃中定義的engine聲明.
selection參數接收JSON對象,會被數據源使用,用於限制接收到的數據的數量.
這個JSON對象的格式和內容和實際的數據源有關.好比HBase的表名,MongoDB的集合,HDFS的路徑,或者Hive表的一個分區.

Talk is cheap, Show me the Code.

org.apache.drill.common.logical.data有不少上文提到的操做符好比Scan,Join,Project,Union等.

@JsonTypeName("scan")
public class Scan extends SourceOperator {
  private final String storageEngine;
  private final JSONOptions selection;

  @JsonCreator
  public Scan(@JsonProperty("storageengine") String storageEngine, @JsonProperty("selection") JSONOptions selection) {
    super();
    this.storageEngine = storageEngine;
    this.selection = selection;
  }

  @JsonProperty("storageengine")
  public String getStorageEngine() { return storageEngine; }

  public JSONOptions getSelection() {  return selection; }

  @Override
  public <T, X, E extends Throwable> T accept(LogicalVisitor<T, X, E> logicalVisitor, X value) throws E {
      return logicalVisitor.visitScan(this, value);
  }
}

LogicalVisitor是邏輯(操做符)的訪問器. Visitor class designed to traversal of a operator tree. 遍歷一顆操做符樹
Basis for a number of operator manipulations including fragmentation and materialization. 對算子的維護包括分片,序列化

Scan操做比較簡單, 它繼承的是SourceOperator:An operator that produces data without any parents. (zero input operator)

Operator分紅若干類,每個operator都標示了它的類型,目前operator類包括:
0:能夠產生不依賴其餘operator的數據,相似於源數據. 好比Scan,
1:該operator能夠處理一個單獨input source, 好比Project,Order,Limit等
M:能夠處理多個input source數據
K:該operator不會產生輸出。

咱們知道Project包括字段和表達式, 好比count(), 其中是ref引用,count是expr表達式

@JsonTypeName("project")
public class Project extends SingleInputOperator {
  private final NamedExpression[] selections;

@JsonPropertyOrder({"ref", "expr"})
public class NamedExpression {
  private final LogicalExpression expr;
  private final FieldReference ref;

在org.apache.drill.common.logical這個包下有個比較重要的類LogicalPlan,先來看看Plan的屬性有哪些

public class PlanProperties {
  public static enum PlanType {APACHE_DRILL_LOGICAL, APACHE_DRILL_PHYSICAL}

  public PlanType type;
  public int version;
  public Generator generator;
  public ResultMode resultMode;
  public JSONOptions options;
  public int queue;

  public static class Generator {
    public String type;
    public String info;

    public static enum ResultMode {
      EXEC, LOGICAL, PHYSICAL;
    }

    private Generator(@JsonProperty("type") String type, @JsonProperty("info") String info) {
      this.type = type;
      this.info = info;
    }
  }

對應了前面的Drill Plan中head部分的輸出(雖然前面咱們看到的Drill Plan應該是物理計劃,而不是邏輯計劃,可是head部分是同樣的)

{
  "head" : {
    "version" : 1,
    "generator" : {
      "type" : "DefaultSqlHandler",
      "info" : ""
    },
    "type" : "APACHE_DRILL_PHYSICAL",
    "options" : [ ],
    "queue" : 0,
    "resultMode" : "EXEC"
  },

邏輯計劃LogicalPlan包含了三個部分: head,storage,query.

http://www.confusedcoders.com/bigdata/apache-drill/understanding-apach...

The query node is the actual query that we want to execute on Drill. The query itself is a collection of operations on the data.

@JsonPropertyOrder({ "head", "storage", "query" })
public class LogicalPlan {
  static final Logger logger = LoggerFactory.getLogger(LogicalPlan.class);

  private final PlanProperties properties;
  private final Map<String, StoragePluginConfig> storageEngineMap;
  private final Graph<LogicalOperator, SinkOperator, SourceOperator> graph;


  @JsonCreator
  public LogicalPlan(@JsonProperty("head") PlanProperties head,
      @JsonProperty("storage") Map<String, StoragePluginConfig> storageEngineMap,
      @JsonProperty("query") List<LogicalOperator> operators) {
    this.storageEngineMap = storageEngineMap != null ? storageEngineMap : new HashMap<String, StoragePluginConfig>();
    this.properties = head;
    this.graph = Graph.newGraph(operators, SinkOperator.class, SourceOperator.class);
  }

query是邏輯操做符集合, 它們和Sink,Source操做符一塊兒構成了一張邏輯計劃數據流的執行圖graph.
LogicalPlan的構建器的build()會建立LogicalPlan對象. 這個Builder對象提供了邏輯計劃的編程接口.

public class LogicalPlanBuilder {
  private PlanProperties planProperties;
  private ImmutableMap.Builder<String, StoragePluginConfig> storageEngines = ImmutableMap.builder();
  private ImmutableList.Builder<LogicalOperator> operators = ImmutableList.builder();

  public LogicalPlanBuilder addLogicalOperator(LogicalOperator operator) {
    this.operators.add(operator);
    return this;
  }
  public LogicalPlan build() {
    return new LogicalPlan(this.planProperties, this.storageEngines.build(), this.operators.build());
  }
}

在build以前,要構建一個完整的圖,要調用相應的addXXX()方法,由於方法返回this,因此調用者能夠鏈式調用.
build就是構建者模式,一旦調用了build方法,返回的對象就是不可修改的.所以要在build前填充全部的數據.

PhysicalPlan物理計劃

怎麼知道前面日誌中打印的Drill Plan是物理計劃,而不是邏輯計劃, 首先能夠從調用的類DefaultSqlHandler,搜索Drill Plan

public PhysicalPlan getPlan(SqlNode sqlNode) {
    final ConvertedRelNode convertedRelNode = validateAndConvert(sqlNode);
    final RelDataType validatedRowType = convertedRelNode.getValidatedRowType();
    final RelNode queryRelNode = convertedRelNode.getConvertedNode();

    log("Optiq Logical", queryRelNode, logger);
    DrillRel drel = convertToDrel(queryRelNode, validatedRowType);

    log("Drill Logical", drel, logger);
    Prel prel = convertToPrel(drel);
    log("Drill Physical", prel, logger);
    PhysicalOperator pop = convertToPop(prel);
    PhysicalPlan plan = convertToPlan(pop);
    log("Drill Plan", plan, logger);
    return plan;
  }

1.能夠看到在上面的getPlan方法中, 依次生成的計劃是: Optiq邏輯計劃-->Drill邏輯計劃-->Drill物理計劃
2.物理計劃和邏輯計劃同樣也有不少operator, 都在org.apache.drill.exec.physical包下
3.Optiq如今變成Apache的Calcite, 入門教程: http://blog.csdn.net/yunlong34574/article/details/46375733

getPlan調用樹是被Foreman線程運行,由Foreman建立的DrillSqlWorker調用執行的.

真正運行物理計劃,仍是在Foreman的runPhysicalPlan中

private void runSQL(final String sql) throws ExecutionSetupException {
    final DrillSqlWorker sqlWorker = new DrillSqlWorker(queryContext);
    final Pointer<String> textPlan = new Pointer<>();
    final PhysicalPlan plan = sqlWorker.getPlan(sql, textPlan);
    queryManager.setPlanText(textPlan.value);
    runPhysicalPlan(plan);
  }
相關文章
相關標籤/搜索