Dapr是爲雲上環境設計的跨語言, 事件驅動, 能夠便捷的構建微服務的系統. balabala一堆, 有興趣的小夥伴能夠去了解一下.node
Dapr提供有狀態和無狀態的微服務. 大部分人都是作無狀態服務(微服務)的, 只是某些領域無狀態並很差使, 由於開銷實在是太大了; 有狀態服務有固定的場景, 就是要求開銷小, 延遲和吞吐都比較高. 廢話少說, 直接來看Dapr是怎麼實現有狀態服務的.git
先來了解一下有狀態服務:github
1. 穩定的路由redis
發送給A服務器的請求, 不能發給B服務器, 不然就是無狀態的算法
2. 狀態api
狀態保存在本身服務器內部, 而不是遠程存儲, 這一點和無狀態有很明顯的區別, 因此無狀態服務須要用redis這種東西加速, 有狀態不須要服務器
3. 處理是單線程數據結構
狀態通常來說比較複雜, 想要對一個比較複雜的東西進行並行的計算是比較困難的; 固然A和B的邏輯之間沒有關係, 實際上是能夠並行的, 可是A本身自己的邏輯執行須要串行執行.app
對於一個有狀態服務來說(dapr), 實現23其實是很輕鬆的, 甚至有一些是用戶須要實現的東西, 因此1纔是關鍵, 當前這個消息(請求)須要被髮送到哪一個服務器上面處理纔是最關鍵的, 甚至決定了他是什麼系統.分佈式
決定哪一個請求的目標地址, 這個東西在分佈式系統裏面叫Placement, 有時候也叫Naming. TiDB裏面有一個Server叫PlacementDriver, 簡稱PD, 其實就是在幹一樣的事情.
好了, 開始研究Dapr的Placement是怎麼實現的.
有一個Placement的進程, 2333, 目錄cmd/placement, 就看他了
func main() { log.Infof("starting Dapr Placement Service -- version %s -- commit %s", version.Version(), version.Commit()) cfg := newConfig() // Apply options to all loggers. if err := logger.ApplyOptionsToLoggers(&cfg.loggerOptions); err != nil { log.Fatal(err) } log.Infof("log level set to: %s", cfg.loggerOptions.OutputLevel) // Initialize dapr metrics for placement. if err := cfg.metricsExporter.Init(); err != nil { log.Fatal(err) } if err := monitoring.InitMetrics(); err != nil { log.Fatal(err) } // Start Raft cluster. raftServer := raft.New(cfg.raftID, cfg.raftInMemEnabled, cfg.raftBootStrap, cfg.raftPeers) if raftServer == nil { log.Fatal("failed to create raft server.") } if err := raftServer.StartRaft(nil); err != nil { log.Fatalf("failed to start Raft Server: %v", err) } // Start Placement gRPC server. hashing.SetReplicationFactor(cfg.replicationFactor) apiServer := placement.NewPlacementService(raftServer)
能夠看到main函數裏面啓動了一個raft server, 通常這樣的話, 就說明在某些能力方面作到了強一致性.
raft庫用的是consul實現的raft, 而不是etcd, 由於etcd的raft不是庫, 只能是一個服務器(包括etcd embed), 你不能定製裏面的協議, 你只能使用etcd提供給你的client來訪問他. 這一點etcd作的很是不友好.
若是用raft庫來作placement, 那麼協議能夠定製, 能夠找Apply相關的函數, 由於raft狀態機只是負責log的一致性, log即消息, 消息的處理則表現出來狀態, Apply函數就是須要用戶作消息處理的地方. 幸好以前有作過MIT 6.824的lab, 對這個稍微有一點了解.
// Apply log is invoked once a log entry is committed. func (c *FSM) Apply(log *raft.Log) interface{} { buf := log.Data cmdType := CommandType(buf[0]) if log.Index < c.state.Index { logging.Warnf("old: %d, new index: %d. skip apply", c.state.Index, log.Index) return nil } var err error var updated bool switch cmdType { case MemberUpsert: updated, err = c.upsertMember(buf[1:]) case MemberRemove: updated, err = c.removeMember(buf[1:]) default: err = errors.New("unimplemented command") } if err != nil { return err } return updated }
在pkg/placement/raft文件夾下面找到raft相關的代碼, fsm.go裏面有對消息的處理函數.
能夠看到, 消息的處理很是簡單, 裏面只有MemberUpsert, 和MemberRemove兩個消息. FSM狀態機內保存的狀態只有:
// DaprHostMemberState is the state to store Dapr runtime host and // consistent hashing tables. type DaprHostMemberState struct { // Index is the index number of raft log. Index uint64 // Members includes Dapr runtime hosts. Members map[string]*DaprHostMember // TableGeneration is the generation of hashingTableMap. // This is increased whenever hashingTableMap is updated. TableGeneration uint64 // hashingTableMap is the map for storing consistent hashing data // per Actor types. hashingTableMap map[string]*hashing.Consistent }
很明顯, 這裏面只有DaprHostMember這個有用的信息, 而DaprHostMember就是集羣內的節點.
這裏能夠分析出來, Dapr經過Raft協議來維護了一個強一致性的Membership, 除此以外什麼也沒幹....據個人朋友說, 跟Orleans是有一點相似的, 只是Orleans是AP系統.
再經過對一致性Hash的分析, 能夠看到:
func (a *actorsRuntime) lookupActorAddress(actorType, actorID string) (string, string) { if a.placementTables == nil { return "", "" } t := a.placementTables.Entries[actorType] if t == nil { return "", "" } host, err := t.GetHost(actorID) if err != nil || host == nil { return "", "" } return host.Name, host.AppID }
經過 ActorType和ActorID到一致性的Hash表中去找host, 那個GetHost實現就是一致性Hash表實現的.
Actor RPC Call的實現:
func (a *actorsRuntime) Call(ctx context.Context, req *invokev1.InvokeMethodRequest) (*invokev1.InvokeMethodResponse, error) { if a.placementBlock { <-a.placementSignal } actor := req.Actor() targetActorAddress, appID := a.lookupActorAddress(actor.GetActorType(), actor.GetActorId()) if targetActorAddress == "" { return nil, errors.Errorf("error finding address for actor type %s with id %s", actor.GetActorType(), actor.GetActorId()) } var resp *invokev1.InvokeMethodResponse var err error if a.isActorLocal(targetActorAddress, a.config.HostAddress, a.config.Port) { resp, err = a.callLocalActor(ctx, req) } else { resp, err = a.callRemoteActorWithRetry(ctx, retry.DefaultLinearRetryCount, retry.DefaultLinearBackoffInterval, a.callRemoteActor, targetActorAddress, appID, req) } if err != nil { return nil, err } return resp, nil }
經過剛纔咱們看到loopupActorAddress函數找到的Host, 而後判斷是不是在當前Host宿主內, 不然就發送到遠程, 對當前宿主作了優化, 實際上沒雞兒用, 由於分佈式系統裏面, 通常都會有不少個host, 在當前host內的機率其實是很是低的.
從這邊, 咱們大概就能分析到全貌, 即Dapr實現分佈式有狀態服務的細節:
1. 經過Consul Raft庫維護Membership
2. 集羣和Placement組件通信, 獲取到Membership
3. 尋找Actor的算法實如今Host內, 而不是Placement組件. 經過ActorType找到能夠提供某種服務的Host, 而後組成一個一致性Hash表, 到該表內查找Host, 進而轉發請求
對Host內一致性Hash表的查找引用, 找到了修改內容的地方:
func (a *actorsRuntime) updatePlacements(in *placementv1pb.PlacementTables) { a.placementTableLock.Lock() defer a.placementTableLock.Unlock() if in.Version != a.placementTables.Version { for k, v := range in.Entries { loadMap := map[string]*hashing.Host{} for lk, lv := range v.LoadMap { loadMap[lk] = hashing.NewHost(lv.Name, lv.Id, lv.Load, lv.Port) } c := hashing.NewFromExisting(v.Hosts, v.SortedSet, loadMap) a.placementTables.Entries[k] = c } a.placementTables.Version = in.Version a.drainRebalancedActors() log.Infof("placement tables updated, version: %s", in.GetVersion()) a.evaluateReminders() } }
從這幾行代碼能夠看出, 版本不不同, 就會全更新, 並且還會進行rehash, 就是a.drainRebalanceActors.
若是學過數據結構, 那麼確定學到過一種東西叫HashTable, HashTable在擴容的時候須要rehash, 須要構建一個更大的table, 而後把全部元素從新放進去, 位置會和原先的大不同. 而一致性Hash能夠解決全rehash的狀況, 只讓部份內容rehash, 失效的內容會比較少.
可是, 凡事都有一個可是, 全部的節點都同時rehash還好, 可一個分佈式系統怎麼作到全部node都同時rehash, 很顯然是作不到的, 因此Dapr維護的Actor Address目錄, 是最終一致的, 也就是系統裏面會存在多個ID相同的Actor(短暫的), 仍是會致使不一致.
對dapr/proto/placement/v1/placement.proto查看, 驗證了個人猜測
// Placement service is used to report Dapr runtime host status. service Placement { rpc ReportDaprStatus(stream Host) returns (stream PlacementOrder) {} } message PlacementOrder { PlacementTables tables = 1; string operation = 2; }
Host啓動, 就去placement那邊經過gRPC Stream訂閱了集羣的變更. 懶到極點了, 竟然是把整個membership發送過來, 而不是發送的diff.
總結一下, 從上面的源碼分析咱們能夠知道, Dapr的Membership是CP系統, 可是Actor的Placement不是, 是一個最終一致的AP系統. 而TiDB的PD是一個CP系統, 只不過是經過etcd embed作的. 但願對你們有一點幫助.
對我有幫助的, 可能就是Dapr對於Consul raft的使用.
參考:
1. Dapr
2. Etcd Embed
3. Consul Raft