Golang微服務開發實踐

github: github.com/yun-mu/Micr…html

微服務概念學習:可參考 Nginx 的微服務文章前端

微服務最佳實踐:可參考 微服務最佳實踐node

demo 簡介

服務:nginx

  • consignment-service(貨運服務)
  • user-service(用戶服務)
  • log-service (日誌服務)
  • vessel-service(貨船服務)
  • api-service (API 服務)

用到的技術棧以下:git

framework: go-micro, gin
Transport: tcp
Server: rpc
Client: rpc
RegisterTTL: 30s
RegisterInterval: 20s
Registry: consul, 服務發現和註冊
Broker: kafka, 消息隊列
Selector: cache, 負載均衡
Codec: protobuf, 編碼
Tracing: jaeger, 鏈路追蹤
Metrics: jaeger
breaker: hystrix, 熔斷
ratelimit: uber/ratelimit, 限流
複製代碼

服務關係圖

project

實體關係圖

image-20180512010554833

服務流程示例

image-20180522174448548

認證

採用 JWTgithub

image-20180528105549866

發佈訂閱模式

未命名文件

demo 運行

前提工具:go, dep, docker, docker-compose, mongogolang

首先初始化:make initweb

Makefile 部分代碼以下:算法

init:
    cd ..
	mv MicroServicePractice ${GOPATH}/src/Ethan/
	./pull.sh # 安裝 go 依賴
	cd plugins
	docker-compose -f docker-compose.yml up -d # 安裝插件,如:kafka, consul, zookeeper, jaeger
複製代碼

以後就能夠運行代碼了:docker

注:建議本身開多個終端 go run ,這樣能夠看日誌

make run # 容許 服務 server
複製代碼

測試:

注:注意順序,剛開始啥數據都沒有的

go run user-cli/cli.go
export Token=$Token # 注意換成前面生成的Token
go run vessel-cli/cli.go
go run consignment-cli/cli.go
複製代碼

peek

consul

jaeger

mongo2

mongo1

開發詳解

proto 代碼生成

安裝工具:

protoc 安裝:google.github.io/proto-lens/…

protoc-gen-goprotoc-gen-micro

go get -u -v google.golang.org/grpc				
go get -u -v github.com/golang/protobuf/protoc-gen-go
go get -u -v github.com/micro/protoc-gen-micro
複製代碼

生成的腳本我已經寫好 Makefile, 進入 interface-center 目錄,執行make build 便可

內部示例以下:

protoc --proto_path=proto:. --go_out=plugins=micro:out/ proto/vessel/vessel.proto
複製代碼

這裏使用 micro 插件,若想和不使用插件對比,可以使用以下命令:

protoc --proto_path=proto:. --go_out=out/ --micro_out=out/ proto/vessel/vessel.proto
複製代碼

這樣會生成兩個文件,一個爲 .micro.go 一個爲 .pb.go

這裏順便看一下 生成的 pb 文件裏是如何進行 rpc 調用的,咱們隨便看一個 方法,如:vesselFindAvailable

func (c *vesselServiceClient) FindAvailable(ctx context.Context, in *Specification, opts ...client.CallOption) (*Response, error) {
	req := c.c.NewRequest(c.serviceName, "VesselService.FindAvailable", in)
	out := new(Response)
	err := c.c.Call(ctx, req, out, opts...)
	if err != nil {
		return nil, err
	}
	return out, nil
}
複製代碼
func newRequest(service, endpoint string, request interface{}, contentType string, reqOpts ...RequestOption) Request {
    var opts RequestOptions

    for _, o := range reqOpts {
        o(&opts)
    }

    // set the content-type specified
    if len(opts.ContentType) > 0 {
        contentType = opts.ContentType
    }

    return &rpcRequest{
        service:     service,
        method:      endpoint,
        endpoint:    endpoint,
        body:        request,
        contentType: contentType,
        opts:        opts,
    }
}
複製代碼

微服務開發流程

若是使用 grpc 做爲 serverclient,開發流程以下:

注:serverclient 必須相同,如:個人代碼中 serverclient 使用的都是 rpc, transporttcp

image-20180512044329199

目錄簡介

  • api:對外暴露的HTTP web 接口,能夠理解爲 網關
  • common:全部服務都能調用的東西,如 GetMicroClient, GetMicroServer
  • config:配置中心,其餘服務的啓動都依賴的配置
  • consignment
  • consignment-cli:cli 測試
  • interface-center:proto 文件中心,同時生成的 .go 文件也在這裏
  • shippy-ui:前端測試 ui 代碼,對接API,API還沒寫完
  • user
  • user-cli
  • vessel
  • vessel-cli

初始化

示例代碼:consignment/main.go, common/service.go

// 直接調用本身寫的公有的庫獲取 server,保持配置同步 
// common.AuthWrapper 爲前置認證,採用JWT
srv := common.GetMicroServer(service, micro.WrapHandler(common.AuthWrapper))
複製代碼

common.GetMicroServer

func GetMicroServer(service string, exOpts ...micro.Option) micro.Service {
	opts := getOpts(service)
	if defaultServer != nil {
		opts = append(opts, defaultServer)
	}
	// ...
    
    // 注意順序,一樣的配置後面的會將前面的覆蓋
	opts = append(opts, exOpts...)

	srv := micro.NewService(opts...)
	// 初始化,解析命令行參數
	srv.Init()
	return srv
}
複製代碼

注:調用者的 client, transport 應當和 server 的 client, transport 配置相同,因此開發 micro web的時候要注意!micro web 全是 HTTP 或者 ws,須要本身使用和後面服務相同的 client 來完成轉發。

img

服務註冊

這裏個人demo中採用了 consul,consul 自帶了 UI和健康檢查,consul UI 端口爲:8500

// 註冊延遲,30s 內沒有註冊則失效,consul 會自動刪除服務
        micro.RegisterTTL(time.Second * 30),
// 註冊間隔,每隔 20s 註冊一次
		micro.RegisterInterval(time.Second * 20)
// ...
// opts 中添加以下配置便可
micro.Registry(consul.NewRegistry(func(op *registry.Options) {
			op.Addrs = config.GetRegistryAddrs(service)
		}))
複製代碼

docker-compose.yml 中已經定義,這裏測試用,所以只採用單節點(server)的形式,consul 採用 Raft 算法,爲了保證選主無誤,節點(server)數必須是奇數,bootstrap-expect 表示節點數量

 consul:
 image: consul:1.5
 container_name: consul-node1
 command: agent -server -bootstrap-expect=1 -node=node1 -bind=0.0.0.0 -client=0.0.0.0 -datacenter=dc1
 volumes:
 - ./consul/node1:/consul/data

 consul-client:
 image: consul:1.5
 container_name: consul-client1
 command: agent -retry-join=consul -node=client1 -bind=0.0.0.0 -client=0.0.0.0 -datacenter=dc1 -ui
 ports:
 - "8500:8500"
 - "8600:8600"
 - "8300:8300"
 depends_on:
 - consul
 volumes:
 - ./consul/client1:/consul/data
複製代碼

watch

// 這裏的 handler 應當實現 pb 中定義的調用
    h := handler.GetHandler(session, vClient, uClient, bk)	
    // 將 server 做爲微服務的服務端
	pb.RegisterShippingServiceHandler(srv.Server(), h)

	if err := srv.Run(); err != nil {
		log.Fatalf("failed to serve: %v", err)
	}
複製代碼

pb

// 貨輪微服務
service ShippingService {
    // 託運一批貨物
    rpc CreateConsignment (Consignment) returns (Response) { } // 查看託運貨物的信息 rpc GetConsignments (GetRequest) returns (Response) { } } 複製代碼

handler

func (h *Handler) CreateConsignment(ctx context.Context, req *pb.Consignment, resp *pb.Response) error {
}
func (h *Handler) GetConsignments(ctx context.Context, req *pb.GetRequest, resp *pb.Response) error {
}
複製代碼

上下文 context

在咱們的 AuthWrapper 中,ctx 做爲上下文信息傳遞的方式,可在 ctx 中添加信息

//
// AuthWrapper 是一個高階函數,入參是 "下一步" 函數,出參是認證函數
// 在返回的函數內部處理完認證邏輯後,再手動調用 fn() 進行下一步處理
// token 是從 上下文中取出的,再調用 user-service 將其作驗證
// 認證經過則 fn() 繼續執行,不然報錯
//
func AuthWrapper(fn server.HandlerFunc) server.HandlerFunc {
	log.Println("AuthWrapper")
	return func(ctx context.Context, req server.Request, resp interface{}) error {
		// consignment-service 獨立測試時不進行認證
		if os.Getenv("DISABLE_AUTH") == "true" {
			return fn(ctx, req, resp)
		}
		meta, ok := metadata.FromContext(ctx)
		if !ok {
			return errors.New("no auth meta-data found in request")
		}

		token := meta["token"]

		// Auth here
		authResp, err := GetUserClient().ValidateToken(context.Background(), &userPb.Token{
			Token: token,
		})

		log.Println("Auth Resp:", authResp)
		if err != nil {
			return err
		}
        // 這裏將 JWT 解析出來的 user_id 傳遞下去
		ctx = context.WithValue(ctx, "user_id", authResp.UserId)
		err = fn(ctx, req, resp)
		return err
	}
}

複製代碼

handler

在全部前置操做執行完畢以後,開始執行 handler 真正的 Call,handler 的函數定義必須和 pb 中如出一轍。

處理完以後直接編輯 resp 便可,以後返回 nil,resp 是一個指針,直接傳遞了返回信息。

func (h *Handler) CreateConsignment(ctx context.Context, req *pb.Consignment, resp *pb.Response) error {
    // ... 處理
	resp.Created = true
	resp.Consignment = req

    // 後置操做
	go func() {
		// ...
		h.pubLog(userID, "CreateConsignment", msg)
	}()
	return nil
}
複製代碼

過濾器

這裏以版本過濾器爲例:

// Filter will filter the version of the service
func Filter(v string) client.CallOption {
	if v == "" {
		v = "latest"
	}
	filter := func(services []*registry.Service) []*registry.Service {
		var filtered []*registry.Service

		for _, service := range services {
			if service.Version == v {
				filtered = append(filtered, service)
			}
		}

		return filtered
	}

	return client.WithSelectOption(selector.WithFilter(filter))
}

複製代碼

以後在進行 client.Call 的時候可使用

vResp, err := h.vesselClient.FindAvailable(ctx, vReq, common.Filter(version))
複製代碼

db 交互

通常而言不要把 pb 結構體直接插入 數據庫中,最好有一個 中間轉換層。示例以下:

func (repo *ConsignmentRepository) Create(con *pb.Consignment) error {
    // 這裏將PB轉換爲想要的結構體,以後再插入
	data := PBConsignment2Consignment(con)
    // dao 層直接對接DB操做
	return dao.Insert(repo.collection(), &data)
}

// 在外面記得把 Session Close
複製代碼

broker

消息隊列

// common/service.go/GetMicroServer()
// 註冊
    brokerKafka := kafka.NewBroker(func(options *broker.Options) {
        // eg: []{"127.0.0.1:9092"}
		options.Addrs = config.GetBrokerAddrs(service)
	})
	if err := brokerKafka.Connect(); err != nil {
		log.Fatalf("Broker Connect error: %v", err)
	}
// ... micro.Broker(brokerKafka)
複製代碼

註冊完以後就開始定義 接口了

// 這裏我將 kafka broker 傳入 handler 中
    bk := srv.Server().Options().Broker
	h := handler.GetHandler(session, vClient, uClient, bk)
複製代碼

發佈消息:

// 發送log
   // ...
   data := &broker.Message{
      Header: map[string]string{
         "user_id": userID,
      },
      Body: body,
   }

   if err := h.Broker.Publish(topic, data); err != nil {
      log.Printf("[pub] failed: %v\n", err)
   }
複製代碼

訂閱消息:

bk := srv.Server().Options().Broker
    // 這裏訂閱了 一個 topic, 並提供接口處理
	_, err := bk.Subscribe(topic, subLog)
複製代碼
func subLog(pub broker.Publication) error {
	var logPB *pb.Log
    // 自行解析 body 便可
	if err := json.Unmarshal(pub.Message().Body, &logPB); err != nil {
		return err
	}
	log.Printf("[Log]: user_id: %s, Msg: %v\n", pub.Message().Header["user_id"], logPB)
	return nil
}

複製代碼

熔斷

Micro提供了兩種實現,gobreaker和hystrix,熔斷是在客戶端實現。

來看看 hystrix:

var (
    // DefaultTimeout is how long to wait for command to complete, in milliseconds
    DefaultTimeout = 1000
    // DefaultMaxConcurrent is how many commands of the same type can run at the same time
    DefaultMaxConcurrent = 10
    // DefaultVolumeThreshold is the minimum number of requests needed before a circuit can be tripped due to health
    DefaultVolumeThreshold = 20
    // DefaultSleepWindow is how long, in milliseconds, to wait after a circuit opens before testing for recovery
    DefaultSleepWindow = 5000
    // DefaultErrorPercentThreshold causes circuits to open once the rolling measure of errors exceeds this percent of requests
    DefaultErrorPercentThreshold = 50
    // DefaultLogger is the default logger that will be used in the Hystrix package. By default prints nothing.
    DefaultLogger = NoopLogger{}
)

type Settings struct {
    Timeout                time.Duration
    MaxConcurrentRequests  int
    RequestVolumeThreshold uint64
    SleepWindow            time.Duration
    ErrorPercentThreshold  int
}
複製代碼

若想修改參數,hystrix 沒有提供全局的接口修改,這裏我直接修改默認參數

// "github.com/afex/hystrix-go/hystrix" 
    hystrix.DefaultMaxConcurrent = 100
	hystrix.DefaultVolumeThreshold = 50
複製代碼

註冊:

// "github.com/micro/go-plugins/wrapper/breaker/hystrix"
// 添加以下配置便可
micro.WrapClient(hystrix.NewClientWrapper()
複製代碼

限流

ratelimit 能夠在客戶端作,也能夠在服務端作;micro提供了兩種方案:juju/ratelimituber/ratelimit

咱們看看 uber 的:

// "github.com/micro/go-plugins/wrapper/ratelimiter/uber"
// 添加以下配置便可
// ratelimit 的配置可自行查看 API 修改
micro.WrapClient(ratelimit.NewHandlerWrapper(1024))
複製代碼

鏈路追蹤

這裏使用 jaeger , jaeger 提供了UI界面,端口爲16686

docker-compose.yml 中已經定義

 jaeger:
 image: jaegertracing/all-in-one:1.12
 container_name: tracing
 environment:
 COLLECTOR_ZIPKIN_HTTP_PORT: 9411
 ports:
 - "5775:5775/udp"
 - "6831:6831/udp"
 - "6832:6832/udp"
 - "5778:5778"
 - "16686:16686"
 - "14268:14268"
 - "9411:9411"
複製代碼
func NewJaegerTracer(serviceName, addr string) (opentracing.Tracer, io.Closer, error) {
	// Sample configuration for testing. Use constant sampling to sample every trace
	// and enable LogSpan to log every span via configured Logger.
	cfg := jaegercfg.Configuration{
		Sampler: &jaegercfg.SamplerConfig{
			Type:  jaeger.SamplerTypeConst,
			Param: 1,
		},
		Reporter: &jaegercfg.ReporterConfig{
			LogSpans: true,
			BufferFlushInterval: 1 * time.Second,
		},
	}

	cfg.ServiceName = serviceName

	// Example logger and metrics factory. Use github.com/uber/jaeger-client-go/log
	// and github.com/uber/jaeger-lib/metrics respectively to bind to real logging and metrics
	// frameworks.
	jLogger := &jaegerLogger{}
	jMetricsFactory := metrics.NullFactory

	metricsFactory := metrics.NullFactory
	metrics := jaeger.NewMetrics(metricsFactory, nil)

	sender, err := jaeger.NewUDPTransport(addr, 0)
	if err != nil {
		log.Logf("could not initialize jaeger sender: %s", err.Error())
		return nil, nil, err
	}

	repoter := jaeger.NewRemoteReporter(sender, jaeger.ReporterOptions.Metrics(metrics))

	return cfg.NewTracer(
		jaegercfg.Logger(jLogger),
		jaegercfg.Metrics(jMetricsFactory),
		jaegercfg.Reporter(repoter),
	)

}

type jaegerLogger struct{}

func (l *jaegerLogger) Error(msg string) {
	log.Logf("ERROR: %s", msg)
}

// Infof logs a message at info priority
func (l *jaegerLogger) Infof(msg string, args ...interface{}) {
	log.Logf(msg, args...)
}
複製代碼
// "github.com/micro/go-plugins/wrapper/trace/opentracing"
opentracing.NewClientWrapper(t)
複製代碼

API

這裏結合大名鼎鼎的 HTTP Restful 框架 gin 使用

主要代碼以下:

// web 的初始化不太同樣 micro -> web
    srv := common.GetMicroWeb(service)
    // ...
    router := gin.Default()
    // ... 正常 gin router 綁定操做
    // 最後直接將 服務 / 綁定到 gin 的router上,交給 gin 處理
	srv.Handle("/", router)
複製代碼

go-micro 詳解

micro 文檔:micro.mu/docs/index.…

參見另外一篇 go-micro詳解

micro

參見另外一篇 micro 工具箱

相關文章
相關標籤/搜索