手擼golang GO與微服務 Saga模式之4

緣起

最近閱讀<<Go微服務實戰>> (劉金亮, 2021.1)
本系列筆記擬採用golang練習之git

Saga模式

  • saga模式將分佈式長事務切分爲一系列獨立短事務
  • 每一個短事務是可經過補償動做進行撤銷的
  • 事務動做和補動做償都是冪等的, 容許重複執行而不會有反作用
Saga由一系列的子事務「Ti」組成,
每一個Ti都有對應的補償「Ci」,
當Ti出現問題時Ci用於處理Ti執行帶來的問題。

能夠經過下面的兩個公式理解Saga模式。
T = T1 T2 … Tn
T = TCT

Saga模式的核心理念是避免使用長期持有鎖(如14.2.2節介紹的兩階段提交)的長事務,
而應該將事務切分爲一組按序依次提交的短事務,
Saga模式知足ACD(原子性、一致性、持久性)特徵。

摘自 <<Go微服務實戰>> 劉金亮, 2021.1

目標

  • 爲實現saga模式的分佈式事務, 先擼一個pub/sub事務消息隊列服務
  • 事務消息隊列服務的功能性要求github

    • 消息不會丟失: 消息的持久化
    • 消息的惟一性: 要求每一個消息有全局ID和子事務ID
    • 確保投遞成功: 投遞隊列持久化, 投遞狀態持久化, 失敗重試

子目標(Day 4)

  • 完善投遞workergolang

    • 未處理消息: 標記, 並嘗試投遞
    • 已處理消息: 判斷是否超時, 並重試投遞
    • 投遞成功: 移動到成功投遞表
    • 投遞失敗: 重置標記, 下輪重試
  • 數據庫表相應的細節調整sql

    • delivery_queue: 去掉failed_count, 增長update_time時間戳
    • success_queue: 去掉sub_id, 改成client_id, 並增長create_time時間戳
    • failed_queue: 由於不容許失敗, 所以刪除失敗投遞表

tDeliveryWorker.go

  • 完善投遞worker數據庫

    • 未處理消息: 標記, 並嘗試投遞
    • 已處理消息: 判斷是否超時, 並重試投遞
    • 投遞成功: 移動到成功投遞表
    • 投遞失敗: 重置標記, 下輪重試
package delivery

import (
    "bytes"
    "encoding/json"
    "errors"
    "github.com/jmoiron/sqlx"
    "io/ioutil"
    "learning/gooop/saga/mqs/database"
    "learning/gooop/saga/mqs/logger"
    "learning/gooop/saga/mqs/models"
    "net/http"
    "time"
)

type tDeliveryWorker struct {
    info *tWorkerInfo
}

func newDeliveryWorker(info *tWorkerInfo) *tDeliveryWorker {
    it := new(tDeliveryWorker)
    it.info = info
    go it.beginMainLoop()
    return it
}

func (me *tDeliveryWorker) beginMainLoop() {
    for !me.isExpired() {
        ok, msg := me.peek()
        if ok {
            switch msg.StatusFlag {
            case 0:
                // 未處理的消息
                me.handleUndeliveredMsg(msg)
                break

            case 1:
                // 處理中的消息
                me.handleDeliveringMsg(msg)
                break
            }

        } else {
            time.Sleep(time.Duration(1) * time.Second)
        }
    }
}


func (me *tDeliveryWorker) isExpired() bool {
    return time.Now().UnixNano() >= me.info.ExpireTime
}

// peek: 從待投遞隊列中獲取最先的一條記錄
func (me *tDeliveryWorker) peek() (bool, *models.QueuedMsg) {
    msg := &models.QueuedMsg{}
    e := database.DB(func(db *sqlx.DB) error {
        rows, err := db.Queryx(
            "select * from delivery_queue where client_id=? order by create_time asc limit 1",
            me.info.ClientID,
        )
        if err != nil {
            return err
        }

        if rows.Next() {
            err = rows.StructScan(msg)
            if err != nil {
                return err
            }
            return nil

        } else {
            return gEmptyRowsErr
        }
    })

    if e != nil {
        return false, nil
    } else {
        return true, msg
    }
}


// handleUndeliveredMsg: if msg unhandled, then try to deliver it
func (me *tDeliveryWorker) handleUndeliveredMsg(msg *models.QueuedMsg) {
    err := database.DB(func(db *sqlx.DB) error {
        now := time.Now().UnixNano()
        r,e := db.Exec(
            "update delivery_queue set status_flag=1, update_time=? where id=? and status_flag=0 and update_time=?",
            now,
            msg.ID,
            msg.UpdateTime,
        )
        if e != nil {
            return e
        }

        rows, e := r.RowsAffected()
        if e != nil {
            return e
        }
        if rows != 1 {
            return gOneRowsErr
        }

        msg.UpdateTime = now
        return nil
    })

    if err != nil {
        logger.Logf("tDeliveryWorker.handleNewMsg, id=%v, msg=%s/%s, err=%s", me.info.ClientID, msg.GlobalID, msg.SubID, err.Error())
        return
    }

    if me.deliver(msg) {
        me.afterDeliverySuccess(msg)

    } else {
        me.afterDeliveryFailed(msg)
    }
}

// deliver: use http.Post function to delivery msg
func (me *tDeliveryWorker) deliver(msg *models.QueuedMsg) bool {
    t := &models.TxMsg{
        GlobalID: msg.GlobalID,
        SubID: msg.SubID,
        Topic: msg.Topic,
        CreateTime: msg.CreateTime,
        Content: msg.Content,
    }
    j,e := json.Marshal(t)
    if e != nil {
        logger.Logf("tDeliveryWorker.deliver, failed json.Marshal, id=%v, msg=%s/%s", me.info.ClientID, msg.GlobalID, msg.SubID)
        return false
    }

    r, e := http.Post(me.info.NotifyURL, "application/json", bytes.NewReader(j))
    if e != nil {
        logger.Logf("tDeliveryWorker.deliver, failed http.Post, id=%v, msg=%s/%s", me.info.ClientID, msg.GlobalID, msg.SubID)
        return false
    }

    defer r.Body.Close()
    rep, e := ioutil.ReadAll(r.Body)
    if e != nil {
        logger.Logf("tDeliveryWorker.deliver, failed ioutil.ReadAll, id=%v, msg=%s/%s", me.info.ClientID, msg.GlobalID, msg.SubID)
        return false
    }

    m := &models.OkMsg{}
    e = json.Unmarshal(rep, m)
    if e != nil {
        logger.Logf("tDeliveryWorker.deliver, failed json.Unmarshal, id=%v, msg=%s/%s", me.info.ClientID, msg.GlobalID, msg.SubID)
        return false
    }

    if m.OK {
        return true
    } else {
        logger.Logf("tDeliveryWorker.deliver, failed OkMsg.OK, id=%v, msg=%s/%s", me.info.ClientID, msg.GlobalID, msg.SubID)
        return false
    }
}

// handleDeliveringMsg: if delivery timeout, then retry delivery
func (me *tDeliveryWorker) handleDeliveringMsg(msg *models.QueuedMsg) {
    now := time.Now().UnixNano()
    if msg.UpdateTime + gDeliveryTimeoutNanos > now {
        return
    }

    // delivery timeout
    me.afterDeliveryTimeout(msg)
}

// afterDeliverySuccess: if done, move msg to success queue
func (me *tDeliveryWorker) afterDeliverySuccess(msg *models.QueuedMsg) {
    err := database.TX(func(db *sqlx.DB, tx *sqlx.Tx) error {
        r,e := db.Exec(
    "delete from delivery_queue where id=? and update_time=? and status_flag=1",
            msg.ID,
            msg.UpdateTime,
        )
        if e != nil {
            return e
        }

        rows, e := r.RowsAffected()
        if e != nil {
            return e
        }
        if rows != 1 {
            return gOneRowsErr
        }

        r, e = db.Exec(
            "insert into success_queue (msg_id, client_id, create_time) values(?, ?, ?)",
            msg.ID,
            msg.ClientID,
            time.Now().UnixNano(),
        )
        if e != nil {
            return e
        }

        rows, e = r.RowsAffected()
        if e != nil {
            return e
        }
        if rows != 1 {
            return gOneRowsErr
        }

        return nil
    })

    if err != nil {
        logger.Logf("tDeliveryWorker.afterDeliverySuccess, failed, id=%v, msg=%s/%s, err=%s", me.info.ClientID, msg.GlobalID, msg.SubID, err.Error())
    } else {
        logger.Logf("tDeliveryWorker.afterDeliverySuccess, done, id=%v, msg=%s/%s", me.info.ClientID, msg.GlobalID, msg.SubID)
    }
}


// afterDeliveryFailed: if failed, do nothing but just log it
func (me *tDeliveryWorker) afterDeliveryFailed(msg *models.QueuedMsg) {
    logger.Logf("tDeliveryWorker.afterDeliveryFailed, id=%v, msg=%s/%s", me.info.ClientID, msg.GlobalID, msg.SubID)
}

// afterDeliveryTimeout: if timeout, then reset status and retry
func (me *tDeliveryWorker) afterDeliveryTimeout(msg *models.QueuedMsg) {
    err := database.DB(func(db *sqlx.DB) error {
        r,e := db.Exec(
            "update delivery_queue set status_flag=0 where id=? and status_flag=1 and update_time=?",
            msg.ID,
            msg.UpdateTime,
        )
        if e != nil {
            return e
        }

        rows,e := r.RowsAffected()
        if e != nil {
            return e
        }

        if rows != 1 {
            return gOneRowsErr
        }

        return nil
    })

    if err != nil {
        logger.Logf("tDeliveryWorker.afterDeliveryTimeout, failed, id=%v, msg=%s/%s, err=%s", me.info.ClientID, msg.GlobalID, msg.SubID, err.Error())
    } else {
        logger.Logf("tDeliveryWorker.afterDeliveryTimeout, done, id=%v, msg=%s/%s", me.info.ClientID, msg.GlobalID, msg.SubID)
    }
}

var gEmptyRowsErr = errors.New("empty rows")
var gOneRowsErr = errors.New("expecting one row affected")
var gDeliveryTimeoutNanos = int64(10 * (time.Second / time.Nanosecond))

database.go

  • 數據庫表相應的細節調整json

    • delivery_queue: 去掉failed_count, 增長update_time時間戳
    • success_queue: 去掉sub_id, 改成client_id, 並增長create_time時間戳
    • failed_queue: 由於不容許失敗, 所以刪除失敗投遞表
package database

import "github.com/jmoiron/sqlx"
import _ "github.com/mattn/go-sqlite3"

type DBFunc func(db *sqlx.DB) error
type TXFunc func(db *sqlx.DB, tx *sqlx.Tx) error

func init() {
    // must prepare tables
    err := DB(initDB)
    if err != nil {
        panic(err)
    }
}

func initDB(db *sqlx.DB) error {
    // 訂閱者/消費者: subscriber
    _, e := db.Exec(`create table if not exists subscriber(
    id integer primary key autoincrement,
    client_id varchar(50) unique not null,
    topic varchar(100) not null,
    notify_url varchar(500) not null,
    expire_time integer
)`)
    if e != nil {
        return e
    }

    // 事務消息: tx_msg
    _, e = db.Exec(`create table if not exists tx_msg (
    id integer primary key autoincrement,
    global_id string varchar(50) not null,
    sub_id string varchar(50) unique not null,
    sender_id varchar(50) not null,
    create_time integer not null,
    topic varchar(100) not null,
    content nvarchar(2048) not null
)`)
    if e != nil {
        return e
    }

    // 投遞隊列: delivery_queue
    _, e = db.Exec(`create table if not exists delivery_queue (
    id integer primary key autoincrement,
    
    client_id varchar(50) not null,
    notify_url varchar(500) not null,
    
    msg_id integer not null,
    global_id string varchar(50) not null,
    sub_id string varchar(50) unique not null,
    sender_id varchar(50) not null,
    create_time integer not null,
    topic varchar(100) not null,
    content nvarchar(2048) not null,
    
    status_flag integer not null,
    update_time integer not null
)`)
    if e != nil {
        return e
    }

    // 成功投遞隊列: success_queue
    _, e = db.Exec(`create table if not exists success_queue (
    id integer primary key autoincrement,
    msg_id integer not null,
    client_id varchar(50) not null,
    create_time integer not null
)`)
    if e != nil {
        return e
    }

//    // 投遞失敗隊列: failed_queue
//    _, e = db.Exec(`create table if not exists failed_queue (
//    id integer primary key autoincrement,
//    msg_id integer not null,
//    client_id varchar(50) not null,
//    create_time integer not null
//)`)
//    if e != nil {
//        return e
//    }

    return nil
}

func open() (*sqlx.DB, error) {
    return sqlx.Open("sqlite3", "./mqs.db")
}

func DB(action DBFunc) error {
    db,err := open()
    if err != nil {
        return err
    }
    defer func() { _ = db.Close() }()

    return action(db)
}

func TX(action TXFunc) error {
    return DB(func(db *sqlx.DB) error {
        tx, err := db.Beginx()
        if err != nil {
            return err
        }

        err = action(db, tx)
        if err == nil {
            return tx.Commit()
        } else {
            return tx.Rollback()
        }
    })
}

(未完待續)app

相關文章
相關標籤/搜索