TiDB源碼閱讀筆記(三) TiDB 的在線 DDL

在線 DDL 始終是數據庫使用上的痛點。以前的工做中,有不少數據中心同事作 DDL 變動都很頭疼,也吐槽過,謹慎選擇時間點進行。即使如此,面對複雜龐雜的應用系統和各種定時運維腳本,DDL 操做依然可能干擾到業務正常運行。git

TiDB 的在線 DDL 是根據 Google F1 的在線異步 schema 變動算法實現github

F1 中 schema 以特殊的 kv 對存儲於 Spanner 中,同時每一個 F1 服務器在運行過程當中自身也維護一份拷貝。爲了保證同一時刻最多隻有 2 份 schema 生效,F1 約定了長度爲數分鐘的 schema 租約,全部 F1 服務器在租約到期後都要從新加載 schema 。若是節點沒法從新完成續租,它將會自動終止服務並等待被集羣管理設施重啓。

簡單來講,TiDB 的在線DDL和 MySQL 相比,主要有這些區別算法

MySQL 的數據和表結構是緊耦合的,想動表結構,勢必會牽扯到數據。TiDB 的數據和表結構是分割的,操做數據時會比對錶結構,經過兩個 version 來應對不一樣的 DML 語句。

詳細的,能夠參考這篇文章 👍數據庫

https://github.com/zimulala/builddatabase/blob/master/f1/schema-change.md​github.comapi

先介紹幾個比較重要的概念服務器

Job: 每一個單獨的 DDL 操做可看作一個 job。在一個 DDL 操做開始時,會將此操做封裝成一個 job 並存放到 job queue,等此操做完成時,會將此 job 從 job queue 刪除,並在存入 history job queue,便於查看歷史 job。
Worker:每一個節點都有一個 worker 用來處理 job。
Owner:整個系統只有一個節點的 worker 能當選 owner 角色,每一個節點均可能當選這個角色,當選 owner 後 worker 纔有處理 job 的權利。owner 這個角色是有任期的,owner 的信息會存儲在 KV 層中。worker按期獲取 KV 層中的 owner 信息,若是其中 ownerID 爲空,或者當前的 owner 超過了任期,則 worker 能夠嘗試更新 KV 層中的 owner 信息(設置 ownerID 爲自身的 workerID),若是更新成功,則該 worker 成爲 owner。在租期內這個用來確保整個系統同一時間只有一個節點在處理 schema 變動。

總結一下,每一個 TiDB 上有一個 Worker 線程,DDL 語句會封裝爲一個 Job ,由 Worker 進行處理。Worker 分爲 Owner 和 非 Owner,每一個集羣同時只能有一個 Owner,只有它能夠處理隊列中的 Job 。咱們先去源碼中看看 Worker 的樣子session


Worker

TiDB源碼閱讀(一) TiDB的入口 中,咱們提到了 main 函數中的 createStoreAndDomain 方法,這個方法初始化了一些重要的後臺進程,其中就包括 Worker,啓動流程基本是以下的路數app

func createStoreAndDomain() {
    dom, err = session.BootstrapSession(storage)
}
// 來到 BootstrapSession ,方法比較長,咱們只看關聯到的地方,建立 session 
func BootstrapSession(store kv.Storage) (*domain.Domain, error) {
      se, err := createSession(store)
}
// 查看 createSession(store),再進一層就是 createSessionWithOpt
func createSessionWithOpt(store kv.Storage, opt *Opt) (*session, error) {
    dom, err := domap.Get(store)
}
// 進Get
func (dm *domainMap) Get(store kv.Storage) (d *domain.Domain, err error) {
   d = domain.NewDomain(store, ddlLease, statisticLease, factory)
}

來到 domain.go ,看 init 函數less

do.ddl = ddl.NewDDL(
        ctx,
        ddl.WithEtcdClient(do.etcdClient),
        ddl.WithStore(do.store),
        ddl.WithInfoHandle(do.infoHandle),
        ddl.WithHook(callback),
        ddl.WithLease(ddlLease),
        ddl.WithResourcePool(sysCtxPool),
)

經過 ddl.NewDDL 進入 ddl.go,開始 start Worker運維

d.start(ctx, opt.ResourcePool)

這裏有兩種 Worker ,一種專門負責 index 類型的 DDL ,一種負責其餘的

if RunWorker {
    d.workers[generalWorker] = newWorker(generalWorker, d.store, d.sessPool, d.delRangeMgr)
    d.workers[addIdxWorker] = newWorker(addIdxWorker, d.store, d.sessPool, d.delRangeMgr)
    for _, worker := range d.workers {
        w := worker
        go tidbutil.WithRecovery(
        func() { w.start(d.ddlCtx) },// 啓動 worker
    }

這樣 Worker 就啓動了,啓動 TiDB 也能看到這個日誌打印:

[ddl_worker.go:130] ["[ddl] start DDL worker"] [worker="worker 1, tp general"]
[ddl_worker.go:130] ["[ddl] start DDL worker"] [worker="worker 2, tp add index"]

Worker 啓動後,就開始輪詢處理隊列的 Job ⬇️

logutil.Logger(w.logCtx).Info("[ddl] start DDL worker")

// We use 4 * lease time to check owner's timeout, so here, we will update owner's status
// every 2 * lease time. If lease is 0, we will use default 1s.
// But we use etcd to speed up, normally it takes less than 1s now, so we use 1s as the max value.
checkTime := chooseLeaseTime(2*d.lease, 1*time.Second)
ticker := time.NewTicker(checkTime)
defer ticker.Stop()
for {
    err := w.handleDDLJobQueue(d) // 處理Job的流程,文章後半部分介紹
}

TiDB 接收 DDL語句

從 parser.y 開始,以 alter table add columns 語句爲例展開,先找AlterTableStmt

AlterTableStmt:
    "ALTER" IgnoreOptional "TABLE" TableName AlterTableSpecListOpt AlterTablePartitionOpt
    {
        specs := $5.([]*ast.AlterTableSpec)
        if $6 != nil {
            specs = append(specs, $6.(*ast.AlterTableSpec))
        }
        $$ = &ast.AlterTableStmt{
            Table: $4.(*ast.TableName),
            Specs: specs,
        }
    }

//看AlterTableSpecListOpt下
|    "ADD" ColumnKeywordOpt IfNotExists ColumnDef ColumnPosition
    {
        $$ = &ast.AlterTableSpec{
            IfNotExists: $3.(bool),
            Tp:          ast.AlterTableAddColumns,
            NewColumns:  []*ast.ColumnDef{$4.(*ast.ColumnDef)},
            Position:    $5.(*ast.ColumnPosition),
        }
    }
Token
IgnoreOptional
TableName
AlterTableSpecListOpt
ColumnKeywordOpt
IfNotExists
ColumnDef 
ColumnPosition

主要看下 ast.AlterTableAddColumns ,來到 ddl_api.go

func (d *ddl) AlterTable(ctx sessionctx.Context, ident ast.Ident, specs []*ast.AlterTableSpec) (err error) {
     case ast.AlterTableAddColumns:
    err = d.AddColumn(ctx, ident, spec)
}

先進行了一系列的校驗:

  1. 若是添加 auto increment、primary key,unique key 屬性字段,返回不支持
  2. 檢查字段屬性
  3. 表中字段是否過多(剛來時聽過 table 最多支持 512 個字段,就在這裏,寫死的TableColumnCountLimit = uint32(512)
  4. 字段是否已經存在
  5. 字段長度是否超過最大值
  6. 等等等

太長不看,來到這裏

job := &model.Job{
        SchemaID:   schema.ID,
        TableID:    t.Meta().ID,
        SchemaName: schema.Name.L,
        Type:       model.ActionAddColumn,
        BinlogInfo: &model.HistoryInfo{},
        Args:       []interface{}{col, spec.Position, 0},
    }
err = d.doDDLJob(ctx, job)

進入 d.doDDLJob(ctx, job)

func (d *ddl) doDDLJob(ctx sessionctx.Context, job *model.Job) error {
    // 獲取 DDL SQL
    job.Query, _ = ctx.Value(sessionctx.QueryString).(string)
    // 賦給 task
    task := &limitJobTask{job, make(chan error)}
    // 傳入 limitJobCh ,我理解就是隊列,畢竟下一句都 true 了
    d.limitJobCh <- task
    ctx.GetSessionVars().StmtCtx.IsDDLJobInQueue = true
}
// 通知 worker ,worker 分爲 addIdxWorker 和 generalWorker
d.asyncNotifyWorker(job.Type)

後面的那段代碼我理解就是 for true 來查 Job 的狀態,若失敗就報錯,若成功就 log 。這就是 TiDB 接收 DDL 語句後的大體流程。


Worker 處理 Job

如今主要的任務就回到了 ddl_worker.go ,也就是上文提到的 handleDDLJobQueue 函數中,這裏開始循環處理隊列中的 Job ,這個方法有點繞,須要循環好幾回,由於每一個 Job 整個流程有好幾種狀態,根據不一樣狀態作不一樣處理,這裏我簡單點說,有興趣的能夠 debug

func (w *worker) handleDDLJobQueue(d *ddlCtx) error {
    once := true 
    for {
    waitTime := 2 * d.lease // 2個租約時間 1min 30s
        //開啓事務,每次循環都會 commit
    err := kv.RunInNewTxn(d.store, false, func(txn kv.Transaction) error {
        if !d.isOwner() {// 不是 owner 角色,就什麼都不作
            return nil
        }
        job, err = w.getFirstDDLJob(t) // 獲取隊列中頭部 Job
        if job == nil || err != nil { // 沒有則返回
            return errors.Trace(err)
        }
                if once {// Job的第一次循環都會走這個,但只是處理異常狀況,
                         // 在正常流程中,w.waitSchemaSynced 直接 return 到第二輪循環
                w.waitSchemaSynced(d, job, waitTime)
            once = false
            return nil
        }
                // 第二輪由於狀態問題,不會走到這個分支
        if job.IsDone() || job.IsRollbackDone() {
            err = w.finishDDLJob(t, job)
        }
                // 我懷疑這個地方是操做 KV 層的,由於沒找到實現,若是說錯了請各位指正
        d.mu.RLock()
        d.mu.hook.OnJobRunBefore(job)
        d.mu.RUnlock()

        tidbutil.WithRecovery(func() {
                        // runDDLJob 一看就是重要函數,下面會說,主要是更新5種狀態
            schemaVer, runJobErr = w.runDDLJob(d, t, job)
        }
                // 若是cancel了 就 finish
        if job.IsCancelled() {
            err = w.finishDDLJob(t, job)
        }
                // 更新 Job
        err = w.updateDDLJob(t, job, runJobErr != nil)

        d.mu.RLock()
        d.mu.hook.OnJobUpdated(job)
        d.mu.RUnlock()
    }
}
w.runDDLJob
func (w *worker) runDDLJob(d *ddlCtx, t *meta.Meta, job *model.Job) (ver int64, err error) {
    timeStart := time.Now()
    if job.IsFinished() {
        return
    }
    if job.IsCancelling() {
        return convertJob2RollbackJob(w, d, t, job)
    }
    if !job.IsRollingback() && !job.IsCancelling() {
        job.State = model.JobStateRunning
    }
        
        // 上面是不一樣狀態的處理
        // 這裏根據不一樣的 type 走不一樣的函數,我們就看 onAddColumn 
    switch job.Type {
    case model.ActionAddColumn:
        ver, err = onAddColumn(d, t, job)
    default:
        job.State = model.JobStateCancelled
        err = errInvalidDDLJob.GenWithStack("invalid ddl job type: %v", job.Type)
    }
    if err != nil {
        //這裏主要異常狀況
    }
    return
}
onAddColumn
func onAddColumn(d *ddlCtx, t *meta.Meta, job *model.Job) (ver int64, err error) {
    // 若是 job 是回滾狀態, 就走 drop colunm
    if job.IsRollingback() {
        ver, err = onDropColumn(t, job)
    }
        // checkAddColumn 這裏處理了幾種狀況,好比字段信息已經存在且是 public,就cancel
    tblInfo, columnInfo, col, pos, offset, err := checkAddColumn(t, job)
    if columnInfo == nil {
        // 根據 after first 語法 建立 ColumnInfo
    columnInfo, offset, err = createColumnInfo(tblInfo, col, pos)
    }
        // 五種狀態 none -> delete only -> write only -> reorganization -> public
    originalState := columnInfo.State
    switch columnInfo.State {
    case model.StateNone:
        // none -> delete only
        ver, err = updateVersionAndTableInfoWithCheck(t, job, tblInfo, originalState != columnInfo.State)
    case model.StateDeleteOnly:
        // delete only -> write only
        ver, err = updateVersionAndTableInfo(t, job, tblInfo, originalState != columnInfo.State)
    case model.StateWriteOnly:
        // write only -> reorganization
        ver, err = updateVersionAndTableInfo(t, job, tblInfo, originalState != columnInfo.State)
    case model.StateWriteReorganization:
        // reorganization -> public
        adjustColumnInfoInAddColumn(tblInfo, offset)
        columnInfo.State = model.StatePublic
        ver, err = updateVersionAndTableInfo(t, job, tblInfo, originalState != columnInfo.State)
        // Finish this job.
        job.FinishTableJob(model.JobStateDone, model.StatePublic, ver, tblInfo)
        asyncNotifyEvent(d, &util.Event{Tp: model.ActionAddColumn, TableInfo: tblInfo, ColumnInfo: columnInfo})
    default:
        err = ErrInvalidDDLState.GenWithStackByArgs("column", columnInfo.State)
    }
}
FinishTableJob
startTime := time.Now()
err = t.AddHistoryDDLJob(job, updateRawArgs)

最後加入 historyDDLJob ,供查詢歷史 DDL 操做。再作些清理工做,整個 DDL 流程就差很少走完了。

其餘 TiDB 的同步

還記得 domai.go 吧,仍是 init 函數,在這裏 TiDB 的非 owner 會隔一個 lease 時間去同步 ver ,大概看一下

init@domain.go
if ddlLease > 0 {
   do.wg.Add(1)
   // Local store needs to get the change information for every DDL state in each session.
   go do.loadSchemaInLoop(ddlLease)
}
do.loadSchemaInLoop

這裏是具體過程了,循環 reload,

func (do *Domain) loadSchemaInLoop(lease time.Duration) {
    ticker := time.NewTicker(lease / 2)
    for {
        select {
        case <-ticker.C:
            err := do.Reload()
        case _, ok := <-syncer.GlobalVersionCh():
            err := do.Reload()
        case <-syncer.Done():
            do.SchemaValidator.Stop()
            err := do.mustRestartSyncer()
            exitLoop := do.mustReload()
        case <-do.exit:
            return
        }
    }
}

這裏大體邏輯有這些:

  1. 獲取 version 並同步,若 reload 時間超過 lease/2 ,則可能報錯。
  2. 集羣同一時刻最多隻能有兩個版本的 schema ,若同步過程,某個 TiDB 的版本爲 1 ,其餘的已是 3 了( 間隔了個2 )則這個 TiDB 中止服務。
  3. 若與 PD 斷鏈,中止服務。
    • *

感受整個流程涉及的東西也很多,有些地方仍是要多看幾遍領悟,以後少不了訂正這篇了😓

相關文章
相關標籤/搜索