StormDRPC流程解讀

Storm 的編程模型是一個有向無環圖,模型角度決定了 Storm 的 Spout 接收到外部系統的請求,將請求數據分發給下游的 bolt 進行處理後,spout 並不能獲得 bolt 的處理結果並將結果返回給外部請求。因此應用場景中 Storm 對外部系統的調用都是採用回調的方式:html

  1. 接收外部系統的請求,將請求獲得的數據發到消息隊列中,就立馬響應給外部系統。
  2. 而後 Storm 實時平臺去消息隊列中拉取數據並進行分佈式並行處理,再將運算完的結果存入第三方存儲介質(外部系統直接經過讀取該介質獲取結果)或者調用外部系統的接口將處理的結果推送出去(以回調的方式實現僞同步請求)。

但假若有一個需求:項目要接入各大銀行的系統中,經過要求對方提供一個回調接口來實現同步是不可能的。必須依靠本身去實現同步請求響應,外部系統將消息發往storm實時平臺,而後外部系統會阻塞,等待storm實時平臺處理完後將結果返回給外部系統。這就要用到 DRPC了。java

Strom DRPC

DRPC設計目的是爲了充分利用Storm的計算能力實現高密度的並行實時計算。經過一個 DRPC Server 負責接收 RPC 請求,並將該請求發送到 Storm 中運行的 Topology,等待接收 Topology 發送的處理結果,並將該結果返回給發送請求的客戶端。編程

一個 DRPC請求過程:客戶端程序向 DRPC Server 發送要執行的函數名稱和該函數的參數。DRPC Server 將函數調用放到隊列中,並用一個唯一的id標記,具有 DRPC功能的拓撲會使用一個 DRPCSpout 拉取 。Topo 計算好結果後會由一個名爲 ReturnResults 的 bolt 去 鏈接 DRPC Server 給出對應函數調用id的結果,而後 DRPC Serve 根據 ID 找到等待中的客戶端,爲等待中的客戶端消除阻塞,併發送結果給客戶端。具體工做流程以下圖所示:segmentfault

img

從一個客戶端的角度來看,一個分佈式RPC調用就像是一個常規的RPC調用。只須要傳輸服務名和請求參數便可。設計模式

實際就是個同步的、向遠程系統發送socket請求並獲得遠程系統處理的結果的分佈式調用而已。架構

DRPC請求流程

由上面的架構圖能夠發現,DRPC Server 至關於一個第三方服務:併發

  1. 負責接收外部系統的請求,將外部請求的參數存入一個先進先出的隊列中,阻塞等待 Storm Topo 處理的結果。
  2. Storm Topo 的 spout 中建立 socket 去鏈接中轉程序,spout 不斷拿隊列中請求參數來處理。
  3. spout獲取到請求參數後,將參數傳給下游的bolt去計算,下游的最後一層bolt計算完也建立socke去鏈接中轉程序並將結果發送給中轉程序。
  4. 中轉程序中阻塞的地方輪詢獲得結果後,就結束輪詢響應給外部系統了。

 DRPC Server 要同時協調三個不一樣的程序的請求,經過源碼可知其經過定義 Thrift 接口完成了進程間的通訊,下面來詳解每一個過程。框架

0. Thrift接口

因爲 Strom 的 drpc 是經過 thrift 框架 進行 rpc調用的,因此先查看 strom.thrift。有兩個 thrift 接口: DistributedRPC 和 DistributedRPCInvocations 。socket

  • DistributedRPC:定義 DRPC客戶端 和 DRPC Server端 之間的調用方法 execute(),暴露給 業務客戶端使用;
  • DistributedRPCInvocations:定義 DRPC Server端 和 服務端邏輯處理Topo 之間的 拉取請求參數 以及 返回結果的 方法;
service DistributedRPC {
  // 請求 drpc 方法  
  string execute(1: string functionName, 2: string funcArgs) throws (1: DRPCExecutionException e, 2: AuthorizationException aze);
}

service DistributedRPCInvocations {
  // 返回 業務topo 處理結果給 DRPCServer
  void result(1: string id, 2: string result) throws (1: AuthorizationException aze);
  // 業務topo 拉取 DRPCServer從客戶端接收到的請求
  DRPCRequest fetchRequest(1: string functionName) throws (1: AuthorizationException aze);
  void failRequest(1: string id) throws (1: AuthorizationException aze);  
  void failRequestV2(1: string id, 2: DRPCExecutionException e) throws (1: AuthorizationException aze);  
}

struct DRPCRequest {
  1: required string func_args;
  2: required string request_id;
}

須要查看對兩個 thrift 接口的具體實現邏輯,只要查看接口的實現類便可,DRPC Server 中的具體實現類是 DRPCThrift,它同時實現了兩個接口中的方法,即處理 DRPC客戶端的請求,又處理 DRPC業務Topo拉請求的請求。分佈式

1. execute( ) 接收客戶端請求

進行 DRPC調用的第一步是 客戶端調用 execute(name, args) ,DRPC Server 的 execute( ) 會對該請求作以下處理:

  1. 將請求封裝成 BlockingOutstandingRequest req,而後 req.getResult() 使用 req 內部的一個Semaphore 實現 acquire() 請求阻塞,直到 DRPC Server 端接到 業務topo 的結果才 release。
  2. 將請求添加到隊列中,等待 業務 topo進行拉取消費。
public class DRPCThrift implements DistributedRPC.Iface, DistributedRPCInvocations.Iface {
    // 構造方法注入 
    private final DRPC drpc;
    //請求隊列 <requestName, request queue>,將請求排隊給業務topo消費,Waiting to be fetched
    ConcurrentHashMap<String, ConcurrentLinkedQueue<OutstandingRequest>> _queues =
        new ConcurrentHashMap<>();
    //結果map <requestId, request>,用來接收結果返回給客戶端,Waiting to be returned
     _requests = new ConcurrentHashMap<>();
    
    @Override
    public String execute(String functionName, String funcArgs)
        throws DRPCExecutionException, AuthorizationException {
        return drpc.executeBlocking(functionName, funcArgs);
    }
}

public class DRPC implements AutoCloseable {
    public String executeBlocking(String functionName, String funcArgs) throws DRPCExecutionException, AuthorizationException {
        String id = nextId();
        T req = BlockingOutstandingRequest
            .FACTORY
            .mkRequest(functionName, new DRPCRequest(funcArgs, id));
        _requests.put(id, req);
        ConcurrentLinkedQueue<OutstandingRequest> q = getQueue(functionName);
        q.add(req);
        try {
            return req.getResult();
        } finally {
            cleanup(req.getRequest().get_request_id());
        }
    }
}

2. DRPCSpout 拉取請求

DRPCSpout 做爲 thrift客戶端 經過調用 fetchRequest() 拉取請求,這裏須要轉換一下思惟,DRPCThrift 依然做爲 thrift 服務端,因此 DRPCThrift 要實現兩個接口。

所以 DRPCSpout 使用的是 DRPCInvocationsClient extends ThriftClient implements DistributedRPCInvocations.Iface,在 nextTuple() 中不斷調用 client.fetchRequest(function); 獲得 DRPC客戶端 的請求來處理。

3. 業務Topo鏈路

後面的流程就進入咱們編寫的業務 Topo 中了,經過 LinearDRPCTopologyBuilder 的builder.createRemoteTopology() 來構建線性的drpc topo,該topo的鏈路爲:spout -> PrepareRequest Bolt-> 用戶bolt1 -> 用戶bolt2 -> JoinResult Bolt -> ReturnResults Bolt

其中 JoinResult Bolt ,用兩個 Map 分別記錄 PrepareRequest 收到的請求 Id,最後一個業務Topo處理後的請求 Id,這兩個Id是同樣的,當兩個Id都在 Map中時就認爲該 DRPC請求完成,則繼續發送給 ReturnResults Bolt 。

最後 ReturnResults Bolt 經過調用 client.result(id, result); 用於返回 Topo 處理結果,在 DRPC 類中 returnResult() 的具體實現:

  1. 從 _requests Map 拿出對應的請求,而後將 result 注入進去,同時 _sem.release(); 將信號量釋放,去掉對請求的阻塞;
  2. 在 DRPC 中的 req.getResult(); 將再也不被阻塞,馬上將 DRPC Server 請求返回給客戶端。
# DRPC類
public void returnResult(String id, String result) {
        OutstandingRequest req = _requests.get(id);
        if (req != null) {
            req.returnResult(result);
        }
    }
}

# BlockingOutstandingRequest 類
public void returnResult(String result) {
        _result = result;
        _sem.release();
}

須要注意的問題

相關類職責

  • DRPCServer:DRPC Server上帝類,啓動用於執行 DRPC 請求的 ThriftServer 端實例,會啓動兩個 ThriftServer
    • handlerServer 用於接收 DRPC客戶端的請求;
    • invokeServer 用於接收 求通過Topo進行業務處理後的 result,而後返回給 handlerServer;
  • ThriftServer:對 Thrift 框架 服務端 啓動、中止 操做的封裝;
  • DRPCThrift:DRPC Server 的業務類,包裝 DRPC類的調用;
  • DRPC:DRPC Server 的業務類

thrift客戶端到服務端調用鏈路問題

閱讀源碼的過程當中對 DRPCSput 的 client.fetchRequest(function);鏈路不清楚,想看它的服務端業務是怎麼實現的,點進去看到的是 DRPCInvocationsClient 的 fetchRequest() ,這裏c.fetchRequest(func);竟然是直接又用 thrift客戶端調 fetchRequest()?看:

public class DRPCInvocationsClient extends ThriftClient implements DistributedRPCInvocations.Iface {
    // 構造方法
    client.set(new DistributedRPCInvocations.Client(protocol));
    
    // # DRPCInvocationsClient 
    @Override
    public DRPCRequest fetchRequest(String func) {
        DistributedRPCInvocations.Client c = client.get();
        if (c == null) {
            throw new TException("Client is not connected...");
        }
        // 這裏是真正的客戶端請求,DistributedRPCInvocations.Client 是 thrift 抽象的客戶端
        return c.fetchRequest(func);
    }
}

小夥子,你會覺得fetchRequest(func)是重複的 thrift客戶端調用,說明你對 DRPCInvocationsClient 類不熟,對設計模式也不熟啊!!首先,DRPCInvocationsClientDistributedRPCInvocations.Client同樣,都實現DistributedRPCInvocations.Iface,你就根據僅有的thrift知識,覺得實現了DistributedRPCInvocations.Iface接口的都是要寫服務端業務邏輯的;其實這裏DRPCInvocationsClient只是用了靜態代理模式,對 DistributedRPCInvocations.Client的代理而已,對各方法多了異常處理啊,真正的客戶端請求確實是c.fetchRequest(func);

既然這樣,那就看還有什麼類是實現了DistributedRPCInvocations.Iface接口的,就是c.fetchRequest(func);對應的服務端相應邏輯,就在DRPCThrift implements DistributedRPC.Iface, DistributedRPCInvocations.Iface,同時實現了兩個 drpc 的接口進行所有方法實現,具體邏輯在DRPCThrift的成員變量DRPC類中!

class DRPC {
    //  DRPCSpout 中調用的 fetchRequest,實際調用的是這裏。
    public DRPCRequest fetchRequest(String functionName) throws AuthorizationException {
        meterFetchRequestCalls.mark();
        checkAuthorizationNoLog("fetchRequest", functionName);
        ConcurrentLinkedQueue<OutstandingRequest> q = getQueue(functionName);
        OutstandingRequest req = q.poll();
        if (req != null) {
            //Only log accesses that fetched something
            logAccess("fetchRequest", functionName);
            req.fetched();
            DRPCRequest ret = req.getRequest();
            return ret;
        }
        return NOTHING_REQUEST;
    }
}

參考

相關文章
相關標籤/搜索