在先前的 TiDB 源碼閱讀系列文章(四) 中,咱們介紹了 Insert 語句,想必你們已經瞭解了 TiDB 是如何寫入數據,本篇文章介紹一下 Select 語句是如何執行。相比 Insert,Select 語句的執行流程會更復雜,本篇文章會第一次進入優化器、Coprocessor 模塊進行介紹。
表結構沿用上篇文章的:html
CREATE TABLE t { id VARCHAR(31), name VARCHAR(50), age int, key id_idx (id) };
Select
語句只會講解最簡單的狀況:全表掃描+過濾,暫時不考慮索引等複雜狀況,更復雜的狀況會在後續章節中介紹。語句爲:mysql
SELECT name FROM t WHERE age > 10;
相比 Insert 的處理流程,Select 的處理流程中有 3 個明顯的不一樣:git
Insert 是比較簡單語句,在查詢計劃這塊並不能作什麼事情(對於 Insert into Select 語句這種,實際上只對 Select 進行優化),而 Select 語句可能會無比複雜,不一樣的查詢計劃之間性能天差地別,須要很是仔細的進行優化。github
Insert 語句只涉及對 Key-Value 的 Set 操做,Select 語句可能要查詢大量的數據,若是經過 KV 接口操做存儲引擎,會過於低效,必需要經過計算下推的方式,將計算邏輯發送到存儲節點,就近進行處理。算法
Insert 語句只須要返回是否成功以及插入了多少行便可,而 Select 語句須要返回結果集。sql
本篇文章會重點說明這些不一樣的地方,而相同的步驟會盡可能化簡。數據庫
Select 語句的語法解析規則在 這裏。相比 Insert 語句,要複雜不少,你們能夠對着 MySQL 文檔 看一下具體的解析實現。須要特別注意的是 From 字段,這裏可能會很是複雜,其語法定義是遞歸的。express
最終語句被解析成 ast.SelectStmt 結構:併發
type SelectStmt struct { dmlNode resultSetNode // SelectStmtOpts wraps around select hints and switches. *SelectStmtOpts // Distinct represents whether the select has distinct option. Distinct bool // From is the from clause of the query. From *TableRefsClause // Where is the where clause in select statement. Where ExprNode // Fields is the select expression list. Fields *FieldList // GroupBy is the group by expression list. GroupBy *GroupByClause // Having is the having condition. Having *HavingClause // OrderBy is the ordering expression list. OrderBy *OrderByClause // Limit is the limit clause. Limit *Limit // LockTp is the lock type LockTp SelectLockType // TableHints represents the level Optimizer Hint TableHints [](#)*TableOptimizerHint }
對於本文所提到的語句 SELECT name FROM t WHERE age > 10;
name 會被解析爲 Fields,WHERE age > 10
被解析爲 Where 字段,FROM t
被解析爲 From 字段。app
在 planBuilder.buildSelect() 方法中,咱們能夠看到 ast.SelectStmt 是如何轉換成一個 plan 樹,最終的結果是一個 LogicalPlan,每個語法元素都被轉換成一個邏輯查詢計劃單元,例如 WHERE c > 10
會被處理爲一個 plan.LogicalSelection 的結構:
if sel.Where != nil { p = b.buildSelection(p, sel.Where, nil) if b.err != nil return nil } }
具體的結構以下:
// LogicalSelection represents a where or having predicate. type LogicalSelection struct { baseLogicalPlan // Originally the WHERE or ON condition is parsed into a single expression, // but after we converted to CNF(Conjunctive normal form), it can be // split into a list of AND conditions. Conditions []expression.Expression }
其中最重要的就是這個 Conditions 字段,表明了 Where 語句須要計算的表達式,這個表達式求值結果爲 True 的時候,代表這一行符合條件。
其餘字段的 AST 轉 LogicalPlan 讀者能夠執行研究一下,通過個這個 buildSelect() 函數後,AST 變成一個 Plan 的樹狀結構樹,下一步會在這個結構上進行優化。
讓咱們回到 plan.Optimize() 函數,Select 語句獲得的 Plan 是一個 LogicalPlan,因此 這裏 能夠進入 doOptimize 這個函數,這個函數比較短,其內容以下:
func doOptimize(flag uint64, logic LogicalPlan) (PhysicalPlan, error) { logic, err := logicalOptimize(flag, logic) if err != nil { return nil, errors.Trace(err) } if !AllowCartesianProduct && existsCartesianProduct(logic) { return nil, errors.Trace(ErrCartesianProductUnsupported) } physical, err := dagPhysicalOptimize(logic) if err != nil { return nil, errors.Trace(err) } finalPlan := eliminatePhysicalProjection(physical) return finalPlan, nil }
你們能夠關注來兩個步驟:logicalOptimize 和 dagPhysicalOptimize,分別表明邏輯優化和物理優化,這兩種優化的基本概念和區別本文不會描述,請你們自行研究(這個是數據庫的基礎知識)。下面分別介紹一下這兩個函數作了什麼事情。
邏輯優化由一系列優化規則組成,對於這些規則會按順序不斷應用到傳入的 LogicalPlan Tree 中,見 logicalOptimize() 函數:
func logicalOptimize(flag uint64, logic LogicalPlan) (LogicalPlan, error) { var err error for i, rule := range optRuleList { // The order of flags is same as the order of optRule in the list. // We use a bitmask to record which opt rules should be used. If the i-th bit is 1, it means we should // apply i-th optimizing rule. if flag&(1<<uint(i)) == 0 { continue } logic, err = rule.optimize(logic) if err != nil { return nil, errors.Trace(err) } } return logic, errors.Trace(err) }
目前 TiDB 已經支持下列優化規則:
var optRuleList = []logicalOptRule{ &columnPruner{}, &maxMinEliminator{}, &projectionEliminater{}, &buildKeySolver{}, &decorrelateSolver{}, &ppdSolver{}, &aggregationOptimizer{}, &pushDownTopNOptimizer{}, }
這些規則並不會考慮數據的分佈,直接無腦的操做 Plan 樹,由於大多數規則應用以後,必定會獲得更好的 Plan(不過上面有一個規則並不必定會更好,讀者能夠想一下是哪一個)。
這裏選一個規則介紹一下,其餘優化規則請讀者自行研究或者是等到後續文章。
columnPruner(列裁剪) 規則,會將不須要的列裁剪掉,考慮這個 SQL: select c from t;
對於 from t
這個全表掃描算子(也多是索引掃描)來講,只須要對外返回 c 這一列的數據便可,這裏就是經過列裁剪這個規則實現,整個 Plan 樹從樹根到葉子節點遞歸調用這個規則,每層節點只保留上面節點所須要的列便可。
通過邏輯優化,咱們能夠獲得這樣一個查詢計劃:
其中 FROM t
變成了 DataSource 算子,WHERE age > 10
變成了 Selection 算子,這裏留一個思考題,SELECT name
中的列選擇去哪裏了?
在物理優化階段,會考慮數據的分佈,決定如何選擇物理算子,好比對於 FROM t WHERE age > 10
這個語句,假設在 age 字段上有索引,須要考慮是經過 TableScan + Filter 的方式快仍是經過 IndexScan 的方式比較快,這個選擇取決於統計信息,也就是 age > 10 這個條件究竟能過濾掉多少數據。
咱們看一下 dagPhysicalOptimize 這個函數:
func dagPhysicalOptimize(logic LogicalPlan) (PhysicalPlan, error) { logic.preparePossibleProperties() logic.deriveStats() t, err := logic.convert2PhysicalPlan(&requiredProp{taskTp: rootTaskType, expectedCnt: math.MaxFloat64}) if err != nil { return nil, errors.Trace(err) } p := t.plan() p.ResolveIndices() return p, nil }
這裏的 convert2PhysicalPlan 會遞歸調用下層節點的 convert2PhysicalPlan 方法,生成物理算子而且估算其代價,而後從中選擇代價最小的方案,這兩個函數比較重要:
// convert2PhysicalPlan implements LogicalPlan interface. func (p *baseLogicalPlan) convert2PhysicalPlan(prop *requiredProp) (t task, err error) { // Look up the task with this prop in the task map. // It's used to reduce double counting. t = p.getTask(prop) if t != nil { return t, nil } t = invalidTask if prop.taskTp != rootTaskType { // Currently all plan cannot totally push down. p.storeTask(prop, t) return t, nil } for _, pp := range p.self.genPhysPlansByReqProp(prop) { t, err = p.getBestTask(t, pp) if err != nil { return nil, errors.Trace(err) } } p.storeTask(prop, t) return t, nil } func (p *baseLogicalPlan) getBestTask(bestTask task, pp PhysicalPlan) (task, error) { tasks := make([]task, 0, len(p.children)) for i, child := range p.children { childTask, err := child.convert2PhysicalPlan(pp.getChildReqProps(i)) if err != nil { return nil, errors.Trace(err) } tasks = append(tasks, childTask) } resultTask := pp.attach2Task(tasks...) if resultTask.cost() < bestTask.cost() { bestTask = resultTask } return bestTask, nil }
上面兩個方法的返回值都是一個叫 task 的結構,而不是物理計劃,這裏引入一個概念,叫 Task
,TiDB 的優化器會將 PhysicalPlan 打包成爲 Task。Task 的定義在 task.go 中,咱們看一下注釋:
// task is a new version of `PhysicalPlanInfo`. It stores cost information for a task. // A task may be CopTask, RootTask, MPPTask or a ParallelTask. type task interface { count() float64 addCost(cost float64) cost() float64 copy() task plan() PhysicalPlan invalid() bool }
在 TiDB 中,Task 的定義是能在單個節點上不依賴於和其餘節點進行數據交換便可進行的一些列操做,目前只實現了兩種 Task:
若是瞭解過 TiDB 的 Explain 結果,那麼能夠看到每一個 Operator 都會標明屬於哪一種 Task,好比下面這個例子:
整個流程是一個樹形動態規劃的算法,你們有興趣能夠跟一下相關的代碼自行研究或者等待後續的文章。
通過整個優化過程,咱們已經獲得一個物理查詢計劃,這個 SELECT name FROM t WHERE age > 10;
語句可以指定出來的查詢計劃大概是這樣子的:
讀者可能會比較奇怪,爲何只剩下這樣一個這一個物理算子?WHERR age > 10
哪裏去了?實際上 age > 10 這個過濾條件被合併進了 PhysicalTableScan,由於 age > 10
這個表達式能夠下推到 TiKV 上進行計算,因此會把 TableScan 和 Filter 這樣兩個操做合在一塊兒。哪些表達式會被下推到 TiKV 上的 Coprocessor 模塊進行計算呢?對於這個 Query 是在下面 這個地方 進行識別:
// PredicatePushDown implements LogicalPlan PredicatePushDown interface. func (ds *DataSource) PredicatePushDown(predicates []expression.Expression) ([]expression.Expression, LogicalPlan) { _, ds.pushedDownConds, predicates = expression.ExpressionsToPB(ds.ctx.GetSessionVars().StmtCtx, predicates, ds.ctx.GetClient()) return predicates, ds }
在 expression.ExpressionsToPB
這個方法中,會把能下推 TiKV 上的表達式識別出來(TiKV 尚未實現全部的表達式,特別是內建函數只實現了一部分),放到 DataSource.pushedDownConds 字段中。接下來咱們看一下 DataSource 是如何轉成 PhysicalTableScan,見 DataSource.convertToTableScan() 方法。這個方法會構建出 PhysicalTableScan,而且調用 addPushDownSelection() 方法,將一個 PhysicalSelection 加到 PhysicalTableScan 之上,一塊兒放進 copTask 中。
這個查詢計劃是一個很是簡單計劃,不過咱們能夠用這個計劃來講明 TiDB 是如何執行查詢操做。
一個查詢計劃如何變成一個可執行的結構以及如何驅動這個結構執行查詢已經在前面的兩篇文章中作了描述,這裏再也不敷述,這一節我會重點介紹如何具體的執行過程以及 TiDB 的分佈式執行框架。
Coprocessor 這個概念是從 HBase 中借鑑而來,簡單來講是一段注入在存儲引擎中的計算邏輯,等待 SQL 層發來的計算請求(序列化後的物理執行計劃),處理本地數據並返回計算結果。在 TiDB 中,計算是以 Region 爲單位進行,SQ
L 層會分析出要處理的數據的 Key Range,再將這些 Key Range 根據 PD 中拿到的 Region 信息劃分紅若干個 Key Range,最後將這些請求發往對應的 Region。
SQL 層會將多個 Region 返回的結果進行彙總,在通過所需的 Operator 處理,生成最終的結果集。
請求的分發與彙總會有不少複雜的處理邏輯,好比出錯重試、獲取路由信息、控制併發度以及結果返回順序,爲了不這些複雜的邏輯與 SQL 層耦合在一塊兒,TiDB 抽象了一個統一的分佈式查詢接口,稱爲 DistSQL API,位於 distsql 這個包中。
其中最重要的方法是 SelectDAG 這個函數:
// SelectDAG sends a DAG request, returns SelectResult. // In kvReq, KeyRanges is required, Concurrency/KeepOrder/Desc/IsolationLevel/Priority are optional. func SelectDAG(goCtx goctx.Context, ctx context.Context, kvReq *kv.Request, fieldTypes []*types.FieldType) (SelectResult, error) { // kvReq 中包含了計算所涉及的數據的 KeyRanges // 這裏經過 TiKV Client 向 TiKV 集羣發送計算請求 resp := ctx.GetClient().Send(goCtx, kvReq) if resp == nil { err := errors.New("client returns nil response") return nil, errors.Trace(err) } if kvReq.Streaming { return &streamResult{ resp: resp, rowLen: len(fieldTypes), fieldTypes: fieldTypes, ctx: ctx, }, nil } // 這裏將結果進行了封裝 return &selectResult{ label: "dag", resp: resp, results: make(chan newResultWithErr, kvReq.Concurrency), closed: make(chan struct{}), rowLen: len(fieldTypes), fieldTypes: fieldTypes, ctx: ctx, }, nil }
TiKV Client 中的具體邏輯咱們暫時跳過,這裏只關注 SQL 層拿到了這個 selectResult
後如何讀取數據,下面這個接口是關鍵。
// SelectResult is an iterator of coprocessor partial results. type SelectResult interface { // NextRaw gets the next raw result. NextRaw(goctx.Context) ([]byte, error) // NextChunk reads the data into chunk. NextChunk(goctx.Context, *chunk.Chunk) error // Close closes the iterator. Close() error // Fetch fetches partial results from client. // The caller should call SetFields() before call Fetch(). Fetch(goctx.Context) // ScanKeys gets the total scan row count. ScanKeys() int64
selectResult 實現了 SelectResult 這個接口,表明了一次查詢的全部結果的抽象,計算是以 Region 爲單位進行,因此這裏所有結果會包含全部涉及到的 Region 的結果。調用 Chunk 方法能夠讀到一個 Chunk 的數據,經過不斷調用 NextChunk 方法,直到 Chunk 的 NumRows 返回 0 就能拿到全部結果。NextChunk 的實現會不斷獲取每一個 Region 返回的 SelectResponse,把結果寫入 Chunk。
能推送到 TiKV 上的計算請求目前有 TableScan、IndexScan、Selection、TopN、Limit、PartialAggregation 這樣幾個,其餘更復雜的算子,仍是須要在單個 tidb-server 上進行處理。因此整個計算是一個多 tikv-server 並行處理 + 單個 tidb-server 進行彙總的模式。
Select 語句的處理過程當中最複雜的地方有兩點,一個是查詢優化,一個是如何分佈式地執行,這兩部分後續都會有文章來更進一步介紹。下一篇文章會脫離具體的 SQL 邏輯,介紹一下如何看懂某一個特定的模塊。
做者:申礫