手擼golang GO與微服務 Saga模式之5

緣起

最近閱讀<<Go微服務實戰>> (劉金亮, 2021.1)
本系列筆記擬採用golang練習之git

Saga模式

  • saga模式將分佈式長事務切分爲一系列獨立短事務
  • 每一個短事務是可經過補償動做進行撤銷的
  • 事務動做和補動做償都是冪等的, 容許重複執行而不會有反作用
Saga由一系列的子事務「Ti」組成,
每一個Ti都有對應的補償「Ci」,
當Ti出現問題時Ci用於處理Ti執行帶來的問題。

能夠經過下面的兩個公式理解Saga模式。
T = T1 T2 … Tn
T = TCT

Saga模式的核心理念是避免使用長期持有鎖(如14.2.2節介紹的兩階段提交)的長事務,
而應該將事務切分爲一組按序依次提交的短事務,
Saga模式知足ACD(原子性、一致性、持久性)特徵。

摘自 <<Go微服務實戰>> 劉金亮, 2021.1

目標

  • 爲實現saga模式的分佈式事務, 先擼一個pub/sub事務消息隊列服務
  • 事務消息隊列服務的功能性要求github

    • 消息不會丟失: 消息的持久化
    • 消息的惟一性: 要求每一個消息有全局ID和子事務ID
    • 確保投遞成功: 投遞隊列持久化, 投遞狀態持久化, 失敗重試

子目標(Day 5)

  • 重構和完善消息投遞機制golang

    • iMsgHeap: 使用待投遞消息堆緩存消息. 老是優先投遞建立時間最小的消息
    • iMsgSource: 定義消息來源接口. 有兩種消息來源, 1-數據庫;2-eventbus
    • iMsgHistoryRing: 使用ring buffer記錄近期已投遞成功的消息, 防止重複投遞
    • tConcurrentMsgHeap: 最小CreateTime優先的消息堆, 實現iMsgHeap接口, 而且是線程安全的.
    • tDBMsgSource: 從數據庫拉取待投遞消息, 實現iMsgSource接口
    • tLiveMsgSource: 監聽eventbus即時推送的投遞消息, 實現iMsgSource接口
    • tMsgHistoryRing: 歷史消息的固定大小環形隊列, 實現iMsgHistoryRing接口, 緩存近期已投遞成功的消息
    • tDeliveryWorker:sql

      • 初始化時, 優先從數據庫加載待投遞消息
      • 使用iMsgHeap緩存待投遞消息, 並確保有序
      • 使用iMsgSource接口, 分別從db和eventbus接收投遞消息
      • 使用iMsgHistoryRing, 緩存已投遞成功的消息, 防止重複投遞

iMsgHeap.go

使用待投遞消息堆緩存消息. 老是優先投遞建立時間最小的消息數據庫

package delivery

import "learning/gooop/saga/mqs/models"

type iMsgHeap interface {
    Size() int
    IsEmpty() bool
    IsNotEmpty() bool

    Push(msg *models.QueuedMsg)
    Peek() *models.QueuedMsg
    Pop() *models.QueuedMsg
}

iMsgSource.go

定義消息來源接口. 有兩種消息來源, 1-數據庫;2-eventbusjson

package delivery

import "learning/gooop/saga/mqs/models"

type iMsgSource interface {
    MsgChan() <- chan *models.QueuedMsg
}

type tSourceExpireFunc func() bool

iMsgHistoryRing.go

使用ring buffer記錄近期已投遞成功的消息, 防止重複投遞緩存

package delivery

import "learning/gooop/saga/mqs/models"

type iMsgHistoryRing interface {
    Push(msg *models.QueuedMsg)
    Has(id int) bool
}

tConcurrentMsgHeap.go

最小CreateTime優先的消息堆, 實現iMsgHeap接口, 而且是線程安全的.安全

package delivery

import (
    "learning/gooop/saga/mqs/models"
    "sync"
)

type tConcurrentMsgHeap struct {
    items []*models.QueuedMsg
    size int
    mutex *sync.Mutex
}

func newMsgHeap() iMsgHeap {
    it := new(tConcurrentMsgHeap)
    it.init()
    return it
}

func (me *tConcurrentMsgHeap) init() {
    me.items = make([]*models.QueuedMsg, 0)
    me.size = 0
    me.mutex = new(sync.Mutex)
}

func (me *tConcurrentMsgHeap) Size() int {
    return me.size
}

func (me *tConcurrentMsgHeap) IsEmpty() bool {
    return me.size <= 0
}

func (me *tConcurrentMsgHeap) IsNotEmpty() bool {
    return !me.IsEmpty()
}


func (me *tConcurrentMsgHeap) has(msgID int) bool {
    for _,it := range me.items {
        if it.MsgID == msgID {
            return true
        }
    }
    return false
}

func (me *tConcurrentMsgHeap) Push(msg *models.QueuedMsg) {
    me.mutex.Lock()
    defer me.mutex.Unlock()

    if me.has(msg.MsgID) {
        return
    }

    me.ensureSize(me.size + 1)
    me.items[me.size] = msg
    me.size++

    me.shiftUp(me.size - 1)
}


func (me *tConcurrentMsgHeap) ensureSize(size int) {
    for ;len(me.items) < size; {
        me.items = append(me.items, nil)
    }
}

func (me *tConcurrentMsgHeap) parentOf(i int) int {
    return (i - 1) / 2
}

func (me *tConcurrentMsgHeap) leftChildOf(i int) int {
    return i*2 + 1
}

func (me *tConcurrentMsgHeap) rightChildOf(i int) int {
    return me.leftChildOf(i) + 1
}

func (me *tConcurrentMsgHeap) last() (i int, v *models.QueuedMsg) {
    if me.IsEmpty() {
        return -1, nil
    }

    i = me.size - 1
    v = me.items[i]
    return i,v
}

func (me *tConcurrentMsgHeap) shiftUp(i int) {
    if i <= 0 {
        return
    }
    v := me.items[i]

    pi := me.parentOf(i)
    pv := me.items[pi]

    if me.less(v, pv) {
        me.items[pi], me.items[i] = v, pv
        me.shiftUp(pi)
    }
}

func (me *tConcurrentMsgHeap) less(a, b *models.QueuedMsg) bool {
    return a.CreateTime < b.CreateTime
}

func (me *tConcurrentMsgHeap) Pop() *models.QueuedMsg {
    me.mutex.Lock()
    defer me.mutex.Unlock()

    if me.IsEmpty() {
        return nil
    }


    top := me.items[0]
    li, lv := me.last()
    me.items[0] = nil
    me.size--

    if me.IsEmpty() {
        return top
    }

    me.items[0] = lv
    me.items[li] = nil

    me.shiftDown(0)
    return top
}


func (me *tConcurrentMsgHeap) Peek() *models.QueuedMsg {
    me.mutex.Lock()
    defer me.mutex.Unlock()

    if me.IsEmpty() {
        return nil
    }

    return me.items[0]
}

func (me *tConcurrentMsgHeap) shiftDown(i int) {
    pv := me.items[i]
    ok, ci, cv := me.minChildOf(i)
    if ok && me.less(cv, pv) {
        me.items[i], me.items[ci] = cv, pv
        me.shiftDown(ci)
    }
}

func (me *tConcurrentMsgHeap) minChildOf(p int) (ok bool, i int, v *models.QueuedMsg) {
    li := me.leftChildOf(p)
    if li >= me.size {
        return false, 0, nil
    }
    lv := me.items[li]

    ri := me.rightChildOf(p)
    if ri >= me.size {
        return true, li, lv
    }
    rv := me.items[ri]

    if me.less(lv, rv) {
        return true, li, lv
    } else {
        return true, ri, rv
    }
}

tDBMsgSource.go

從數據庫拉取待投遞消息, 實現iMsgSource接口app

package delivery

import (
    "github.com/jmoiron/sqlx"
    "learning/gooop/saga/mqs/database"
    "learning/gooop/saga/mqs/models"
    "time"
)

type tDBMsgSource struct {
    clientID string
    expireFunc tSourceExpireFunc
    msgChan chan *models.QueuedMsg
}

func newDBMsgSource(clientID string, expireFunc tSourceExpireFunc) iMsgSource {
    it := new(tDBMsgSource)
    it.init(clientID, expireFunc)
    return it
}

func (me *tDBMsgSource) init(clientID string, expireFunc tSourceExpireFunc) {
    me.clientID = clientID
    me.expireFunc = expireFunc
    me.msgChan = make(chan *models.QueuedMsg, 1)
    go me.beginPollDB()
}

func (me *tDBMsgSource) MsgChan() <- chan *models.QueuedMsg {
    return me.msgChan
}

func (me *tDBMsgSource) beginPollDB() {
    interval := time.Duration(1) * time.Second
    for !me.expireFunc() {
        if len(me.msgChan) <= 0 {
            ok, msg := me.poll()
            if ok {
                me.msgChan <- msg
                continue
            }
        }

        // poll failed, or chan full
        time.Sleep(interval)
    }

    close(me.msgChan)
}

func (me *tDBMsgSource) poll() (bool, *models.QueuedMsg) {
    msg := &models.QueuedMsg{}
    e := database.DB(func(db *sqlx.DB) error {
        rows, err := db.Queryx(
            "select * from delivery_queue where client_id=? order by create_time asc limit 1",
            me.clientID,
        )
        if err != nil {
            return err
        }

        if rows.Next() {
            err = rows.StructScan(msg)
            if err != nil {
                return err
            }
            return nil

        } else {
            return gEmptyRowsErr
        }
    })

    if e != nil {
        return false, nil
    } else {
        return true, msg
    }
}

tLiveMsgSource.go

監聽eventbus即時推送的投遞消息, 實現iMsgSource接口less

package delivery

import (
    "fmt"
    "learning/gooop/saga/mqs/eventbus"
    "learning/gooop/saga/mqs/logger"
    "learning/gooop/saga/mqs/models"
    "learning/gooop/saga/mqs/models/events"
    "time"
)

type tLiveMsgSource struct {
    clientID string
    expireFunc tSourceExpireFunc
    msgChan chan *models.QueuedMsg
}

func newLiveMsgSource(clientID string, expireFunc tSourceExpireFunc) iMsgSource {
    it := new(tLiveMsgSource)
    it.init(clientID, expireFunc)
    return it
}

func (me *tLiveMsgSource) init(clientID string, expireFunc tSourceExpireFunc) {
    me.clientID = clientID
    me.expireFunc = expireFunc
    me.msgChan = make(chan *models.QueuedMsg, 1)

    eventbus.GlobalEventBus.Sub(events.MsgPublishedEvent,
        me.id(),
        me.handleMsgPublished)
    go me.beginWatchExpire()
}


func (me *tLiveMsgSource) id() string {
    return fmt.Sprintf("tLiveMsgSource.%s", me.clientID)
}

func (me *tLiveMsgSource) beginWatchExpire() {
    for range time.Tick(1 * time.Second) {
        if me.expireFunc() {
            me.afterExpired()
            return
        }
    }
}

func (me *tLiveMsgSource) afterExpired() {
    eventbus.GlobalEventBus.Unsub(events.MsgPublishedEvent, me.id())
    close(me.msgChan)
}

func (me *tLiveMsgSource) handleMsgPublished(_ string, args interface{}) {
    msg, ok := args.(*models.QueuedMsg)
    if !ok {
        return
    }

    if msg.ClientID != me.clientID {
        return
    }

    if len(me.msgChan) >= 0 {
        return
    }

    logger.Logf(
        "tLiveMsgSource.handleMsgPublished, clientID=%s, msg=%s/%s/%s",
        me.clientID, msg.GlobalID, msg.SubID, msg.Topic )
    me.msgChan <- msg
}

func (me *tLiveMsgSource) MsgChan() <- chan *models.QueuedMsg {
    return me.msgChan
}

tMsgHistoryRing.go

歷史消息的固定大小環形隊列, 實現iMsgHistoryRing接口, 緩存近期已投遞成功的消息

package delivery

import "learning/gooop/saga/mqs/models"


type tMsgHistoryRing struct {
    items []*models.QueuedMsg
    capacity int
    index int
}

func newMsgHistoryRing(capacity int) iMsgHistoryRing {
    it := new(tMsgHistoryRing)
    it.init(capacity)
    return it
}

func (me *tMsgHistoryRing) init(capacity int) {
    me.items = make([]*models.QueuedMsg, capacity)
    me.capacity = capacity
    me.index = 0
}


func (me *tMsgHistoryRing) Has(id int) bool {
    for _,it := range me.items {
        if it != nil && it.ID == id {
            return true
        }
    }

    return false
}

func (me *tMsgHistoryRing) Push(msg *models.QueuedMsg) {
    me.items[me.index] = msg

    me.index++
    if me.index >= me.capacity {
        me.index = 0
    }
}

tDeliveryWorker.go

  • 初始化時, 優先從數據庫加載待投遞消息
  • 使用iMsgHeap緩存待投遞消息, 並確保有序
  • 使用iMsgSource接口, 分別從db和eventbus接收投遞消息
  • 使用iMsgHistoryRing, 緩存已投遞成功的消息, 防止重複投遞
package delivery

import (
    "bytes"
    "encoding/json"
    "errors"
    "github.com/jmoiron/sqlx"
    "io/ioutil"
    "learning/gooop/saga/mqs/database"
    "learning/gooop/saga/mqs/logger"
    "learning/gooop/saga/mqs/models"
    "net/http"
    "time"
)

type tDeliveryWorker struct {
    info        *tWorkerInfo
    successRing iMsgHistoryRing
    dbSource iMsgSource
    liveSource iMsgSource
    msgHeap iMsgHeap
}

func newDeliveryWorker(info *tWorkerInfo) *tDeliveryWorker {
    it := new(tDeliveryWorker)
    it.init(info)
    return it
}

// init: do initialization, and start initial load
func (me *tDeliveryWorker) init(info *tWorkerInfo) {
    me.info = info
    me.successRing = newMsgHistoryRing(64)

    me.dbSource = newDBMsgSource(info.ClientID, me.isExpired)
    me.liveSource = newLiveMsgSource(info.ClientID, me.isExpired)
    me.msgHeap = newMsgHeap()

    go me.beginInitialLoadFromDB()
}

// beginInitialLoadFromDB: initially, load queued msg from database
func (me *tDeliveryWorker) beginInitialLoadFromDB() {
    buf := [][]*models.QueuedMsg{ nil }
    for !me.isExpired() {
        err := database.DB(func(db *sqlx.DB) error {
            e, rows := me.loadFromDB(db)
            if e != nil {
                return e
            }

            buf[0] = rows
            return nil
        })

        if err != nil {
            logger.Logf("tDeliveryWorker.initialLoadFromDB, clientID=%s, err=%s", me.info.ClientID, err.Error())
            time.Sleep(3 * time.Second)
        } else {
            me.afterInitialLoad(buf[0])
        }
    }
}

// loadFromDB: load queued msg from database
func (me *tDeliveryWorker) loadFromDB(db *sqlx.DB) (error, []*models.QueuedMsg) {
    rows, err := db.Queryx(
        "select * from delivery_queue where client_id=? order by create_time asc limit ?",
        me.info.ClientID,
        gInitialLoadRows,
    )
    if err != nil {
        return err, nil
    }

    msgList := []*models.QueuedMsg{}
    for rows.Next() {
        msg := &models.QueuedMsg{}
        err = rows.StructScan(msg)
        if err != nil {
            return err, nil
        }
        msgList = append(msgList, msg)
    }

    return nil, msgList
}

// afterInitialLoad: after initial load done, push msgs into heap, and start delivery loop
func (me *tDeliveryWorker) afterInitialLoad(msgList []*models.QueuedMsg) {
    logger.Logf("tDeliveryWorker.afterInitialLoad, clientID=%s, rows=%d", me.info.ClientID, len(msgList))
    for _,it := range msgList {
        me.msgHeap.Push(it)
    }

    go me.beginPollAndDeliver()
}

// beginPollAndDeliver: poll msg from heap, and then deliver it
func (me *tDeliveryWorker) beginPollAndDeliver() {
    for !me.isExpired() {
        select {
        case msg := <- me.dbSource.MsgChan():
            me.msgHeap.Push(msg)
            break

        case msg := <- me.liveSource.MsgChan():
            me.msgHeap.Push(msg)
            break
        }

        if me.msgHeap.IsEmpty() {
            continue
        }

        msg := me.msgHeap.Pop()
        if msg == nil {
            continue
        }

        switch msg.StatusFlag {
        case 0:
            // 未處理的消息
            me.handleUndeliveredMsg(msg)
            break

        case 1:
            // 處理中的消息
            me.handleDeliveringMsg(msg)
            break
        }
    }
}

// isExpired: is me expired?
func (me *tDeliveryWorker) isExpired() bool {
    return time.Now().UnixNano() >= me.info.ExpireTime
}

// handleUndeliveredMsg: if msg unhandled, then try to deliver it
func (me *tDeliveryWorker) handleUndeliveredMsg(msg *models.QueuedMsg) {
    err := database.DB(func(db *sqlx.DB) error {
        now := time.Now().UnixNano()
        r,e := db.Exec(
            "update delivery_queue set status_flag=1, update_time=? where id=? and status_flag=0 and update_time=?",
            now,
            msg.ID,
            msg.UpdateTime,
        )
        if e != nil {
            return e
        }

        rows, e := r.RowsAffected()
        if e != nil {
            return e
        }
        if rows != 1 {
            return gOneRowsErr
        }

        msg.StatusFlag = 1
        msg.UpdateTime = now

        return nil
    })

    if err != nil {
        logger.Logf("tDeliveryWorker.handleNewMsg, id=%v, msg=%s/%s, err=%s", me.info.ClientID, msg.GlobalID, msg.SubID, err.Error())
        return
    }

    if me.deliver(msg) {
        me.afterDeliverySuccess(msg)

    } else {
        me.afterDeliveryFailed(msg)
    }
}

// deliver: use http.Post function to delivery msg
func (me *tDeliveryWorker) deliver(msg *models.QueuedMsg) bool {
    if me.successRing.Has(msg.ID) {
        return true
    }

    t := &models.TxMsg{
        GlobalID: msg.GlobalID,
        SubID: msg.SubID,
        Topic: msg.Topic,
        CreateTime: msg.CreateTime,
        Content: msg.Content,
    }
    j,e := json.Marshal(t)
    if e != nil {
        logger.Logf("tDeliveryWorker.deliver, failed json.Marshal, id=%v, msg=%s/%s", me.info.ClientID, msg.GlobalID, msg.SubID)
        return false
    }

    r, e := http.Post(me.info.NotifyURL, "application/json", bytes.NewReader(j))
    if e != nil {
        logger.Logf("tDeliveryWorker.deliver, failed http.Post, id=%v, msg=%s/%s", me.info.ClientID, msg.GlobalID, msg.SubID)
        return false
    }

    defer r.Body.Close()
    rep, e := ioutil.ReadAll(r.Body)
    if e != nil {
        logger.Logf("tDeliveryWorker.deliver, failed ioutil.ReadAll, id=%v, msg=%s/%s", me.info.ClientID, msg.GlobalID, msg.SubID)
        return false
    }

    m := &models.OkMsg{}
    e = json.Unmarshal(rep, m)
    if e != nil {
        logger.Logf("tDeliveryWorker.deliver, failed json.Unmarshal, id=%v, msg=%s/%s", me.info.ClientID, msg.GlobalID, msg.SubID)
        return false
    }

    if m.OK {
        return true
    } else {
        logger.Logf("tDeliveryWorker.deliver, failed OkMsg.OK, id=%v, msg=%s/%s", me.info.ClientID, msg.GlobalID, msg.SubID)
        return false
    }
}

// handleDeliveringMsg: if delivery timeout, then retry delivery
func (me *tDeliveryWorker) handleDeliveringMsg(msg *models.QueuedMsg) {
    now := time.Now().UnixNano()
    if msg.UpdateTime + gDeliveryTimeoutNanos > now {
        return
    }

    // delivery timeout
    me.afterDeliveryTimeout(msg)
}

// afterDeliverySuccess: if done, move msg to success queue
func (me *tDeliveryWorker) afterDeliverySuccess(msg *models.QueuedMsg) {
    if me.successRing.Has(msg.ID) {
        return
    }
    me.successRing.Push(msg)

    err := database.TX(func(db *sqlx.DB, tx *sqlx.Tx) error {
        r,e := db.Exec(
    "delete from delivery_queue where id=? and update_time=? and status_flag=1",
            msg.ID,
            msg.UpdateTime,
        )
        if e != nil {
            return e
        }

        rows, e := r.RowsAffected()
        if e != nil {
            return e
        }
        if rows != 1 {
            return gOneRowsErr
        }

        r, e = db.Exec(
            "insert into success_queue (msg_id, client_id, create_time) values(?, ?, ?)",
            msg.ID,
            msg.ClientID,
            time.Now().UnixNano(),
        )
        if e != nil {
            return e
        }

        rows, e = r.RowsAffected()
        if e != nil {
            return e
        }
        if rows != 1 {
            return gOneRowsErr
        }

        return nil
    })

    if err != nil {
        logger.Logf("tDeliveryWorker.afterDeliverySuccess, failed, id=%v, msg=%s/%s, err=%s", me.info.ClientID, msg.GlobalID, msg.SubID, err.Error())
    } else {
        logger.Logf("tDeliveryWorker.afterDeliverySuccess, done, id=%v, msg=%s/%s", me.info.ClientID, msg.GlobalID, msg.SubID)
    }
}


// afterDeliveryFailed: if failed, do nothing but just log it
func (me *tDeliveryWorker) afterDeliveryFailed(msg *models.QueuedMsg) {
    logger.Logf("tDeliveryWorker.afterDeliveryFailed, id=%v, msg=%s/%s", me.info.ClientID, msg.GlobalID, msg.SubID)
}

// afterDeliveryTimeout: if timeout, then reset status and retry
func (me *tDeliveryWorker) afterDeliveryTimeout(msg *models.QueuedMsg) {
    err := database.DB(func(db *sqlx.DB) error {
        r,e := db.Exec(
            "update delivery_queue set status_flag=0 where id=? and status_flag=1 and update_time=?",
            msg.ID,
            msg.UpdateTime,
        )
        if e != nil {
            return e
        }

        rows,e := r.RowsAffected()
        if e != nil {
            return e
        }

        if rows != 1 {
            return gOneRowsErr
        }

        return nil
    })

    if err != nil {
        logger.Logf("tDeliveryWorker.afterDeliveryTimeout, failed, id=%v, msg=%s/%s, err=%s", me.info.ClientID, msg.GlobalID, msg.SubID, err.Error())
    } else {
        logger.Logf("tDeliveryWorker.afterDeliveryTimeout, done, id=%v, msg=%s/%s", me.info.ClientID, msg.GlobalID, msg.SubID)
    }
}

var gEmptyRowsErr = errors.New("empty rows")
var gOneRowsErr = errors.New("expecting one row affected")
var gDeliveryTimeoutNanos = int64(10 * (time.Second / time.Nanosecond))
var gInitialLoadRows = 100

(未完待續)

相關文章
相關標籤/搜索