用 Golang 實現基於 Redis 的安全高效 RPC 通訊

前言

RPC(Remote Procedure Call),翻譯過來爲「遠程過程調用」,是一種分佈式系統中服務或節點之間的有效通訊機制。經過 RPC,某個節點(或客戶端)能夠很輕鬆的調用遠端(或服務端)的方法或服務,就像在本地調用同樣簡單。現有的不少 RPC 框架都要求暴露服務端地址,也就是須要知道服務器的 IP 和 RPC 端口。而本篇文章將介紹一種不須要暴露 IP 地址和端口的 RPC 通訊方式。這種方式是基於 Redis BRPOP/BLPOP 操做實現的延遲隊列,以及 Golang 中的 goroutine 協程異步機制,整個框架很是簡單和易於理解,同時也很高效、穩定和安全。這種方式已經應用到了 Crawlab 中的節點通訊當中,成爲了各節點即時傳輸信息的主要方式。下面咱們將從 Crawlab 早期節點通訊方案 PubSub 開始,介紹當時遇到的問題和解決方案,而後如何過渡到如今的 RPC 解決方案,以及它是如何在 Crawlab 中發揮做用的。前端

基於 PubSub 的方案

PubSub

早期的 Crawlab 是基於 Redis 的 PubSub,也就是發佈訂閱模式。這是 Redis 中主要用於一對多的單向通訊的方案。其用法很是簡單:node

  1. 訂閱者利用SUBSCRIBE channel1 channel2 ...來訂閱一個或多個頻道;
  2. 發佈者利用PUBLISH channelx message來發布消息給該頻道的訂閱者。

Redis的PubSub能夠用做廣播模式,即一個發佈者對應多個訂閱者。而在Crawlab中,咱們只有一個訂閱者對應一個發佈者的狀況(主節點->工做節點:nodes:<node_id>)或一個訂閱者對應多個發佈者的狀況(工做節點->主節點:nodes:master>)。這是爲了方便雙向通訊。git

如下爲節點通訊原理示意圖。github

各個節點會經過Redis的PubSub功能來作相互通訊。redis

所謂PubSub,簡單來講是一個發佈訂閱模式。訂閱者(Subscriber)會在Redis上訂閱(Subscribe)一個通道,其餘任何一個節點均可以做爲發佈者(Publisher)在該通道上發佈(Publish)消息。json

通訊架構

在 Crawlab 中,主節點會訂閱 nodes:master 通道,其餘節點若是須要向主節點發送消息,只須要向 nodes:master 發佈消息就能夠了。同理,各工做節點會各自訂閱一個屬於本身的通道 nodes:<node_id>node_id 是MongoDB裏的節點ID,是MongoDB ObjectId),若是須要給工做節點發送消息,只須要發佈消息到該通道就能夠了。後端

一個網絡請求的簡單過程以下:設計模式

  1. 客戶端(前端應用)發送請求給主節點(API);
  2. 主節點經過Redis PubSub<nodes:<node_id> 通道發佈消息給相應的工做節點;
  3. 工做節點收到消息以後,執行一些操做,並將相應的消息經過 <nodes:master> 通道發佈給主節點;
  4. 主節點收到消息以後,將消息返回給客戶端。

不是全部節點通訊都是雙向的,也就是說,主節點只會單方面對工做節點通訊,工做節點並不會返回響應給主節點,所謂的單向通訊。如下是Crawlab的通訊類型。安全

changoroutine

若是您在閱讀 Crawlab 源碼,會發現節點通訊中有大量的 chan 語法,這是 Golang 的一個併發特性。服務器

chan 表示爲一個通道,在 Golang 中分爲無緩衝和有緩衝的通道,咱們用了無緩衝通道來阻塞協程,只有當 chan 接收到信號(chan <- "some signal"),該阻塞纔會釋放,協程進行下一步操做)。在請求響應模式中,若是爲雙向通訊,主節點收到請求後會起生成一個無緩衝通道來阻塞該請求,當收到來自工做節點的消息後,向該無緩衝通道賦值,阻塞釋放,返回響應給客戶端。

go 命令會起一個 goroutine(協程)來完成併發,配合 chan,該協程能夠利用無緩衝通道掛起,等待信號執行接下來的操做。

基於延遲隊列的方案

PubSub 方案的問題

PubSub 這種消息訂閱-發佈設計模式是一種有效的實現節點通訊的方式,可是它有兩個問題:

  1. PubSub 的數據是即時的,會隨着 Redis 宕機而丟失;
  2. 寫基於 PubSub 的通訊服務會要求用到 goroutinechannel,這加大了開發難度,下降了可維護性。

其中,第二個問題是比較棘手的。若是咱們但願加入更多的功能,須要寫大量的異步代碼,這會加大系統模塊間的耦合度,形成擴展性不好,並且代碼閱讀起來很痛苦。

所以,爲了解決這個問題,咱們採用了基於 Redis 延遲消息隊列的 RPC 服務。

延遲隊列架構

下圖是基於延遲隊列架構的 RPC 實現示意圖。

每個節點都有一個客戶端(Client)和服務端(Server)。客戶端用於發送消息到目標節點(Target Node)並接收其返回的消息,服務端用於接收、處理源節點(Source Node)的消息並返回消息給源節點的客戶端。

整個 RPC 通訊的流程以下:

  1. 源節點的客戶端經過 LPUSH 將消息推送到 Redis 的 nodes:<node_id> 中,並執行 BRPOP nodes:<node_id>:<msg_id> 阻塞並監聽這個消息隊列;
  2. 目標節點的服務端經過 BRPOP 一直在監聽 nodes:<node_id>,收到消息後,經過消息中的 Method 字段執行對應的程序;
  3. 目標節點執行完畢後,服務端經過 LPUSH 將消息推送到 Redis 的 nodes:<node_id>:<msg_id> 中;
  4. 因爲源節點客戶端一直在監聽 nodes:<node_id>:<msg_id> 這個消息隊列,當目標節點服務端推送消息到這個隊列後,源節點客戶端將當即收到返回的消息,再作後續處理。

這樣,整個節點的通訊流程就經過 Redis 完成了。這樣作的好處在於不用暴露 HTTP 的 IP 地址和端口,只須要知道節點 ID 便可完成 RPC 通訊。

這樣設計後的 RPC 代碼比較容易理解和維護。每次須要擴展新的通訊類別時,只須要繼承 rpc.Service 類,實現 ClientHandle(客戶端處理方法)和 ServerHandle(服務端處理方法)方法就能夠了。

這裏多說一下 BRPOP。它將移出並獲取消息隊列的最後一個元素, 若是消息隊列沒有元素會阻塞隊列直到等待超時或發現可彈出元素爲止。所以,使用 BRPOP 命令相對於輪訓或其餘方式,能夠避免不間斷的請求 Redis,避免浪費網絡和計算資源。

若是對 Redis 的操做命令不熟悉的,能夠參考一下掘金小冊《Redis 深度歷險:核心原理與應用實踐》,這本小冊深刻介紹了 Redis 的原理以及工程實踐,對於應用 Redis 到實際開發中很是實用。

代碼實踐

講了這麼多理論知識,咱們仍是須要看看代碼的。老師常教育咱們:「Talk is cheap. Show me the code.」

因爲 Crawlab 後端是 Golang 開發的,要理解如下代碼須要一些 Golang 的基礎知識。

消息數據結構

首先咱們須要定一個傳輸消息的數據結構。代碼以下。

package entity

type RpcMessage struct {
	Id      string            `json:"id"`      // 消息ID
	Method  string            `json:"method"`  // 消息方法
	NodeId  string            `json:"node_id"` // 節點ID
	Params  map[string]string `json:"params"`  // 參數
	Timeout int               `json:"timeout"` // 超時
	Result  string            `json:"result"`  // 結果
	Error   string            `json:"error"`   // 錯誤
}
複製代碼

這裏,咱們定義了消息 ID、方法、節點 ID、參數等字段。消息 ID 是 UUID,保證了消息 ID 的惟一性。

基礎接口

首先,咱們定義一個抽象基礎接口,方便讓實際業務邏輯模塊繼承。服務端的處理邏輯在 ServerHandle 中,返回 entity 裏的 RpcMessage,而客戶端的邏輯在 ClientHandle 中。

// RPC服務基礎類
type Service interface {
	ServerHandle() (entity.RpcMessage, error)
	ClientHandle() (interface{}, error)
}
複製代碼

客戶端通用方法

當咱們調用客戶端的通用方法的時候,須要實現兩個邏輯:

  1. 發送消息:生成消息 ID,將消息序列化爲 JSON,LPUSH 推入 Redis 消息隊列;
  2. 經過 BRPOP 延遲獲取返回的消息,返回給調用方。

如下是實現的代碼。

// 客戶端處理消息函數
func ClientFunc(msg entity.RpcMessage) func() (entity.RpcMessage, error) {
	return func() (replyMsg entity.RpcMessage, err error) {
		// 請求ID
		msg.Id = uuid.NewV4().String()

		// 發送RPC消息
		msgStr := utils.ObjectToString(msg)
		if err := database.RedisClient.LPush(fmt.Sprintf("rpc:%s", msg.NodeId), msgStr); err != nil {
			log.Errorf("RpcClientFunc error: " + err.Error())
			debug.PrintStack()
			return replyMsg, err
		}

		// 獲取RPC回覆消息
		dataStr, err := database.RedisClient.BRPop(fmt.Sprintf("rpc:%s:%s", msg.NodeId, msg.Id), msg.Timeout)
		if err != nil {
			log.Errorf("RpcClientFunc error: " + err.Error())
			debug.PrintStack()
			return replyMsg, err
		}

		// 反序列化消息
		if err := json.Unmarshal([]byte(dataStr), &replyMsg); err != nil {
			log.Errorf("RpcClientFunc error: " + err.Error())
			debug.PrintStack()
			return replyMsg, err
		}

		// 若是返回消息有錯誤,返回錯誤
		if replyMsg.Error != "" {
			return replyMsg, errors.New(replyMsg.Error)
		}

		return
	}
}
複製代碼

服務端處理

服務端處理的邏輯以下,大體的邏輯是:

  1. 在一個循環中,經過 BRPOP 獲取該節點對應的消息;
  2. 當獲取到消息時,生成一個 goroutine 異步處理該消息;
  3. 繼續等待。

您能夠在 InitRpcService 這個方法中看到上述邏輯。私有方法 handleMsg 實現了序列化、調用服務端 RPC 服務方法、發送返回消息的邏輯。若是須要拓展 RPC 方法類型,在工廠類方法 GetService 裏添加就能夠了。

// 獲取RPC服務
func GetService(msg entity.RpcMessage) Service {
	switch msg.Method {
	case constants.RpcInstallLang:
		return &InstallLangService{msg: msg}
	case constants.RpcInstallDep:
		return &InstallDepService{msg: msg}
	case constants.RpcUninstallDep:
		return &UninstallDepService{msg: msg}
	case constants.RpcGetLang:
		return &GetLangService{msg: msg}
	case constants.RpcGetInstalledDepList:
		return &GetInstalledDepsService{msg: msg}
	}
	return nil
}

// 處理RPC消息
func handleMsg(msgStr string, node model.Node) {
	// 反序列化消息
	var msg entity.RpcMessage
	if err := json.Unmarshal([]byte(msgStr), &msg); err != nil {
		log.Errorf(err.Error())
		debug.PrintStack()
	}

	// 獲取service
	service := GetService(msg)

	// 根據Method調用本地方法
	replyMsg, err := service.ServerHandle()
	if err != nil {
		log.Errorf(err.Error())
		debug.PrintStack()
	}

	// 發送返回消息
	if err := database.RedisClient.LPush(fmt.Sprintf("rpc:%s:%s", node.Id.Hex(), replyMsg.Id), utils.ObjectToString(replyMsg)); err != nil {
		log.Errorf(err.Error())
		debug.PrintStack()
	}
}

// 初始化服務端RPC服務
func InitRpcService() error {
	go func() {
		for {
			// 獲取當前節點
			node, err := model.GetCurrentNode()
			if err != nil {
				log.Errorf(err.Error())
				debug.PrintStack()
				continue
			}

			// 獲取獲取消息隊列信息
			msgStr, err := database.RedisClient.BRPop(fmt.Sprintf("rpc:%s", node.Id.Hex()), 0)
			if err != nil {
				if err != redis.ErrNil {
					log.Errorf(err.Error())
					debug.PrintStack()
				}
				continue
			}

			// 處理消息
			go handleMsg(msgStr, node)
		}
	}()
	return nil
}
複製代碼

調用方法例子

Crawlab 的節點上常常須要爲爬蟲安裝一些第三方依賴,例如 pymongo、requests 等。而其中,咱們也須要直到某個節點上是否已經安裝了某個依賴,這須要跨服務器通訊,也就是須要在分佈式網絡中進行雙向通訊。而這個邏輯是經過 RPC 實現的。主節點向目標節點發起 RPC 調用,目標節點運行被調用方法,將運行結果也就是安裝的依賴列表返回給客戶端,客戶端再返回給調用者。

下面的代碼實現了獲取目標節點上已安裝的依賴的 RPC 方法。

// 獲取已安裝依賴服務
// 繼承Service基礎類
type GetInstalledDepsService struct {
	msg entity.RpcMessage
}

// 服務端處理方法
// 重載ServerHandle
func (s *GetInstalledDepsService) ServerHandle() (entity.RpcMessage, error) {
	lang := utils.GetRpcParam("lang", s.msg.Params)
	deps, err := GetInstalledDepsLocal(lang)
	if err != nil {
		s.msg.Error = err.Error()
		return s.msg, err
	}
	resultStr, _ := json.Marshal(deps)
	s.msg.Result = string(resultStr)
	return s.msg, nil
}

// 客戶端處理方法
// 重載ClientHandle
func (s *GetInstalledDepsService) ClientHandle() (o interface{}, err error) {
	// 發起 RPC 請求,獲取服務端數據
	s.msg, err = ClientFunc(s.msg)()
	if err != nil {
		return o, err
	}

	// 反序列化
	var output []entity.Dependency
	if err := json.Unmarshal([]byte(s.msg.Result), &output); err != nil {
		return o, err
	}
	o = output

	return
}
複製代碼

發起調用

寫好了 RPC 服務端和客戶端處理方法,就能夠輕鬆編寫調用邏輯了。如下是調用獲取遠端已安裝依賴列表的方法。首先由 GetService 工廠類獲取以前定義好的 GetInstalledDepsService,再調用其客戶端處理方法 ClientHandle,而後返回結果。這就像在本地調用方法同樣。是否是很簡單?

// 獲取遠端已安裝依賴
func GetInstalledDepsRemote(nodeId string, lang string) (deps []entity.Dependency, err error) {
	params := make(map[string]string)
	params["lang"] = lang
	s := GetService(entity.RpcMessage{
		NodeId:  nodeId,
		Method:  constants.RpcGetInstalledDepList,
		Params:  params,
		Timeout: 60,
	})
	o, err := s.ClientHandle()
	if err != nil {
		return
	}
	deps = o.([]entity.Dependency)
	return
}
複製代碼

結語

本篇文章主要介紹了一種基於 Redis 延遲隊列的 RPC 通訊方式,這種方式不用暴露各個節點或服務的 IP 地址或端口,是一種很是安全的方式。並且,這種方式已經用 Golang 在 Crawlab 中實現了雙向通訊,特別是 Golang 中的天生支持異步的 goroutine,讓這種方式的實現變得簡單。實際上,這種方式理論上是很是高效的,可以支持高併發數據傳輸。

可是,在 Crawlab 的實現中還存在一些隱患,也就是它並無限制服務端的處理併發數量。所以若是傳輸消息過多時,服務端資源會被佔滿,致使處理速度變慢甚至宕機的風險。修復方式是在服務端限制併發數量。另外,限於時間的緣由,做者尚未來得及測試這種 RPC 通訊方式的實際傳輸效率,容錯機制也沒有加入。所以總的來講還有很大的提高和優化空間。

雖然如此,這種方式對於 Crawlab 的低併發遠程通訊來講是足夠的了,在實際使用中也沒有出現問題,很是穩定。對於隱祕性有要求、但願不暴露地址信息的開發者,咱們也推薦將該種方式在實際應用中嘗試。

參考

相關文章
相關標籤/搜索