寫這個東西也只是由於想簡單掌握下 TiDB 的源碼,同事給了一些閱讀思路,很贊。mysql
有些地方若是理解的有問題還請批評教育,對 Go 語言理解的比較有限。linux
若是不當心誤導了讀者,請見諒sql
TiDB 模塊是使用 Go 語言開發的,使用 GoLand 編譯器就能夠了。數據庫
JetBrains出品緩存
閱讀源碼,要尋找好的切入點,咱們選擇 main.go[1] 做爲閱讀源碼的入口。服務器
tidb-server/main.go
這裏的 main 函數能夠 debug ,也是 TiDB 啓動的開頭。網絡
稍微簡化一下session
func main() { registerStores() registerMetrics() config.InitializeConfig(*configPath, *configCheck, *configStrict, reloadConfig, overrideConfig) if config.GetGlobalConfig().OOMUseTmpStorage { config.GetGlobalConfig().UpdateTempStoragePath() initializeTempDir() } setCPUAffinity() setupLog() setupTracing() setupBinlogClient() setupMetrics() createStoreAndDomain() createServer() runServer() }
能夠看出,啓動流程作的每一個步驟都按照函數封裝好了,大體瞭解一下都作什麼dom
// 註冊store func registerStores() { err := kvstore.Register("tikv", tikv.Driver{}) //註冊TiKV tikv.NewGCHandlerFunc = gcworker.NewGCWorker //爲TiKV生成GCworker err = kvstore.Register("mocktikv", mockstore.MockDriver{}) //註冊默認存儲引擎MockTiKV } //共註冊100+ prometheus監控項,這裏只表一項 func RegisterMetrics() { prometheus.MustRegister(AutoAnalyzeCounter) } // get全局config func InitializeConfig(confPath string, configCheck, configStrict bool, reloadFunc ConfReloadFunc, enforceCmdArgs func(*Config)) { cfg := GetGlobalConfig() StoreGlobalConfig(cfg) }
這個判斷分佈式
if config.GetGlobalConfig().OOMUseTmpStorage { config.GetGlobalConfig().UpdateTempStoragePath() initializeTempDir() }
設置是否在單條 SQL 語句的內存使用超出 mem-quota-query [2] 限制時爲某些算子啓用臨時磁盤。故若爲 true ,則初始化 TempStoragePath 。
// 設置CPU親和性 func setCPUAffinity() { / err := linux.SetAffinity(cpu) runtime.GOMAXPROCS(len(cpu)) } //配置系統log func setupLog() { err = logutil.InitLogger(cfg.Log.ToLogConfig()) // 這裏配置格式、文件名、slowlog等 } //註冊分佈式系統追蹤鏈 jaeger func setupTracing() { tracingCfg := cfg.OpenTracing.ToTracingConfig() tracingCfg.ServiceName = "TiDB" tracer, _, err := tracingCfg.NewTracer() opentracing.SetGlobalTracer(tracer) } // 設置binlog信息 func setupBinlogClient() { if !cfg.Binlog.Enable { //若binlog.enable=false,則不開啓binlog return } if cfg.Binlog.IgnoreError { //若爲true,則忽略binlog報錯 binloginfo.SetIgnoreError(true) } if len(cfg.Binlog.BinlogSocket) == 0 { //配置binlog輸出網絡地址 ... } binloginfo.SetPumpsClient(client) //配置binlog信息到pump客戶端 } // 配置監控 func setupMetrics() { runtime.SetMutexProfileFraction(10)// 對鎖調用的跟蹤 systimeErrHandler := func() { // 表示TiDB的進程是否仍然存在。 // 若10分鐘內tidb_monitor_keep_alive_total // 次數<100,TiDB可能退出,此時會報警 metrics.TimeJumpBackCounter.Inc() } callBackCount := 0 sucessCallBack := func() { callBackCount++ if callBackCount >= 5 { callBackCount = 0 metrics.KeepAliveCounter.Inc() // KeepAlive監控 } } } // 啓動了一些重要的後臺進程 func createStoreAndDomain() { fullPath := fmt.Sprintf("%s://%s", cfg.Store, cfg.Path) storage, err = kvstore.New(fullPath) dom, err = session.BootstrapSession(storage)} } // 建立TiDB server func createServer() { driver := server.NewTiDBDriver(storage) svr, err = server.NewServer(cfg, driver) svr.SetDomain(dom) } //啓動服務 runServer()
能夠看到, runServer() 是啓動TiDB流程中的的最後一步。
因此咱們跳轉到 server.Run() 中,
這裏有不少接受請求時的異常處理邏輯,在此不表。簡化一下大概有四步,以下:
if s.cfg.Status.ReportStatus { s.startStatusHTTP() //配置路由信息 } for{ conn, err := s.listener.Accept()// 監聽客戶端請求 clientConn := s.newConn(conn)// 建立connection go s.onConn(clientConn)// 使用connection處理請求 }
首先配置了關於 TiDB 組件的不少路由信息,有興趣的能夠看看。
server 不斷監聽網絡請求,出現新的客戶端請求就建立一個新的 connection ,使用一個新 goroutine 來持續爲它提供服務。
後續就是經過 go s.onConn(clientConn) 處理客戶端請求,咱們來一探究竟接下來的流程,簡化下代碼,大體以下
func (s *Server) onConn(conn *clientConn) { ctx := logutil.WithConnID(context.Background(), conn.connectionID) if err := conn.handshake(ctx); err != nil { if plugin.IsEnable(plugin.Audit) && conn.ctx != nil { conn.ctx.GetSessionVars().ConnectionInfo = conn.connectInfo() }) } } connectedTime := time.Now() conn.Run(ctx) }
將建鏈 info 寫入到 session 中,統計一下鏈路創建成功的時間,成功後,經過 conn.Run(ctx) 處理客戶端請求。
func (cc *clientConn) Run(ctx context.Context) { for { waitTimeout := cc.getSessionVarsWaitTimeout(ctx) cc.pkt.setReadTimeout(time.Duration(waitTimeout) * time.Second) data, err := cc.readPacket() if err = cc.dispatch(ctx, data); err != nil { ... } } }
簡化後,處理邏輯大體是,
不停的經過 cc.readPacket() 讀取客戶端發來的網絡包。若是這個期間發生了等待網絡包超時的現象,則 close connection 。
讀取 data 成功後,將它傳入 cc.dispatch(ctx,data) ,這也是處理 SQL 請求的入口了。
當 SQL 來到 很合理 ,顧名思義,是對不一樣的 MySQL 協議進行調度,
server/conn.go
Then.
func (cc *clientConn) dispatch(ctx context.Context, data []byte) error { cc.lastPacket = data cmd := data[0] data = data[1:] dataStr := string(hack.String(data)) }
客戶端請求MySQL協議報文格式
TiDB 也要根據這個格式進行解析, data[0] 就是 data 的第一個 byte ,其他的是命令。
MySQL 請求報文的命令列表
0x00 COM_SLEEP 內部線程狀態 0x01 COM_QUIT 關閉鏈接 0x02 COM_INIT_DB 切換數據庫 0x03 COM_QUERY SQL查詢請求 0x04 COM_FIELD_LIST 獲取數據表字段信息 0x05 COM_CREATE_DB 建立數據庫 0x06 COM_DROP_DB 刪除數據庫 0x07 COM_REFRESH 清除緩存 0x08 COM_SHUTDOWN 中止服務器 0x09 COM_STATISTICS 獲取服務器統計信息 0x0A COM_PROCESS_INFO 獲取當前鏈接的列表 0x0B COM_CONNECT 內部線程狀態 0x0C COM_PROCESS_KILL 中斷某個鏈接 0x0D COM_DEBUG 保存服務器調試信息 0x0E COM_PING 測試連通性 0x0F COM_TIME 內部線程狀態 0x10 COM_DELAYED_INSERT 內部線程狀態 0x11 COM_CHANGE_USER 從新登錄 0x12 COM_BINLOG_DUMP 獲取二進制日誌信息 0x13 COM_TABLE_DUMP 獲取數據表結構信息 0x14 COM_CONNECT_OUT 內部線程狀態 0x15 COM_REGISTER_SLAVE 從服務器向主服務器進行註冊 0x16 COM_STMT_PREPARE 預處理SQL語句 0x17 COM_STMT_EXECUTE 執行預處理語句 0x18 COM_STMT_SEND_LONG_DATA 發送BLOB類型的數據 0x19 COM_STMT_CLOSE 銷燬預處理語句 0x1A COM_STMT_RESET 清除預處理語句參數緩存 0x1B COM_SET_OPTION 設置語句選項 0x1C COM_STMT_FETCH 獲取預處理語句的執行結果
咱們看一下 dispatch 接下來的部分,也就是目前 TiDB 實現的部分 MySQL 協議了
switch cmd { case mysql.ComPing, mysql.ComStmtClose, mysql.ComStmtSendLongData, mysql.ComStmtReset, mysql.ComSetOption, mysql.ComChangeUser: cc.ctx.SetProcessInfo("", t, cmd, 0) case mysql.ComInitDB: cc.ctx.SetProcessInfo("use "+dataStr, t, cmd, 0) } switch cmd { case mysql.ComSleep: case mysql.ComQuit: case mysql.ComQuery: case mysql.ComPing: case mysql.ComInitDB: case mysql.ComFieldList: case mysql.ComStmtPrepare: case mysql.ComStmtExecute: case mysql.ComStmtFetch: case mysql.ComStmtClose: case mysql.ComStmtSendLongData: case mysql.ComStmtReset: case mysql.ComSetOption: case mysql.ComChangeUser default: // other not support
目前 TiDB 的絕大部分 SQL 語句都是走的 ComQuery ,感興趣的同窗能夠自行 debug 一下。
case mysql.ComQuery: if len(data) > 0 && data[len(data)-1] == 0 { data = data[:len(data)-1] dataStr = string(hack.String(data)) } return cc.handleQuery(ctx, dataStr)
因而咱們仔細看看這個 case , 要去看下 cc.handleQuery(ctx , dataStr) 。
func (cc *clientConn) handleQuery(ctx context.Context, sql string) (err error) { stmts, err := cc.ctx.Execute(ctx, sql) }
以後就要進入解析、優化 SQL 的部分 ,咱們仍是要先簡單理解下 Lex & Yacc。