Go微服務 - 第九部分 - 使用RabbitMQ和AMQP進行消息傳遞

第九部分: Go語言微服務系列 - 使用RabbitMQ和AMQP進行消息傳遞

本文咱們將經過RabbitMQ和AMQP協議在Go微服務之間進行消息傳遞。html

簡介

微服務是將應用程序的業務領域分離成具備清晰分離域的邊界上下文,運行進程分離,其中任何跨域邊界的持久關係必須依賴最終一致性,而不是相似於ACID事務或外鍵約束。其中不少概念都來自域驅動設計,或受其啓發。領域驅動設計是另一個很大的話題,足以用一個文章系列來介紹。linux

在咱們Go語言微服務博客系列的上下文和微服務大致架構中,實現服務間的鬆耦合的一種模式是使用消息傳遞來進行服務間通訊,不須要嚴格的請求/響應消息交換或相似的消息交換。也就是說,使用消息傳遞只是便於服務間鬆耦合的衆多策略中的一種。git

在Spring Cloud中,RabbitMQ彷佛是選擇的消息中間人(代理), 特別是由於在第八部分中咱們看到的,Spring Cloud Config服務器具備RabbitMQ運行時依賴。github

本文中,將會讓accountservice服務每當讀取特殊帳號對象時,就在RabbitMQ exchange上放一條消息。這個消息會被一個咱們本文所實現的全新微服務消費。咱們也將處理Go代碼在多微服務間的複用問題,將多服務複用代碼放在common類庫中,這樣每一個微服務均可以import它。spring

還記得咱們在第一部分中的系統景觀的圖片嗎? 下面是在本部分完成以後看起來的樣子:docker

clipboard.png

  • 實現集中配置服務
  • 實現服務間通訊的消息傳遞
  • 實現兩個微服務accountservice和vipservice

依然還有不少元素還沒有實現。 不要擔憂,咱們慢慢都會作到的。數據庫

源代碼

這一部分有不少源代碼,本文不會包含全部代碼。 要查看完整代碼,可克隆並切換到P9分支,或者直接查看https://github.com/callistaen...json

發送消息

咱們將實現一個簡單的虛構(make-believe)用例: 當特定VIP帳號在讀取accountservice服務時,咱們但願通知一個vip offer服務,在某些狀況下,它將爲帳戶持有人產生"offer"。在適當設計的領域模型中,帳戶對象和VIP offer對象時兩個獨立領域,它們應該儘量少的互相瞭解。跨域

clipboard.png

換言之,accountservice不能直接訪問VIP服務的存儲。這個例子中,咱們經過RabbitMQ傳遞一個消息給vipservice, 徹底將業務邏輯和持久化都委託給vipservice。數組

咱們將使用AMQP協議作全部通訊,這個協議是面向互操做性消息傳遞的ISO標準應用程序層協議。咱們的選擇使用的Go類庫是streadway/amqp, 相似在第八部分中咱們消費配置更新時候使用的。

讓咱們重複在AMQP中exchange和publisher, consumer和queue之間的關係:
clipboard.png

也就是說消息被髮布到exchange, 而後將消息副本基於路由規則和可能已經註冊消費者的綁定分佈到queue。在quora.com網站上的這個帖子對這個話題進行了很好的解釋。

Thread vs Post: 在論壇中,經常使用Thread和Post代指某些東西。可是這二者有什麼區別呢?
通俗的講Thread就是論壇中最初發起的某個主題的話題, 包含不少Post(A thread is a group of posts on a single topic.)。中文社區一般所謂的樓主發的第一個東西。 而Post則是對樓主最初發的內容作的回覆或跟帖。
參考連接: https://www.drupal.org/projec...

爲何RabbitMQ有Queue,還要有Exchange?

現實中的(Quora中的答案)例子:

假設你在Apple商店裏邊,先要買耳機。 店裏就會有人過來問你:"須要什麼?" 你告訴他你須要買耳機,而後他就把你帶到他的同事的櫃檯前的排隊隊列以後等待。由於不少其餘人也在買東西,銷售員正在處理隊列前面的那個消費者。 若是這個時候,另一我的進店了,剛纔招呼你的人會一樣詢問對方須要什麼幫助。剛進來的人須要修下手機,被找呼的人帶到了另一個修理手機的櫃檯等待了。

這個例子中問你須要什麼的人就是exchange, 他會根據須要把你路由到恰當的隊列中排隊等待。在隊列的後面有不少員工,也就是對應隊列的worker, 或者消費者。一次處理一個請求,基於先進先出的原則。也可能會根據最早到的人作一個簡單輪詢。

若是店裏沒有導流的服務員,那麼你就須要來回在每一個櫃檯前來回問是否能幫到你,直到找到你須要辦理業務的櫃檯後開始排隊。

固然,導航蘋果商店的工做不復雜,但在應用程序中,你可能有不少隊列,服務不一樣類型的請求,基於路由和綁定具備交換路由消息的鍵來講很是有幫助。 發佈者只須要關心添加正確的路由密匙,而消費者只須要關心用正確的綁定密匙建立正確的隊列,就能夠作到"我對這些消息感興趣。"

消息傳遞代碼

既然咱們須要在accountservice和vipservice中使用消息傳遞代碼和從Spring Cloud Config服務器上加載配置的代碼,咱們能夠建立可共享的庫。

咱們在goblog目錄下面建立一個common目錄來保存咱們可複用的東西:

mkdir -p common/messaging
mkdir -p common/config

咱們將全部AMQP相關的代碼放在messaging目錄,配置相關的放在config目錄。這樣你能夠把以前的goblog/accountservice/config中的代碼移到common/config目錄中,並相應的修改import語句中的代碼位置。能夠看看已完成代碼看它是如何支持的。

消息傳遞代碼在單獨文件中封裝起來,裏邊定義了咱們應用將用於鏈接、發佈和訂閱的接口以及具體實現。老實說,對於使用streadway/amqp的AMQP消息傳遞來講有不少樣板代碼,所以無需在乎代碼的實現細節。

在common/messaging/下面建立一個messagingclient.go文件:

package messaging

import (
    "github.com/streadway/amqp"
    "fmt"
    "log"
)

// Defines our interface for connecting and consuming messages.
type IMessagingClient interface {
    ConnectToBroker(connectionString string)
    Publish(msg []byte, exchangeName string, exchangeType string) error
    PublishOnQueue(msg []byte, queueName string) error
    Subscribe(exchangeName string, exchangeType string, consumerName string, handlerFunc func(amqp.Delivery)) error
    SubscribeToQueue(queueName string, consumerName string, handlerFunc func(amqp.Delivery)) error
    Close()
}

// Real implementation, encapsulates a pointer to an amqp.Connection
type MessagingClient struct {
    conn *amqp.Connection
}

上面代碼片斷,定義了messaging的接口。 這就是accountservice和vipservice須要消息傳遞的時候須要使用它們進行處理的,但願能從不少複雜的東西里邊抽象出來。注意我已經選擇兩種變體"Product"和"Consume"來使用topics和direct/queue消息模式。

接下來,咱們定義了一個保存amqp.Connection指針的結構體,咱們會將必要的方法綁定到它上面(隱式的,由於Go語言中都是這樣乾的), 這樣就實現了咱們聲明的接口。

func (m *MessagingClient) ConnectToBroker(connectionString string) {
    if connectionString == "" {
        panic("Cannot initialize connection to broker, connectionString not set. Have you initialized?")
    }

    var err error
    m.conn, err = amqp.Dial(fmt.Sprintf("%s/", connectionString))
    if err != nil {
        panic("Failed to connect to AMQP compatible broker at: " + connectionString)
    }
}

func (m *MessagingClient) PublishOnQueue(body []byte, queueName string) error {
    if m.conn == nil {
            panic("Tried to send message before connection was initialized. Don't do that.")
    }
    ch, err := m.conn.Channel()      // Get a channel from the connection
    defer ch.Close()

    queue, err := ch.QueueDeclare(// Declare a queue that will be created if not exists with some args
        queueName, // our queue name
        false, // durable
        false, // delete when unused
        false, // exclusive
        false, // no-wait
        nil, // arguments
    )

    // Publishes a message onto the queue.
    err = ch.Publish(
        "", // exchange
        queue.Name, // routing key
        false, // mandatory
        false, // immediate
        amqp.Publishing{
            ContentType: "application/json",
            Body:        body, // Our JSON body as []byte
        })
    fmt.Printf("A message was sent to queue %v: %v", queueName, body)
    return err
}

ConnectToBroker中展現了咱們如何獲取鏈接指針的,例如amqp.Dial方法。若是咱們沒有配置或者沒法鏈接咱們的broker, 會panic咱們的微服務,容器編排會嘗試使用新實例從新嘗試。 傳入的鏈接字符串就像這樣:

amqp://guest:guest@rabbitmq:5672/

注意咱們如今使用的是Docker Swarm模式下的RabbitMQ broker的服務名。

PublishOnQueue()函數至關長,它或多或少是從官方例子派生過來的,這裏我對其進行了簡化,帶比較少的參數。要發佈消息到命名隊列,咱們須要傳入的參數有:

  • body: 以字節數組形式傳入。 能夠是JSON,XML或一些二進制。
  • queueName: 要發送消息到的目標隊列名字。

要了解更多exchange的詳情,能夠參考RabbitMQ的官方文檔

PublishOnQueue()方法樣本代碼使用的很重,可是很容易理解。聲明隊列(若是不存在就建立它), 而後發佈咱們的[]byte消息到它裏邊。發佈消息到命名exchange更加複雜,它須要樣板代碼首先聲明一個exchange,一個隊列,而後實現將它們綁定一塊兒的代碼。 詳細請查看完整代碼

繼續,實際使用咱們MessagingClient的是在goblog/accountservice/service/handlers.go中,所以咱們添加一個字段,並硬編碼檢查是否爲VIP, 而後若是請求帳號id是10000的話,咱們就發送一個消息傳遞。

var DBClient dbclient.IBoltClient
var MessagingClient messaging.IMessagingClient  // 添加新行
var isHealthy = true

func GetAccount(w http.ResponseWriter, r *http.Request) {
    // Read the 'accountId' path parameter from the mux map
    var accountId = mux.Vars(r)["accountId"]

    // Read the account struct BoltDB
    account, err := DBClient.QueryAccount(accountId)
    account.ServedBy = util.GetIP()

    // If err, return a 404
    if err != nil {
        fmt.Println("Some error occured serving " + accountId + ": " + err.Error())
        w.WriteHeader(http.StatusNotFound)
        return
    }

    notifyVIP(account)   // 添加新行 同時發送VIP通知。

    // NEW call the quotes-service
    quote, err := getQuote()
    if err == nil {
        account.Quote = quote
    }

    // If found, marshal into JSON, write headers and content
    data, _ := json.Marshal(account)
    writeJsonResponse(w, http.StatusOK, data)
}

// If our hard-coded "VIP" account, spawn a goroutine to send a message.
func notifyVIP(account model.Account) {
    if account.Id == "10000" {
        go func(account model.Account) {
            vipNotification := model.VipNotification{AccountId: account.Id, ReadAt: time.Now().UTC().String()}
            data, _ := json.Marshal(vipNotification)
            fmt.Printf("Notifying VIP account %v\n", account.Id)
            err := MessagingClient.PublishOnQueue(data, "vip_queue")
            if err != nil {
                fmt.Println(err.Error())
            }
        }(account)
    }
}

藉此機會,咱們展現調用新goroutine的內聯匿名函數, 也就是說使用了go關鍵詞的。既然咱們沒有什麼理由在發送消息傳遞的時候須要阻塞執行HTTP處理的主goroutine, 那麼這種狀況就是使用goroutine實現並行的最佳時機。

main.go文件也須要更新一點代碼以即可以在啓動的時候使用加載的並注入到Viper中的配置來初始化AMQ鏈接。

...
func main() {
    fmt.Printf("Starting %v\n", appName)

    config.LoadConfigurationFromBranch(
        viper.GetString("configServerUrl"),
        appName,
        viper.GetString("profile"),
        viper.GetString("configBranch"))
    initializeBoltClient()
    initializeMessaging()     // 新增行,初始化消息傳遞
    handleSigterm(func() {
        service.MessagingClient.Close()
    })
    service.StartWebServer(viper.GetString("server_port"))
}

func initializeMessaging() {
    if !viper.IsSet("amqp_server_url") {
        panic("No 'amqp_server_url' set in configuration, cannot start")
    }

    service.MessagingClient = &messaging.MessagingClient{}
    service.MessagingClient.ConnectToBroker(viper.GetString("amqp_server_url"))
    service.MessagingClient.Subscribe(viper.GetString("config_event_bus"), "topic", appName, config.HandleRefreshEvent)
}
...

沒有什麼大不了的東西 - 咱們建立一個空的MessagingClient實例並將其地址賦值給service.MessagingClient, 而後使用配置amqp_server_url來調用ConnectToBroker方法。若是配置中沒有broker_url,咱們就panic()退出,由於咱們不但願在甚至都沒有可能鏈接到broker的狀況下運行服務。

若是成功的鏈接到broker, 那麼咱們就調用Subscribe方法來訂閱由配置指定的topic。

更新配置

咱們在咱們的.yml配置文件中添加amqp_broker_url屬性到第八部分中的配置文件中,這些東西已經沒有人管了。

broker_url: amqp://guest:guest@192.168.99.100:5672 _(dev)_   

broker_url: amqp://guest:guest@rabbitmq:5672 _(test)_

注意test profile, 咱們使用的是Swarm服務名"rabbitmq", 而不是我筆記本上看到的Swarm的網絡IP地址。(你實際的IP地址可能會變化,192.168.99.100彷佛是運行Docker Toolbox的標準IP)。

配置文件中使用明文的用戶名和密碼是不推薦的,在現實生活中,咱們通常會使用第八部分中看到的Spring Cloud Config服務器內置的加密特性。

單元測試

固然,咱們應該至少編寫一個單元測試,確保咱們handlers.go中的GetAccount函數當某人請求神奇的並不是常特殊的帳號標識爲10000的帳號時嘗試發送一個消息。爲此,咱們須要模擬IMessagingClient和handlers_test.go中添加新的測試用例實現。讓咱們開始模擬吧。 此次咱們將使用第三方工具mockery來產生IMessagingClient接口的實現:(記住在命令行運行這些命令的時候使用恰當的GOPATH設置)。

> go get github.com/vektra/mockery/.../
> cd $GOPATH/src/github.com/callistaenterprise/goblog/common/messaging
> ./$GOPATH/bin/mockery -all -output .
  Generating mock for: IMessagingClient

咱們如今在當前目錄有一個IMessagingClient.go模擬文件。 我不太喜歡這樣的文件名字,不喜歡駝峯,因此我將它重命名爲一個明顯的東西,它模擬並遵循本博客系列中文件名的約定。

mv IMessagingClient.go mockmessagingclient.go

可能須要調整通常文件中的import語句,刪除import別名。 除了那些,咱們使用一個黑盒方式來達到這個特殊模擬 - 僅假設它在咱們開始寫測試的時候會工做。

請隨意檢查生成的模擬實現的源代碼,它很是相似咱們以前第四部分中手工寫的東西。

切到handlers_test.go,咱們添加一個新的測試用例:

// declare mock types to make test code a bit more readable
var anyString = mock.AnythingOfType("string")
var anyByteArray = mock.AnythingOfType("[]uint8")  // == []byte

func TestNotificationIsSentForVIPAccount(t *testing.T) {
    // Set up the DB client mock
    mockRepo.On("QueryAccount", "10000").Return(model.Account{Id:"10000", Name:"Person_10000"}, nil)
    DBClient = mockRepo

    mockMessagingClient.On("PublishOnQueue", anyByteArray, anyString).Return(nil)
    MessagingClient = mockMessagingClient

    Convey("Given a HTTP req for a VIP account", t, func() {
        req := httptest.NewRequest("GET", "/accounts/10000", nil)
        resp := httptest.NewRecorder()
        Convey("When the request is handled by the Router", func() {
            NewRouter().ServeHTTP(resp, req)
            Convey("Then the response should be a 200 and the MessageClient should have been invoked", func() {
                So(resp.Code, ShouldEqual, 200)
                time.Sleep(time.Millisecond * 10)    // Sleep since the Assert below occurs in goroutine
                So(mockMessagingClient.AssertNumberOfCalls(t, "PublishOnQueue", 1), ShouldBeTrue)
            })
    })})
}

能夠查看註釋瞭解詳情。我不喜歡在斷言調用數以前人爲添加10毫秒睡眠,但因爲模擬是在goroutine中調用,和主線程是獨立的,咱們須要容許它有一些時間來完成。 但願在涉及到有goroutine或者channel的時候,有更好的單元測試方式。

我認可,模擬這種方式比使用相似Mockito的東西更冗餘, 當寫Java應用的單元測試的時候。不過,我認爲可讀性和易讀性仍是不錯的。

確保測試經過:

go test ./...

運行

若是你尚未作的話,先運行springcloud.sh腳本更新配置服務器。 而後,運行copyall.sh並等幾秒鐘更新accountservice。咱們將使用curl來獲取咱們特殊的帳號:

> curl http://$ManagerIP:6767/accounts/10000
{"id":"10000","name":"Person_0","servedBy":"10.255.0.11"}

若是全部進行順利的話,咱們能夠打開RabbitMQ管理控制檯,並看咱們是否在名爲vipQueue的隊列上得到了一個消息。

clipboard.png

在上面截圖最底下,咱們看到vipQueue有一個消息。若是咱們使用RabbitMQ管理控制檯的Get Message功能, 咱們會看到下面的消息:

clipboard.png

在Go語言中實現消費者 - vipservice

最後,是時候從頭開始寫一個全新的微服務了, 咱們須要用它來展現如何從RabbitMQ消費消息。咱們將確保應用在前面內容中學到的模式包括:

  • HTTP服務器
  • 健康檢查
  • 集中化配置管理
  • 消息傳遞碼複用

若是你已經切出P9分支的代碼了,那麼在你goblog目錄下面就已經有了vipservice了。
我不會一行行過每一個代碼文件的內容,由於有些和accountservice裏邊的重複了。相反我將聚焦在剛纔發送消息的消費方面。須要注意一些事情:

  • 在config-repo倉庫添加了兩個新的.yml文件,vipservice-dev.yml和vipservice-test.yml。
  • copyall.sh腳本更新了,讓它同時構建和部署accountservice和vipservice。

消費消息

咱們會使用common/messaging的SubscribeToQueue函數,例如:

SubscribeToQueue(queueName string, consumerName string, handlerFunc func(amqp.Delivery)) error

這裏咱們應該提供的最重要的是:

  • 隊列的名字(例如: vip_queue)。
  • 消費者名字(咱們是誰)。
  • 處理器函數,它將使用一個amqp.Delivery參數來調用 - 和第八部分中咱們消費配置更新很是相似。

實際上將咱們的回調函數綁定到隊列的SubscribeToQueue實現的實現並不奇怪,若是咱們須要瞭解細節,能夠查閱源代碼

繼續快速看看vipservice的入口文件main.go, 看看咱們如何設置的:

package main

import (
    "flag"
    "fmt"
    "github.com/callistaenterprise/goblog/common/config"
    "github.com/callistaenterprise/goblog/common/messaging"
    "github.com/callistaenterprise/goblog/vipservice/service"
    "github.com/spf13/viper"
    "github.com/streadway/amqp"
    "os"
    "os/signal"
    "syscall"
)

var appName = "vipservice"

var messagingClient messaging.IMessagingClient

func init() {
    configServerUrl := flag.String("configServerUrl", "http://configserver:8888", "Address to config server")
    profile := flag.String("profile", "test", "Environment profile, something similar to spring profiles")
    configBranch := flag.String("configBranch", "master", "git branch to fetch configuration from")
    flag.Parse()

    viper.Set("profile", *profile)
    viper.Set("configServerUrl", *configServerUrl)
    viper.Set("configBranch", *configBranch)
}

func main() {
    fmt.Println("Starting " + appName + "...")

    config.LoadConfigurationFromBranch(viper.GetString("configServerUrl"), appName, viper.GetString("profile"), viper.GetString("configBranch"))
    initializeMessaging()

    // Makes sure connection is closed when service exits.
    handleSigterm(func() {
        if messagingClient != nil {
            messagingClient.Close()
        }
    })
    service.StartWebServer(viper.GetString("server_port"))
}

func onMessage(delivery amqp.Delivery) {
    fmt.Printf("Got a message: %v\n", string(delivery.Body))
}

func initializeMessaging() {
    if !viper.IsSet("amqp_server_url") {
        panic("No 'broker_url' set in configuration, cannot start")
    }
    messagingClient = &messaging.MessagingClient{}
    messagingClient.ConnectToBroker(viper.GetString("amqp_server_url"))

    // Call the subscribe method with queue name and callback function
    err := messagingClient.SubscribeToQueue("vip_queue", appName, onMessage)
    failOnError(err, "Could not start subscribe to vip_queue")

    err = messagingClient.Subscribe(viper.GetString("config_event_bus"), "topic", appName, config.HandleRefreshEvent)
    failOnError(err, "Could not start subscribe to "+viper.GetString("config_event_bus")+" topic")
}

// Handles Ctrl+C or most other means of "controlled" shutdown gracefully. Invokes the supplied func before exiting.
func handleSigterm(handleExit func()) {
    c := make(chan os.Signal, 1)
    signal.Notify(c, os.Interrupt)
    signal.Notify(c, syscall.SIGTERM)
    go func() {
        <-c
        handleExit()
        os.Exit(1)
    }()
}

func failOnError(err error, msg string) {
    if err != nil {
        fmt.Printf("%s: %s", msg, err)
        panic(fmt.Sprintf("%s: %s", msg, err))
    }
}

看起來和accountservice很是類似,對不對? 咱們可能會重複如何安裝和啓動咱們添加的每一個微服務的基本知識。

onMessage函數在這裏僅僅打印咱們接到的vip消息的body。若是咱們須要實現更多虛構的用例,它會調用一些花哨的邏輯來肯定帳號持有人是否有資格得到"超級可怕的購買咱們全部東西(TM)"的offer, 而且可能寫一個offer給"VIP offer數據庫"。你能夠隨意實現並提交一個PR。

沒有什麼可補充的。除了這個片斷,當咱們按下Ctrl + C或者當Swarm認爲是時候殺死服務實例:

func handleSigterm(handleExit func()) {
    c := make(chan os.Signal, 1)
    signal.Notify(c, os.Interrupt)
    signal.Notify(c, syscall.SIGTERM)
    go func() {
        <-c
        handleExit()
        os.Exit(1)
    }()
}

不是最容易讀的代碼片斷,它所作的就是註冊通道c做爲os.Interrupt和syscall的監聽器。SIGTERM和goroutine會阻塞在c上的消息監聽,知道接收到這兩種信號。 這樣就使得咱們很是確定咱們提供的handleExit()函數在微服務被殺掉的時候都會被調用。怎麼肯定? Ctrl + C或docker swarm擴展也工做良好。kill也同樣。 kill -9不會。 所以請求不要使用kill -9中止,除非你必需要這樣作。

它將調用咱們在IMessageConsumer接口中聲明的Close()函數, 它實現的時候確保AMQP鏈接被正確關閉。

部署運行

咱們對copyall.sh內容進行了修改:

#!/bin/bash

export GOOS=linux
export CGO_ENABLED=0

cd accountservice;go get;go build -o accountservice-linux-amd64;echo built `pwd`;cd ..
cd healthchecker;go get;go build -o healthchecker-linux-amd64;echo built `pwd`;cd ..
cd vipservice;go get;go build -o vipservice-linux-amd64;echo built `pwd`;cd ..

export GOOS=darwin

cp healthchecker/healthchecker-linux-amd64 accountservice/
cp healthchecker/healthchecker-linux-amd64 vipservice/

docker build -t someprefix/accountservice accountservice/
docker service rm accountservice
docker service create --name=accountservice --replicas=1 --network=my_network -p=6767:6767 someprefix/accountservice

docker build -t someprefix/vipservice vipservice/
docker service rm vipservice
docker service create --name=vipservice --replicas=1 --network=my_network someprefix/vipservice

運行這個腳本,等待幾秒鐘,讓服務從新構建部署完成。而後查看:

> docker service ls
ID            NAME            REPLICAS  IMAGE
kpb1j3mus3tn  accountservice  1/1       someprefix/accountservice
n9xr7wm86do1  configserver    1/1       someprefix/configserver
r6bhneq2u89c  rabbitmq        1/1       someprefix/rabbitmq
sy4t9cbf4upl  vipservice      1/1       someprefix/vipservice
u1qcvxm2iqlr  viz             1/1       manomarks/visualizer:latest

或者可使用dvizz Docker Swarm服務呈現來查看:

clipboard.png

檢查日誌

既然docker service logs特性已經在1.13.0中被標記爲試驗階段,咱們依然可使用前面的方式來查看vipservice的日誌。首先,運行docker ps找出容器id:

> docker ps
CONTAINER ID        IMAGE
a39e6eca83b3        someprefix/vipservice:latest
b66584ae73ba        someprefix/accountservice:latest
d0074e1553c7        someprefix/configserver:latest

而後使用vipservice的容器id來查看日誌:

> docker logs -f a39e6eca83b3
Starting vipservice...
2017/06/06 19:27:22 Declaring Queue ()
2017/06/06 19:27:22 declared Exchange, declaring Queue ()
2017/06/06 19:27:22 declared Queue (0 messages, 0 consumers), binding to Exchange (key 'springCloudBus')
Starting HTTP service at 6868

而後另外打開一個窗口,執行下面的請求:

> curl http://$ManagerIP:6767/accounts/10000

而後你就會在剛纔日誌裏邊看到多了下面一行信息:

Got a message: {"accountId":"10000","readAt":"2017-02-15 20:06:27.033757223 +0000 UTC"}

也就是說咱們的vipservice成功的消費了從accountservice發佈的消息。

Work隊列

跨越服務的多個實例的分佈式work模式是利用了work隊列的概念。每一個vip消息應該只能被單個vipservice實例處理。

clipboard.png

所以讓咱們看看當咱們將vipservice規模擴大到2個的時候會發生什麼:

> docker service scale vipservice=2

數秒以後新的實例就可使用了。既然咱們使用的是AMQP中的direct/queue方式,咱們但願有輪詢的行爲。使用curl觸發四個VIP帳戶查詢。

> curl http://$ManagerIP:6767/accounts/10000
> curl http://$ManagerIP:6767/accounts/10000
> curl http://$ManagerIP:6767/accounts/10000
> curl http://$ManagerIP:6767/accounts/10000

而後在看看日誌:

> docker logs -f a39e6eca83b3
Got a message: {"accountId":"10000","readAt":"2017-02-15 20:06:27.033757223 +0000 UTC"}
Got a message: {"accountId":"10000","readAt":"2017-02-15 20:06:29.073682324 +0000 UTC"}

正如咱們預料的,咱們看到第一個實例處理了四條消息中的兩條。若是咱們對其餘的vipservice進行docker logs查詢,咱們會看到其餘的消息在它們裏邊消費了。很是滿意。

佔用空間和性能

此次不會作性能測試,在發送和接受一些消息後,快速查看內存使用就足夠了:

CONTAINER                                    CPU %               MEM USAGE / LIMIT
vipservice.1.tt47bgnmhef82ajyd9s5hvzs1       0.00%               1.859MiB / 1.955GiB
accountservice.1.w3l6okdqbqnqz62tg618szsoj   0.00%               3.434MiB / 1.955GiB
rabbitmq.1.i2ixydimyleow0yivaw39xbom         0.51%               129.9MiB / 1.955GiB

上買呢在服務了一些請求後獲得的信息。新的vipservice和accountservice同樣不是很複雜,所以和預料的同樣啓動的時候佔用的內存很是小。

總結

本文多是這個系列目前最長的一篇文章了!咱們完成了:

  • 更深刻的測試了RabbitMQ和AMQP的機制。
  • 添加了全新的微服務vipservice。
  • 將消息傳遞和配置代碼放到可複用的子項目中。
  • 使用AMQP協議發佈/訂閱消息。
  • 使用mockery產生模擬代碼。

在第十部分,咱們將作一些輕量的但在現實世界很是重要的模型 - 使用Logrus, Docker GELF日誌驅動記錄結構化日誌以及將日誌發不到Laas提供者商。

中英文對照表

  • 領域驅動設計: Domain-driven Design(DDD).

參考連接

相關文章
相關標籤/搜索