在微服務架構下,原單體服務被拆分爲多個微服務獨立部署,客戶端就沒法知曉服務的具體位置;並且服務數量太多,維護如此多的服務地址,運維人員也沒法高效工做。git
所以,在微服務架構中引入了服務註冊中心,用於接受和維護各個服務的地址信息。客戶端或者網關能夠經過註冊中心查詢目標服務地址,動態實現服務訪問,而且在此實現服務負載均衡。github
對於服務註冊與發現,go-kit默認提供了對consul、zookeeper、etcd、eureka經常使用註冊中心的支持。docker
本文將基於consul,使用「客戶端發現模式」進行實戰演練,主要有如下要點:json
本文實例程序採用的思路爲:算術服務註冊至consul,其餘部分保持不變;發現服務對外暴露http接口,接受請求後(接收請求內容存儲在Body中,以json方式傳遞),按照go-kit的機制動態查詢算術服務實例,調用算術服務的接口,而後將響應內容返回。以下圖所示:bootstrap
docker/docker-compose.yml
,以下所示(暫時註釋了Prometheus和Grafana的部分)。version: '2'
services:
consul:
image: progrium/consul:latest
ports:
- 8400:8400
- 8500:8500
- 8600:53/udp
hostname: consulserver
command: -server -bootstrap -ui-dir /ui
複製代碼
sudo docker-compose -f docker/docker-compose.yml up
複製代碼
http://localhost:8500
,出現如下界面即爲啓動成功。本示例基於arithmetic_monitor_demo
代碼進行改寫。首先,複製該目錄並重命名爲arithmetic_consul_demo
;新建兩個目錄,分別命名爲register
、discover
;將原有go
代碼文件移動至register
目錄。結果以下圖所示:api
另外,須要下載所依賴的第三方庫uuid
和hashicorp/consul
瀏覽器
go get github.com/pborman/uuid
go get github.com/hashicorp/consul
複製代碼
新建register/register.go
,添加Register
方法,實現向consul的註冊邏輯。該方法接收5個參數,分別是註冊中心consul的ip、端口,算術服務的本地ip和端口,日誌記錄工具。bash
建立註冊對象須要使用hashicorp/consul
,查看代碼可知其方法定義以下:微信
func NewRegistrar(client Client, r *stdconsul.AgentServiceRegistration, logger log.Logger) *Registrar
複製代碼
因此Register
的實現過程主要有三步:建立consul客戶端對象;建立consul對算術服務健康檢查的參數配置信息;建立算術服務向consul註冊的服務配置信息。代碼以下:架構
func Register(consulHost, consulPort, svcHost, svcPort string, logger log.Logger) (registar sd.Registrar) {
// 建立Consul客戶端鏈接
var client consul.Client
{
consulCfg := api.DefaultConfig()
consulCfg.Address = consulHost + ":" + consulPort
consulClient, err := api.NewClient(consulCfg)
if err != nil {
logger.Log("create consul client error:", err)
os.Exit(1)
}
client = consul.NewClient(consulClient)
}
// 設置Consul對服務健康檢查的參數
check := api.AgentServiceCheck{
HTTP: "http://" + svcHost + ":" + svcPort + "/health",
Interval: "10s",
Timeout: "1s",
Notes: "Consul check service health status.",
}
port, _ := strconv.Atoi(svcPort)
//設置微服務想Consul的註冊信息
reg := api.AgentServiceRegistration{
ID: "arithmetic" + uuid.New(),
Name: "arithmetic",
Address: svcHost,
Port: port,
Tags: []string{"arithmetic", "raysonxin"},
Check: &check,
}
// 執行註冊
registar = consul.NewRegistrar(client, ®, logger)
return
}
複製代碼
由Step-2
可知,consul將定時請求算術服務的/heath
用於檢查服務的健康狀態,因此咱們將從service
、endpoint
、transport
中增長對應的實現。
Service
中新增接口方法HealthCheck
,並依次在ArithmeticService
、loggingMiddleware
、metricMiddleware
中添加實現。// service接口
// Service Define a service interface
type Service interface {
//省略以前的其餘方法
// HealthCheck check service health status
HealthCheck() bool
}
// ArithmeticService實現HealthCheck
// HealthCheck implement Service method
// 用於檢查服務的健康狀態,這裏僅僅返回true。
func (s ArithmeticService) HealthCheck() bool {
return true
}
// loggingMiddleware實現HealthCheck
func (mw loggingMiddleware) HealthCheck() (result bool) {
defer func(begin time.Time) {
mw.logger.Log(
"function", "HealthChcek",
"result", result,
"took", time.Since(begin),
)
}(time.Now())
result = mw.Service.HealthCheck()
return
}
// metricMiddleware實現HealthCheck
func (mw metricMiddleware) HealthCheck() (result bool) {
defer func(begin time.Time) {
lvs := []string{"method", "HealthCheck"}
mw.requestCount.With(lvs...).Add(1)
mw.requestLatency.With(lvs...).Observe(time.Since(begin).Seconds())
}(time.Now())
result = mw.Service.HealthCheck()
return
}
複製代碼
endpoints.go
中新增結構:ArithmeticEndpoints
。在以前的示例中,僅使用了一個endpoint,因此我直接使用告終構endpoint.Endpoint
。定義以下:// ArithmeticEndpoint define endpoint
type ArithmeticEndpoints struct {
ArithmeticEndpoint endpoint.Endpoint
HealthCheckEndpoint endpoint.Endpoint
}
複製代碼
endpoint.Endpoint
封裝方法。代碼以下:// HealthRequest 健康檢查請求結構
type HealthRequest struct{}
// HealthResponse 健康檢查響應結構
type HealthResponse struct {
Status bool `json:"status"`
}
// MakeHealthCheckEndpoint 建立健康檢查Endpoint
func MakeHealthCheckEndpoint(svc Service) endpoint.Endpoint {
return func(ctx context.Context, request interface{}) (response interface{}, err error) {
status := svc.HealthCheck()
return HealthResponse{status}, nil
}
}
複製代碼
transports.go
中新增健康檢查接口/health
。// MakeHttpHandler make http handler use mux
func MakeHttpHandler(ctx context.Context, endpoints ArithmeticEndpoints, logger log.Logger) http.Handler {
r := mux.NewRouter()
//省略原有/calculate/{type}/{a}/{b}代碼
// create health check handler
r.Methods("GET").Path("/health").Handler(kithttp.NewServer(
endpoints.HealthCheckEndpoint,
decodeHealthCheckRequest,
encodeArithmeticResponse,
options...,
))
return r
}
複製代碼
接下來在main.go
中增長健康檢查和服務註冊相關的調用代碼,以便上述修改邏輯生效。
//建立健康檢查的Endpoint,未增長限流
healthEndpoint := MakeHealthCheckEndpoint(svc)
//把算術運算Endpoint和健康檢查Endpoint封裝至ArithmeticEndpoints
endpts := ArithmeticEndpoints{
ArithmeticEndpoint: endpoint,
HealthCheckEndpoint: healthEndpoint,
}
//建立http.Handler
r := MakeHttpHandler(ctx, endpts, logger)
複製代碼
// 定義環境變量
var (
consulHost = flag.String("consul.host", "", "consul ip address")
consulPort = flag.String("consul.port", "", "consul port")
serviceHost = flag.String("service.host", "", "service ip address")
servicePort = flag.String("service.port", "", "service port")
)
// parse
flag.Parse()
// ...
//建立註冊對象
registar := Register(*consulHost, *consulPort, *serviceHost, *servicePort, logger)
go func() {
fmt.Println("Http Server start at port:" + *servicePort)
//啓動前執行註冊
registar.Register()
handler := r
errChan <- http.ListenAndServe(":"+*servicePort, handler)
}()
go func() {
c := make(chan os.Signal, 1)
signal.Notify(c, syscall.SIGINT, syscall.SIGTERM)
errChan <- fmt.Errorf("%s", <-c)
}()
error := <-errChan
//服務退出,取消註冊
registar.Deregister()
fmt.Println(error)
複製代碼
打開終端,切換至項目目錄。執行go build ./register
編譯成功後,輸入如下指令啓動算術服務(註冊服務):
./register -consul.host localhost -consul.port 8500 -service.host 192.168.192.145 -service.port 9000
複製代碼
啓動成功後,再次刷新consul-ui
界面,看到以下界面即說明算術服務成功註冊至consul。
同時也能夠在註冊服務運行的終端看到consul定時調用/health
接口的日誌輸出信息:
discover
服務要完成的工做爲:以REST接口/calculate
對外提供API服務,客戶端使用HTTP POST方法發送json數據執行請求;在endpoint中查詢已經在consul中註冊的服務實例;而後選擇合適的服務實例向其發起請求轉發;完成請求後向原客戶端請求響應。
查閱go-kit源碼可知,kit/sd/Endpointer
提供了一套服務發現機制,其定義和建立接口以下所示:
// Endpointer listens to a service discovery system and yields a set of
// identical endpoints on demand. An error indicates a problem with connectivity
// to the service discovery system, or within the system itself; an Endpointer
// may yield no endpoints without error.
type Endpointer interface {
Endpoints() ([]endpoint.Endpoint, error)
}
// NewEndpointer creates an Endpointer that subscribes to updates from Instancer src
// and uses factory f to create Endpoints. If src notifies of an error, the Endpointer
// keeps returning previously created Endpoints assuming they are still good, unless
// this behavior is disabled via InvalidateOnError option.
func NewEndpointer(src Instancer, f Factory, logger log.Logger, options ...EndpointerOption) *DefaultEndpointer
複製代碼
經過代碼註釋咱們能夠知道: Endpointer經過監聽服務發現系統的事件信息,而且經過factory按需建立服務終結點(Endpoint
)。
因此,咱們須要經過Endpointer
來實現服務發現功能。在微服務模式下,同一個服務可能存在多個實例,因此須要經過負載均衡機制完成實例選擇,這裏使用go-kit工具集中的kit/sd/lb
組件(該組件實現RoundRibbon,並具有Retry功能)。
在discover
目錄中建立go文件factory.go
,實現sd.Factory
的邏輯,即把服務實例轉換爲endpoint,在該endpoint中實現對於目標服務的調用過程。這裏直接針對算術運算服務進行封裝,代碼以下所示:
func arithmeticFactory(_ context.Context, method, path string) sd.Factory {
return func(instance string) (endpoint endpoint.Endpoint, closer io.Closer, err error) {
if !strings.HasPrefix(instance, "http") {
instance = "http://" + instance
}
tgt, err := url.Parse(instance)
if err != nil {
return nil, nil, err
}
tgt.Path = path
var (
enc kithttp.EncodeRequestFunc
dec kithttp.DecodeResponseFunc
)
enc, dec = encodeArithmeticRequest, decodeArithmeticReponse
return kithttp.NewClient(method, tgt, enc, dec).Endpoint(), nil, nil
}
}
func encodeArithmeticRequest(_ context.Context, req *http.Request, request interface{}) error {
arithReq := request.(ArithmeticRequest)
p := "/" + arithReq.RequestType + "/" + strconv.Itoa(arithReq.A) + "/" + strconv.Itoa(arithReq.B)
req.URL.Path += p
return nil
}
func decodeArithmeticReponse(_ context.Context, resp *http.Response) (interface{}, error) {
var response ArithmeticResponse
var s map[string]interface{}
if respCode := resp.StatusCode; respCode >= 400 {
if err := json.NewDecoder(resp.Body).Decode(&s); err != nil {
return nil, err
}
return nil, errors.New(s["error"].(string) + "\n")
}
if err := json.NewDecoder(resp.Body).Decode(&response); err != nil {
return nil, err
}
return response, nil
}
複製代碼
建立go文件discover/enpoints.go
。根據上述分析,在該endpoint實現對服務發現系統的監聽,實現實例選擇,最終返回可執行的endpoint.Endpoint
。下面根據代碼註釋說明實現過程:
// MakeDiscoverEndpoint 使用consul.Client建立服務發現Endpoint
// 爲了方便這裏默認了一些參數
func MakeDiscoverEndpoint(ctx context.Context, client consul.Client, logger log.Logger) endpoint.Endpoint {
serviceName := "arithmetic"
tags := []string{"arithmetic", "raysonxin"}
passingOnly := true
duration := 500 * time.Millisecond
//基於consul客戶端、服務名稱、服務標籤等信息,
// 建立consul的鏈接實例,
// 可實時查詢服務實例的狀態信息
instancer := consul.NewInstancer(client, logger, serviceName, tags, passingOnly)
//針對calculate接口建立sd.Factory
factory := arithmeticFactory(ctx, "POST", "calculate")
//使用consul鏈接實例(發現服務系統)、factory建立sd.Factory
endpointer := sd.NewEndpointer(instancer, factory, logger)
//建立RoundRibbon負載均衡器
balancer := lb.NewRoundRobin(endpointer)
//爲負載均衡器增長重試功能,同時該對象爲endpoint.Endpoint
retry := lb.Retry(1, duration, balancer)
return retry
}
複製代碼
建立go文件discover/transports.go
。經過mux/Router
使用POST方法爲發現服務開放REST接口/calculate
,與算術服務同樣,這裏須要endpoint.Endpoint
、DecodeRequestFunc
、EncodeResponseFunc
。爲了方便,我把算術服務中的請求與響應結構和編解碼方法直接複製過來。代碼以下所示:
func MakeHttpHandler(endpoint endpoint.Endpoint) http.Handler {
r := mux.NewRouter()
r.Methods("POST").Path("/calculate").Handler(kithttp.NewServer(
endpoint,
decodeDiscoverRequest,
encodeDiscoverResponse,
))
return r
}
// 省略實體結構和編解碼方法
複製代碼
接下來就是在main方法把以上邏輯串起來,而後啓動發現服務了,這裏監聽端口爲9001。比較簡單,直接貼代碼了:
func main() {
// 建立環境變量
var (
consulHost = flag.String("consul.host", "", "consul server ip address")
consulPort = flag.String("consul.port", "", "consul server port")
)
flag.Parse()
//建立日誌組件
var logger log.Logger
{
logger = log.NewLogfmtLogger(os.Stderr)
logger = log.With(logger, "ts", log.DefaultTimestampUTC)
logger = log.With(logger, "caller", log.DefaultCaller)
}
//建立consul客戶端對象
var client consul.Client
{
consulConfig := api.DefaultConfig()
consulConfig.Address = "http://" + *consulHost + ":" + *consulPort
consulClient, err := api.NewClient(consulConfig)
if err != nil {
logger.Log("err", err)
os.Exit(1)
}
client = consul.NewClient(consulClient)
}
ctx := context.Background()
//建立Endpoint
discoverEndpoint := MakeDiscoverEndpoint(ctx, client, logger)
//建立傳輸層
r := MakeHttpHandler(discoverEndpoint)
errc := make(chan error)
go func() {
c := make(chan os.Signal)
signal.Notify(c, syscall.SIGINT, syscall.SIGTERM)
errc <- fmt.Errorf("%s", <-c)
}()
//開始監聽
go func() {
logger.Log("transport", "HTTP", "addr", "9001")
errc <- http.ListenAndServe(":9001", r)
}()
// 開始運行,等待結束
logger.Log("exit", <-errc)
}
複製代碼
在終端中切換至discover
目錄,執行go build
完成編譯,而後使用如下命令(指定註冊中心服務地址)啓動發現服務:
./discover -consul.host localhost -consul.port 8500
複製代碼
使用postman請求http://localhost:9001/calculate
,在body
中設置請求信息,完成測試。以下圖所示:
本文使用consul做爲註冊中心,經過實例演示了go-kit的服務註冊與發現功能。因爲本人在這個部分了解不夠透徹,在編寫代碼和本文的過程當中,一直在研究go-kit發現組件的設計方式,力求可以經過代碼、文字解釋清楚。本人水平有限,有任何錯誤或不妥之處,請你們批評指正。
本文實例代碼見arithmetic_consul_demo。
本文首發於本人微信公衆號【兮一昂吧】,歡迎掃碼關注!