由於豐巢自去年年末開始在推送平臺上嘗試了TiDB,最近又要將承接豐巢全部交易的支付平臺切到TiDB上。我本人一直沒有抽出時間對TiDB的源碼進行學習,最近準備開始一系列的學習和分享。因爲我本人沒有數據庫相關的經驗,本着學習的心態和你們一塊兒探討,歡迎高手隨時指正。總結一下本次學習分享的目的:java
言歸正傳,說一下本文的產生緣由:去年咱們在推送平臺上使用TiDB的過程當中,就發現老版本的TiDB是沒法經過外部手段kill調用慢查詢的,而慢查詢的危害對於數據庫來講會有致命的風險,後來pingcap公司在2.1版本(具體的版本參見TiDB的說明)中增長了show processlist和kill tidb命令,可是由於TiDB自己是無狀態的,這兩個命令屬於單機命令,在使用的過程當中,你們仍是要提早作好準備,要直連到具體的TiDB的server上纔可以使用,不要經過nginx等服務進行轉發請求,到時不但不能解決問題,還有可能帶來意外的風險。今天第一章,咱們先來看一下show processlist這個比較簡單的命令的源碼,下一章,咱們再分析kill tidb這個命令。mysql
上面的列表中展現了當前TiDB正在處理每一個鏈接的sql語句詳情。linux
在我分析源碼以前,我問了本身本次分析源碼要搞清楚的兩個問題,在這裏和你們分享一下:nginx
首先,啓動TiDB server.代碼在tidb-server/main.go裏面,主要方法是:runServer方法golang
func runServer() { err := svr.Run() }
再來看一下:server/server.go源碼:sql
func (s *Server) Run() error { for { conn, err := s.listener.Accept() go s.onConn(conn) } }
重點代碼是監聽端口,並建立鏈接,啓動另外一協程去服務新來的鏈接,接下來再看看server.go中的onConn方法:數據庫
func (s *Server) onConn(c net.Conn) { conn := s.newConn(c) conn.Run() }
其中,s.newConn方法會將net. Conn鏈接包裝成clientConn鏈接,並分配在這個TiDB server下惟一的connectionID,此connectionID爲原子變量,每次新鏈接自增長1,咱們先記住這個id,後面分析的時候會用到它。咱們來看看server/conn.go下的Run方法:session
func (cc *clientConn) Run() { for { data, err := cc.readPacket() cc.dispatch(data) } }
Run方法主要就是不斷的輪訓讀取clientConn中的內容,並將它交給dispatch方法進行下面的分析及返回結果操做,至此關於接收show processlist命令部分已經分析完畢,固然其它的sql語句也是通過這個過程進入到dispatch方法中的。架構
接着分析dispatch方法在處理show processlist命令的流程:框架
func (cc *clientConn) dispatch(data []byte) error { switch cmd { case mysql.ComQuery: return cc.handleQuery(ctx1, hack.String(data)) } }
show processlist命令屬於mysql.ComQuery,所以流程會走到handleQuery方法裏面,咱們來看一下:
func (cc *clientConn) handleQuery(ctx context.Context, sql string) (err error) { rs, err := cc.ctx.Execute(ctx, sql) err = cc.writeResultset(ctx, rs[0], false, 0, 0) }
handleQuery中處理show processlist命令的重點代碼就是上面的兩行,咱們先來看一下server/driver_tidb.go中的Execute方法:
rsList, err := tc.session.Execute(ctx, sql)
Execute中的重點就是調用session/session.go中的Execute方法:
func (s *session) execute(ctx context.Context, sql string) (recordSets []sqlexec.RecordSet, err error) { s.PrepareTxnCtx(ctx) stmtNodes, warns, err := s.ParseSQL(ctx, sql, charsetInfo, collation) compiler := executor.Compiler{Ctx: s} for _, stmtNode := range stmtNodes { recordSets, err = s.executeStatement(ctx, connID, stmtNode, stmt, recordSets); } }
上面的execute方法中會對sql語句進行處理及制定執行計劃,處理完成後調用executeStatement方法,executeStatement中的重點方法是runStmt:
recordSet, err := runStmt(ctx, s, stmt)
咱們再來看看session/tidb.go中的runStmt方法:
func runStmt(ctx context.Context, sctx sessionctx.Context, s sqlexec.Statement) (sqlexec.RecordSet, error) { rs, err = s.Exec(ctx) err = finishStmt(ctx, sctx, se, sessVars, err) }
繼續來分析executor/adapter中的(a *ExecStmt) Exec方法,同樣採起劃重點的方式:
func (a *ExecStmt) Exec(ctx context.Context) (sqlexec.RecordSet, error) { e, err := a.buildExecutor(sctx) e.Open(ctx) var pi processinfoSetter if raw, ok := sctx.(processinfoSetter); ok { pi = raw sql := a.OriginText() if simple, ok := a.Plan.(*plannercore.Simple); ok && simple.Statement != nil { if ss, ok := simple.Statement.(ast.SensitiveStmtNode); ok { // Use SecureText to avoid leak password information. sql = ss.SecureText() } } // Update processinfo, ShowProcess() will use it. pi.SetProcessInfo(sql) //fmt.Println(sql) a.Ctx.GetSessionVars().StmtCtx.StmtType = GetStmtLabel(a.StmtNode) } return &recordSet{ executor: e, stmt: a, processinfo: pi, txnStartTS: txnStartTS, }, nil }
(a *ExecStmt) Exec方法中raw, ok := sctx.(processinfoSetter)這段邏輯就是把當前鏈接正在執行的語句存儲到processinfo裏面取,關於這部分細節比較簡單,在這裏不展開來分析。咱們先來看看buildExecutor中作了什麼事情?
b := newExecutorBuilder(ctx, a.InfoSchema) e := b.build(a.Plan)
重點要來了,在executor/builder.go中的build方法作了啥事?
case *plannercore.Show: return b.buildShow(v)
build方法會根據不一樣的語句類型來構建不一樣的Executor並返回,show processlist命令會匹配到plannercore.Show類型,咱們看看buildShow方法的實現:
e := &ShowExec{
baseExecutor: newBaseExecutor(b.ctx, v.Schema(), v.ExplainID()), Tp: v.Tp, DBName: model.NewCIStr(v.DBName), Table: v.Table, Column: v.Column, User: v.User, Flag: v.Flag, Full: v.Full, GlobalScope: v.GlobalScope, is: b.is, } if len(v.Conditions) == 0 { return e } sel := &SelectionExec{ baseExecutor: newBaseExecutor(b.ctx, v.Schema(), v.ExplainID(), e), filters: v.Conditions, } return sel
由於v.Conditions爲0,因此返回類型爲ShowExec的Executor,咱們接下來再剛纔的Exec方法中的e.Open方法,其實就是ShowExec的Open方法,ShowExec位於executor/show.go文件中,咱們查找後發現ShowExec中沒有Open方法,我當時是被搞蒙了,後來發現這是go的一個語言特性,它使用的是baseExecutor的Open方法:
func (e *baseExecutor) Open(ctx context.Context) error { for _, child := range e.children { err := child.Open(ctx) if err != nil { return errors.Trace(err) } } return nil }
上面的方法會遍歷baseExecutor中的children的Executor,而後調用它們的Open方法,可是由於ShowExec在建立它的baseExecutor的時候,沒有任何的children,因此在show processlist這個操做過程當中,Open方法至關於啥也沒幹,可是你們在分析其它語句時,這個Open方法是一個很重要的方法。咱們再來看剛纔Exec中的最後的return塊裏面,返回了包裝executor、processinfo等信息的recordSet類型。至此關於show processlist命令如何包裝成Executor並和processinfo等信息做爲recordSet類型的返回值返回給上層函數分析完畢。
接下來咱們再來看handleQuery中的writeResultset方法:
err = cc.writeResultset(ctx, rs[0], false, 0, 0)
在server/conn.go中的writeResultset主要的邏輯就是下面的邏輯:
err = cc.writeChunks(ctx, rs, binary, serverStatus)
咱們繼續來分析writeChunks中的重要部分:
func (cc *clientConn) writeChunks(ctx context.Context, rs ResultSet, binary bool, serverStatus uint16) error { for { err := rs.Next(ctx, chk) } }
writeChunks裏面主要就是循環調用rs.Next的方法,直到知足條件爲止,rs的類型其實是server/driver_tidb.go下的tidbResultSet類型,咱們來看一下它的Next方法:
func (trs *tidbResultSet) Next(ctx context.Context, chk *chunk.Chunk) error { return trs.recordSet.Next(ctx, chk) }
tidbResultSet的Next方法主要是調用了executor/adapter.go中的recordSet類型的Next方法,咱們來看看這個Next方法:
func (a *recordSet) Next(ctx context.Context, chk *chunk.Chunk) error { err := a.executor.Next(ctx, chk) }
recordSet方法的重點就是調用它的executor的Next方法,咱們在上一個小節 結尾處分析出recordSet的executor就是以前生成的ShowExec(可算是找到它了,我已經累暈)。那麼,咱們接着分析它的Next方法:
e.fetchAll()
ShowExec中的Next方法的主要邏輯就是調用它的fetchAll方法,接着往下看:
case ast.ShowProcessList: return e.fetchShowProcessList()
由於匹配到了這個case,因此會調用它的fetchShowProcessList方法:
func (e *ShowExec) fetchShowProcessList() error { sm := e.ctx.GetSessionManager() pl := sm.ShowProcessList() }
上面的sm類型的server/server.go中的Server類型,咱們來看看它的ShowProcessList方法:
func (s *Server) ShowProcessList() map[uint64]util.ProcessInfo { s.rwlock.RLock() rs := make(map[uint64]util.ProcessInfo, len(s.clients)) for _, client := range s.clients { if atomic.LoadInt32(&client.status) == connStatusWaitShutdown { continue } pi := client.ctx.ShowProcess() rs[pi.ID] = pi } s.rwlock.RUnlock() return rs }
它主要是遍歷當前全部的客戶端,並獲取到全部客戶端的ShowProcess,其中的client.ctx類型爲server.TiDBContext,咱們來看看它的ShowProcess:
func (tc *TiDBContext) ShowProcess() util.ProcessInfo { return tc.session.ShowProcess() }
邏輯比較簡單,就是調用類型爲session.session的ShowProcess方法,接着往下看:
func (s *session) ShowProcess() util.ProcessInfo { var pi util.ProcessInfo tmp := s.processInfo.Load() if tmp != nil { pi = tmp.(util.ProcessInfo) pi.Mem = s.GetSessionVars().StmtCtx.MemTracker.BytesConsumed() } return pi }
session的ShowProcess方法會從內存中加載當前session的processInfo信息。至此咱們分析show processlist命令的源碼分析完畢,關於每一個鏈接如何設置自身的processinfo信息,邏輯也比較簡單,你們有興趣能夠本身去研究一下。
咱們能夠回答一下開頭提出的兩個問題:
經過上面的分析,咱們還能夠總結如下的特色:
最後,我想和你們分享一下,我本身在源碼閱讀裏面用到的一些方法和技巧,大的方面會有兩種方法:
上面的兩種方法,會伴隨你們在源碼閱讀的各個階段,可是有了這兩種方法仍是遠遠不夠的,我再分享一下個人相關技巧: