DialOptions 是最重要的一環,負責配置每一次 rpc 請求的時候的一應選擇。 git
先來看看這個的結構
連接github
// dialOptions configure a Dial call. dialOptions are set by the DialOption
// values passed to Dial.
type dialOptions struct {
unaryInt UnaryClientInterceptor
streamInt StreamClientInterceptor
chainUnaryInts []UnaryClientInterceptor
chainStreamInts []StreamClientInterceptor
cp Compressor
dc Decompressor
bs backoff.Strategy
block bool
insecure bool
timeout time.Duration
scChan <-chan ServiceConfig
authority string
copts transport.ConnectOptions
callOptions []CallOption
// This is used by v1 balancer dial option WithBalancer to support v1
// balancer, and also by WithBalancerName dial option.
balancerBuilder balancer.Builder
// This is to support grpclb.
resolverBuilder resolver.Builder
channelzParentID int64
disableServiceConfig bool
disableRetry bool
disableHealthCheck bool
healthCheckFunc internal.HealthChecker
minConnectTimeout func() time.Duration defaultServiceConfig *ServiceConfig // defaultServiceConfig is parsed from defaultServiceConfigRawJSON. defaultServiceConfigRawJSON *string } 複製代碼
因爲命名很是規範,加上註釋很容易看懂每個 field 配置的哪一條屬性。若是掠過看的 大概有 壓縮解壓器,超時阻塞設置,認證安全轉發,負載均衡,服務持久化的信息存儲 ,配置,心跳檢測等。安全
這裏是 grpc 設計較好的地方,經過函數設置,同時設有生成函數的函數。什麼意思呢?首先結合圖來理解,這也是整個 grpc 設置的精華部分app
func DialContext(ctx context.Context, target string, opts ...DialOption) (conn *ClientConn, err error) {
cc := &ClientConn{
target: target,
csMgr: &connectivityStateManager{},
conns: make(map[*addrConn]struct{}),
dopts: defaultDialOptions(),
blockingpicker: newPickerWrapper(),
czData: new(channelzData),
firstResolveEvent: grpcsync.NewEvent(),
}
···
for _, opt := range opts {
opt.apply(&cc.dopts)
}
···
}
複製代碼
這裏的 options 能夠說是 client 發起 rpc 請求的核心中轉站。
另外一個重要的接口,同時也集中在 dialOptions 結構體中初始化處理的是 callOptions []CallOption
負載均衡
CallOption 是一個接口,定義在 rpc_util 包內 ide
// CallOption configures a Call before it starts or extracts information from
// a Call after it completes.
type CallOption interface {
// before is called before the call is sent to any server. If before
// returns a non-nil error, the RPC fails with that error.
before(*callInfo) error
// after is called after the call has completed. after cannot return an
// error, so any failures should be reported via output parameters.
after(*callInfo)
}
複製代碼
操做的是 callInfo 結構裏的數據,其被包含在 dialOptions
結構體中,
即每一次 dial 的時候進行調用。svg
同時它自身定義頗有意思,操做的是 callInfo
結構體函數
// callInfo contains all related configuration and information about an RPC.
type callInfo struct {
compressorType string
failFast bool
stream ClientStream
maxReceiveMessageSize *int
maxSendMessageSize *int
creds credentials.PerRPCCredentials
contentSubtype string
codec baseCodec
maxRetryRPCBufferSize int
}
複製代碼
能夠看到 callInfo 中字段用來表示 單次調用中獨有的自定義選項如 壓縮,流控,認證,編解碼器等。ui
簡單看一個 CallOption 接口的實現
// Header returns a CallOptions that retrieves the header metadata
// for a unary RPC.
func Header(md *metadata.MD) CallOption {
return HeaderCallOption{HeaderAddr: md}
}
// HeaderCallOption is a CallOption for collecting response header metadata.
// The metadata field will be populated *after* the RPC completes.
// This is an EXPERIMENTAL API.
type HeaderCallOption struct {
HeaderAddr *metadata.MD
}
func (o HeaderCallOption) before(c *callInfo) error { return nil }
func (o HeaderCallOption) after(c *callInfo) {
if c.stream != nil {
*o.HeaderAddr, _ = c.stream.Header()
}
}
複製代碼
重點看到,實際操做是在 before 和 after 方法中執行,它們會在 Client 發起請求的時候自動執行,顧名思義,一個在調用前執行,一個在調用後執行。
這裏能夠看出,這裏也是經過函數返回一個擁有這兩個方法的結構體,注意這一個設計,能夠做爲你本身的 Option 設計的時候的參考。
有兩種方法讓 Client 接受你的 CallOption 設置
// WithDefaultCallOptions returns a DialOption which sets the default
// CallOptions for calls over the connection.
func WithDefaultCallOptions(cos ...CallOption) DialOption {
return newFuncDialOption(func(o *dialOptions) {
o.callOptions = append(o.callOptions, cos...)
})
}
複製代碼
有沒有感受有點很差理解?給大家一個實例
這裏假設 咱們設置了一個 mycodec 的譯碼器。立刻下面解釋它的設計。
值得注意的是, 我好像只提到了在 Client 調用時設置,callOption 只在客戶端設置的狀況是否是讓你們感到困惑。
實際上 gRPC server 端會自動檢測 callOption 的設置,並檢測本身是否支持此項選擇,若是不支持則會返回失敗。也就是說,在 Server 端註冊的全部 Codec 譯碼器以後,Client 直接使用相應的設置就行了。
在 gRPC 中 Codec 有兩個接口定義,一個是 baseCodec 包含正常的 Marshal 和 Unmarshal 方法,另外一個是擁有名字的 Codec 定義在 encoding 包內,這是因爲在註冊 registry 的時候會使用到這個方法。
type Codec interface {
// Marshal returns the wire format of v.
Marshal(v interface{}) ([]byte, error)
// Unmarshal parses the wire format into v.
Unmarshal(data []byte, v interface{}) error
// String returns the name of the Codec implementation. This is unused by
// gRPC.
String() string
}
複製代碼
就是這個方法
// RegisterCodec registers the provided Codec for use with all gRPC clients and
// servers.
//
// The Codec will be stored and looked up by result of its Name() method, which
// should match the content-subtype of the encoding handled by the Codec. This
// is case-insensitive, and is stored and looked up as lowercase. If the
// result of calling Name() is an empty string, RegisterCodec will panic. See
// Content-Type on
// https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-HTTP2.md#requests for
// more details.
//
// NOTE: this function must only be called during initialization time (i.e. in
// an init() function), and is not thread-safe. If multiple Compressors are
// registered with the same name, the one registered last will take effect.
func RegisterCodec(codec Codec) {
if codec == nil {
panic("cannot register a nil Codec")
}
if codec.Name() == "" {
panic("cannot register Codec with empty string result for Name()")
}
contentSubtype := strings.ToLower(codec.Name())
registeredCodecs[contentSubtype] = codec
}
複製代碼
同時 encoding 包中還定義了 Compressor 接口,參照 Codec 理解便可。
// Compressor is used for compressing and decompressing when sending or
// receiving messages.
type Compressor interface {
// Compress writes the data written to wc to w after compressing it. If an
// error occurs while initializing the compressor, that error is returned
// instead.
Compress(w io.Writer) (io.WriteCloser, error)
// Decompress reads data from r, decompresses it, and provides the
// uncompressed data via the returned io.Reader. If an error occurs while
// initializing the decompressor, that error is returned instead.
Decompress(r io.Reader) (io.Reader, error)
// Name is the name of the compression codec and is used to set the content
// coding header. The result must be static; the result cannot change
// between calls.
Name() string
}
複製代碼
這個包對應 context 中的 Value field 也就是 key-value 形式的存儲
type MD map[string][]string
複製代碼
實現了完善的存儲功能,從單一讀寫到批量(採用 pair 模式,...string 做爲參數,len(string)%2==1 時會報錯,因爲會有孤立的沒有配對的元信息。
另外幾個函數是實現了從 context 中的讀取和寫入(這裏的寫入是 使用 context.WithValue 方法,即生成 parent context 的 copy。
type mdIncomingKey struct{}<br />
type mdOutgoingKey struct{}