Storm同步調用之DRPC模型探討

  摘要:Storm的編程模型是一個有向無環圖,決定了storm的spout接收到外部系統的請求後,spout並不能獲得bolt的處理結果並將結果返回給外部請求。因此也就決定了storm沒法提供對外部系統的同步調用功能。html

 

  最近新的黑名單項目須要在storm實時計算平臺上提供對外部系統請求調用的同步響應(也就是讓storm支持同步調用而不是回調),而Storm的編程模型是一個有向無環圖,也就決定了storm的spout接收到外部系統的請求後,將請求數據分發給下游的bolt進行處理後,spout並不能獲得bolt的處理結果並將結果返回給外部請求。redis

  在傳統也就是業界大部分應用場景storm對外部系統的調用都是採用回調的方式。本人以前參與的某4000萬用戶,日均1000萬交易量的信用卡中心也是採用回調的方式。編程

 

原文和做者一塊兒討論:http://www.cnblogs.com/intsmaze/p/7602242.htmltomcat

        

      storm常見回調設計方案

  首先jetty,tomcat等啓動服務,接收外部系統的請求,將請求獲得的數據發往kafka,activeMQ等消息隊列中,就立馬響應給外部系統。服務器

  而後storm實時平臺去消息隊列中拉取數據並進行分佈式並行處理,而後將運算完的結果存入第三方存儲介質(外部系統直接經過讀取該介質獲取結果)或者調用外部系統的接口將處理的結果推送出去(以回調的方式實現僞同步請求)。多線程

  

   目前的需求

  如今的項目是一個產品,要接入各大銀行的系統中,因此經過要求對方提供一個回調接口來實現同步是不可能的。必須依靠本身去實現同步請求響應,外部系統將消息發往storm實時平臺,而後外部系統會阻塞,等待storm實時平臺處理完後將結果返回給外部系統。

   這個時候固然就是去storm的官網去看看有沒有對應的高級接口,果不其然看到了DRPC,熟悉RPC的就知道就是遠程過程調用,就是向遠程系統發送socket請求並獲得遠程系統處理的結果,那麼DPRC也就是分佈式遠程過程調用而已,那麼他就必定提供了同步請求響應的功能。併發

   關於DRPC在文章末尾會簡單演示一下,這裏重點說下我對storm的DRPC的原理理解。上面我也說了storm的編程模型是一個有向無環圖,從模型的角度來講是不可能支持同步請求的功能的。socket

   本身如何基於storm實現同步調用

   我也本身思考下,若是是我本身會如何在現有的storm的編程模型下如何實現同步調用。
    
   方案一:你們最容易想到的方案就是,在storm的拓撲的spout節點中new ServerSocket(8080),來接收外部系統的請求,而後將請求的數據分發給下游的bolt處理,處理完後將結果返回給外部系統。 

  問題一:storm的計算模型的拓撲結構是一個有向無環圖,處理的結果並不會返回給spout節點。
分佈式

  我可讓bolt將處理的結果存入redis,而後spout不斷輪詢去redis讀取對應的結果並返回!函數

  貌似能夠,可是查看spout的調用源代碼會發現,若是這樣會致使spout的吞吐量降低,由於spout只有從redis輪詢到當次請求的處理結果後纔會在循環調用nextTuple()方法,固然在spout實現類中開啓多線程後,貌似能夠解決nextTuple方法阻塞(具體沒有去想,由於自己這個方案不可行了,就沒必須去掉頭髮了)storm的任務中再去開多線程是無效率的,還不如不選擇storm技術。

  問題二:spout節點啓動的機器是不固定的,ip是會變化的,則對外部系統調用時ip的維護帶來了麻煩,因此這種方案不可取。

public void nextTuple() { 獲取請求的數據 collector.emit(); while(true) { 去redis中讀取該次請求的結果,讀到則結束循環 } }

   

  方案二:拋開storm實時平臺,單獨開發一套中轉程序,負責接收外部系統的請求,將外部請求的參數存入一個先進先出的隊列中,阻塞等待storm處理的結果。storm拓撲的spout中建立socket去鏈接中轉程序,中轉程序從隊列中拿出請求參數返回給spout。spout獲取到請求參數後,將參數傳給下游的bolt去計算,下游的最後一層bolt計算完也建立socke去鏈接中轉程序並將結果發送給中轉程序。中轉程序得到bolt返回結果,存入某個地方,而後中轉程序中阻塞的地方輪詢獲得結果後,就結束輪詢響應給外部系統了。

 

  固然這只是一個簡單的方案設計,具體還有不少細節設計以及考慮在咱們的Server端,由於它要同時協調三個不一樣的程序的請求,而且可以根據以每個請求自動聚合外部系統請求,spout請求,bolt請求爲一組。

  Storm的DRPC概述

  storm的DRPC其實就實現外部系統同步調用storm實時平臺的功能組件了。應該不須要我去從零開發了。接下來就看看storm的DPRC功能是否和我當初的想法是否一致!

  官方話語:

  分佈式RPC(DRPC)背後的思想是將真正強大功能的計算與storm的計算並行化。Storm拓撲以一個函數參數的流做爲輸入,它向每一個函數調用發出一個輸出流的結果。

  分佈式RPC(DRPC)的真正目的是使用storm實時並行計算極端功能。Storm拓撲須要一個輸入流做爲函數參數,以一個輸出流的形式發射每一個函數調用的結果。。從一個客戶端的角度來看,一個分佈式RPC調用就像是一個常規的RPC調用。

  分佈式RPC工做流程以下圖所示:

 

  客戶端程序會向啓動的DRPC服務器發送要執行的函數名稱和該函數的參數。具有DRPC功能的拓撲會使用一個DRPCSpout接收來自DRPC服務器傳來的函數調用流。每一個函數調用都用一個唯一的id標記在DRPC服務器上。拓撲計算好結果後會由一個名爲ReturnResults的bolt去鏈接DRPC服務器給出對應函數調用id的結果,而後DRPC服務器根據ID找到等待中的客戶端,爲等待中的客戶端消除阻塞,併發送結果給客戶端。

  從一個客戶端的角度來看,一個分佈式RPC調用就像是一個常規的RPC調用。

public class Client {
    public static void main(String[] args) throws TException,
            DRPCExecutionException {
        DRPCClient client = new DRPCClient("192.168.19.131", 3772);
        for (int i = 0; i < 10; i++) {
            System.out.println(i);
             String result = client.execute("method_name","param is intsmaze--"+i+"---");
            System.out.println(result);
        }
        client.close();
    }
}

   下一篇將會重點講解如何運行storm的drpc示例,並剖析它的內部實現原理來驗證是否和本文的猜測一致。

相關文章
相關標籤/搜索