GRPC源碼分析 1.代碼結構以及通用組件解析

base in https://github.com/grpc/grpc-...

  • 目錄概覽
  • 可選項(Opts
  • 包裝(Wrapper
  • 組件式編程(Builder & Regist
  • 一次性事件(grpcsync.Event
  • 無界channel(UnboundedBuffer

目錄概覽

摘要了一部分文件目錄,用來描述在grpc中不一樣目錄層級的主要做用。
grpc
├── 頂層目錄(package grpc, 主要包含一些grpc提供的接口文件和涉及到具體實現的一些包裝器文件
├── clientconn.go // grpc接口文件,主要提供 Dial 接口。
├── balancer_conn_wrappers.go // 各類包裝器 *_wrappers
├── resolver_conn_wrapper.go
├── balancer
│   ├── balancer.go
├── resolver    // 次級目錄(主要用於描述接口
│   └── resolver.go     //resolver的接口文件
├── internal    // 內部目錄(主要提供各類具體實現
│   ├── backoff
│   │   └── backoff.go //退避策略的具體實現
│   ├── buffer
│   │   ├── unbounded.go    //內部提供的一些組件
│   ├── resolver
│   │   ├── dns
│   │   │   ├── dns_resolver.go //dns_resolver的實現 *_resolver.go
文件層級
頂層目錄 主要提供grpc接口以及各類包裝器文件 grpc.Dial() *_wrapper.go
次級目錄 這裏主要是提供grpc的一些功能組件定義,一般是接口文件 type Resolver interface {}
內部目錄 這裏主要提供功能組件的具體實現 dns_resolver.go

可選項(Opts

在grpc中咱們會看到不少相似以下這種代碼, 通常後面會須要接收參數 opts ...Option, 這種接口方式被稱爲選項模式(options-pattern ,主要是爲了構建接口提供靈活的可選項

下面咱們用本身的僞代碼模擬一次這種邏輯(摘自 https://github.com/pojol/brai...git

// 配置項
type config struct {
    Tracing       bool
}

// 配置Option的包裝函數
type Option func(*Server)

// 添加開啓tracing的可選項
func WithTracing() Option {
    return func(r *Server) {
        r.cfg.Tracing = true
    }
}

// 使用可選項進行構建
func New(name string, opts ...Option) IServer {
    const (
        defaultTracing       = false
    )

    server = &Server{
        cfg: config{
            Tracing:       defaultTracing,  // 進行默認的初始化賦值
        },
    }

    // 查看是否有可選項,若是有則使用可選項將默認值覆蓋。
    for _, opt := range opts {
        opt(server)
    }
}
總結 經過這種options模式,能夠沒必要每次定義全部的選項,只需選擇本身想要的改動便可。

包裝(Wrapper

grpc中使用Wrapper把接口的實現和其依賴的對象聚合到一塊兒,經過水平組合的方式完成一些接口的實現。
type ccResolverWrapper struct {
    cc         *ClientConn      // 包含了 ClientConn
    resolverMu sync.Mutex
    resolver   resolver.Resolver    // 包含了 Resolver interface 
    done       *grpcsync.Event  // 完成事件(這個下面有詳細解釋
    curState   resolver.State   // 狀態

    pollingMu sync.Mutex       // 輪詢鎖
    polling   chan struct{}    // 一個channel主要用於判斷是否處於輪詢中
}
上面是一個Wrapper的結構,它主要包含了ClientConn的指針,以及Resolver接口,另外還包含了一些自身邏輯須要的狀態和鎖

它主要實現了resolver.ClientConn interface, 使用這個包裝器主要是爲了聚合前面的那些組件,完成一些須要相互依賴調度的邏輯。不過這未必是值得借鑑的,這裏先簡單路過一下。github

插件式編程模式

build.png

如上圖所示,咱們使用了Resolver來展現grpc是如何使用插件式編程方式組織代碼的。編程

  • 接口定義文件 resolver.go
// Resolver 構建器的定義
type Builder interface {
    Build(target Target, cc ClientConn, opts BuildOptions) (Resolver, error)
    Scheme() string
}

// 名字解析 Resolver 提供的接口定義
type Resolver interface {}

// 註冊不一樣的resolver實現
func Register(b Builder) {}

// 經過scheme獲取相關的resolver實現
func Get(scheme string) Builder {}
  • 實現文件 internal/resolver/dns/dns_resolver.go
// 經過init函數,將實現註冊到resolver
func init() { resolver.Register(NewBuilder()) }

// 實現resolver.Builder接口的 Build 函數(在這裏進行真正的構建操做
func Build() {}
// 返回當前resolver解決的解析樣式
func Scheme() string { return "dns" }
  • 應用 resolver clientconn.go
// 經過解析用戶傳入的target 得到scheme
cc.parsedTarget = grpcutil.ParseTarget(cc.target)

// 經過target的scheme獲取對應的resolver.Builder
func (cc *ClientConn) getResolver(scheme string) resolver.Builder {
    for _, rb := range cc.dopts.resolvers {
        if scheme == rb.Scheme() {
            return rb
        }
    }
    return resolver.Get(scheme)
}
總結 經過以上的關鍵代碼,咱們知道了組件是如何完成 接口定義以及 實現使用
在grpc中有很多的代碼是使用這種插件式的方式進行編程,這種編碼方式能夠方便的 隔離實現,使用戶專一在本身的實現上。另外也支持用戶 編寫本身的實現註冊到grpc中。

能夠閱讀 策略模式 & 開閉原則 加深對這種編碼形式的理解。安全

一次性事件(grpcsync.Event

主要用於在異步邏輯中判斷一次性事件(開關)線程安全,在grpc中不少模塊的退出邏輯都依賴於這個Event

實現來自 /internal/grpcsync/event.go架構

type Event struct {
    fired int32         // 用於標記是否被觸發
    c     chan struct{} // 用於發送觸發信號
    o     sync.Once     // 保證只被執行一次
}

func (e *Event) Fire() bool {} // 觸發事件
func (e *Event) Done() <-chan struct{} {}   // 被觸發信號
func (e *Event) HasFired() bool {} // 是否被觸發
// 構建Event
func NewEvent() *Event {
    return &Event{c: make(chan struct{})}
}

// 模擬使用,建立一個服務,而後這個服務會開啓一個goroutine從管道中接收消息來處理業務
// 以下的話能夠是一些新節點信息,而後經過done來處理退出的邏輯,當外部關閉這個balancer,會當即通知到這個goroutine而後退出。
func newBalancer() {
    b := Balancer{
        done : NewEvent(),  // 構建
    }

    // watcher
    go func() {
        for {
            select {
                case <- otherCh:
                    //todo
                case <- b.done.Done(): // 監聽到終止信號,退出goroutine。
                    return
            }
        }
    }()
}

func (b *Balancer)close() {
    b.done.Fire() // 觸發信號
}

無界channel(UnboundedBuffer

前面有說到grpcsync.Event是用來控制退出邏輯,這裏的unbounded則用於多個goroutine之間的消息傳遞。
這是一個很是不錯的channel實踐,它不用考慮channel的各類阻塞狀況(這裏主要是channel溢出的狀況。方便了channel的應用。

實現來自
/internal/buffer/unbounded.go Unbounded
/internal/transport/transport.go recvBuffer
這二者的實現邏輯是同樣的,只是Unbounded包裝的interface{} ,而recvBuffer會被高頻調用因此使用了具體的類型recvMsgapp

type Unbounded struct {
    c       chan interface{}
    backlog []interface{}
    sync.Mutex
}

func NewUnbounded() *Unbounded {
    return &Unbounded{c: make(chan interface{}, 1)}
}

// 往管道中寫入消息(生產端
func (b *Unbounded) Put(t interface{}) {
    b.Lock()
    // 判斷是否有積壓消息,若是沒有則直接寫入管道後退出
    // 若是有,則寫入到積壓隊列中(先進先出隊列
    if len(b.backlog) == 0 {    
        select {
        case b.c <- t:
            b.Unlock()
            return
        default:
        }
    }
    b.backlog = append(b.backlog, t)
    b.Unlock()
}

func (b *Unbounded) Load() {
    b.Lock()
    // 這裏主要是判斷積壓隊列是否有消息,若是有則左移一位
    // 並將移出的消息,寫入channel中。
    if len(b.backlog) > 0 { 
        select {
        case b.c <- b.backlog[0]:
            b.backlog[0] = nil
            b.backlog = b.backlog[1:]
        default:
        }
    }
    b.Unlock()
}

// 管道的讀信號(消費端
func (b *Unbounded) Get() <-chan interface{} {
    return b.c
}
最後宣傳一下個人開源框架 https://github.com/pojol/braid 一個輕量的微服務框架

目標是幫助用戶能夠更容易的使用和理解微服務架構。框架

相關文章
相關標籤/搜索