最近閱讀<<Go微服務實戰>> (劉金亮, 2021.1)
本系列筆記擬採用golang練習之git
Saga由一系列的子事務「Ti」組成, 每一個Ti都有對應的補償「Ci」, 當Ti出現問題時Ci用於處理Ti執行帶來的問題。 能夠經過下面的兩個公式理解Saga模式。 T = T1 T2 … Tn T = TCT Saga模式的核心理念是避免使用長期持有鎖(如14.2.2節介紹的兩階段提交)的長事務, 而應該將事務切分爲一組按序依次提交的短事務, Saga模式知足ACD(原子性、一致性、持久性)特徵。 摘自 <<Go微服務實戰>> 劉金亮, 2021.1
事務消息隊列服務的功能性要求github
完善投遞workergolang
數據庫表相應的細節調整sql
完善投遞worker數據庫
package delivery import ( "bytes" "encoding/json" "errors" "github.com/jmoiron/sqlx" "io/ioutil" "learning/gooop/saga/mqs/database" "learning/gooop/saga/mqs/logger" "learning/gooop/saga/mqs/models" "net/http" "time" ) type tDeliveryWorker struct { info *tWorkerInfo } func newDeliveryWorker(info *tWorkerInfo) *tDeliveryWorker { it := new(tDeliveryWorker) it.info = info go it.beginMainLoop() return it } func (me *tDeliveryWorker) beginMainLoop() { for !me.isExpired() { ok, msg := me.peek() if ok { switch msg.StatusFlag { case 0: // 未處理的消息 me.handleUndeliveredMsg(msg) break case 1: // 處理中的消息 me.handleDeliveringMsg(msg) break } } else { time.Sleep(time.Duration(1) * time.Second) } } } func (me *tDeliveryWorker) isExpired() bool { return time.Now().UnixNano() >= me.info.ExpireTime } // peek: 從待投遞隊列中獲取最先的一條記錄 func (me *tDeliveryWorker) peek() (bool, *models.QueuedMsg) { msg := &models.QueuedMsg{} e := database.DB(func(db *sqlx.DB) error { rows, err := db.Queryx( "select * from delivery_queue where client_id=? order by create_time asc limit 1", me.info.ClientID, ) if err != nil { return err } if rows.Next() { err = rows.StructScan(msg) if err != nil { return err } return nil } else { return gEmptyRowsErr } }) if e != nil { return false, nil } else { return true, msg } } // handleUndeliveredMsg: if msg unhandled, then try to deliver it func (me *tDeliveryWorker) handleUndeliveredMsg(msg *models.QueuedMsg) { err := database.DB(func(db *sqlx.DB) error { now := time.Now().UnixNano() r,e := db.Exec( "update delivery_queue set status_flag=1, update_time=? where id=? and status_flag=0 and update_time=?", now, msg.ID, msg.UpdateTime, ) if e != nil { return e } rows, e := r.RowsAffected() if e != nil { return e } if rows != 1 { return gOneRowsErr } msg.UpdateTime = now return nil }) if err != nil { logger.Logf("tDeliveryWorker.handleNewMsg, id=%v, msg=%s/%s, err=%s", me.info.ClientID, msg.GlobalID, msg.SubID, err.Error()) return } if me.deliver(msg) { me.afterDeliverySuccess(msg) } else { me.afterDeliveryFailed(msg) } } // deliver: use http.Post function to delivery msg func (me *tDeliveryWorker) deliver(msg *models.QueuedMsg) bool { t := &models.TxMsg{ GlobalID: msg.GlobalID, SubID: msg.SubID, Topic: msg.Topic, CreateTime: msg.CreateTime, Content: msg.Content, } j,e := json.Marshal(t) if e != nil { logger.Logf("tDeliveryWorker.deliver, failed json.Marshal, id=%v, msg=%s/%s", me.info.ClientID, msg.GlobalID, msg.SubID) return false } r, e := http.Post(me.info.NotifyURL, "application/json", bytes.NewReader(j)) if e != nil { logger.Logf("tDeliveryWorker.deliver, failed http.Post, id=%v, msg=%s/%s", me.info.ClientID, msg.GlobalID, msg.SubID) return false } defer r.Body.Close() rep, e := ioutil.ReadAll(r.Body) if e != nil { logger.Logf("tDeliveryWorker.deliver, failed ioutil.ReadAll, id=%v, msg=%s/%s", me.info.ClientID, msg.GlobalID, msg.SubID) return false } m := &models.OkMsg{} e = json.Unmarshal(rep, m) if e != nil { logger.Logf("tDeliveryWorker.deliver, failed json.Unmarshal, id=%v, msg=%s/%s", me.info.ClientID, msg.GlobalID, msg.SubID) return false } if m.OK { return true } else { logger.Logf("tDeliveryWorker.deliver, failed OkMsg.OK, id=%v, msg=%s/%s", me.info.ClientID, msg.GlobalID, msg.SubID) return false } } // handleDeliveringMsg: if delivery timeout, then retry delivery func (me *tDeliveryWorker) handleDeliveringMsg(msg *models.QueuedMsg) { now := time.Now().UnixNano() if msg.UpdateTime + gDeliveryTimeoutNanos > now { return } // delivery timeout me.afterDeliveryTimeout(msg) } // afterDeliverySuccess: if done, move msg to success queue func (me *tDeliveryWorker) afterDeliverySuccess(msg *models.QueuedMsg) { err := database.TX(func(db *sqlx.DB, tx *sqlx.Tx) error { r,e := db.Exec( "delete from delivery_queue where id=? and update_time=? and status_flag=1", msg.ID, msg.UpdateTime, ) if e != nil { return e } rows, e := r.RowsAffected() if e != nil { return e } if rows != 1 { return gOneRowsErr } r, e = db.Exec( "insert into success_queue (msg_id, client_id, create_time) values(?, ?, ?)", msg.ID, msg.ClientID, time.Now().UnixNano(), ) if e != nil { return e } rows, e = r.RowsAffected() if e != nil { return e } if rows != 1 { return gOneRowsErr } return nil }) if err != nil { logger.Logf("tDeliveryWorker.afterDeliverySuccess, failed, id=%v, msg=%s/%s, err=%s", me.info.ClientID, msg.GlobalID, msg.SubID, err.Error()) } else { logger.Logf("tDeliveryWorker.afterDeliverySuccess, done, id=%v, msg=%s/%s", me.info.ClientID, msg.GlobalID, msg.SubID) } } // afterDeliveryFailed: if failed, do nothing but just log it func (me *tDeliveryWorker) afterDeliveryFailed(msg *models.QueuedMsg) { logger.Logf("tDeliveryWorker.afterDeliveryFailed, id=%v, msg=%s/%s", me.info.ClientID, msg.GlobalID, msg.SubID) } // afterDeliveryTimeout: if timeout, then reset status and retry func (me *tDeliveryWorker) afterDeliveryTimeout(msg *models.QueuedMsg) { err := database.DB(func(db *sqlx.DB) error { r,e := db.Exec( "update delivery_queue set status_flag=0 where id=? and status_flag=1 and update_time=?", msg.ID, msg.UpdateTime, ) if e != nil { return e } rows,e := r.RowsAffected() if e != nil { return e } if rows != 1 { return gOneRowsErr } return nil }) if err != nil { logger.Logf("tDeliveryWorker.afterDeliveryTimeout, failed, id=%v, msg=%s/%s, err=%s", me.info.ClientID, msg.GlobalID, msg.SubID, err.Error()) } else { logger.Logf("tDeliveryWorker.afterDeliveryTimeout, done, id=%v, msg=%s/%s", me.info.ClientID, msg.GlobalID, msg.SubID) } } var gEmptyRowsErr = errors.New("empty rows") var gOneRowsErr = errors.New("expecting one row affected") var gDeliveryTimeoutNanos = int64(10 * (time.Second / time.Nanosecond))
數據庫表相應的細節調整json
package database import "github.com/jmoiron/sqlx" import _ "github.com/mattn/go-sqlite3" type DBFunc func(db *sqlx.DB) error type TXFunc func(db *sqlx.DB, tx *sqlx.Tx) error func init() { // must prepare tables err := DB(initDB) if err != nil { panic(err) } } func initDB(db *sqlx.DB) error { // 訂閱者/消費者: subscriber _, e := db.Exec(`create table if not exists subscriber( id integer primary key autoincrement, client_id varchar(50) unique not null, topic varchar(100) not null, notify_url varchar(500) not null, expire_time integer )`) if e != nil { return e } // 事務消息: tx_msg _, e = db.Exec(`create table if not exists tx_msg ( id integer primary key autoincrement, global_id string varchar(50) not null, sub_id string varchar(50) unique not null, sender_id varchar(50) not null, create_time integer not null, topic varchar(100) not null, content nvarchar(2048) not null )`) if e != nil { return e } // 投遞隊列: delivery_queue _, e = db.Exec(`create table if not exists delivery_queue ( id integer primary key autoincrement, client_id varchar(50) not null, notify_url varchar(500) not null, msg_id integer not null, global_id string varchar(50) not null, sub_id string varchar(50) unique not null, sender_id varchar(50) not null, create_time integer not null, topic varchar(100) not null, content nvarchar(2048) not null, status_flag integer not null, update_time integer not null )`) if e != nil { return e } // 成功投遞隊列: success_queue _, e = db.Exec(`create table if not exists success_queue ( id integer primary key autoincrement, msg_id integer not null, client_id varchar(50) not null, create_time integer not null )`) if e != nil { return e } // // 投遞失敗隊列: failed_queue // _, e = db.Exec(`create table if not exists failed_queue ( // id integer primary key autoincrement, // msg_id integer not null, // client_id varchar(50) not null, // create_time integer not null //)`) // if e != nil { // return e // } return nil } func open() (*sqlx.DB, error) { return sqlx.Open("sqlite3", "./mqs.db") } func DB(action DBFunc) error { db,err := open() if err != nil { return err } defer func() { _ = db.Close() }() return action(db) } func TX(action TXFunc) error { return DB(func(db *sqlx.DB) error { tx, err := db.Beginx() if err != nil { return err } err = action(db, tx) if err == nil { return tx.Commit() } else { return tx.Rollback() } }) }
(未完待續)app