4、Storm入門之Spout

可靠的消息 VS 不可靠的消息html

在設計拓撲結構時,始終在頭腦中記着的一件重要事情就是消息的可靠性。當有沒法處理的消息時,你就要決定該怎麼辦,以及做爲一個總體的拓撲結構該作些什麼。舉個例子,在處理銀行存款時,不要丟失任何事務報文就是很重要的事情。可是若是你要統計分析數以百萬的tweeter消息,即便有一條丟失了,仍然能夠認爲你的結果是準確的。java

對於Storm來講,根據每一個拓撲的須要擔保消息的可靠性是開發者的責任。這就涉及到消息可靠性和資源消耗之間的權衡。高可靠性的拓撲必須管理丟失的消息,必然消耗更多資源;可靠性較低的拓撲可能會丟失一些消息,佔用的資源也相應更少。不論選擇什麼樣的可靠性策略,Storm都提供了不一樣的工具來實現它。git

要在spout中管理可靠性,你能夠在分發時包含一個元組的消息ID(collector.emit(new Values(…),tupleId))。在一個元組被正確的處理時調用ack方法,而在失敗時調用fail方法。當一個元組被全部的靶bolt和錨bolt處理過,便可斷定元組處理成功(你將在第5章學到更多錨bolt知識)。

發生下列狀況之一時爲元組處理失敗:github

  • 提供數據的spout調用collector.fail(tuple)web

  • 處理時間超過配置的超時時間redis

讓咱們來看一個例子。想象你正在處理銀行事務,需求以下:apache

  • 若是事務失敗了,從新發送消息json

  • 若是失敗了太屢次,終結拓撲運行api

建立一個spout和一個boltspout隨機發送100個事務ID,有80%的元組不會被bolt收到(你能夠在例子ch04-spout查看完整代碼)。實現spout時利用Map分發事務消息元組,這樣就比較容易實現重發消息。緩存

public void nextTuple() {

    if(!toSend.isEmpty()){
        for(Map.Entry<Integer, String> transactionEntry : toSend.entrySet()){

            Integer transactionId = transactionEntry.getKey();

            String transactionMessage = transactionEntry.getValue();

            collector.emit(new Values(transactionMessage),transactionId);

        }

        toSend.clear();

    }}

若是有未發送的消息,獲得每條事務消息和它的關聯ID,把它們做爲一個元組發送出去,最後清空消息隊列。值得一提的是,調用map的clear是安全的,由於nextTuple失敗時,只有ack方法會修改map,而它們都運行在一個線程內。

維護兩個map用來跟蹤待發送的事務消息和每一個事務的失敗次數。ack方法只是簡單的把事務從每一個列表中刪除。

public void ack(Object msgId) {

    messages.remove(msgId);

    failCounterMessages.remove(msgId);
}

fail方法決定應該從新發送一條消息,仍是已經失敗太屢次而放棄它。

NOTE:若是你使用所有數據流組,而拓撲裏的全部bolt都失敗了,spoutfail方法纔會被調用。

public void fail(Object msgId) {

    Integer transactionId = (Integer) msgId;

    //檢查事務失敗次數

    Integer failures = transactionFailureCount.get(transactionId) + 1;

    if(failes >= MAX_FAILS){

        //失敗數過高了,終止拓撲

        throw new RuntimeException("錯誤, transaction id 【"+

         transactionId+"】 已失敗太屢次了 【"+failures+"】");

    }

    //失敗次數沒有達到最大數,保存這個數字並重發此消息

    transactionFailureCount.put(transactionId, failures);

    toSend.put(transactionId, messages.get(transactionId));

    LOG.info("重發消息【"+msgId+"】");
 }

首先,檢查事務失敗次數。若是一個事務失敗次數太多,經過拋出RuntimeException終止發送此條消息的工人。不然,保存失敗次數,並把消息放入待發送隊列(toSend),它就會再次調用nextTuple時得以從新發送。
NOTE:Storm節點不維護狀態,所以若是你在內存保存信息(就像本例作的那樣),而節點又不幸掛了,你就會丟失全部緩存的消息。
Storm是一個快速失敗的系統。拓撲會在拋出異常時掛掉,而後再由Storm重啓,恢復到拋出異常前的狀態。

獲取數據

接下來你會了解到一些設計spout的技巧,幫助你從多數據源獲取數據。

直接鏈接

在一個直接鏈接的架構中,spout直接與一個消息分發器鏈接(見圖4-1)。

圖4-1直接鏈接的spout

圖4-1 直接鏈接的spout

這個架構很容易實現,尤爲是在消息分發器是已知設備或已知設備組時。已知設備知足:拓撲從啓動時就已知道該設備,並貫穿拓撲的整個生命週期保持不變。未知設備就是在拓撲運行期添加進來的。已知設備組就是從拓撲啓動時組內全部設備都是已知的。

下面舉個例子說明這一點。建立一個spout使用Twitter流API讀取twitter數據流。spout把API看成消息分發器直接鏈接。從數據流中獲得符合track參數的公共tweets(參考twitter開發頁面)。完整的例子能夠在連接https://github.com/storm-book/examples-ch04-spouts/找到。

spout從配置對象獲得鏈接參數(track,user,password),並鏈接到API(在這個例子中使用ApacheDefaultHttpClient)。它一次讀一行數據,並把數據從JSON轉化成Java對象,而後發佈它。

public void nextTuple() {

    //建立http客戶端

    client = new DefaultHttpClient();

    client.setCredentialsProvider(credentialProvider);

    HttpGet get = new HttpGet(STREAMING_API_URL+track);
    HttpResponse response;

    try {

        //執行http訪問

        response = client.execute(get);

        StatusLine status = response.getStatusLine();

        if(status.getStatusCode() == 200){

            InputStream inputStream = response.getEntity().getContent();

            BufferedReader reader = new BufferedReader(new InputStreamReader(inputStream));

            String in;

            //逐行讀取數據

            while((in = reader.readLine())!=null){

                try{

                    //轉化併發布消息

                    Object json = jsonParser.parse(in);

                    collector.emit(new Values(track,json));

                }catch (ParseException e) {

                    LOG.error("Error parsing message from twitter",e);

                }

            }

        }

    } catch (IOException e) {

        LOG.error("Error in communication with twitter api ["+get.getURI().toString()+"], 

           sleeping 10s");

        try {

            Thread.sleep(10000);

        } catch (InterruptedException e1) {}

    }
  }

NOTE:在這裏你鎖定了nextTuple方法,因此你永遠也不會執行ackfail方法。在真實的應用中,咱們推薦你在一個單獨的線程中執行鎖定,並維持一個內部隊列用來交換數據(你會在下一個例子中學到如何實現這一點:消息隊列)。

棒極了!
如今你用一個spout讀取Twitter數據。一個明智的作法是,採用拓撲並行化,多個spout從同一個流讀取數據的不一樣部分。那麼若是你有多個流要讀取,你該怎麼作呢?Storm的第二個有趣的特性(譯者注:第一個有趣的特性已經出現過,這句話原文都是同樣的,不過按照中文的行文習慣仍是不重複使用措詞了)是,你能夠在任意組件內(spouts/bolts)訪問TopologyContext。利用這一特性,你可以把流劃分到多個spouts讀取。

public void open(Map conf, TopologyContext context,

          SpoutOutputCollector collector) {

    //從context對象獲取spout大小

    int spoutsSize = context.getComponentTasks(context.getThisComponentId()).size();

    //從這個spout獲得任務id

    int myIdx = context.getThisTaskIndex();

    String[] tracks = ((String) conf.get("track")).split(",");

    StringBuffer tracksBuffer = new StringBuffer();

    for(int i=0; i< tracks.length;i++){

        //Check if this spout must read the track word

        if( i % spoutsSize == myIdx){

            tracksBuffer.append(",");

            tracksBuffer.append(tracks[i]);

        }

    }

    if(tracksBuffer.length() == 0) {

        throw new RuntimeException("沒有爲spout獲得track配置" +

 " [spouts大小:"+spoutsSize+", tracks:"+tracks.length+"] tracks的數量必須高於spout的數量");

 this.track =tracksBuffer.substring(1).toString();

    }

 ...

 }

利用這一技巧,你能夠把collector對象均勻的分配給多個數據源,固然也能夠應用到其它的情形。好比說,從web服務器收集日誌文件見圖4-2

圖4-2直連hash

圖4-2 直連hash

經過上一個例子,你學會了從一個spout鏈接到已知設備。你也可使用相同的方法鏈接未知設備,不過這時你須要藉助於一個協同系統維護的設備列表。協同系統負責探察列表的變化,並根據變化建立或銷燬鏈接。好比,從web服務器收集日誌文件時,web服務器列表可能隨着時間變化。當添加一臺web服務器時,協同系統探查到變化併爲它建立一個新的spout。見圖4-3

圖4-3直連協同

圖4-3 直連協同

消息隊列

第二種方法是,經過一個隊列系統接收來自消息分發器的消息,並把消息轉發給spout。更進一步的作法是,把隊列系統做爲spout和數據源之間的中間件,在許多狀況下,你能夠利用多隊列系統的重播能力加強隊列可靠性。這意味着你不須要知道有關消息分發器的任何事情,並且添加或移除分發器的操做比直接鏈接簡單的多。這個架構的問題在於隊列是一個故障點,另外你還要爲處理流程引入新的環節。

圖4-4展現了這一架構模型

圖4-4使用隊列系統

圖4-4 使用隊列系統

NOTE:你能夠經過輪詢隊列或哈希隊列(把隊列消息經過哈希發送給spouts或建立多個隊列使隊列spouts一一對應)在多個spouts之間實現並行性。

接下來咱們利用Redis和它的java庫Jedis建立一個隊列系統。在這個例子中,咱們建立一個日誌處理器從一個未知的來源收集日誌,利用lpush命令把消息插入隊列,利用blpop命令等待消息。若是你有不少處理過程,blpop命令採用了輪詢方式獲取消息。

咱們在spoutopen方法建立一個線程,用來獲取消息(使用線程是爲了不鎖定nextTuple在主循環的調用):

new Thread(new Runnable() {

    @Override

    public void run() {

        try{

           Jedis client= new Jedis(redisHost, redisPort);

           List res = client.blpop(Integer.MAX_VALUE, queues);

           messages.offer(res.get(1));

        }catch(Exception e){

            LOG.error("從redis讀取隊列出錯",e);

            try {

                Thread.sleep(100);

            }catch(InterruptedException e1){}

        }

    }}).start();

這個線程的唯一目的就是,建立redis鏈接,而後執行blpop命令。每當收到了一個消息,它就被添加到一個內部消息隊列,而後會被nextTuple消費。對於spout來講數據源就是redis隊列,它不知道消息分發者在哪裏也不知道消息的數量。

NOTE:咱們不推薦你在spout建立太多線程,由於每一個spout都運行在不一樣的線程。一個更好的替代方案是增長拓撲並行性,也就是經過Storm集羣在分佈式環境建立更多線程。

nextTuple方法中,要作的唯一的事情就是從內部消息隊列獲取消息並再次分發它們。

public void nextTuple(){

    while(!messages.isEmpty()){

        collector.emit(new Values(messages.poll()));

    }}

NOTE:你還能夠藉助redis在spout實現消息重發,從而實現可靠的拓撲。(譯者注:這裏是相對於開頭的可靠的消息VS不可靠的消息講的)

DRPC

DRPCSpout從DRPC服務器接收一個函數調用,並執行它(見第三章的例子)。對於最多見的狀況,使用backtype.storm.drpc.DRPCSpout就足夠了,不過仍然有可能利用Storm包內的DRPC類建立本身的實現。

小結

如今你已經學習了常見的spout實現模式,它們的優點,以及如何確保消息可靠性。不存在適用於全部拓撲的架構模式。若是你知道數據源,而且可以控制它們,你就可使用直接鏈接;然而若是你須要添加未知數據源或從多種數據源接收數據,就最好使用消息隊列。若是你要執行在線過程,你可使用DRPCSpout或相似的實現。

你已經學習了三種常見鏈接方式,不過依賴於你的需求仍然有無限的可能。

相關文章
相關標籤/搜索