手擼golang GO與微服務 Saga模式之3

緣起

最近閱讀<<Go微服務實戰>> (劉金亮, 2021.1)
本系列筆記擬採用golang練習之git

Saga模式

  • saga模式將分佈式長事務切分爲一系列獨立短事務
  • 每一個短事務是可經過補償動做進行撤銷的
  • 事務動做和補動做償都是冪等的, 容許重複執行而不會有反作用
Saga由一系列的子事務「Ti」組成,
每一個Ti都有對應的補償「Ci」,
當Ti出現問題時Ci用於處理Ti執行帶來的問題。

能夠經過下面的兩個公式理解Saga模式。
T = T1 T2 … Tn
T = TCT

Saga模式的核心理念是避免使用長期持有鎖(如14.2.2節介紹的兩階段提交)的長事務,
而應該將事務切分爲一組按序依次提交的短事務,
Saga模式知足ACD(原子性、一致性、持久性)特徵。

摘自 <<Go微服務實戰>> 劉金亮, 2021.1

目標

  • 爲實現saga模式的分佈式事務, 先擼一個pub/sub事務消息隊列服務
  • 事務消息隊列服務的功能性要求github

    • 消息不會丟失: 消息的持久化
    • 消息的惟一性: 要求每一個消息有全局ID和子事務ID
    • 確保投遞成功: 投遞隊列持久化, 投遞狀態持久化, 失敗重試

子目標(Day 3)

  • 有序,可靠地投遞消息golang

    • 有序: 每一個訂閱者綁定獨立的投遞worker, 按時間戳投遞
    • 可靠: 投遞狀態的持久化

設計

  • QueuedMsg: 把投遞隊列的消息設計得胖一點, 以減小join的使用
  • IEventBus: 設計一個內部消息總線, 以減小邏輯耦合
  • tDeliveryService: 投遞服務, 管理若干個投遞worker
  • tWorkerInfo: 投遞worker的初始化參數
  • tDeliveryWorker: 投遞worker(未完成)

QueuedMsg.go

把待投遞消息設計得胖一點, 以減小join的使用sql

package models

type QueuedMsg struct {
    ID int
    ClientID string
    NotifyURL string

    MsgID int
    GlobalID string
    SubID string
    SenderID string
    CreateTime int64
    Topic string
    Content string

    StatusFlag int
    FailedCount int
}

IEventBus.go

設計一個內部消息總線, 以減小邏輯耦合json

package eventbus

import "sync"

type EventHandleFunc func(e string, args interface{})

type IEventBus interface {
    Pub(e string, args interface{})
    Sub(e string, handler EventHandleFunc)
}


type tEventBus struct {
    rwmutex *sync.RWMutex
    items map[string][]EventHandleFunc
}


func newEventBus() IEventBus {
    it := new(tEventBus)
    it.init()
    return it
}

func (me *tEventBus) init() {
    me.rwmutex = new(sync.RWMutex)
    me.items = make(map[string][]EventHandleFunc)
}

func (me *tEventBus) Pub(e string, args interface{}) {
    me.rwmutex.RLock()
    defer me.rwmutex.RUnlock()
    
    handlers,ok := me.items[e]
    if ok {
        for _,it := range handlers {
            go it(e, args)
        }
    }
}

func (me *tEventBus) Sub(e string, handler EventHandleFunc) {
    me.rwmutex.Lock()
    defer me.rwmutex.Unlock()

    handlers,ok := me.items[e]
    if ok {
        me.items[e] = append(handlers, handler)
    } else {
        me.items[e] = []EventHandleFunc{handler }
    }
}


var GlobalEventBus = newEventBus()

tDeliveryService.go

投遞服務, 管理若干個投遞workerapp

package delivery

import (
    "github.com/jmoiron/sqlx"
    "learning/gooop/saga/mqs/database"
    "learning/gooop/saga/mqs/eventbus"
    "learning/gooop/saga/mqs/logger"
    "learning/gooop/saga/mqs/models"
    "sync"
    "time"
)

type tDeliveryService struct {
    rwmutex *sync.RWMutex
    workers map[string]*tDeliveryWorker
}

func newDeliveryService() *tDeliveryService {
    it := new(tDeliveryService)
    it.init()
    return it
}

func (me *tDeliveryService) init() {
}

func (me *tDeliveryService) handleBootEvent(e string, args interface{}) {
    go me.beginCreatingWorkers()
    go me.beginCleanExpiredWorkers()
}


func (me *tDeliveryService) beginCreatingWorkers() {
    for {
        e := database.DB(func(db *sqlx.DB) error {
            now := time.Now().UnixNano()
            rows, err := db.Queryx("select client_id, notify_url, expire_time from subscriber where expire_time>?", now)
            if err != nil {
                return err
            }

            for rows.Next() {
                it := new(tWorkerInfo)
                err = rows.StructScan(it)
                if err != nil {
                    return err
                }

                me.createWorker(it)
            }

            return nil
        })

        if e != nil {
            logger.Logf("tDeliveryService.beginCreatingWorkers, error = %s", e.Error())
        }
        time.Sleep(time.Duration(5) * time.Second)
    }
}


func (me *tDeliveryService) beginCleanExpiredWorkers() {
    for range time.Tick(time.Duration(30) * time.Second) {
        me.clean()
    }
}

func (me *tDeliveryService) clean() {
    me.rwmutex.RLock()
    var keys []string
    for k,v := range me.workers {
        if v.isExpired() {
            keys = append(keys, k)
        }
    }
    me.rwmutex.RUnlock()

    if len(keys) == 0 {
        return
    }

    me.rwmutex.Lock()
    defer me.rwmutex.Unlock()
    for _,k := range keys {
        delete(me.workers, k)
    }
}

func (me *tDeliveryService) handleNewSubscriber(args interface{}) {
    it, ok := args.(*models.SubMsg)
    if !ok {
        return
    }

    me.createWorker(&tWorkerInfo{
        it.ClientID, it.NotifyUrl, it.ExpireTime,
    })
}

func maxInt64(a, b int64) int64 {
    if a >= b {
        return a
    }
    return b
}

func (me *tDeliveryService) createWorker(info *tWorkerInfo) {
    me.rwmutex.RLock()
    w,ok := me.workers[info.ClientID]
    me.rwmutex.RUnlock()
    if ok {
        w.info.ExpireTime = maxInt64(w.info.ExpireTime, info.ExpireTime)
        return
    }

    me.rwmutex.Lock()
    defer me.rwmutex.Unlock()
    me.workers[info.ClientID] = newDeliveryWorker(info)
}

var gDeliveryService = newDeliveryService()

func init() {
    eventbus.GlobalEventBus.Sub("boot", gDeliveryService.handleBootEvent)
    eventbus.GlobalEventBus.Sub("subscriber.new", gDeliveryService.handleBootEvent)
}

tWorkerInfo.go

投遞worker的初始化參數分佈式

package delivery

type tWorkerInfo struct {
    ClientID string
    NotifyURL string
    ExpireTime int64
}

tDeliveryWorker.go

投遞worker, 未完成微服務

package delivery

import (
    "bytes"
    "encoding/json"
    "errors"
    "github.com/jmoiron/sqlx"
    "io/ioutil"
    "learning/gooop/saga/mqs/database"
    "learning/gooop/saga/mqs/logger"
    "learning/gooop/saga/mqs/models"
    "net/http"
    "time"
)

type tDeliveryWorker struct {
    info *tWorkerInfo
}

func newDeliveryWorker(info *tWorkerInfo) *tDeliveryWorker {
    it := new(tDeliveryWorker)
    it.info = info
    go it.beginMainLoop()
    return it
}

func (me *tDeliveryWorker) beginMainLoop() {
    for !me.isExpired() {
        ok, msg := me.peek()
        if ok {
            switch msg.StatusFlag {
            case 0:
                // 未處理的消息
                me.handleNewMsg(msg)
                break

            case 1:
                // 處理中的消息
                me.handleDeliveringMsg(msg)
                break
            }

        } else {
            time.Sleep(time.Duration(1) * time.Second)
        }
    }
}


func (me *tDeliveryWorker) isExpired() bool {
    return time.Now().UnixNano() >= me.info.ExpireTime
}

var gEmptyRowsErr = errors.New("empty rows")

func (me *tDeliveryWorker) peek() (bool, *models.QueuedMsg) {
    msg := &models.QueuedMsg{}
    e := database.DB(func(db *sqlx.DB) error {
        rows, err := db.Queryx("select * from delivery_queue where client_id=? order by create_time asc limit 1", me.info.ClientID)
        if err != nil {
            return err
        }

        if rows.Next() {
            err = rows.StructScan(msg)
            if err != nil {
                return err
            }
            return nil

        } else {
            return gEmptyRowsErr
        }
    })

    if e != nil {
        return false, nil
    } else {
        return true, msg
    }
}



func (me *tDeliveryWorker) handleNewMsg(msg *models.QueuedMsg) bool {
    err := database.DB(func(db *sqlx.DB) error {
        r,e := db.Exec("update delivery_queue set status_flag=1 where id=?", msg.ID)
        if e != nil {
            return e
        }

        rows, e := r.RowsAffected()
        if e != nil {
            return e
        }
        if rows != 1 {
            return gEmptyRowsErr
        }

        return nil
    })

    if err != nil {
        logger.Logf("tDeliveryWorker.handleNewMsg, id=%v, msg=%s/%s, err=%s", me.info.ClientID, msg.GlobalID, msg.SubID, err.Error())
        return false
    }

    if me.deliver(msg) {
        // todo: move msg to success queue
    } else {
        // todo: msg.failed_count++
    }

    return false
}


func (me *tDeliveryWorker) deliver(msg *models.QueuedMsg) bool {
    t := &models.TxMsg{
        GlobalID: msg.GlobalID,
        SubID: msg.SubID,
        Topic: msg.Topic,
        CreateTime: msg.CreateTime,
        Content: msg.Content,
    }
    j,e := json.Marshal(t)
    if e != nil {
        return false
    }

    r, e := http.Post(me.info.NotifyURL, "application/json", bytes.NewReader(j))
    if e != nil {
        return false
    }

    defer r.Body.Close()
    rep, e := ioutil.ReadAll(r.Body)
    if e != nil {
        return false
    }

    m := &models.OkMsg{}
    e = json.Unmarshal(rep, m)
    if e != nil {
        return false
    }

    return m.OK
}

func (me *tDeliveryWorker) handleDeliveringMsg(msg *models.QueuedMsg) bool {
    // todo: check if delivery timeout
    return false
}

(未完待續)oop

相關文章
相關標籤/搜索