Go微服務 - 第八部分 - 使用Viper和Spring Cloud Config進行集中配置

第八部分: Go微服務 - 使用Viper和Spring Cloud Config進行集中配置

在第八部分,咱們探索Go微服務中使用Spring Cloud Config進行集中配置。java

簡介

考慮到微服務畢竟是用來分解應用爲獨立軟件片斷的,在微服務中集中處理一些東西感受有些不太搭配。然而咱們一般在後面的是進程之間的獨立。微服務的其餘操做應該集中處理。例如,日誌應該在你的日誌解決, 好比elk stack中終結, 監控應該歸入專用的監控中。 在這部分,咱們將使用Spring Cloud Config和git處理外部化和集中配置。linux

集中處理組成咱們應用程序的各類微服務的配置其實是很天然的事情。 特別是在未知數量底層硬件節點上的容器環境運行的時候,管理配置文件構建到每一個微服務的映像中或放到不一樣的安裝卷中,很快就會變成真正的難題。有不少行之有效的項目能夠幫咱們處理這些問題,例如etcd, consul和ZooKeeper。然而,應該注意的是,這些項目提供的不只僅是配置服務。既然本文聚焦的是集成Go微服務和Spring Cloud/Netflix OSS生態的支持服務, 咱們將基於Spring Cloud配置進行集中配置, Spring Cloud Config是一個提供精確配置的專用軟件。git

Spring Cloud Config

Spring Cloud生態提供了集中配置的解決方案,也沒有什麼創意,就叫Spring Cloud Config。Spring Cloud Config服務器能夠被視爲服務和真正配置之間的代理, 提供了一些很是整潔的特性:github

  • 支持多種不一樣的後端,例如git(默認), 用於etcd、consul和ZooKeeper的文件系統和插件。
  • 加密屬性的透明解密。
  • 可插拔安全性。
  • 使用git鉤子/REST API以及Spring Cloud Bus(例如RabbitMQ)的推送機制來將配置文件中的改變傳播到服務,使得配置的實時更新成爲可能。

個人同事Magnus最近的一篇文章對Spring Cloud Config進行特別深刻的探討, 見參考鏈接。在本文中,咱們將集成咱們的accountservice服務和Spring Cloud Config服務,配置後端使用公開的位於github上的git倉庫, 從倉庫中咱們能夠獲取配置,解密/加密屬性以及實現實時重載配置。golang

下面是咱們整個解決方案目標的簡單概述:web

clipboard.png

概述

既然咱們以Swarm模式運行Docker, 咱們將繼續以各類方式使用Docker的機制。在Swarm內部,咱們應該運行至少一個(能夠更多)Spring Cloud Config服務器。當咱們的微服務中的一個啓動的時候,它們要知道:算法

  • 配置服務器的邏輯服務名和端口號。也就是說,咱們把咱們的配置服務器也部署到Docker Swarm上做爲服務,這裏咱們稱之爲configserver。意味着這是微服務要請求配置的時候惟一須要知道的東西。
  • 它們的名字是什麼, 例如"accountservice"。
  • 它運行在什麼樣的執行配置文件上,例如"dev", "test", "prod"。 若是你對spring.profiles.active概念比較熟悉的話,這用於Go語言同樣很天然。
  • 若是咱們使用git做爲後端,並想從特定的分支獲取配置信息,咱們就須要提早知道(可選的)。

鑑於上面四個標準, 請求配置的簡單GET可能看起來像下面的樣子:spring

resp, err := http.Get("http://configserver:8888/accountservice/dev/P8")

也就是下面的協議:docker

protocol://url:port/applicationName/profile/branch

在Swarm中搭建一個Spring Cloud配置服務器

本文代碼能夠從github直接克隆下來: https://github.com/callistaen...shell

你也能夠用其餘方式來設置和部署配置服務器。而我在goblog目錄下面準備了一個support目錄,用於存放https://github.com/callistaen...,裏邊包含了咱們後面須要的第三方服務。

通常來講,每一個必要的支持組件要麼是簡單的便於構建和部署組件的現成的Dockerfile, 要麼是(java)源代碼和配置(Spring Cloud應用一般是基於Spring Boot的), 這樣咱們須要本身使用Gradle構建。(不須要擔憂,咱們只須要安裝JDK就能夠了)。

(這些Spring Cloud應用程序大部分個人同事都已經提早準備好了。具體能夠參考Java微服務)

RabbitMQ

什麼狀況? 咱們不是要安裝Spring Cloud Config服務器嗎? 好吧,這個依賴的軟件具備一個消息中間人,可使用支持RabbitMQ的Spring Cloud Bus來傳播配置改變。

有RabbitMQ是一個很好的事情,無論怎麼說,咱們文章後面還會用到它。因此將從RabbitMQ開始,並在咱們的Swarm中做爲服務來運行。

我已經在/goblog/support/rabbitmq目錄下面準備了一個Dockerfile,可使用我在Docker Swarm服務中提早準備好的映像。

# use rabbitmq official
FROM rabbitmq

# enable management plugin
RUN rabbitmq-plugins enable --offline rabbitmq_management

# enable mqtt plugin
RUN rabbitmq-plugins enable --offline rabbitmq_mqtt

# expose management port
EXPOSE 15672
EXPOSE 5672

而後咱們能夠建立一個腳本文件, 在須要更新的時候幫咱們自動作這些事情。

#!/bin/bash

# RabbitMQ
docker service rm rabbitmq
docker build -t someprefix/rabbitmq support/rabbitmq/
docker service create --name=rabbitmq --replicas=1 --network=my_network -p 1883:1883 -p 5672:5672 -p 15672:15672 someprefix/rabbitmq

(注意,你可能須要給這個腳本語言添加可執行權限。)

運行它,等待Docker下載必要的映像,並將它部署到Swarm中。 當它完成的時候,你就能夠打開RabbitMQ管理UI,而且能使用guest/guest來登陸進去。

Spring Cloud Config服務器

在/support/config-server中你會發現一個提早配置好的Spring Boot應用程序,它用於運行配置服務器。咱們會使用一個git倉庫來保存和訪問咱們的yaml文件存儲的配置。

---
# For deployment in Docker containers
spring:
  profiles: docker
  cloud:
    config:
      server:
        git:
          uri: https://github.com/eriklupander/go-microservice-config.git

# Home-baked keystore for encryption. Of course, a real environment wouldn't expose passwords in a blog...
encrypt:
  key-store:
    location: file:/server.jks
    password: letmein
    alias: goblogkey
    secret: changeme

# Since we're running in Docker Swarm mode, disable Eureka Service Discovery
eureka:
  client:
    enabled: false

# Spring Cloud Config requires rabbitmq, use the service name.
spring.rabbitmq.host: rabbitmq
spring.rabbitmq.port: 5672

上面是配置服務器的配置文件。咱們能夠看到一些東西:

  • 咱們告訴config-server到咱們指定的URL來獲取配置。
  • 一個密鑰庫,用於加密(自簽名)和解密的密鑰存儲庫。
  • 既然咱們是運行在Docker Swarm模式下的,所以eureka的服務發現功能是禁用的。
  • 配置服務器指望找到一個RabbitMQ, 它的host名爲rabbitmq, 端口爲5672, host恰好是剛纔咱們給咱們的RabbitMQ服務起的Docker Swarm服務名。

下面是配置服務器的Dockerfile內容, 至關簡單:

FROM davidcaste/alpine-java-unlimited-jce

EXPOSE 8888

ADD ./build/libs/*.jar app.jar
ADD ./server.jks /

ENTRYPOINT ["java","-Dspring.profiles.active=docker","-Djava.security.egd=file:/dev/./urandom","-jar","/app.jar"]

不要介意java.security.egd的東西,這是這個文章系列中咱們不須要關心的問題的解決辦法。

這裏有幾點須要注意:

  • 咱們使用的鏡像是基於Alpine Linux的,沒有限制Java的加密擴展安裝的。 這是一個必要要求,若是咱們想要Spring Cloud Config的加密/解密功能。
  • 容器鏡像的根目錄中咱們加入了在提早準備好的keystore。

編譯keystore

後面咱們要使用加密屬性,咱們須要爲配置服務器帶一個自簽名證書。(這裏咱們須要使用keytool工具。)

在/goblog/support/config-server目錄下面執行下面的命令:

keytool -genkeypair -alias goblogkey -keyalg RSA -dname "CN=Go Blog,OU=Unit,O=Organization,L=City,S=State,C=SE" -keypass changeme -keystore server.jks -storepass letmein -validity 730

keytool是一個密鑰和證書管理工具。它具備不少選項:

  • -certreq: 生成證書請求。
  • -changealias: 更改條目的別名。
  • -delete: 刪除條目。
  • -exportcert: 導出證書。
  • -genkeypair: 生成密鑰對。
  • -genseckey: 生成密鑰。
  • -gencert: 根據證書請求生成證書。
  • -importcert: 導入證書或證書鏈。
  • -importpass: 導入口令。
  • -importkeystore: 從其餘密鑰庫導入一個或全部條目。
  • -keypasswd: 更改條目的密鑰口令。
  • -list: 列出密鑰庫中的條目。
  • -printcert: 打印證書內容。
  • -printcertreq: 打印證書請求的內容。
  • -printcrl: 打印 CRL 文件的內容。
  • -storepasswd: 更改密鑰庫的存儲口令。

執行完上面命令後在當前目錄下面生成一個server.jks keystore簽名證書。你能夠隨意修改任何屬性/密碼, 主要記住相應的更改application.yml就能夠了。

...
encrypt:
  key-store:
    location: file:/server.jks
    password: letmein
    alias: goblogkey
    secret: changeme
...

構建部署

是時候構建部署服務器了。 咱們先建立一個shell腳原本節約咱們時間,由於咱們可能會須要重複作不少次。 記住 - 你須要Java運行時環境來構建它。 在/goblog目錄,咱們建立一個springcloud.sh的腳本文件。 咱們把全部真正須要構建的東西都放這裏(構建可能須要很長時間):

#!/bin/bash

cd support/config-server
./gradlew build
cd ../..
docker build -t someprefix/configserver support/config-server/
docker service rm configserver
docker service create --replicas 1 --name configserver -p 8888:8888 --network my_network --update-delay 10s --with-registry-auth  --update-parallelism 1 someprefix/configserver

而後運行腳本,須要修改腳本的可執行權限。
等待幾分鐘時間,而後檢查它是否在docker服務中啓動運行了:

> docker service ls

ID                  NAME                MODE                REPLICAS            IMAGE
39d26cc3zeor        rabbitmq            replicated          1/1                 someprefix/rabbitmq
eu00ii1zoe76        viz                 replicated          1/1                 manomarks/visualizer:latest
q36gw6ee6wry        accountservice      replicated          1/1                 someprefix/accountservice
t105u5bw2cld        quotes-service      replicated          1/1                 eriklupander/quotes-service:latest
urrfsu262e9i        dvizz               replicated          1/1                 eriklupander/dvizz:latest
w0jo03yx79mu        configserver        replicated          1/1                 someprefix/configserver

而後能夠經過curl來加載accountservice的JSON配置。

> curl http://$ManagerIP:8888/accountservice/dev/master
{"name":"accountservice","profiles":["dev"],"label":"master","version":"b8cfe2779e9604804e625135b96b4724ea378736",
    "propertySources":[
    {"name":"https://github.com/eriklupander/go-microservice-config.git/accountservice-dev.yml",
    "source":
        {"server_port":6767,"server_name":"Accountservice DEV"}
    }]
}

(這裏輸出爲了簡潔,咱們格式化了的)。實際配置保存在source屬性中,在那裏包含有全部.yml文件的屬性值,它們以key-value對的形式出現。加載並解析source屬性到Go語言可用的配置中, 是本文的中間件來完成的。

yaml配置文件

在咱們深刻到Go代碼以前,咱們先看看https://github.com/eriklupand...:

accountservice-dev.yml
accountservice-test.yml

這兩個文件目前裏邊的內容都很是少。

server_port: 6767
server_name: Accountservice TEST
the_password: (we'll get back to this one)

這裏咱們只配置了咱們但願綁定服務的HTTP端口號。真實的服務可能在裏邊設置不少東西。

使用解密/加密

Spring Cloud Config其中一個靈活的地方就是在配置文件中支持內置支持透明的解密被加密值。例如,能夠看看accountservice-test.yml文件,那裏咱們有the_password屬性:

server_port: 6767
server_name: Accountservice TEST
the_password: '{cipher}AQB1BMFCu5UsCcTWUwEQt293nPq0ElEFHHp5B2SZY8m4kUzzqxOFsMXHaH7SThNNjOUDGxRVkpPZEkdgo6aJFSPRzVF04SXOVZ6Rjg6hml1SAkLy/k1R/E0wp0RrgySbgh9nNEbhzqJz8OgaDvRdHO5VxzZGx8uj5KN+x6nrQobbIv6xTyVj9CSqJ/Btf/u1T8/OJ54vHwi5h1gSvdox67teta0vdpin2aSKKZ6w5LyQocRJbONUuHyP5roCONw0pklP+2zhrMCy0mXhCJSnjoHvqazmPRUkyGcjcY3LHjd39S2eoyDmyz944TKheI6rWtCfozLcIr/wAZwOTD5sIuA9q8a9nG2GppclGK7X649aYQynL+RUy1q7T7FbW/TzSBg='

使用字符串{cipher}做爲解密前綴,咱們的Spring Cloud配置服務器將在傳遞結果給服務器以前,知道如何自動爲咱們解密值。在全部配置都正確的運行實例中,curl請求REST API來獲取這個配置將返回:

...
      "source": {
        "server_port": 6767,
        "server_name": "Accountservice TEST",
        "the_password": "password"
....

至關靈活吧, 對吧?the_password屬性能夠在公網服務器和Spring Cloud服務器(它可能在不安全環境或內部服務器外部可見的任何環境都不可用。)中用保存明文加密的字符串(若是你相信加密算法和簽名密鑰的完整性)透明解密這個屬性爲真正的password。

固然,你須要使用相同的key做爲Spring Cloud Config的解密key來解密,有些事情能夠經過配置服務器的HTTP API來完成。

curl http://$ManagerIP:8888/encrypt -d 'password'
AQClKEMzqsGiVpKx+Vx6vz+7ww00n... (rest omitted for brevity)

Viper

咱們的基於Go的配置框架選擇的是Viper。 Viper具備很好的API能夠用, 而且很方便擴展, 而且不會妨礙咱們正常的應用代碼。雖然Viper不肯生的支持從Spring Cloud配置服務器加載配置, 可是咱們能夠寫一小片代碼能夠幫咱們作到這點。 Viper也能夠處理不少種文件類型做爲配置源 - 例如json, yaml, 普通屬性文件。 Viper能夠爲咱們從OS讀取環境變量, 至關整潔。 一旦初始化併產生後,咱們的配置老是可使用各類的viper.Get函數獲取來使用,確實很方便。

還記得在本文開頭的圖片嗎? 好吧,若是不記得了, 咱們再重複一遍:

clipboard.png

咱們將讓微服務啓動的時候發起一個HTTP請求, 獲取JSON響應的source部分,並將它們放到Viper中,這樣咱們就能夠在那裏獲取咱們的web服務器的端口號了。 讓咱們開始吧。

加載配置

正如使用curl的已展現示例,咱們能夠對配置服務器進行簡單HTTP請求,那裏咱們只須要知道名字和咱們的profile便可。 咱們將添加一些解析flag的功能到咱們的accountservice main.go, 所以在啓動的時候,咱們能夠指定一個環境profile,也能夠指定到配置服務器的可選的URI。

var appName = "accountservice"

// Init function, runs before main()
func init() {
    // Read command line flags
    profile := flag.String("profile", "test", "Environment profile, something similar to spring profiles")
    configServerUrl := flag.String("configServerUrl", "http://configserver:8888", "Address to config server")
    configBranch := flag.String("configBranch", "master", "git branch to fetch configuration from")
    flag.Parse()
    
    // Pass the flag values into viper.
    viper.Set("profile", *profile)
    viper.Set("configServerUrl", *configServerUrl)
    viper.Set("configBranch", *configBranch)
}

func main() {
    fmt.Printf("Starting %v\n", appName)

    // NEW - load the config
    config.LoadConfigurationFromBranch(
        viper.GetString("configServerUrl"),
        appName,
        viper.GetString("profile"),
        viper.GetString("configBranch"))
    initializeBoltClient()
    service.StartWebServer(viper.GetString("server_port"))    // NEW, use port from loaded config 
}

init函數比較簡單,就是從命令行參數解析flag參數值,而後設置到viper中。 在main函數中,調用config.LoadConfigurationFromBranch, 從遠程git倉庫加載配置。這裏config.LoadConfigurationFromBranch是在goblog/common/config/loader.go中定義的:

package config

import (
    "encoding/json"
    "fmt"
    "io/ioutil"
    "net/http"

    "github.com/Sirupsen/logrus"
    "github.com/spf13/viper"
)

// LoadConfigurationFromBranch loads config from for example http://configserver:8888/accountservice/test/P8
func LoadConfigurationFromBranch(configServerURL string, appName string, profile string, branch string) {
    url := fmt.Sprintf("%s/%s/%s/%s", configServerURL, appName, profile, branch)
    logrus.Printf("Loading config from %s\n", url)
    body, err := fetchConfiguration(url)
    if err != nil {
        logrus.Errorf("Couldn't load configuration, cannot start. Terminating. Error: %v", err.Error())
        panic("Couldn't load configuration, cannot start. Terminating. Error: " + err.Error())
    }
    parseConfiguration(body)
}

func fetchConfiguration(url string) ([]byte, error) {
    defer func() {
        if r := recover(); r != nil {
            fmt.Println("Recovered in f", r)
        }
    }()
    logrus.Printf("Getting config from %v\n", url)
    resp, err := http.Get(url)
    if err != nil || resp.StatusCode != 200 {
        logrus.Errorf("Couldn't load configuration, cannot start. Terminating. Error: %v", err.Error())
        panic("Couldn't load configuration, cannot start. Terminating. Error: " + err.Error())
    }
    body, err := ioutil.ReadAll(resp.Body)
    if err != nil {
        panic("Error reading configuration: " + err.Error())
    }
    return body, err
}

func parseConfiguration(body []byte) {
    var cloudConfig springCloudConfig
    err := json.Unmarshal(body, &cloudConfig)
    if err != nil {
        panic("Cannot parse configuration, message: " + err.Error())
    }

    for key, value := range cloudConfig.PropertySources[0].Source {
        viper.Set(key, value)
        logrus.Printf("Loading config property %v => %v\n", key, value)
    }
    if viper.IsSet("server_name") {
        logrus.Printf("Successfully loaded configuration for service %s\n", viper.GetString("server_name"))
    }
}

type springCloudConfig struct {
    Name            string           `json:"name"`
    Profiles        []string         `json:"profiles"`
    Label           string           `json:"label"`
    Version         string           `json:"version"`
    PropertySources []propertySource `json:"propertySources"`
}

type propertySource struct {
    Name   string                 `json:"name"`
    Source map[string]interface{} `json:"source"`
}

本代碼引入了三個包logrus, viper和amqp。由於我沒有使用deps之類的包管理工具,所以咱們在安裝logrus和viper包的時候,這兩個包也有依賴的第三方包,咱們手工進行一些go get:

mkdir -p $GOPATH/src/golang.org/x
cd !$

git clone https://github.com/golang/text.git
git clone https://github.com/golang/sys.git


logrus go get問題
git clone https://github.com/golang/crypto.git

loadConfigurationFromBranch函數根據提供的參數獲取配置並解析配置到viper中。

基本上來講就是咱們發起一個帶有appName, profile, git branch參數的HTTP GET請求到配置服務器, 而後解碼響應JSON到在同一文件中聲明的springCloudConfig結構體中。最後咱們簡單迭代cloudConfig.PropertySources[0]的全部key-value對, 並將它們分別放入viper, 這樣咱們能夠隨處均可以使用viper.GetString(key)或其餘的Viper提供的其餘Get方法來獲取它們。

注意,若是咱們鏈接配置服務器或解析響應發生錯誤的話,就會panic()整個微服務, 這樣就會kill掉它。Docker Swarm將檢測這個並嘗試在數秒以內部署一個新的實例。 擁有這樣行爲的典型緣由在於集羣冷啓動的時候,基於Go的微服務要比基於Sping Boot的配置服務器啓動要快得多。讓Swarm嘗試幾回,事情會本身解決掉的。

咱們吧實際工做分割到一個公共函數和一些包級別的函數單元,主要是便於單元測試。 單元測試檢查,以便咱們能將JSON轉換爲實際的viper屬性,看起來想GoConvey樣式的測試:

func TestParseConfiguration(t *testing.T) {
    Convey("Given a JSON configuration response body", t, func() {
        var body = `{"name":"accountservice-dev","profiles":["dev"],"label":null,"version":null,"propertySources":[{"name":"file:/config-repo/accountservice-dev.yml","source":{"server_port":6767"}}]}`

        Convey("When parsed", func() {
            parseConfiguration([]byte(body))

            Convey("Then Viper should have been populated with values from Source", func() {
                So(viper.GetString("server_port"), ShouldEqual, "6767")
            })
        })
    })
}

而後在goblog/accountservice目錄運行測試: go test ./...

更新Dockerfile

鑑於咱們是從外部源加載配置,咱們的服務須要一個查找的線索。 這能夠在容器和服務啓動的時候,經過使用flag做爲命令行參數來執行。

FROM iron/base
EXPOSE 6767

ADD accountservice-linux-amd64 /
ADD healthchecker-linux-amd64 /

HEALTHCHECK --interval=3s --timeout=3s CMD ["./healthchecker-linux-amd64", "-port=6767"] || exit 1
ENTRYPOINT ["./accountservice-linux-amd64", "-configServerUrl=http://configserver:8888", "-profile=test", "-configBranch=P8"]

ENTRYPOINT如今提供了一些值,使得它能夠到達配置,這樣能夠加載配置。

放入Swarm

你可能已經注意到咱們再也不使用6767端口號做爲端口號的硬編碼了, 也就是:

service.StartWebServer(viper.GetString("server_port"))

使用copyall.sh腳本從新構建並部署更新後的accountservice到Docker Swarm中。

全部事情都完成的時候,服務依然如本博客系列那樣運行,例外的是它其實是從外部和集中化配置服務器拿的端口號,而不是硬編碼到編譯二進制文件的端口號。

咱們能夠看看咱們的accountservice的日誌:

docker logs -f [containerid]
Starting accountservice
Loading config from http://configserver:8888/accountservice/test/P8
Loading config property the_password => password
Loading config property server_port => 6767
Loading config property server_name => Accountservice TEST
Successfully loaded configuration for service Accountservice TEST

這裏咱們又get新技能了,使用docker logs能夠查看具體容器的日誌:

Usage:    docker logs [OPTIONS] CONTAINER

Fetch the logs of a container

Options:
      --details        Show extra details provided to logs
  -f, --follow         Follow log output
      --since string   Show logs since timestamp (e.g. 2013-01-02T13:23:37) or relative (e.g. 42m for 42 minutes)
      --tail string    Number of lines to show from the end of the logs (default "all")
  -t, --timestamps     Show timestamps
      --until string   Show logs before a timestamp (e.g. 2013-01-02T13:23:37) or relative (e.g. 42m for 42 minutes)

docker logs支持查詢某個時間點先後的日誌。

實際上打印配置值是錯誤的作法,這裏咱們只是出於學習目的做出的輸出。這裏咱們使用logrus來打印日誌。

實時配置更新

1. 哦,咱們用於某種目的的外部服務器的URL是否改變了呢?
2. 該死,怎麼沒有人告訴我!

假設咱們不少人都遇到下面狀況, 咱們須要重建整個應用或至少重啓來更新一些無效或改變的配置值。Spring Cloud具備刷新域的概念,其中bean能夠實時更新,使用配置修改經過git commit hook傳播。

下圖提供了一個如何推送到git倉庫,傳播到咱們Go微服務的概覽:

clipboard.png

在本文中,咱們使用的是github倉庫,它徹底不知道如何執行post-commit hook操做到個人筆記本的Spring Cloud Server, 所以咱們將模擬一個提交掛鉤使用Spring Cloud服務器的內置/監控端點來推送。

curl -H "X-Github-Event: push" -H "Content-Type: application/json" -X POST -d '{"commits": [{"modified": ["accountservice.yml"]}],"name":"some name..."}' -ki http://$ManagerIP:8888/monitor

Spring Cloud服務器將知道使用這個POST作什麼,並在RabbitMQ(由Spring Cloud Bus抽象出來的)的交換上發送一個RefreshRemoteApplicationEvent。若是在成功引導了Spring Cloud Config以後,看看RabbitMQ的管理界面,應該建立了exchange。

clipboard.png

exchange和傳統的消息控制例如publisher, consumer, queue的區別是什麼?

Publisher -> Exchange -> (Routing) -> Queue -> Consumer

也就是消息被髮布到exchange, 而後基於路由規則和可能註冊了消費者的捆綁將消息副本分佈到queue。

所以爲了消費RefreshRemoteApplicationEvent消息(我更喜歡調用它們的refresh tokens), 全部咱們須要作的是確保咱們的Go服務在springCloudBus exchange上監聽這樣的消息, 若是咱們的目標應用執行了配置重載。 下面咱們來實現它。

Go語言中使用AMQP協議來消費消息

RabbitMQ中間人能夠經過使用AMQP協議來訪問。咱們將使用一個叫作streadway/amqp的Go版本的AMQP客戶端。 大部分AMQP/RabbitMQ管道代碼都應該使用一些可複用工具,可能咱們稍後會重構它。 基於這個例子的管道代碼是來自streadway/amqp倉庫的:

// This example declares a durable Exchange, an ephemeral (auto-delete) Queue,
// binds the Queue to the Exchange with a binding key, and consumes every
// message published to that Exchange with that routing key.
//
package main

import (
    "flag"
    "fmt"
    "github.com/streadway/amqp"
    "log"
    "time"
)

var (
    uri          = flag.String("uri", "amqp://guest:guest@localhost:5672/", "AMQP URI")
    exchange     = flag.String("exchange", "test-exchange", "Durable, non-auto-deleted AMQP exchange name")
    exchangeType = flag.String("exchange-type", "direct", "Exchange type - direct|fanout|topic|x-custom")
    queue        = flag.String("queue", "test-queue", "Ephemeral AMQP queue name")
    bindingKey   = flag.String("key", "test-key", "AMQP binding key")
    consumerTag  = flag.String("consumer-tag", "simple-consumer", "AMQP consumer tag (should not be blank)")
    lifetime     = flag.Duration("lifetime", 5*time.Second, "lifetime of process before shutdown (0s=infinite)")
)

func init() {
    flag.Parse()
}

func main() {
    c, err := NewConsumer(*uri, *exchange, *exchangeType, *queue, *bindingKey, *consumerTag)
    if err != nil {
        log.Fatalf("%s", err)
    }

    if *lifetime > 0 {
        log.Printf("running for %s", *lifetime)
        time.Sleep(*lifetime)
    } else {
        log.Printf("running forever")
        select {}
    }

    log.Printf("shutting down")

    if err := c.Shutdown(); err != nil {
        log.Fatalf("error during shutdown: %s", err)
    }
}

type Consumer struct {
    conn    *amqp.Connection
    channel *amqp.Channel
    tag     string
    done    chan error
}

func NewConsumer(amqpURI, exchange, exchangeType, queueName, key, ctag string) (*Consumer, error) {
    c := &Consumer{
        conn:    nil,
        channel: nil,
        tag:     ctag,
        done:    make(chan error),
    }

    var err error

    log.Printf("dialing %q", amqpURI)
    c.conn, err = amqp.Dial(amqpURI)
    if err != nil {
        return nil, fmt.Errorf("Dial: %s", err)
    }

    go func() {
        fmt.Printf("closing: %s", <-c.conn.NotifyClose(make(chan *amqp.Error)))
    }()

    log.Printf("got Connection, getting Channel")
    c.channel, err = c.conn.Channel()
    if err != nil {
        return nil, fmt.Errorf("Channel: %s", err)
    }

    log.Printf("got Channel, declaring Exchange (%q)", exchange)
    if err = c.channel.ExchangeDeclare(
        exchange,     // name of the exchange
        exchangeType, // type
        true,         // durable
        false,        // delete when complete
        false,        // internal
        false,        // noWait
        nil,          // arguments
    ); err != nil {
        return nil, fmt.Errorf("Exchange Declare: %s", err)
    }

    log.Printf("declared Exchange, declaring Queue %q", queueName)
    queue, err := c.channel.QueueDeclare(
        queueName, // name of the queue
        true,      // durable
        false,     // delete when unused
        false,     // exclusive
        false,     // noWait
        nil,       // arguments
    )
    if err != nil {
        return nil, fmt.Errorf("Queue Declare: %s", err)
    }

    log.Printf("declared Queue (%q %d messages, %d consumers), binding to Exchange (key %q)",
        queue.Name, queue.Messages, queue.Consumers, key)

    if err = c.channel.QueueBind(
        queue.Name, // name of the queue
        key,        // bindingKey
        exchange,   // sourceExchange
        false,      // noWait
        nil,        // arguments
    ); err != nil {
        return nil, fmt.Errorf("Queue Bind: %s", err)
    }

    log.Printf("Queue bound to Exchange, starting Consume (consumer tag %q)", c.tag)
    deliveries, err := c.channel.Consume(
        queue.Name, // name
        c.tag,      // consumerTag,
        false,      // noAck
        false,      // exclusive
        false,      // noLocal
        false,      // noWait
        nil,        // arguments
    )
    if err != nil {
        return nil, fmt.Errorf("Queue Consume: %s", err)
    }

    go handle(deliveries, c.done)

    return c, nil
}

func (c *Consumer) Shutdown() error {
    // will close() the deliveries channel
    if err := c.channel.Cancel(c.tag, true); err != nil {
        return fmt.Errorf("Consumer cancel failed: %s", err)
    }

    if err := c.conn.Close(); err != nil {
        return fmt.Errorf("AMQP connection close error: %s", err)
    }

    defer log.Printf("AMQP shutdown OK")

    // wait for handle() to exit
    return <-c.done
}

func handle(deliveries <-chan amqp.Delivery, done chan error) {
    for d := range deliveries {
        log.Printf(
            "got %dB delivery: [%v] %q",
            len(d.Body),
            d.DeliveryTag,
            d.Body,
        )
        d.Ack(false)
    }
    log.Printf("handle: deliveries channel closed")
    done <- nil
}

載goblog/accountservice/main.go main函數中添加新行, 爲咱們啓動一個AMQP消費者:

func main() {
    fmt.Printf("Starting %v\n", appName)

    config.LoadConfigurationFromBranch(
            viper.GetString("configServerUrl"),
            appName,
            viper.GetString("profile"),
            viper.GetString("configBranch"))
    initializeBoltClient()
    
    // NEW
    go config.StartListener(appName, viper.GetString("amqp_server_url"), viper.GetString("config_event_bus"))   
    service.StartWebServer(viper.GetString("server_port"))
}

注意上面的StartListener的兩個參數服務器url和事件bus兩個屬性,它們是在下面的文件中定義的:

server_port: 6767
server_name: Accountservice TEST
the_password: '{cipher}AQB1BMFC....'
amqp_server_url: amqp://guest:guest@rabbitmq:5672/
config_event_bus: springCloudBus
func StartListener(appName string, amqpServer string, exchangeName string) {
    err := NewConsumer(amqpServer, exchangeName, "topic", "config-event-queue", exchangeName, appName)
    if err != nil {
        log.Fatalf("%s", err)
    }

    log.Printf("running forever")
    select {}   // Yet another way to stop a Goroutine from finishing...
}

NewConsumer是樣板代碼的實際位置,這裏先忽略過它,直接看看實際處理進來請求的代碼:

func handleRefreshEvent(body []byte, consumerTag string) {
    updateToken := &UpdateToken{}
    err := json.Unmarshal(body, updateToken)
    if err != nil {
        log.Printf("Problem parsing UpdateToken: %v", err.Error())
    } else {
        if strings.Contains(updateToken.DestinationService, consumerTag) {
            log.Println("Reloading Viper config from Spring Cloud Config server")

            // Consumertag is same as application name.
            LoadConfigurationFromBranch(
                viper.GetString("configServerUrl"),
                consumerTag,
                viper.GetString("profile"),
                viper.GetString("configBranch"))
        }
    }
}

// {"type":"RefreshRemoteApplicationEvent","timestamp":1494514362123,"originService":"config-server:docker:8888","destinationService":"xxxaccoun:**","id":"53e61c71-cbae-4b6d-84bb-d0dcc0aeb4dc"}
type UpdateToken struct {
    Type string `json:"type"`
    Timestamp int `json:"timestamp"`
    OriginService string `json:"originService"`
    DestinationService string `json:"destinationService"`
    Id string `json:"id"`
}

這個代碼嘗試解析到達的消息爲UpdateToken結構體,而且若是destinationService匹配咱們的consumerTag(也就是 appName accountservice), 咱們就調用一樣的最初服務啓動時調用的LoadConfigurationFromBranch函數。

請注意在實際場景中,NewConsumer函數和通常的消息處理代碼將須要更多的錯誤處理、確保只處理恰當的消息等等工做。

單元測試

讓咱們爲handleRefreshEvent()函數寫一個單元測試。 建立一個新的測試文件:

var SERVICE_NAME = "accountservice"

func TestHandleRefreshEvent(t *testing.T) {
    // Configure initial viper values
    viper.Set("configServerUrl", "http://configserver:8888")
    viper.Set("profile", "test")
    viper.Set("configBranch", "master")

    // Mock the expected outgoing request for new config
    defer gock.Off()
    gock.New("http://configserver:8888").
        Get("/accountservice/test/master").
        Reply(200).
        BodyString(`{"name":"accountservice-test","profiles":["test"],"label":null,"version":null,"propertySources":[{"name":"file:/config-repo/accountservice-test.yml","source":{"server_port":6767,"server_name":"Accountservice RELOADED"}}]}`)

Convey("Given a refresh event received, targeting our application", t, func() {
        var body = `{"type":"RefreshRemoteApplicationEvent","timestamp":1494514362123,"originService":"config-server:docker:8888","destinationService":"accountservice:**","id":"53e61c71-cbae-4b6d-84bb-d0dcc0aeb4dc"}
`
        Convey("When handled", func() {
            handleRefreshEvent([]byte(body), SERVICE_NAME)

            Convey("Then Viper should have been re-populated with values from Source", func() {
                So(viper.GetString("server_name"), ShouldEqual, "Accountservice RELOADED")
            })
        })
    })
}

我但願BDD樣式的GoConvey傳達(雙關語!)測試如何工做。 注意咱們如何使用gock來攔截對外的請求新配置的HTTP請求,以及咱們預先產生的帶有一些初始值的viper。

運行它

是時候測試了。 從新使用copyall.sh腳本部署服務。

檢查accountservice的日誌:

> docker logs -f [containerid]
Starting accountservice
... [truncated for brevity] ...
Successfully loaded configuration for service Accountservice TEST    <-- LOOK HERE!!!!
... [truncated for brevity] ...
2017/05/12 12:06:36 dialing amqp://guest:guest@rabbitmq:5672/
2017/05/12 12:06:36 got Connection, getting Channel
2017/05/12 12:06:36 got Channel, declaring Exchange (springCloudBus)
2017/05/12 12:06:36 declared Exchange, declaring Queue (config-event-queue)
2017/05/12 12:06:36 declared Queue (0 messages, 0 consumers), binding to Exchange (key 'springCloudBus')
2017/05/12 12:06:36 Queue bound to Exchange, starting Consume (consumer tag 'accountservice')
2017/05/12 12:06:36 running forever

如今對accountservice-test.yml和service name進行修改,並使用前面展現的使用monitor API POST來僞造一個提交hook:

我修改了accountservice-test.yml文件和它的service name屬性,從accountservice TEST到Temporary test string, 而後推送改變。

接着,咱們使用curl來讓咱們的Spring Cloud Config服務器知道這些更新:

> curl -H "X-Github-Event: push" -H "Content-Type: application/json" -X POST -d '{"commits": [{"modified": ["accountservice.yml"]}],"name":"what is this?"}' -ki http://192.168.99.100:8888/monitor

若是全部都正常工做,就會觸發一個refresh token從Config服務器,咱們的accountservice就會撿起它. 再次檢查下log:

> docker logs -f [containerid]
2017/05/12 12:13:22 got 195B consumer: [accountservice] delivery: [1] routingkey: [springCloudBus] {"type":"RefreshRemoteApplicationEvent","timestamp":1494591202057,"originService":"config-server:docker:8888","destinationService":"accountservice:**","id":"1f421f58-cdd6-44c8-b5c4-fbf1e2839baa"}
2017/05/12 12:13:22 Reloading Viper config from Spring Cloud Config server
Loading config from http://configserver:8888/accountservice/test/P8
Loading config property server_port => 6767
Loading config property server_name => Temporary test string!
Loading config property amqp_server_url => amqp://guest:guest@rabbitmq:5672/
Loading config property config_event_bus => springCloudBus
Loading config property the_password => password
Successfully loaded configuration for service Temporary test string!      <-- LOOK HERE!!!!

正如你所見的,最後一行打印了"Successfully loaded configuration for service Temporary test string!", 源代碼以下:

if viper.IsSet("server_name") {
    fmt.Printf("Successfully loaded configuration for service %s\n", viper.GetString("server_name"))
}

也就是說,咱們已經動態修改了的以前存儲在Viper中的屬性值, 而沒告訴咱們的服務!這是真正的酷!

重要提示: 雖然動態更新屬性是很是酷的,可是它自己不會更新這些東西,好比咱們運行服務器的端口,池中已存在鏈接對象, 或RabbitMQ中間人的活動鏈接。 這些類型的已運行東西須要花費一些時間來使用新的配置來重啓, 這些內容超出了本文的範圍。

Footprint及性能

在啓動時添加配置加載不該該影響運行時性能, 事實上它確實不影響。每秒1千個請求和以前具備一樣的吞吐,CPU和內存使用。相信個人話或者你本身試試。咱們將在第一次啓動後快速查看內存使用狀況:

CONTAINER                                    CPU %               MEM USAGE / LIMIT     MEM %               NET I/O             BLOCK I/O           PIDS
accountservice.1.pi7wt0wmh2quwm8kcw4e82ay4   0.02%               4.102MiB / 1.955GiB   0.20%               18.8kB / 16.5kB     0B / 1.92MB         6
configserver.1.3joav3m6we6oimg28879gii79     0.13%               568.7MiB / 1.955GiB   28.41%              171kB / 130kB       72.9MB / 225kB      50
rabbitmq.1.kfmtsqp5fnw576btraq19qel9         0.19%               125.5MiB / 1.955GiB   6.27%               6.2MB / 5.18MB      31MB / 414kB        75
quotes-service.1.q81deqxl50n3xmj0gw29mp7jy   0.05%               340.1MiB / 1.955GiB   16.99%              2.97kB / 0B         48.1MB / 0B         30

甚至和AMQP、Viper做爲配置框架的集成,咱們最初運行信息大概4MB左右。咱們的Spring Boot實現的配置服務器使用了超過500MB的內存,而RabbitMQ(我認爲是用Erlang寫的)使用125MB。

我能夠確定的是,咱們可使用一些標準的JVM -xmx參數可讓配置服務器的尺寸降低到256MB初始化堆尺寸,可是它絕對是須要大量RAM的。然而,在生產環境中我但願運行2個配置服務器,而非幾十個或幾百個。 當談及支持服務,從Spring Cloud生態,內存使用並非什麼大事,由於咱們一般不會有這種服務的多餘一個或幾個實例。

備忘

// 查找須要加入的swarm的token, 須要在Leader中查詢。
docker swarm join-token -q worker

// 以worker節點的形式加入Swarm
docker swarm join --token tokenstring worker ip:port
docker swarm join --token tokenstring manager ip:port

// ssh到具體的機器
docker-machine ssh docker-name # swarm-manager-1

總結

在本文中咱們部署了一個Spring Cloud配置服務器,和它的RabbitMQ依賴到咱們的Swarm中。而後咱們寫了一些Go代碼,使用一些簡單的HTTP, JSON和Viper框架從配置服務器啓動時加載配置並填充到Viper中,方便咱們整個微服務代碼方便使用。

在下一節中,咱們會繼續探索AMQP和RabbitMQ, 深刻更多細節,看看咱們本身如何發送一些消息。

參考鏈接

相關文章
相關標籤/搜索