Dapr實現分佈式有狀態服務的細節

Dapr是爲雲上環境設計的跨語言, 事件驅動, 能夠便捷的構建微服務的系統. balabala一堆, 有興趣的小夥伴能夠去了解一下.node

Dapr提供有狀態和無狀態的微服務. 大部分人都是作無狀態服務(微服務)的, 只是某些領域無狀態並很差使, 由於開銷實在是太大了; 有狀態服務有固定的場景, 就是要求開銷小, 延遲和吞吐都比較高. 廢話少說, 直接來看Dapr是怎麼實現有狀態服務的.git

 

先來了解一下有狀態服務:github

1. 穩定的路由redis

   發送給A服務器的請求, 不能發給B服務器, 不然就是無狀態的算法

2. 狀態api

   狀態保存在本身服務器內部, 而不是遠程存儲, 這一點和無狀態有很明顯的區別, 因此無狀態服務須要用redis這種東西加速, 有狀態不須要服務器

3. 處理是單線程數據結構

   狀態通常來說比較複雜, 想要對一個比較複雜的東西進行並行的計算是比較困難的; 固然A和B的邏輯之間沒有關係, 實際上是能夠並行的, 可是A本身自己的邏輯執行須要串行執行.app

 

對於一個有狀態服務來說(dapr), 實現23其實是很輕鬆的, 甚至有一些是用戶須要實現的東西, 因此1纔是關鍵, 當前這個消息(請求)須要被髮送到哪一個服務器上面處理纔是最關鍵的, 甚至決定了他是什麼系統.分佈式

決定哪一個請求的目標地址, 這個東西在分佈式系統裏面叫Placement, 有時候也叫Naming. TiDB裏面有一個Server叫PlacementDriver, 簡稱PD, 其實就是在幹一樣的事情.

好了, 開始研究Dapr的Placement是怎麼實現的.

 

有一個Placement的進程, 2333, 目錄cmd/placement, 就看他了

func main() {
	log.Infof("starting Dapr Placement Service -- version %s -- commit %s", version.Version(), version.Commit())

	cfg := newConfig()

	// Apply options to all loggers.
	if err := logger.ApplyOptionsToLoggers(&cfg.loggerOptions); err != nil {
		log.Fatal(err)
	}
	log.Infof("log level set to: %s", cfg.loggerOptions.OutputLevel)

	// Initialize dapr metrics for placement.
	if err := cfg.metricsExporter.Init(); err != nil {
		log.Fatal(err)
	}

	if err := monitoring.InitMetrics(); err != nil {
		log.Fatal(err)
	}

	// Start Raft cluster.
	raftServer := raft.New(cfg.raftID, cfg.raftInMemEnabled, cfg.raftBootStrap, cfg.raftPeers)
	if raftServer == nil {
		log.Fatal("failed to create raft server.")
	}

	if err := raftServer.StartRaft(nil); err != nil {
		log.Fatalf("failed to start Raft Server: %v", err)
	}

	// Start Placement gRPC server.
	hashing.SetReplicationFactor(cfg.replicationFactor)
	apiServer := placement.NewPlacementService(raftServer)

能夠看到main函數裏面啓動了一個raft server, 通常這樣的話, 就說明在某些能力方面作到了強一致性.

raft庫用的是consul實現的raft, 而不是etcd, 由於etcd的raft不是庫, 只能是一個服務器(包括etcd embed), 你不能定製裏面的協議, 你只能使用etcd提供給你的client來訪問他. 這一點etcd作的很是不友好.

 

若是用raft庫來作placement, 那麼協議能夠定製, 能夠找Apply相關的函數, 由於raft狀態機只是負責log的一致性, log即消息, 消息的處理則表現出來狀態, Apply函數就是須要用戶作消息處理的地方. 幸好以前有作過MIT 6.824的lab, 對這個稍微有一點了解.

// Apply log is invoked once a log entry is committed.
func (c *FSM) Apply(log *raft.Log) interface{} {
	buf := log.Data
	cmdType := CommandType(buf[0])

	if log.Index < c.state.Index {
		logging.Warnf("old: %d, new index: %d. skip apply", c.state.Index, log.Index)
		return nil
	}

	var err error
	var updated bool
	switch cmdType {
	case MemberUpsert:
		updated, err = c.upsertMember(buf[1:])
	case MemberRemove:
		updated, err = c.removeMember(buf[1:])
	default:
		err = errors.New("unimplemented command")
	}

	if err != nil {
		return err
	}

	return updated
}

在pkg/placement/raft文件夾下面找到raft相關的代碼, fsm.go裏面有對消息的處理函數.

能夠看到, 消息的處理很是簡單, 裏面只有MemberUpsert, 和MemberRemove兩個消息.  FSM狀態機內保存的狀態只有:

// DaprHostMemberState is the state to store Dapr runtime host and
// consistent hashing tables.
type DaprHostMemberState struct {
	// Index is the index number of raft log.
	Index uint64
	// Members includes Dapr runtime hosts.
	Members map[string]*DaprHostMember

	// TableGeneration is the generation of hashingTableMap.
	// This is increased whenever hashingTableMap is updated.
	TableGeneration uint64

	// hashingTableMap is the map for storing consistent hashing data
	// per Actor types.
	hashingTableMap map[string]*hashing.Consistent
}

很明顯, 這裏面只有DaprHostMember這個有用的信息, 而DaprHostMember就是集羣內的節點.

 

這裏能夠分析出來, Dapr經過Raft協議來維護了一個強一致性的Membership, 除此以外什麼也沒幹....據個人朋友說, 跟Orleans是有一點相似的, 只是Orleans是AP系統.

 

再經過對一致性Hash的分析, 能夠看到:

func (a *actorsRuntime) lookupActorAddress(actorType, actorID string) (string, string) {
	if a.placementTables == nil {
		return "", ""
	}

	t := a.placementTables.Entries[actorType]
	if t == nil {
		return "", ""
	}
	host, err := t.GetHost(actorID)
	if err != nil || host == nil {
		return "", ""
	}
	return host.Name, host.AppID
}

經過 ActorType和ActorID到一致性的Hash表中去找host, 那個GetHost實現就是一致性Hash表實現的.

Actor RPC Call的實現:

func (a *actorsRuntime) Call(ctx context.Context, req *invokev1.InvokeMethodRequest) (*invokev1.InvokeMethodResponse, error) {
	if a.placementBlock {
		<-a.placementSignal
	}

	actor := req.Actor()
	targetActorAddress, appID := a.lookupActorAddress(actor.GetActorType(), actor.GetActorId())
	if targetActorAddress == "" {
		return nil, errors.Errorf("error finding address for actor type %s with id %s", actor.GetActorType(), actor.GetActorId())
	}

	var resp *invokev1.InvokeMethodResponse
	var err error

	if a.isActorLocal(targetActorAddress, a.config.HostAddress, a.config.Port) {
		resp, err = a.callLocalActor(ctx, req)
	} else {
		resp, err = a.callRemoteActorWithRetry(ctx, retry.DefaultLinearRetryCount, retry.DefaultLinearBackoffInterval, a.callRemoteActor, targetActorAddress, appID, req)
	}

	if err != nil {
		return nil, err
	}
	return resp, nil
}

經過剛纔咱們看到loopupActorAddress函數找到的Host, 而後判斷是不是在當前Host宿主內, 不然就發送到遠程, 對當前宿主作了優化, 實際上沒雞兒用, 由於分佈式系統裏面, 通常都會有不少個host, 在當前host內的機率其實是很是低的.

 

從這邊, 咱們大概就能分析到全貌, 即Dapr實現分佈式有狀態服務的細節:

1. 經過Consul Raft庫維護Membership

2. 集羣和Placement組件通信, 獲取到Membership

3. 尋找Actor的算法實如今Host內, 而不是Placement組件. 經過ActorType找到能夠提供某種服務的Host, 而後組成一個一致性Hash表, 到該表內查找Host, 進而轉發請求

 

對Host內一致性Hash表的查找引用, 找到了修改內容的地方:

func (a *actorsRuntime) updatePlacements(in *placementv1pb.PlacementTables) {
	a.placementTableLock.Lock()
	defer a.placementTableLock.Unlock()

	if in.Version != a.placementTables.Version {
		for k, v := range in.Entries {
			loadMap := map[string]*hashing.Host{}
			for lk, lv := range v.LoadMap {
				loadMap[lk] = hashing.NewHost(lv.Name, lv.Id, lv.Load, lv.Port)
			}
			c := hashing.NewFromExisting(v.Hosts, v.SortedSet, loadMap)
			a.placementTables.Entries[k] = c
		}

		a.placementTables.Version = in.Version
		a.drainRebalancedActors()

		log.Infof("placement tables updated, version: %s", in.GetVersion())

		a.evaluateReminders()
	}
}

從這幾行代碼能夠看出, 版本不不同, 就會全更新, 並且還會進行rehash, 就是a.drainRebalanceActors. 

若是學過數據結構, 那麼確定學到過一種東西叫HashTable, HashTable在擴容的時候須要rehash, 須要構建一個更大的table, 而後把全部元素從新放進去, 位置會和原先的大不同. 而一致性Hash能夠解決全rehash的狀況, 只讓部份內容rehash, 失效的內容會比較少.

可是, 凡事都有一個可是, 全部的節點都同時rehash還好, 可一個分佈式系統怎麼作到全部node都同時rehash, 很顯然是作不到的, 因此Dapr維護的Actor Address目錄, 是最終一致的, 也就是系統裏面會存在多個ID相同的Actor(短暫的), 仍是會致使不一致.

 

對dapr/proto/placement/v1/placement.proto查看, 驗證了個人猜測

// Placement service is used to report Dapr runtime host status.
service Placement {
  rpc ReportDaprStatus(stream Host) returns (stream PlacementOrder) {}
}

message PlacementOrder {
  PlacementTables tables = 1;
  string operation = 2;
}

Host啓動, 就去placement那邊經過gRPC Stream訂閱了集羣的變更. 懶到極點了, 竟然是把整個membership發送過來, 而不是發送的diff.

 

總結一下, 從上面的源碼分析咱們能夠知道, Dapr的Membership是CP系統, 可是Actor的Placement不是, 是一個最終一致的AP系統. 而TiDB的PD是一個CP系統, 只不過是經過etcd embed作的. 但願對你們有一點幫助.

對我有幫助的, 可能就是Dapr對於Consul raft的使用.

 

參考:

1. Dapr

2. Etcd Embed

3. Consul Raft

相關文章
相關標籤/搜索