Storm 的編程模型是一個有向無環圖,模型角度決定了 Storm 的 Spout 接收到外部系統的請求,將請求數據分發給下游的 bolt 進行處理後,spout 並不能獲得 bolt 的處理結果並將結果返回給外部請求。因此應用場景中 Storm 對外部系統的調用都是採用回調的方式:html
但假若有一個需求:項目要接入各大銀行的系統中,經過要求對方提供一個回調接口來實現同步是不可能的。必須依靠本身去實現同步請求響應,外部系統將消息發往storm實時平臺,而後外部系統會阻塞,等待storm實時平臺處理完後將結果返回給外部系統。這就要用到 DRPC了。java
DRPC設計目的是爲了充分利用Storm的計算能力實現高密度的並行實時計算。經過一個 DRPC Server 負責接收 RPC 請求,並將該請求發送到 Storm 中運行的 Topology,等待接收 Topology 發送的處理結果,並將該結果返回給發送請求的客戶端。編程
一個 DRPC請求過程:客戶端程序向 DRPC Server 發送要執行的函數名稱和該函數的參數。DRPC Server 將函數調用放到隊列中,並用一個唯一的id標記,具有 DRPC功能的拓撲會使用一個 DRPCSpout 拉取 。Topo 計算好結果後會由一個名爲 ReturnResults 的 bolt 去 鏈接 DRPC Server 給出對應函數調用id的結果,而後 DRPC Serve 根據 ID 找到等待中的客戶端,爲等待中的客戶端消除阻塞,併發送結果給客戶端。具體工做流程以下圖所示:segmentfault
從一個客戶端的角度來看,一個分佈式RPC調用就像是一個常規的RPC調用。只須要傳輸服務名和請求參數便可。設計模式
實際就是個同步的、向遠程系統發送socket請求並獲得遠程系統處理的結果的分佈式調用而已。架構
由上面的架構圖能夠發現,DRPC Server 至關於一個第三方服務:併發
DRPC Server 要同時協調三個不一樣的程序的請求,經過源碼可知其經過定義 Thrift 接口完成了進程間的通訊,下面來詳解每一個過程。框架
因爲 Strom 的 drpc 是經過 thrift 框架 進行 rpc調用的,因此先查看 strom.thrift。有兩個 thrift 接口: DistributedRPC 和 DistributedRPCInvocations 。socket
service DistributedRPC { // 請求 drpc 方法 string execute(1: string functionName, 2: string funcArgs) throws (1: DRPCExecutionException e, 2: AuthorizationException aze); } service DistributedRPCInvocations { // 返回 業務topo 處理結果給 DRPCServer void result(1: string id, 2: string result) throws (1: AuthorizationException aze); // 業務topo 拉取 DRPCServer從客戶端接收到的請求 DRPCRequest fetchRequest(1: string functionName) throws (1: AuthorizationException aze); void failRequest(1: string id) throws (1: AuthorizationException aze); void failRequestV2(1: string id, 2: DRPCExecutionException e) throws (1: AuthorizationException aze); } struct DRPCRequest { 1: required string func_args; 2: required string request_id; }
須要查看對兩個 thrift 接口的具體實現邏輯,只要查看接口的實現類便可,DRPC Server 中的具體實現類是 DRPCThrift,它同時實現了兩個接口中的方法,即處理 DRPC客戶端的請求,又處理 DRPC業務Topo拉請求的請求。分佈式
進行 DRPC調用的第一步是 客戶端調用 execute(name, args) ,DRPC Server 的 execute( ) 會對該請求作以下處理:
public class DRPCThrift implements DistributedRPC.Iface, DistributedRPCInvocations.Iface { // 構造方法注入 private final DRPC drpc; //請求隊列 <requestName, request queue>,將請求排隊給業務topo消費,Waiting to be fetched ConcurrentHashMap<String, ConcurrentLinkedQueue<OutstandingRequest>> _queues = new ConcurrentHashMap<>(); //結果map <requestId, request>,用來接收結果返回給客戶端,Waiting to be returned _requests = new ConcurrentHashMap<>(); @Override public String execute(String functionName, String funcArgs) throws DRPCExecutionException, AuthorizationException { return drpc.executeBlocking(functionName, funcArgs); } } public class DRPC implements AutoCloseable { public String executeBlocking(String functionName, String funcArgs) throws DRPCExecutionException, AuthorizationException { String id = nextId(); T req = BlockingOutstandingRequest .FACTORY .mkRequest(functionName, new DRPCRequest(funcArgs, id)); _requests.put(id, req); ConcurrentLinkedQueue<OutstandingRequest> q = getQueue(functionName); q.add(req); try { return req.getResult(); } finally { cleanup(req.getRequest().get_request_id()); } } }
DRPCSpout 做爲 thrift客戶端 經過調用 fetchRequest() 拉取請求,這裏須要轉換一下思惟,DRPCThrift 依然做爲 thrift 服務端,因此 DRPCThrift 要實現兩個接口。
所以 DRPCSpout 使用的是 DRPCInvocationsClient extends ThriftClient implements DistributedRPCInvocations.Iface,在 nextTuple() 中不斷調用 client.fetchRequest(function); 獲得 DRPC客戶端 的請求來處理。
後面的流程就進入咱們編寫的業務 Topo 中了,經過 LinearDRPCTopologyBuilder 的builder.createRemoteTopology()
來構建線性的drpc topo
,該topo
的鏈路爲:spout -> PrepareRequest Bolt-> 用戶bolt1 -> 用戶bolt2 -> JoinResult Bolt -> ReturnResults Bolt
其中 JoinResult Bolt ,用兩個 Map 分別記錄 PrepareRequest 收到的請求 Id,最後一個業務Topo處理後的請求 Id,這兩個Id是同樣的,當兩個Id都在 Map中時就認爲該 DRPC請求完成,則繼續發送給 ReturnResults Bolt 。
最後 ReturnResults Bolt 經過調用 client.result(id, result); 用於返回 Topo 處理結果,在 DRPC 類中 returnResult() 的具體實現:
# DRPC類 public void returnResult(String id, String result) { OutstandingRequest req = _requests.get(id); if (req != null) { req.returnResult(result); } } } # BlockingOutstandingRequest 類 public void returnResult(String result) { _result = result; _sem.release(); }
閱讀源碼的過程當中對 DRPCSput 的 client.fetchRequest(function);
鏈路不清楚,想看它的服務端業務是怎麼實現的,點進去看到的是 DRPCInvocationsClient 的 fetchRequest() ,這裏c.fetchRequest(func);
竟然是直接又用 thrift客戶端調 fetchRequest()?看:
public class DRPCInvocationsClient extends ThriftClient implements DistributedRPCInvocations.Iface { // 構造方法 client.set(new DistributedRPCInvocations.Client(protocol)); // # DRPCInvocationsClient @Override public DRPCRequest fetchRequest(String func) { DistributedRPCInvocations.Client c = client.get(); if (c == null) { throw new TException("Client is not connected..."); } // 這裏是真正的客戶端請求,DistributedRPCInvocations.Client 是 thrift 抽象的客戶端 return c.fetchRequest(func); } }
小夥子,你會覺得fetchRequest(func)
是重複的 thrift客戶端調用,說明你對 DRPCInvocationsClient
類不熟,對設計模式也不熟啊!!首先,DRPCInvocationsClient
和DistributedRPCInvocations.Client
同樣,都實現DistributedRPCInvocations.Iface
,你就根據僅有的thrift
知識,覺得實現了DistributedRPCInvocations.Iface
接口的都是要寫服務端業務邏輯的;其實這裏DRPCInvocationsClient
只是用了靜態代理模式,對 DistributedRPCInvocations.Client
的代理而已,對各方法多了異常處理啊,真正的客戶端請求確實是c.fetchRequest(func);
既然這樣,那就看還有什麼類是實現了DistributedRPCInvocations.Iface
接口的,就是c.fetchRequest(func);
對應的服務端相應邏輯,就在DRPCThrift implements DistributedRPC.Iface, DistributedRPCInvocations.Iface
,同時實現了兩個 drpc 的接口進行所有方法實現,具體邏輯在DRPCThrift
的成員變量DRPC
類中!
class DRPC { // DRPCSpout 中調用的 fetchRequest,實際調用的是這裏。 public DRPCRequest fetchRequest(String functionName) throws AuthorizationException { meterFetchRequestCalls.mark(); checkAuthorizationNoLog("fetchRequest", functionName); ConcurrentLinkedQueue<OutstandingRequest> q = getQueue(functionName); OutstandingRequest req = q.poll(); if (req != null) { //Only log accesses that fetched something logAccess("fetchRequest", functionName); req.fetched(); DRPCRequest ret = req.getRequest(); return ret; } return NOTHING_REQUEST; } }