清晰架構(Clean Architecture)的Go微服務: 事物管理

爲了支持業務層中的事務,我試圖在Go中查找相似Spring的聲明式事務管理,可是沒找到,因此我決定本身寫一個。 事務很容易在Go中實現,但很難作到正確地實現。html

需求:

1.將業務邏輯與事務代碼分開。
在編寫業務用例時,開發者應該只需考慮業務邏輯,不須要同時考慮怎樣給業務邏輯加事務管理。若是之後須要添加事務支持,你能夠在現有業務邏輯的基礎上進行簡單封裝,而無需更改任何其餘代碼。事務實現細節應該對業務邏輯透明。git

2.事務邏輯應該做用於用例層(業務邏輯)
不在持久層上。github

3.數據服務(數據持久性)層應對事務邏輯透明。
這意味着持久性代碼應該是相同的,不管它是否支持事務golang

4.你能夠選擇延遲支持事物。
你能夠先編寫沒有事務的用例,稍後能夠在不修改現有代碼的狀況下給該用例加上事務。你只需添加新代碼。sql

我最終的解決方案還不是聲明式事務管理,但它很是接近。建立一個真正的聲明式事務管理須要付出不少努力,所以我構建了一個能夠實現聲明式事務的大多數功能的事務管理,同時又沒花不少精力。數據庫

方案:

最終解決方案涉及本程序的全部層級,我將逐一解釋它們。服務器

數據庫連接封裝架構

在Go的「sql」lib中,有兩個數據庫連接sql.DB和sql.Tx. 不須要事務時,使用sql.DB訪問數據庫; 當須要事務時,你使用sql.Tx. 爲了共享代碼,持久層須要同時支持二者。 所以須要對數據庫連接進行封裝,而後把它做爲數據庫訪問方法的接收器。 我從這裏¹獲得了粗略的想法。app

// SqlGdbc (SQL Go database connection) is a wrapper for SQL database handler ( can be *sql.DB or *sql.Tx)
// It should be able to work with all SQL data that follows SQL standard.
type SqlGdbc interface {
    Exec(query string, args ...interface{}) (sql.Result, error)
    Prepare(query string) (*sql.Stmt, error)
    Query(query string, args ...interface{}) (*sql.Rows, error)
    QueryRow(query string, args ...interface{}) *sql.Row
    // If need transaction support, add this interface
    Transactioner
}

// SqlDBTx is the concrete implementation of sqlGdbc by using *sql.DB
type SqlDBTx struct {
    DB *sql.DB
}

// SqlConnTx is the concrete implementation of sqlGdbc by using *sql.Tx
type SqlConnTx struct {
    DB *sql.Tx
}

數據庫實現類型SqlDBTx和sqlConnTx都須要實現SqlGdbc接口(包括「Transactioner」)接口才行。 須要爲每一個數據庫(例如MySQL, CouchDB)實現「Transactioner」接口以支持事務。框架

// Transactioner is the transaction interface for database handler
// It should only be applicable to SQL database
type Transactioner interface {
    // Rollback a transaction
    Rollback() error
    // Commit a transaction
    Commit() error
    // TxEnd commits a transaction if no errors, otherwise rollback
    // txFunc is the operations wrapped in a transaction
    TxEnd(txFunc func() error) error
    // TxBegin gets *sql.DB from receiver and return a SqlGdbc, which has a *sql.Tx
    TxBegin() (SqlGdbc, error)
}

數據庫存儲層(datastore layer)的事物管理代碼

如下是「Transactioner」接口的實現代碼,其中只有TxBegin()是在SqlDBTx(sql.DB)上實現,由於事務從sql.DB開始,而後全部事務的其餘操做都在SqlConnTx(sql.Tx)上。 我從這裏²獲得了這個想法。

// TransactionBegin starts a transaction
func (sdt *SqlDBTx) TxBegin() (gdbc.SqlGdbc, error) {
    tx, err := sdt.DB.Begin()
    sct := SqlConnTx{tx}
    return &sct, err
}

func (sct *SqlConnTx) TxEnd(txFunc func() error) error {
    var err error
    tx := sct.DB

    defer func() {
        if p := recover(); p != nil {
            tx.Rollback()
            panic(p) // re-throw panic after Rollback
        } else if err != nil {
            tx.Rollback() // err is non-nil; don't change it
        } else {
            err = tx.Commit() // if Commit returns error update err with commit err
        }
    }()
    err = txFunc()
    return err
}

func (sct *SqlConnTx) Rollback() error {
    return sct.DB.Rollback()
}

用例層的事物接口

在用例層中,你能夠擁有相同業務功能的一個函數的兩個版本,一個支持事務,一個不支持,而且它們的名稱能夠共享相同的前綴,而事務能夠添加「withTx」做爲後綴。 例如,在如下代碼中,「ModifyAndUnregister」是不支持事務的那個,「ModifyAndUnregisterWithTx」是支持事務的那個。 「EnableTxer」是用例層上惟一的事務支持接口,任何支持事務的「用例」都須要它。 這裏的全部代碼都在是用例層級(包括「EnableTxer」)代碼,不涉及數據庫內容。

type RegistrationUseCaseInterface interface {
...
    // ModifyAndUnregister change user information and then unregister the user based on the User.Id passed in.
    // It is created to illustrate transaction, no real use.
    ModifyAndUnregister(user *model.User) error
    // ModifyAndUnregisterWithTx change user information and then unregister the user based on the User.Id passed in.
    // It supports transaction
    // It is created to illustrate transaction, no real use.
    ModifyAndUnregisterWithTx(user *model.User) error
    // EnableTx enable transaction support on use case. Need to be included for each use case needs transaction
    // It replaces the underline database handler to sql.Tx for each data service that used by this use case
    EnableTxer
}
// EnableTxer is the transaction interface for use case layer
type EnableTxer interface {
    EnableTx()
}

如下是不包含事務的業務邏輯代碼的示例。 「modifyAndUnregister(ruc,user)」是事務和非事務用例函數共享的業務功能。 你須要使用TxBegin()和TxEnd()(在TxDataInterface中)來包裝業務功能以支持事務,這些是數據服務層接口,而且與數據庫訪問層無關。 該用例還實現了「EnableTx()」接口,該接口實際上將底層數據庫連接從sql.DB切換到sql.Tx.

// The use case of ModifyAndUnregister without transaction
func (ruc *RegistrationUseCase) ModifyAndUnregister(user *model.User) error {
    return modifyAndUnregister(ruc, user)
}

// The use case of ModifyAndUnregister with transaction
func (ruc *RegistrationUseCase) ModifyAndUnregisterWithTx(user *model.User) error {
    tdi, err := ruc.TxDataInterface.TxBegin()
    if err != nil {
        return errors.Wrap(err, "")
    }
    ruc.EnableTx()
    return tdi.TxEnd(func() error {
        // wrap the business function inside the TxEnd function
        return modifyAndUnregister(ruc, user)
    })
}

// The business function will be wrapped inside a transaction and inside a non-transaction function
// It needs to be written in a way that every error will be returned so it can be catched by TxEnd() function,
// which will handle commit and rollback
func modifyAndUnregister(ruc *RegistrationUseCase, user *model.User) error {
    udi := ruc.UserDataInterface
    err := modifyUser(udi, user)
    if err != nil {
        return errors.Wrap(err, "")
    }
    err = unregisterUser(udi, user.Name)
    if err != nil {
        return errors.Wrap(err, "")
    }
    return nil
}

func (ruc *RegistrationUseCase) EnableTx() {
    // Only UserDataInterface need transaction support here. If there are other data services need it,
    // then they also need to enable transaction here
    ruc.UserDataInterface.EnableTx(ruc.TxDataInterface)
}

爲何我須要在「TxDataInterface」中調用函數「EnbaleTx」來替換底層數據庫連接而不是直接在用例中執行? 由於sql.DB和sql.Tx層級要比用例層低幾個級別,直接調用會搞砸依賴關係。 保持合理依賴關係的訣竅是在每一層上都有TxBegin()和TxEnd()並逐層調用它們以維持合理的依賴關係。

數據服務層的事物接口

咱們討論了用例層和數據存儲層上的事務功能,咱們還須要數據服務層中的事務功能將這二者鏈接在一塊兒。 如下代碼是數據服務層的事務接口(「TxDataInterface」)。 「TxDataInterface」是僅爲事物管理而建立的數據服務層接口。 每一個數據庫只須要實現一次。 還有一個「EnableTxer」接口(這是一個數據服務層接口,不要與用例層中的「EnableTxer」接口混淆),實現「EnableTxer」接口將開啓數據服務類型對事務的支持,例如, 若是想要「UserDataInterface」支持事物,就須要它實現「EnableTxer」接口。

// TxDataInterface represents operations needed for transaction support.
// It only needs to be implemented once for each database
// For sqlGdbc, it is implemented for SqlDBTx in transaction.go
type TxDataInterface interface {
    // TxBegin starts a transaction. It gets a DB handler from the receiver and return a TxDataInterface, which has a
    // *sql.Tx inside. Any data access wrapped inside a transaction will go through the *sql.Tx
    TxBegin() (TxDataInterface, error)
    // TxEnd is called at the end of a transaction and based on whether there is an error, it commits or rollback the
    // transaction.
    // txFunc is the business function wrapped in a transaction
    TxEnd(txFunc func() error) error
    // Return the underline transaction handler, sql.Tx
    GetTx() gdbc.SqlGdbc
}

// This interface needs to be included in every data service interface that needs transaction support
type EnableTxer interface {
    // EnableTx enables transaction, basically it replaces the underling database handle sql.DB with sql.Tx
    EnableTx(dataInterface TxDataInterface)
}

// UserDataInterface represents interface for user data access through database
type UserDataInterface interface {
...
    Update(user *model.User) (rowsAffected int64, err error)
    // Insert adds a user to a database. The returned resultUser has a Id, which is auto generated by database
    Insert(user *model.User) (resultUser *model.User, err error)
    // Need to add this for transaction support
    EnableTxer
}

如下代碼是「TxDataInterface」的實現。 「TxDataSql」是「TxDataInterface」的具體類型。 它調用底層數據庫連接的開始和結束函數來執行真正的事務操做。

// TxDataSql is the generic implementation for transaction for SQL database
// You only need to do it once for each SQL database
type TxDataSql struct {
    DB gdbc.SqlGdbc
}

func (tds *TxDataSql) TxEnd(txFunc func() error) error {
    return tds.DB.TxEnd(txFunc)
}

func (tds *TxDataSql) TxBegin() (dataservice.TxDataInterface, error) {

    sqlTx, error := tds.DB.TxBegin()
    tdi := TxDataSql{sqlTx}
    tds.DB = tdi.DB
    return &tdi, error
}
func (tds *TxDataSql) GetTx() gdbc.SqlGdbc {
    return tds.DB
}

事物策略:

你可能會問爲何我在上面的代碼中須要「TxDataSql」? 確實能夠在沒有它的狀況下實現事務,實際上最開的程序裏就沒有它。 可是我仍是要在某些數據服務中實現「TxDataInterface」來開始和結束事務。 因爲這是在用例層中完成的,用例層不知道哪一個數據服務類型實現了接口,所以必須在每一個數據服務接口上實現「TxDataInterface」(例如,「UserDataInterface」和「CourseDataInterface」)以保證 「用例層」不會選擇沒有接口的「數據服務(data service)」。 在建立「TxDataSql」以後,我只須要在「TxDataSql」中實現一次「TxDataInterface」,而後每一個數據服務類型只須要實現「EnableTx()」就好了。

// UserDataSql is the SQL implementation of UserDataInterface
type UserDataSql struct {
    DB gdbc.SqlGdbc
}

func (uds *UserDataSql) EnableTx(tx dataservice.TxDataInterface) {
    uds.DB = tx.GetTx()
}

func (uds *UserDataSql) FindByName(name string) (*model.User, error) {
    //logger.Log.Debug("call FindByName() and name is:", name)
    rows, err := uds.DB.Query(QUERY_USER_BY_NAME, name)
    if err != nil {
        return nil, errors.Wrap(err, "")
    }
    defer rows.Close()
    return retrieveUser(rows)
}

上面的代碼是「UserDataService」接口的實現程序。 「EnableTx()」方法從「TxDataInterface」得到sql.Tx並將「UserDataSql」中的sql.DB替換爲sql.Tx.

數據訪問方法(例如,FindByName())在事務代碼和非事務代碼之間共享,而且不須要知道「UserDataSql.DB」是sql.DB仍是sql.Tx.

依賴關係漏洞:

上面的代碼實現中存在一個缺陷,這會破壞個人設計並使其不完美。它是「TxDataInterface」中的函數「GetTx()」,它是一個數據服務層接口,所以它不該該依賴於gdbc.SqlGdbc(數據庫接口)。你可能認爲數據服務層的實現代碼不管如何都須要訪問數據庫,當前這是正確的。可是,你能夠在未來更改實現去調用gRPC微服務(而不是數據庫)。若是接口不依賴於SQL接口的話,則能夠自由更改實現,但若是不是,則即便你的接口實現已更改,該接口也會永久保留對SQL的依賴。

爲何它是本程序中打破依賴關係的惟一地方?由於對於其餘接口,容器負責建立具體類型,而程序的其他部分僅使用接口。可是對於事務,在建立具體類型以後,須要將底層數據庫處理程序從sql.DB替換爲sql.Tx,這破壞了設計。

它有解決方法嗎?是的,容器能夠爲須要事務的函數建立sql.Tx而不是sql.DB,這樣我就不須要在之後的用例級別中替換它。可是,配置文件中須要一個標誌來指示函數是否須要事務, 並且這個標誌須要配備給用例中的每一個函數。這是一個太大的改動,因此我決定如今先這樣,之後再從新審視它。

好處:

經過這個實現,事務代碼對業務邏輯幾乎是透明的(除了我上面提到的缺陷)。業務邏輯中沒有數據存儲(datastore)級事務代碼,如Tx.Begin,Tx.Commit和Tx.Rollback(但你確實須要業務級別事物函數Tx.Begin和Tx.End),不只如此,你的持久性代碼中也幾乎沒有數據存儲級事務代碼。 如需在用例層上啓用事務,你只須要在用例上實現EnableTx()並將業務函數封裝在「TxBegin()」,EnableTx()和「TxEnd()」中,如上例所示。 在持久層上,大多數事務代碼已經由「txDataService.go」實現,你只須要爲特定的數據服務(例如UserDataService)實現「EnableTx」。 事務支持的真正操做是在「transaction.go」文件中實現的,它實現了「Transactioner」接口,它有四個函數,「Rollback」, 「Commit」, 「TxBegin」 和 「TxEnd」。

對用例增長事物支持的步驟:

假設咱們須要在用例「listCourse」中爲一個函數添加事務支持,如下是步驟

  1. 在列表課程用例(「listCourse.go」)中實現「EnableTxer」界面
  2. 在域模型(「course」)數據服務層(courseDataMysql.go)中實現「EnableTxer」接口
  3. 建立一個新的事務啓用函數並將現有業務函數包裝在「TxBegin()」,EnableTx()和「TxEnd()」中
缺陷:

首先,它仍然不是聲明​​式事物管理;第二,它沒有徹底達到需求中的#4。要將用例函數從非事務更改成事務,你能夠建立一個支持事務的新函數,它須要更改調用函數; 或者你修改現有函數並將其包裝到事務中,這也須要代碼更改。爲了實現#4,須要添加許多代碼,所以我將其推遲到之後。第三,它不支持嵌套事務(Nested Transaction),所以你須要手動確保代碼中沒有發生嵌套事務。若是代碼庫不是太複雜,這很容易作到。若是你有一個很是複雜的代碼庫,有不少事務和非事務函數混在一塊兒,那麼手工作起來會比較困難,這是須要在程序中實現嵌套事務或找到已經支持它的方案。我沒有花時間研究添加嵌套事務所需的工做量,但這可能並不容易。若是你對它感興趣,這裏³是一些討論。到目前爲止,對於大多數狀況而言,當前的解決方案多是在代價不大的狀況下的最佳方案。

應用範圍:

首先,它只支持SQL數據庫的事務。 若是你有NoSql數據庫,它將沒法工做(大多數NoSql數據庫不管如何都不支持事務)。 其次,若是事務跨越了數據庫的邊界(例如在不一樣的微服務器之間),那麼它將沒法工做。 在這種狀況下,你須要使用Saga⁴。它的原理是爲事物中的每一個操做寫一個補償操做,而後在回滾階段挨個執行每個補償操做。 在當前框架中添加Sage解決方案應該不難。

其餘數據庫相關問題:

關閉數據庫連接(Close connection)

我歷來沒有爲數據庫連接調用Close()函數,由於沒有必要這樣作。 你能夠傳入sql.DB或sql.Tx做爲持久性函數的接收器(receiver)。 對於sql.DB,數據庫將自動建立連接池併爲你管理連接。 連接完成後,它將返回到連接池,無需關閉。 對於sql.Tx,在事務結束時,你能夠提交或回滾,以後連接將返回到鏈接池,而無需關閉。 請參閱此處⁵ 和 此處⁶ .

對象關係映射(O/R mapping)

我簡要地查看了幾個「O/R」映射庫,但它們沒有提供我所須要的功能。 我認爲「O/R映射」只適合兩種狀況。 首先,你的應用程序主要是CRUD,沒有太多的查詢或搜索; 第二,開發人員不熟悉SQL。 若是不是這種狀況,則O/R映射不會提供太多幫助。 我想從擴展數據庫模塊中得到兩個功能,一個是將sql.row加載到個人域模型結構(包括處理NULL值)中(例如「User」),另外一個是自動關閉sql類型,如sql.statement或sql.rows。 有一些sql擴展庫彷佛提供了至少部分這樣的功能。 我尚未嘗試,但彷佛值得一試。

延遲(Defer):

在進行數據庫訪問時,你將進行大量重複調用以關閉數據庫類型(例如statements, rows)。例如如下代碼中的「defer row.close()」。 你想要記住這一點,要在錯誤處理函數以後調用「defer row.close()」,由於若是不是這樣,當出現錯誤時,「rows」將爲nil,這將致使恐慌而且不會執行錯誤處理代碼。

func (uds *UserDataSql) Find(id int) (*model.User, error) {
    rows, err := uds.DB.Query(QUERY_USER_BY_ID, id)
    if err != nil {
        return nil, errors.Wrap(err, "")
    }
    defer rows.Close()
    return retrieveUser(rows)
}

恐慌(panic):

我看到不少Go數據庫代碼在出現數據庫錯誤時拋出了恐慌(panic)而不是錯誤(error),這可能會致使微服務出現問題,由於在微服務環境中你一般但願服務一直運行。 假設當更新語句中出現SQL錯誤時,用戶將沒法訪問該功能,這很糟糕。 但若是由於這個,整個微服務或網站被關閉,那就更糟了。 所以,正確的方法是將錯誤傳播到上一級並讓它決定要作什麼。 所以正確的作法是不在你的程序中拋出panic,但若是第三方庫拋出恐慌呢? 這時你須要捕獲恐慌並從中恢復以保持你的服務正常運行。 我在另外一篇文章「日誌管理」⁸中有具體示例.

源程序:

完整的源程序連接 github: https://github.com/jfeng45/se...

索引:

[1]db transaction in golang

[2]database/sql Tx—detecting Commit or Rollback

[3]database/sql: nested transaction or save point support

[4]GOTO 2015 • Applying the Saga Pattern • Caitie McCaffrey — YouTube

[5]Common Pitfalls When Using database/sql in Go

[6]Go database/sql tutorial

[7]sqlx

[8]Go Microservice with Clean Architecture: Application Logging

不堆砌術語,不羅列架構,不迷信權威,不盲從流行,堅持獨立思考

相關文章
相關標籤/搜索