gRPC 源碼詳解(一)配置化的結構體

grpc 源碼結構詳解

DialOptions

DialOptions 是最重要的一環,負責配置每一次 rpc 請求的時候的一應選擇。 git

結構

先來看看這個的結構
連接github

// dialOptions configure a Dial call. dialOptions are set by the DialOption
// values passed to Dial.
type dialOptions struct {
	unaryInt  UnaryClientInterceptor
	streamInt StreamClientInterceptor

	chainUnaryInts  []UnaryClientInterceptor
	chainStreamInts []StreamClientInterceptor

	cp          Compressor
	dc          Decompressor
	bs          backoff.Strategy
	block       bool
	insecure    bool
	timeout     time.Duration
	scChan      <-chan ServiceConfig
	authority   string
	copts       transport.ConnectOptions
	callOptions []CallOption
	// This is used by v1 balancer dial option WithBalancer to support v1
	// balancer, and also by WithBalancerName dial option.
	balancerBuilder balancer.Builder
	// This is to support grpclb.
	resolverBuilder             resolver.Builder
	channelzParentID            int64
	disableServiceConfig        bool
	disableRetry                bool
	disableHealthCheck          bool
	healthCheckFunc             internal.HealthChecker
	minConnectTimeout           func() time.Duration defaultServiceConfig *ServiceConfig // defaultServiceConfig is parsed from defaultServiceConfigRawJSON. defaultServiceConfigRawJSON *string } 複製代碼

因爲命名很是規範,加上註釋很容易看懂每個 field 配置的哪一條屬性。若是掠過看的 大概有 壓縮解壓器,超時阻塞設置,認證安全轉發,負載均衡,服務持久化的信息存儲 ,配置,心跳檢測等。安全

其一應函數方法都是設置 其中字段的。 閉包

如何設置

這裏是 grpc 設計較好的地方,經過函數設置,同時設有生成函數的函數。什麼意思呢?首先結合圖來理解,這也是整個 grpc 設置的精華部分app

grpc-setOperation.svg

這裏的意思是 , DialOptions 是一個導出接口,實現函數是 apply 同時接受參數 dialOptions 來修改它。
而實際上,是使用 newFuncDialOption 函數包裝一個 修改 dialOptions 的方法給 funcDialOption 結構體,在實際 Dial 調用的時候 是使用閉包 調用 funcDialOption 結構體的 apply 方法。
能夠在這裏看一下 Dial 方法的源碼(Dial 調用的是 DialContext
起做用的就是 opt.apply()

func DialContext(ctx context.Context, target string, opts ...DialOption) (conn *ClientConn, err error) {
	cc := &ClientConn{
		target:            target,
		csMgr:             &connectivityStateManager{},
		conns:             make(map[*addrConn]struct{}),
		dopts:             defaultDialOptions(),
		blockingpicker:    newPickerWrapper(),
		czData:            new(channelzData),
		firstResolveEvent: grpcsync.NewEvent(),
	}
	···
	for _, opt := range opts {
		opt.apply(&cc.dopts)
	}
    ···
}
複製代碼

這裏的 options 能夠說是 client 發起 rpc 請求的核心中轉站。
另外一個重要的接口,同時也集中在 dialOptions 結構體中初始化處理的是 
callOptions []CallOption負載均衡

CallOption

CallOption 是一個接口,定義在 rpc_util 包內 ide

結構

// CallOption configures a Call before it starts or extracts information from
// a Call after it completes.
type CallOption interface {
	// before is called before the call is sent to any server. If before
	// returns a non-nil error, the RPC fails with that error.
	before(*callInfo) error

	// after is called after the call has completed. after cannot return an
	// error, so any failures should be reported via output parameters.
	after(*callInfo)
}
複製代碼

操做的是 callInfo 結構裏的數據,其被包含在 dialOptions  結構體中,
即每一次 dial 的時候進行調用。svg

callInfo

同時它自身定義頗有意思,操做的是 callInfo  結構體函數

// callInfo contains all related configuration and information about an RPC.
type callInfo struct {
	compressorType        string
	failFast              bool
	stream                ClientStream
	maxReceiveMessageSize *int
	maxSendMessageSize    *int
	creds                 credentials.PerRPCCredentials
	contentSubtype        string
	codec                 baseCodec
	maxRetryRPCBufferSize int
}
複製代碼

能夠看到 callInfo 中字段用來表示 單次調用中獨有的自定義選項如 壓縮,流控,認證,編解碼器等。ui

一個實現

簡單看一個 CallOption 接口的實現

// Header returns a CallOptions that retrieves the header metadata
// for a unary RPC.
func Header(md *metadata.MD) CallOption {
	return HeaderCallOption{HeaderAddr: md}
}

// HeaderCallOption is a CallOption for collecting response header metadata.
// The metadata field will be populated *after* the RPC completes.
// This is an EXPERIMENTAL API.
type HeaderCallOption struct {
	HeaderAddr *metadata.MD
}

func (o HeaderCallOption) before(c *callInfo) error { return nil }
func (o HeaderCallOption) after(c *callInfo) {
	if c.stream != nil {
		*o.HeaderAddr, _ = c.stream.Header()
	}
}
複製代碼

重點看到,實際操做是在 before 和 after 方法中執行,它們會在 Client 發起請求的時候自動執行,顧名思義,一個在調用前執行,一個在調用後執行。

實現注意

這裏能夠看出,這裏也是經過函數返回一個擁有這兩個方法的結構體,注意這一個設計,能夠做爲你本身的 Option 設計的時候的參考。

兩種方法

有兩種方法讓 Client 接受你的 CallOption 設置

  1. 在 Client 使用方法的時候直接做爲 參數傳遞,將剛纔所說的函數-返回一個實現了 CallOption 接口的結構體。
  2. 在 生成 Client 的時候就傳遞設置。具體以下
  3. 經過 dialOptions.go 中的 函數 grpc.WithDefaultCallOptions()
  4. 這個函數會將 CallOption 設置到 dialOptions 中的字段 []CallOption 中。
// WithDefaultCallOptions returns a DialOption which sets the default
// CallOptions for calls over the connection.
func WithDefaultCallOptions(cos ...CallOption) DialOption {
	return newFuncDialOption(func(o *dialOptions) {
		o.callOptions = append(o.callOptions, cos...)
	})
}
複製代碼

有沒有感受有點很差理解?給大家一個實例

  1. response, err := myclient.MyCall(ctx, request, grpc.CallContentSubtype("mycodec"))
  2. myclient := grpc.Dial(ctx, target, grpc.WithDefaultCallOptions(grpc.CallContentSubtype("mycodec")))

這裏假設 咱們設置了一個 mycodec 的譯碼器。立刻下面解釋它的設計。

值得注意的是, 我好像只提到了在 Client 調用時設置,callOption  只在客戶端設置的狀況是否是讓你們感到困惑。
實際上 gRPC server 端會自動檢測 callOption 的設置,並檢測本身是否支持此項選擇,若是不支持則會返回失敗。也就是說,在 Server 端註冊的全部 Codec 譯碼器以後,Client 直接使用相應的設置就行了。

Codec

在 gRPC 中 Codec 有兩個接口定義,一個是 baseCodec 包含正常的 Marshal 和 Unmarshal 方法,另外一個是擁有名字的 Codec 定義在 encoding 包內,這是因爲在註冊 registry 的時候會使用到這個方法。

接口

type Codec interface {
	// Marshal returns the wire format of v.
	Marshal(v interface{}) ([]byte, error)
	// Unmarshal parses the wire format into v.
	Unmarshal(data []byte, v interface{}) error
	// String returns the name of the Codec implementation. This is unused by
	// gRPC.
	String() string
}
複製代碼

就是這個方法

// RegisterCodec registers the provided Codec for use with all gRPC clients and
// servers.
//
// The Codec will be stored and looked up by result of its Name() method, which
// should match the content-subtype of the encoding handled by the Codec. This
// is case-insensitive, and is stored and looked up as lowercase. If the
// result of calling Name() is an empty string, RegisterCodec will panic. See
// Content-Type on
// https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-HTTP2.md#requests for
// more details.
//
// NOTE: this function must only be called during initialization time (i.e. in
// an init() function), and is not thread-safe. If multiple Compressors are
// registered with the same name, the one registered last will take effect.
func RegisterCodec(codec Codec) {
	if codec == nil {
		panic("cannot register a nil Codec")
	}
	if codec.Name() == "" {
		panic("cannot register Codec with empty string result for Name()")
	}
	contentSubtype := strings.ToLower(codec.Name())
	registeredCodecs[contentSubtype] = codec
}
複製代碼

Compressor

同時 encoding 包中還定義了 Compressor 接口,參照 Codec 理解便可。

// Compressor is used for compressing and decompressing when sending or
// receiving messages.
type Compressor interface {
	// Compress writes the data written to wc to w after compressing it. If an
	// error occurs while initializing the compressor, that error is returned
	// instead.
	Compress(w io.Writer) (io.WriteCloser, error)
	// Decompress reads data from r, decompresses it, and provides the
	// uncompressed data via the returned io.Reader. If an error occurs while
	// initializing the decompressor, that error is returned instead.
	Decompress(r io.Reader) (io.Reader, error)
	// Name is the name of the compression codec and is used to set the content
	// coding header. The result must be static; the result cannot change
	// between calls.
	Name() string
}

複製代碼

MetaData

這個包對應 context 中的 Value field 也就是 key-value 形式的存儲

在其餘包中簡寫是 MD

結構

type MD map[string][]string
複製代碼

函數

實現了完善的存儲功能,從單一讀寫到批量(採用 pair 模式,...string 做爲參數,len(string)%2==1 時會報錯,因爲會有孤立的沒有配對的元信息。

另外幾個函數是實現了從 context 中的讀取和寫入(這裏的寫入是 使用 context.WithValue 方法,即生成 parent context 的 copy。

注意⚠️

  • 值得注意的是,在 MetaData 結構體中, value 的結構是 []string 。
  • 同時 key 不能夠以 "grpc-" 開頭,這是由於在 grpc 的 internal 包中已經保留了。
  • 更爲重要的是 在 context 中的讀取方式,實際上是 MetaData 結構對應的是 context Value 中的 value 值,而 key 值設爲 一個空結構體同時區分輸入輸入
    • type mdIncomingKey struct{}<br />
    • type mdOutgoingKey struct{}
相關文章
相關標籤/搜索