爲了支持業務層中的事務,我試圖在Go中查找相似Spring的聲明式事務管理,可是沒找到,因此我決定本身寫一個。 事務很容易在Go中實現,但很難作到正確地實現。html
1.將業務邏輯與事務代碼分開。
在編寫業務用例時,開發者應該只需考慮業務邏輯,不須要同時考慮怎樣給業務邏輯加事務管理。若是之後須要添加事務支持,你能夠在現有業務邏輯的基礎上進行簡單封裝,而無需更改任何其餘代碼。事務實現細節應該對業務邏輯透明。git
2.事務邏輯應該做用於用例層(業務邏輯)
不在持久層上。github
3.數據服務(數據持久性)層應對事務邏輯透明。
這意味着持久性代碼應該是相同的,不管它是否支持事務golang
4.你能夠選擇延遲支持事物。
你能夠先編寫沒有事務的用例,稍後能夠在不修改現有代碼的狀況下給該用例加上事務。你只需添加新代碼。sql
我最終的解決方案還不是聲明式事務管理,但它很是接近。建立一個真正的聲明式事務管理須要付出不少努力,所以我構建了一個能夠實現聲明式事務的大多數功能的事務管理,同時又沒花不少精力。數據庫
最終解決方案涉及本程序的全部層級,我將逐一解釋它們。服務器
數據庫連接封裝架構
在Go的「sql」lib中,有兩個數據庫連接sql.DB和sql.Tx. 不須要事務時,使用sql.DB訪問數據庫; 當須要事務時,你使用sql.Tx. 爲了共享代碼,持久層須要同時支持二者。 所以須要對數據庫連接進行封裝,而後把它做爲數據庫訪問方法的接收器。 我從這裏¹獲得了粗略的想法。app
// SqlGdbc (SQL Go database connection) is a wrapper for SQL database handler ( can be *sql.DB or *sql.Tx) // It should be able to work with all SQL data that follows SQL standard. type SqlGdbc interface { Exec(query string, args ...interface{}) (sql.Result, error) Prepare(query string) (*sql.Stmt, error) Query(query string, args ...interface{}) (*sql.Rows, error) QueryRow(query string, args ...interface{}) *sql.Row // If need transaction support, add this interface Transactioner } // SqlDBTx is the concrete implementation of sqlGdbc by using *sql.DB type SqlDBTx struct { DB *sql.DB } // SqlConnTx is the concrete implementation of sqlGdbc by using *sql.Tx type SqlConnTx struct { DB *sql.Tx }
數據庫實現類型SqlDBTx和sqlConnTx都須要實現SqlGdbc接口(包括「Transactioner」)接口才行。 須要爲每一個數據庫(例如MySQL, CouchDB)實現「Transactioner」接口以支持事務。框架
// Transactioner is the transaction interface for database handler // It should only be applicable to SQL database type Transactioner interface { // Rollback a transaction Rollback() error // Commit a transaction Commit() error // TxEnd commits a transaction if no errors, otherwise rollback // txFunc is the operations wrapped in a transaction TxEnd(txFunc func() error) error // TxBegin gets *sql.DB from receiver and return a SqlGdbc, which has a *sql.Tx TxBegin() (SqlGdbc, error) }
數據庫存儲層(datastore layer)的事物管理代碼
如下是「Transactioner」接口的實現代碼,其中只有TxBegin()是在SqlDBTx(sql.DB)上實現,由於事務從sql.DB開始,而後全部事務的其餘操做都在SqlConnTx(sql.Tx)上。 我從這裏²獲得了這個想法。
// TransactionBegin starts a transaction func (sdt *SqlDBTx) TxBegin() (gdbc.SqlGdbc, error) { tx, err := sdt.DB.Begin() sct := SqlConnTx{tx} return &sct, err } func (sct *SqlConnTx) TxEnd(txFunc func() error) error { var err error tx := sct.DB defer func() { if p := recover(); p != nil { tx.Rollback() panic(p) // re-throw panic after Rollback } else if err != nil { tx.Rollback() // err is non-nil; don't change it } else { err = tx.Commit() // if Commit returns error update err with commit err } }() err = txFunc() return err } func (sct *SqlConnTx) Rollback() error { return sct.DB.Rollback() }
用例層的事物接口
在用例層中,你能夠擁有相同業務功能的一個函數的兩個版本,一個支持事務,一個不支持,而且它們的名稱能夠共享相同的前綴,而事務能夠添加「withTx」做爲後綴。 例如,在如下代碼中,「ModifyAndUnregister」是不支持事務的那個,「ModifyAndUnregisterWithTx」是支持事務的那個。 「EnableTxer」是用例層上惟一的事務支持接口,任何支持事務的「用例」都須要它。 這裏的全部代碼都在是用例層級(包括「EnableTxer」)代碼,不涉及數據庫內容。
type RegistrationUseCaseInterface interface { ... // ModifyAndUnregister change user information and then unregister the user based on the User.Id passed in. // It is created to illustrate transaction, no real use. ModifyAndUnregister(user *model.User) error // ModifyAndUnregisterWithTx change user information and then unregister the user based on the User.Id passed in. // It supports transaction // It is created to illustrate transaction, no real use. ModifyAndUnregisterWithTx(user *model.User) error // EnableTx enable transaction support on use case. Need to be included for each use case needs transaction // It replaces the underline database handler to sql.Tx for each data service that used by this use case EnableTxer } // EnableTxer is the transaction interface for use case layer type EnableTxer interface { EnableTx() }
如下是不包含事務的業務邏輯代碼的示例。 「modifyAndUnregister(ruc,user)」是事務和非事務用例函數共享的業務功能。 你須要使用TxBegin()和TxEnd()(在TxDataInterface中)來包裝業務功能以支持事務,這些是數據服務層接口,而且與數據庫訪問層無關。 該用例還實現了「EnableTx()」接口,該接口實際上將底層數據庫連接從sql.DB切換到sql.Tx.
// The use case of ModifyAndUnregister without transaction func (ruc *RegistrationUseCase) ModifyAndUnregister(user *model.User) error { return modifyAndUnregister(ruc, user) } // The use case of ModifyAndUnregister with transaction func (ruc *RegistrationUseCase) ModifyAndUnregisterWithTx(user *model.User) error { tdi, err := ruc.TxDataInterface.TxBegin() if err != nil { return errors.Wrap(err, "") } ruc.EnableTx() return tdi.TxEnd(func() error { // wrap the business function inside the TxEnd function return modifyAndUnregister(ruc, user) }) } // The business function will be wrapped inside a transaction and inside a non-transaction function // It needs to be written in a way that every error will be returned so it can be catched by TxEnd() function, // which will handle commit and rollback func modifyAndUnregister(ruc *RegistrationUseCase, user *model.User) error { udi := ruc.UserDataInterface err := modifyUser(udi, user) if err != nil { return errors.Wrap(err, "") } err = unregisterUser(udi, user.Name) if err != nil { return errors.Wrap(err, "") } return nil } func (ruc *RegistrationUseCase) EnableTx() { // Only UserDataInterface need transaction support here. If there are other data services need it, // then they also need to enable transaction here ruc.UserDataInterface.EnableTx(ruc.TxDataInterface) }
爲何我須要在「TxDataInterface」中調用函數「EnbaleTx」來替換底層數據庫連接而不是直接在用例中執行? 由於sql.DB和sql.Tx層級要比用例層低幾個級別,直接調用會搞砸依賴關係。 保持合理依賴關係的訣竅是在每一層上都有TxBegin()和TxEnd()並逐層調用它們以維持合理的依賴關係。
數據服務層的事物接口
咱們討論了用例層和數據存儲層上的事務功能,咱們還須要數據服務層中的事務功能將這二者鏈接在一塊兒。 如下代碼是數據服務層的事務接口(「TxDataInterface」)。 「TxDataInterface」是僅爲事物管理而建立的數據服務層接口。 每一個數據庫只須要實現一次。 還有一個「EnableTxer」接口(這是一個數據服務層接口,不要與用例層中的「EnableTxer」接口混淆),實現「EnableTxer」接口將開啓數據服務類型對事務的支持,例如, 若是想要「UserDataInterface」支持事物,就須要它實現「EnableTxer」接口。
// TxDataInterface represents operations needed for transaction support. // It only needs to be implemented once for each database // For sqlGdbc, it is implemented for SqlDBTx in transaction.go type TxDataInterface interface { // TxBegin starts a transaction. It gets a DB handler from the receiver and return a TxDataInterface, which has a // *sql.Tx inside. Any data access wrapped inside a transaction will go through the *sql.Tx TxBegin() (TxDataInterface, error) // TxEnd is called at the end of a transaction and based on whether there is an error, it commits or rollback the // transaction. // txFunc is the business function wrapped in a transaction TxEnd(txFunc func() error) error // Return the underline transaction handler, sql.Tx GetTx() gdbc.SqlGdbc } // This interface needs to be included in every data service interface that needs transaction support type EnableTxer interface { // EnableTx enables transaction, basically it replaces the underling database handle sql.DB with sql.Tx EnableTx(dataInterface TxDataInterface) } // UserDataInterface represents interface for user data access through database type UserDataInterface interface { ... Update(user *model.User) (rowsAffected int64, err error) // Insert adds a user to a database. The returned resultUser has a Id, which is auto generated by database Insert(user *model.User) (resultUser *model.User, err error) // Need to add this for transaction support EnableTxer }
如下代碼是「TxDataInterface」的實現。 「TxDataSql」是「TxDataInterface」的具體類型。 它調用底層數據庫連接的開始和結束函數來執行真正的事務操做。
// TxDataSql is the generic implementation for transaction for SQL database // You only need to do it once for each SQL database type TxDataSql struct { DB gdbc.SqlGdbc } func (tds *TxDataSql) TxEnd(txFunc func() error) error { return tds.DB.TxEnd(txFunc) } func (tds *TxDataSql) TxBegin() (dataservice.TxDataInterface, error) { sqlTx, error := tds.DB.TxBegin() tdi := TxDataSql{sqlTx} tds.DB = tdi.DB return &tdi, error } func (tds *TxDataSql) GetTx() gdbc.SqlGdbc { return tds.DB }
事物策略:
你可能會問爲何我在上面的代碼中須要「TxDataSql」? 確實能夠在沒有它的狀況下實現事務,實際上最開的程序裏就沒有它。 可是我仍是要在某些數據服務中實現「TxDataInterface」來開始和結束事務。 因爲這是在用例層中完成的,用例層不知道哪一個數據服務類型實現了接口,所以必須在每一個數據服務接口上實現「TxDataInterface」(例如,「UserDataInterface」和「CourseDataInterface」)以保證 「用例層」不會選擇沒有接口的「數據服務(data service)」。 在建立「TxDataSql」以後,我只須要在「TxDataSql」中實現一次「TxDataInterface」,而後每一個數據服務類型只須要實現「EnableTx()」就好了。
// UserDataSql is the SQL implementation of UserDataInterface type UserDataSql struct { DB gdbc.SqlGdbc } func (uds *UserDataSql) EnableTx(tx dataservice.TxDataInterface) { uds.DB = tx.GetTx() } func (uds *UserDataSql) FindByName(name string) (*model.User, error) { //logger.Log.Debug("call FindByName() and name is:", name) rows, err := uds.DB.Query(QUERY_USER_BY_NAME, name) if err != nil { return nil, errors.Wrap(err, "") } defer rows.Close() return retrieveUser(rows) }
上面的代碼是「UserDataService」接口的實現程序。 「EnableTx()」方法從「TxDataInterface」得到sql.Tx並將「UserDataSql」中的sql.DB替換爲sql.Tx.
數據訪問方法(例如,FindByName())在事務代碼和非事務代碼之間共享,而且不須要知道「UserDataSql.DB」是sql.DB仍是sql.Tx.
依賴關係漏洞:
上面的代碼實現中存在一個缺陷,這會破壞個人設計並使其不完美。它是「TxDataInterface」中的函數「GetTx()」,它是一個數據服務層接口,所以它不該該依賴於gdbc.SqlGdbc(數據庫接口)。你可能認爲數據服務層的實現代碼不管如何都須要訪問數據庫,當前這是正確的。可是,你能夠在未來更改實現去調用gRPC微服務(而不是數據庫)。若是接口不依賴於SQL接口的話,則能夠自由更改實現,但若是不是,則即便你的接口實現已更改,該接口也會永久保留對SQL的依賴。
爲何它是本程序中打破依賴關係的惟一地方?由於對於其餘接口,容器負責建立具體類型,而程序的其他部分僅使用接口。可是對於事務,在建立具體類型以後,須要將底層數據庫處理程序從sql.DB替換爲sql.Tx,這破壞了設計。
它有解決方法嗎?是的,容器能夠爲須要事務的函數建立sql.Tx而不是sql.DB,這樣我就不須要在之後的用例級別中替換它。可是,配置文件中須要一個標誌來指示函數是否須要事務, 並且這個標誌須要配備給用例中的每一個函數。這是一個太大的改動,因此我決定如今先這樣,之後再從新審視它。
經過這個實現,事務代碼對業務邏輯幾乎是透明的(除了我上面提到的缺陷)。業務邏輯中沒有數據存儲(datastore)級事務代碼,如Tx.Begin,Tx.Commit和Tx.Rollback(但你確實須要業務級別事物函數Tx.Begin和Tx.End),不只如此,你的持久性代碼中也幾乎沒有數據存儲級事務代碼。 如需在用例層上啓用事務,你只須要在用例上實現EnableTx()並將業務函數封裝在「TxBegin()」,EnableTx()和「TxEnd()」中,如上例所示。 在持久層上,大多數事務代碼已經由「txDataService.go」實現,你只須要爲特定的數據服務(例如UserDataService)實現「EnableTx」。 事務支持的真正操做是在「transaction.go」文件中實現的,它實現了「Transactioner」接口,它有四個函數,「Rollback」, 「Commit」, 「TxBegin」 和 「TxEnd」。
假設咱們須要在用例「listCourse」中爲一個函數添加事務支持,如下是步驟
首先,它仍然不是聲明式事物管理;第二,它沒有徹底達到需求中的#4。要將用例函數從非事務更改成事務,你能夠建立一個支持事務的新函數,它須要更改調用函數; 或者你修改現有函數並將其包裝到事務中,這也須要代碼更改。爲了實現#4,須要添加許多代碼,所以我將其推遲到之後。第三,它不支持嵌套事務(Nested Transaction),所以你須要手動確保代碼中沒有發生嵌套事務。若是代碼庫不是太複雜,這很容易作到。若是你有一個很是複雜的代碼庫,有不少事務和非事務函數混在一塊兒,那麼手工作起來會比較困難,這是須要在程序中實現嵌套事務或找到已經支持它的方案。我沒有花時間研究添加嵌套事務所需的工做量,但這可能並不容易。若是你對它感興趣,這裏³是一些討論。到目前爲止,對於大多數狀況而言,當前的解決方案多是在代價不大的狀況下的最佳方案。
首先,它只支持SQL數據庫的事務。 若是你有NoSql數據庫,它將沒法工做(大多數NoSql數據庫不管如何都不支持事務)。 其次,若是事務跨越了數據庫的邊界(例如在不一樣的微服務器之間),那麼它將沒法工做。 在這種狀況下,你須要使用Saga⁴。它的原理是爲事物中的每一個操做寫一個補償操做,而後在回滾階段挨個執行每個補償操做。 在當前框架中添加Sage解決方案應該不難。
關閉數據庫連接(Close connection)
我歷來沒有爲數據庫連接調用Close()函數,由於沒有必要這樣作。 你能夠傳入sql.DB或sql.Tx做爲持久性函數的接收器(receiver)。 對於sql.DB,數據庫將自動建立連接池併爲你管理連接。 連接完成後,它將返回到連接池,無需關閉。 對於sql.Tx,在事務結束時,你能夠提交或回滾,以後連接將返回到鏈接池,而無需關閉。 請參閱此處⁵ 和 此處⁶ .
對象關係映射(O/R mapping)
我簡要地查看了幾個「O/R」映射庫,但它們沒有提供我所須要的功能。 我認爲「O/R映射」只適合兩種狀況。 首先,你的應用程序主要是CRUD,沒有太多的查詢或搜索; 第二,開發人員不熟悉SQL。 若是不是這種狀況,則O/R映射不會提供太多幫助。 我想從擴展數據庫模塊中得到兩個功能,一個是將sql.row加載到個人域模型結構(包括處理NULL值)中(例如「User」),另外一個是自動關閉sql類型,如sql.statement或sql.rows。 有一些sql擴展庫彷佛提供了至少部分這樣的功能。 我尚未嘗試,但彷佛值得一試。
延遲(Defer):
在進行數據庫訪問時,你將進行大量重複調用以關閉數據庫類型(例如statements, rows)。例如如下代碼中的「defer row.close()」。 你想要記住這一點,要在錯誤處理函數以後調用「defer row.close()」,由於若是不是這樣,當出現錯誤時,「rows」將爲nil,這將致使恐慌而且不會執行錯誤處理代碼。
func (uds *UserDataSql) Find(id int) (*model.User, error) { rows, err := uds.DB.Query(QUERY_USER_BY_ID, id) if err != nil { return nil, errors.Wrap(err, "") } defer rows.Close() return retrieveUser(rows) }
恐慌(panic):
我看到不少Go數據庫代碼在出現數據庫錯誤時拋出了恐慌(panic)而不是錯誤(error),這可能會致使微服務出現問題,由於在微服務環境中你一般但願服務一直運行。 假設當更新語句中出現SQL錯誤時,用戶將沒法訪問該功能,這很糟糕。 但若是由於這個,整個微服務或網站被關閉,那就更糟了。 所以,正確的方法是將錯誤傳播到上一級並讓它決定要作什麼。 所以正確的作法是不在你的程序中拋出panic,但若是第三方庫拋出恐慌呢? 這時你須要捕獲恐慌並從中恢復以保持你的服務正常運行。 我在另外一篇文章「日誌管理」⁸中有具體示例.
完整的源程序連接 github: https://github.com/jfeng45/se...
[2]database/sql Tx—detecting Commit or Rollback
[3]database/sql: nested transaction or save point support
[4]GOTO 2015 • Applying the Saga Pattern • Caitie McCaffrey — YouTube
[5]Common Pitfalls When Using database/sql in Go
[7]sqlx
[8]Go Microservice with Clean Architecture: Application Logging
不堆砌術語,不羅列架構,不迷信權威,不盲從流行,堅持獨立思考