歡迎你們前往騰訊雲+社區,獲取更多騰訊海量技術實踐乾貨哦~html
本文來自 雲+社區翻譯社,由 Tnecesoc編譯。
微服務就是將應用程序的業務領域劃分爲具備明確範圍的不一樣場景,並以分離的流程來運行這些場景,使得其中跨邊界的任何持久化的關係必須依賴最終的一致性,而不是 ACID 類事務或外鍵約束。這些概念不少都來源於領域驅動設計(DDD),或受到了它的啓發。不過 DDD 是個要花一整個博客系列來說的話題,這裏就先不提了。git
在咱們的 Go 微服務系列博客還有微服務架構的背景下,實現服務間鬆耦合的一種方式是引入消息傳遞機制來進行不須要遵循嚴格的請求 / 響應式的消息交換或相似機制的服務的通訊。這裏要強調一點,引入消息傳遞機制只是衆多能夠用來實現服務間鬆耦合的策略之一。github
正如咱們在博客系列的第 8 章看到的那樣,在 Spring Cloud 裏,Spring Cloud Config 服務器將 RabbitMQ 做爲了運行時的一個依賴項目,所以 RabbitMQ 應該是一個不錯的消息中繼器(message broker)。spring
至於本系列博客的這一章的內容,咱們會在讀取特定的賬號對象的時候讓咱們的 "account service" 往 RabbitMQ 的交換器裏面放一條消息。這一消息會被一個咱們會在這篇博客裏編寫的,全新的微服務所處理。咱們同時也會將一些 Go 語言代碼放在一個 「common」 庫裏面來實如今跨微服務情景下的代碼複用。docker
記住第 1 部分的系統圖景嗎?下面是完成這一章的部分以後的樣子:shell
在完成這一部分以前咱們要作不少工做,不過咱們可以作到。數據庫
這部分會有不少新代碼,咱們不可能把它全放在博客文章裏。若要取得完整的源代碼,不妨用 git clone 命令下載下來,並切換到第 9 章的分支:json
git checkout P9
咱們將實施一個簡單的仿真用例:當在 「account service」 中讀取某些 「VIP」 帳戶時,咱們但願觸發 「VIP offer」 服務。這一服務在特定狀況下會爲帳戶持有者生成一個 「offer」 。在一個設計正確的領域模型裏面,accounts 對象還有 VIP offer 對象都是兩個獨立的領域,而它們應儘量少地瞭解彼此。數組
這個 account service 應當從不直接訪問 VIP service 的存儲空間(即 offers)。在這種狀況下,咱們應該把一條消息傳遞給 RabbitMQ 上的 「vip service」,並將業務邏輯和持久化存儲徹底委任給 「vip service」。服務器
咱們將使用 AMQP 協議來進行全部通訊,AMQP 協議是一個做爲 ISO 標準的應用層協議,其所實現的消息傳遞能爲系統帶來可互操做性。這裏咱們就延用在第 8 章咱們處理配置更新(configuration update)時候的設置,選用 streadway / amqp 這一 AMQP 庫。
讓咱們重述一下 AMQP 中的交換器(exchange)與發佈者(publisher)、消費者(consumer)和隊列(queue)的關係:
發佈者會將一條消息發佈到交換點,後者會根據必定的路由規則或登記了相應消費者的綁定信息來說消息的副本發到隊列(或分發到多個隊列)裏去。對此,Quora 上的這個回答有個很好的解釋。
因爲咱們想要使用新的以及現有的代碼來從咱們現有的 account service 和新的 vip service 裏面的 Spring Cloud 配置文件裏面加載咱們所需的配置,因此咱們會在這裏建立咱們的第一個共享庫。
首先在 /goblog 下建立新文件夾 common 來存放可重用的內容:
mkdir -p common/messaging mkdir -p common/config
咱們將全部與 AMQP 相關的代碼放在 messaging 文件夾裏面,並將配置文件放在 config 文件夾裏。你也能夠將 /goblog/accountservice/config 的內容複製到 /goblog/common/config 中 - 請記住,這要求咱們更新先前從 account service 中導入配置代碼的 import 語句。不妨看看完整源代碼來查閱這部分的寫法。
跟消息傳遞有關的代碼會被封裝在一個文件中,該文件將定義應用程序用來鏈接,發佈和訂閱消息的接口還有實際實現。實際上,咱們用的 streadway / amqp 已經提供了不少實現 AMQP 消息傳遞所需的模板代碼,因此這部分的具體細節也便不深究了。
在 /goblog/common/messaging 中建立一個新的 .go 文件:messagingclient.go。
讓咱們看看裏面主要應有什麼:
// 定義用來鏈接、發佈消息、消費消息的接口 type IMessagingClient interface { ConnectToBroker(connectionString string) Publish(msg []byte, exchangeName string, exchangeType string) error PublishOnQueue(msg []byte, queueName string) error Subscribe(exchangeName string, exchangeType string, consumerName string, handlerFunc func(amqp.Delivery)) error SubscribeToQueue(queueName string, consumerName string, handlerFunc func(amqp.Delivery)) error Close() }
上面這段代碼定義了咱們所用的消息接口。這就是咱們的 「account service」 和 「vip service」 在消息傳遞時將要處理的問題,能經過抽象手段來消除系統的大部分複雜度。請注意,我選擇了兩個 「Produce」 和 「Consume」 的變體,以便與訂閱/發佈主題還有 direct / queue 消息傳遞模式合在一塊兒使用。
接下來,咱們將定義一個結構體,該結構體將持有指向 amqp.Connection
的指針,而且咱們將再加上一些必要的方法,以便它能(隱式地,Go 一直以來都是這樣)實現咱們剛纔聲明的接口。
// 接口實現,封裝了一個指向 amqp.Connection 的指針 type MessagingClient struct { conn *amqp.Connection }
接口的實現很是冗長,在此只給出其中兩個 - ConnectToBroker()
和 PublishToQueue()
:
func (m *MessagingClient) ConnectToBroker(connectionString string) { if connectionString == "" { panic("Cannot initialize connection to broker, connectionString not set. Have you initialized?") } var err error m.conn, err = amqp.Dial(fmt.Sprintf("%s/", connectionString)) if err != nil { panic("Failed to connect to AMQP compatible broker at: " + connectionString) } }
這就是咱們得到 connection 指針 (如 amqp.Dial
) 的方法。若是咱們丟掉了配置文件,或者鏈接不上中繼器,那麼微服務就會拋出一個 panic 異常,並會讓容器協調器從新建立一個新的實例。在這裏傳入的 connectionString 參數就以下例所示:
amqp://guest:guest@rabbitmq:5672/
注意,這裏的 rabbitmq broker 是以 service 這一 Docker Swarm 的模式下運行的。
PublishOnQueue()
函數很長 - 它跟官方提供的 streadway 樣例或多或少地有些不一樣,畢竟這裏簡化了它的一些參數。爲了將消息發佈到一個有名字的隊列,咱們須要傳遞這些參數:
若要了解交換器的更多詳情,那請參閱 RabbitMQ 文檔。
func (m *MessagingClient) PublishOnQueue(body []byte, queueName string) error { if m.conn == nil { panic("Tried to send message before connection was initialized. Don't do that.") } ch, err := m.conn.Channel() // 從 connection 裏得到一個 channel 對象 defer ch.Close() // 提供一些參數聲明一個隊列,若相應的隊列不存在,那就建立一個 queue, err := ch.QueueDeclare( queueName, // 隊列名 false, // 是否持久存在 false, // 是否在不用時就刪掉 false, // 是否排外 false, // 是否無等待 nil, // 其餘參數 ) // 往隊列發佈消息 err = ch.Publish( "", // 目標爲默認的交換器 queue.Name, // 路由關鍵字,例如隊列名 false, // 必須發佈 false, // 當即發佈 amqp.Publishing{ ContentType: "application/json", Body: body, // JSON 正文, 以 byte[] 形式給出 }) fmt.Printf("A message was sent to queue %v: %v", queueName, body) return err }
這裏的模板代碼略多,但應該不難理解。這段代碼會聲明一個(若是不存在那就建立一個)隊列,而後把咱們的消息以字節數組的形式發佈給它。
將消息發佈到一個有名字的交換器的代碼會更復雜,由於它須要一段模板代碼來聲明交換器,以及隊列,並把它們綁定在一塊兒。這裏有一份完整的源代碼示例。
接下來,因爲咱們的 「MessageClient」 的實際使用者會是 /goblog/accountservice/service/handlers.go ,咱們會往裏面再添加一個字段,並在請求的賬戶有 ID 「10000」 的時候往硬編碼進程序中的 「is VIP」 檢查方法中發送一條消息:
var DBClient dbclient.IBoltClient var MessagingClient messaging.IMessagingClient // NEW func GetAccount(w http.ResponseWriter, r *http.Request) { ...
而後:
... notifyVIP(account) // 並行地發送 VIP 消息 // 如有這樣的 account, 那就把它弄成一個 JSON, 而後附上首部和其餘內容來打包 data, _ := json.Marshal(account) writeJsonResponse(w, http.StatusOK, data) } // 若是這個 account 是咱們硬編碼進來的 account, 那就開個協程來發送消息 func notifyVIP(account model.Account) { if account.Id == "10000" { go func(account model.Account) { vipNotification := model.VipNotification{AccountId: account.Id, ReadAt: time.Now().UTC().String()} data, _ := json.Marshal(vipNotification) err := MessagingClient.PublishOnQueue(data, "vipQueue") if err != nil { fmt.Println(err.Error()) } }(account) } }
正好藉此機會展現一下調用一個新的協程(goroutine)時所使用的內聯匿名函數,即便用 go
關鍵字。咱們不能由於要執行 HTTP 處理程序發送消息就把 「主」 協程阻塞起來,所以這也是增長一點並行性的好時機。
main.go 也須要有所更新,以便在啓動的時候能使用加載並注入到 Viper 裏面的配置信息來初始化 AMQ 鏈接。
// 在 main 方法裏面調用這個函數 func initializeMessaging() { if !viper.IsSet("amqp_server_url") { panic("No 'amqp_server_url' set in configuration, cannot start") } service.MessagingClient = &messaging.MessagingClient{} service.MessagingClient.ConnectToBroker(viper.GetString("amqp_server_url")) service.MessagingClient.Subscribe(viper.GetString("config_event_bus"), "topic", appName, config.HandleRefreshEvent) }
這段沒什麼意思 - 咱們經過建立一個空的消息傳遞結構,並使用從 Viper 獲取的屬性值來調用 ConnectToBroker
來獲得 service.MessagingClient
實例。若是咱們的配置沒有 broker_url
,那就拋一個 panic 異常,畢竟在不可能鏈接到中繼器的時候程序也沒辦法運行。
咱們在第 8 部分中已將 amqp_broker_url
屬性添加到了咱們的 .yml 配置文件裏面,因此這個步驟實際上已經作過了。
broker_url: amqp://guest:guest@192.168.99.100:5672 _(dev)_ broker_url: amqp://guest:guest@rabbitmq:5672 _(test)_
注意,咱們在 「test」 配置文件裏面填入的是 Swarm 服務名 「rabbitmq」,而不是從個人電腦上看到的 Swarm 的 LAN IP 地址。(你們的實際 IP 地址應該會有所不一樣,不過運行 Docker Toolbox 時 192.168.99.100 彷佛是標準配置)。
咱們並不推薦在配置文件中填入用戶名和密碼的明文。在真實的使用環境中,咱們一般可使用在第 8 部分中看到的 Spring Cloud Config 服務器裏面的內置加密功能。
固然,咱們至少應該編寫一個單元測試,以確保 handlers.go 中的 GetAccount
函數在有人請求由 「10000」 標識的很是特殊的賬戶時會嘗試去發送一條消息。
爲此,咱們須要在 handlers_test.go 中實現一個模擬的 IMessagingClient
還有一個新的測試用例。咱們先從模擬開始。這裏咱們將使用第三方工具 mockery 生成一個 IMessagingClient
接口的模擬實現(在 shell 運行下面的命令的時候必定要先把 GOPATH 設置好):
> go get github.com/vektra/mockery/.../ > cd $GOPATH/src/github.com/callistaenterprise/goblog/common/messaging > ./$GOPATH/bin/mockery -all -output . Generating mock for: IMessagingClient
如今,在當前文件夾中就有了一個模擬實現文件 IMessagingClient.go。我看這個文件的名字不爽,也看不慣它的駝峯命名法,所以咱們將它重命名一下,讓它的名字能更明顯的表示它是一個模擬的實現,而且遵循本系列博客的文件名的一向風格:
mv IMessagingClient.go mockmessagingclient.go
咱們可能須要在生成的文件中稍微調整一下 import 語句,並刪除一些別名。除此以外,咱們會對這個模擬實現採用一種黑盒方法 - 只假設它會在咱們開始測試的時候起做用。
不妨也看一看這裏生成的模擬實現的源碼,這跟咱們在第 4 章中手動編寫的內容很是類似。
在 handlers_test.go 裏添加一個新的測試用例:
// 聲明一個模仿類來讓測試更有可讀性 var anyString = mock.AnythingOfType("string") var anyByteArray = mock.AnythingOfType("[]uint8") // == []byte func TestNotificationIsSentForVIPAccount(t *testing.T) { // 配置 DBClient 的模擬實現 mockRepo.On("QueryAccount", "10000").Return(model.Account{Id:"10000", Name:"Person_10000"}, nil) DBClient = mockRepo mockMessagingClient.On("PublishOnQueue", anyByteArray, anyString).Return(nil) MessagingClient = mockMessagingClient Convey("Given a HTTP req for a VIP account", t, func() { req := httptest.NewRequest("GET", "/accounts/10000", nil) resp := httptest.NewRecorder() Convey("When the request is handled by the Router", func() { NewRouter().ServeHTTP(resp, req) Convey("Then the response should be a 200 and the MessageClient should have been invoked", func() { So(resp.Code, ShouldEqual, 200) time.Sleep(time.Millisecond * 10) // Sleep since the Assert below occurs in goroutine So(mockMessagingClient.AssertNumberOfCalls(t, "PublishOnQueue", 1), ShouldBeTrue) }) })}) }
有關的詳情都寫在了註釋裏。在此,我也看不慣在斷言 numberOfCalls 的後置狀態以前人爲地搞個 10 ms 的睡眠,但因爲模擬是在與 「主線程」 分離的協程中調用的,咱們須要讓它稍微掛起一段時間等待主線程完成一些工做。在此也但願能對協程和管道(channel)有一個更好的慣用的單元測試方式。
我認可 - 使用這種測試方式的過程比在爲 Java 應用程序編寫單元測試用例時使用 Mockito 更加冗長。不過,我仍是認爲它的可讀性不錯,寫起來也很簡單。
接着運行測試,並確保測試經過:
go test ./...
首先要運行 springcloud.sh 腳原本更新配置服務器。而後運行 copyall.sh 並等待幾秒鐘,來讓它完成對咱們的 「account service」 的更新。而後咱們再使用 curl 來獲取咱們的 「特殊」 賬戶。
> curl http://$ManagerIP:6767/accounts/10000 {"id":"10000","name":"Person_0","servedBy":"10.255.0.11"}
若順利的話,咱們應該可以打開 RabbitMQ 的管理控制檯。而後再看看咱們是否在名爲 vipQueue 的隊列上收到了一條消息:
open http://192.168.99.100:15672/#/queues
在上圖的最底部,咱們看到 「vipQueue」 有 1 條消息。咱們再調用一下 RabbitMQ 管理控制檯中的 「Get Message」 函數,而後咱們應該能夠看到這條消息:
最後該從頭開始寫一個全新的微服務了,咱們將用它來展現如何使用 RabbitMQ 的消息。咱們會將迄今爲止在本系列中學到的東西用到裏面,其中包括:
若是你執行過了 git checkout P9
,那就應該能夠在 root/goblog 文件夾中看到 「vipservice」 。
我不會在這裏介紹每一行代碼,畢竟它有些部分跟 「accountservice」 有所重複。咱們會將重點放在 咱們剛剛所發送的消息的 「消費方式」 上。有幾點要注意:
咱們將使用 /goblog/common/messaging 和 SubscribeToQueue
函數中的代碼,例如:
SubscribeToQueue(queueName string, consumerName string, handlerFunc func(amqp.Delivery)) error
對此咱們要提供這些參數:
將咱們的回調函數綁定到隊列的 SubscribeToQueue
函數的實現也沒什麼好說的。這是其源代碼,若是須要也能夠看一看。
接下來咱們快速瀏覽一下 vip service 的 main.go 來看看咱們設置這些東西的過程:
var messagingClient messaging.IMessagingConsumer func main() { fmt.Println("Starting " + appName + "...") config.LoadConfigurationFromBranch(viper.GetString("configServerUrl"), appName, viper.GetString("profile"), viper.GetString("configBranch")) initializeMessaging() // 確保在服務存在的時候關掉鏈接 handleSigterm(func() { if messagingClient != nil { messagingClient.Close() } }) service.StartWebServer(viper.GetString("server_port")) } // 在收到 "vipQueue" 發來的消息時會調用的回調函數 func onMessage(delivery amqp.Delivery) { fmt.Printf("Got a message: %v\n", string(delivery.Body)) } func initializeMessaging() { if !viper.IsSet("amqp_server_url") { panic("No 'broker_url' set in configuration, cannot start") } messagingClient = &messaging.MessagingClient{} messagingClient.ConnectToBroker(viper.GetString("amqp_server_url")) // Call the subscribe method with queue name and callback function err := messagingClient.SubscribeToQueue("vip_queue", appName, onMessage) failOnError(err, "Could not start subscribe to vip_queue") err = messagingClient.Subscribe(viper.GetString("config_event_bus"), "topic", appName, config.HandleRefreshEvent) failOnError(err, "Could not start subscribe to " + viper.GetString("config_event_bus") + " topic") }
很熟悉對吧?咱們在後續的章節也極可能會反覆提到設置並啓動咱們加進去的微服務的方法。這也是基礎知識的一部分。
這個 onMessage
函數只記錄了咱們收到的任何 「VIP」 消息的正文。若是咱們要實現更多的仿真用例,就得引入一些花哨的邏輯來肯定帳戶持有者是否有資格得到 「super-awesome buy all our stuff (tm)」 的待遇,而且也可能要往 「VIP offer 數據庫「 裏寫入記錄。有興趣的話不妨也實施一下這一邏輯,而後交個 pull request。
最後再提一下這段代碼。在這段代碼的幫助下,咱們能夠按下 Ctrl + C 來殺掉一個服務的實例,或者咱們也能夠等待 Docker Swarm 來殺死一個服務實例。
func handleSigterm(handleExit func()) { c := make(chan os.Signal, 1) signal.Notify(c, os.Interrupt) signal.Notify(c, syscall.SIGTERM) go func() { <-c handleExit() os.Exit(1) }() }
這段代碼在可讀性上並不比別的代碼好,它所作的只是將管道 「c」 註冊爲 os.Interrupt
和 syscall.SIGTERM
的監聽器,而且會阻塞性地監聽 「c」 上的消息,直到接到任意信號爲止。這使咱們可以確信,只要微服務的實例被殺了,這裏的 handleExit()
函數就會被調用。若仍是不能確信的話,能夠用 Ctrl + C 或者 Docker Swarm scaling 來測試一下。kill
指令也能夠,不過 kill -9
就不行。所以,除非必須,最好不要用 kill -9
來結束任何東西的運行。
handleExit()
函數將調用咱們在 IMessageConsumer
接口上聲明的 Close()
函數,該函數會確保 AMQP 鏈接的正常關閉。
這裏的 copyall.sh 腳本已經更新過了。如有跟從上面的步驟,而且確保了合 Github 的 P9 分支的一致性,那就能夠運行了。在完成部署以後,執行 docker service ls 就應該會打印這樣的內容:
> docker service ls ID NAME REPLICAS IMAGE kpb1j3mus3tn accountservice 1/1 someprefix/accountservice n9xr7wm86do1 configserver 1/1 someprefix/configserver r6bhneq2u89c rabbitmq 1/1 someprefix/rabbitmq sy4t9cbf4upl vipservice 1/1 someprefix/vipservice u1qcvxm2iqlr viz 1/1 manomarks/visualizer:latest
或者也可使用 dvizz Docker Swarm 服務渲染器查看狀態:
因爲 docker service logs 功能在 1.13.0 版本里面被標記成了實驗性功能,所以咱們必須用老一套的方式來查看 「vipservice」 的日誌。
首先,執行 docker ps 找出 CONTAINER ID:
> docker ps CONTAINER ID IMAGE a39e6eca83b3 someprefix/vipservice:latest b66584ae73ba someprefix/accountservice:latest d0074e1553c7 someprefix/configserver:latest
記下 vipservice 的 CONTAINER ID,並執行 docker logs -f 檢查其日誌:
> docker logs -f a39e6eca83b3 Starting vipservice... 2017/06/06 19:27:22 Declaring Queue () 2017/06/06 19:27:22 declared Exchange, declaring Queue () 2017/06/06 19:27:22 declared Queue (0 messages, 0 consumers), binding to Exchange (key 'springCloudBus') Starting HTTP service at 6868
打開另外一個命令行窗口,並 curl 一下咱們的特殊帳戶對象。
> curl http://$ManagerIP:6767/accounts/10000
若是一切正常,咱們應該在原始窗口的日誌中看到響應隊列的消息。
Got a message: {"accountId":"10000","readAt":"2017-02-15 20:06:27.033757223 +0000 UTC"}
用於跨服務實例分發工做的模式利用了工做隊列的概念。每一個 「vip 消息」 應該由一個「vipservice」實例處理。
因此讓咱們看看使用 docker service scale 命令將 「vipservice」 擴展成兩個實例時的情形:
> docker service scale vipservice=2
新的 「vipservice」 實例應該能在幾秒鐘內完成部署。
因爲咱們在 AMQP 中使用了 direct / queue 的發送方法,咱們應該會見到一種輪轉調度式(round-robin)的分發情形。再用 curl 觸發四個 VIP 賬戶的查找:
> curl http://$ManagerIP:6767/accounts/10000 > curl http://$ManagerIP:6767/accounts/10000 > curl http://$ManagerIP:6767/accounts/10000 > curl http://$ManagerIP:6767/accounts/10000
再次檢查咱們原始的 「vipservice」 的日誌:
> docker logs -f a39e6eca83b3 Got a message: {"accountId":"10000","readAt":"2017-02-15 20:06:27.033757223 +0000 UTC"} Got a message: {"accountId":"10000","readAt":"2017-02-15 20:06:29.073682324 +0000 UTC"}
正如咱們所預期的那樣,咱們看到第一個實例處理了四個消息中的兩個。若是咱們對另外一個 「vipservice」 實例也執行一次 docker logs,那麼咱們也應該會在那裏看到兩條消息。
實際上,我並無真正想出一個好的方式來在避免花費大量的時間模擬一個 AMQP 庫的前提下,對 AMQP 消費者進行單元測試。在 messagingclient_test.go 裏面準備了一個測試,用於測試訂閱者對傳入消息的等待以及處理的一個循環。不過並無值得一提的地方。
爲了更全面地測試消息傳遞機制,我可能會在後續的博客文章中回顧關於集成測試的話題。使用 Docker Remote API 或 Docker Compose 進行 go 測試。測試將啓動一些服務,好比能讓咱們在測試代碼裏面發送還有接收消息的 RabbitMQ。
咱們此次不弄性能測試。在發送和接收一些消息以後快速瀏覽一下內存使用狀況就足夠了:
CONTAINER CPU % MEM USAGE / LIMIT vipservice.1.tt47bgnmhef82ajyd9s5hvzs1 0.00% 1.859MiB / 1.955GiB accountservice.1.w3l6okdqbqnqz62tg618szsoj 0.00% 3.434MiB / 1.955GiB rabbitmq.1.i2ixydimyleow0yivaw39xbom 0.51% 129.9MiB / 1.955GiB
上面即是在處理了幾個請求以後的內存使用狀況。新的 「vipservice」 並不像 「accountservice」 那麼複雜,所以在啓動後它應該會使用更少的內存。
這大概是這個系列中最長的一部分了!咱們在這章完成了這些內容:
問答
微服務架構:跨服務數據共享如何實現?
相關閱讀
在微服務之間進行通訊
RabbitMQ與AMQP協議
使用Akka HTTP構建微服務:CDC方法
此文已由做者受權騰訊雲+社區發佈,原文連接:https://cloud.tencent.com/dev...
歡迎你們前往騰訊雲+社區或關注雲加社區微信公衆號(QcloudCommunity),第一時間獲取更多海量技術實踐乾貨哦~