6、Storm的高級原語之Transactional Topology

一、什麼是Transactional Topology?

    ○ 是一個每一個tuple僅被處理一次的框架java

    ○ 由Storm0.7引入,於Storm0.9被棄用,被triden取而代之數據庫

    ○ 底層依靠spout\bolt\topology\stream抽象的一個特性框架

二、Transactional Topology設計思路

    ○ 一次只處理一次tuple

        基於Storm處理tuple失敗時會重發(replay),如何確保replay的記錄不被重複記錄,換句話說就是如何保證tuple僅被處理一次,這就依賴於一個稱做強順序性的思想。ide

        強順序性:每一個tuple與一個transaction id相關聯,transaction id實際就是一個數字,每個tuple都有一個按照順序的transaction id(例如:tuple1的transaction id 爲 1,tuple2的transaction id 爲 2,...以此類推),只有當前的tuple處理並存儲完畢,下一個tuple(處於等待狀態)才能進行存儲,tuple被存儲時連同transaction id一併存儲,此時考慮兩種狀況:優化

                        tuple處理失敗時:從新發送一個和原來如出一轍的transaction idui

                        tuple處理成功時:發送的transaction id會和存儲的transaction id對比,若是不存在transaction id,表示第一次記錄,直接存儲;若是發現存在,則忽略該tuple。spa

        這一思想是由Kafka開發者提出來的。設計

    ○ 一次處理一批tuple

        基於上面的一個優化,將一批tuple直接打包成一個batch,而後分配一個transaction id ,讓batch與batch之間保證強順序性,且batch內部的tuples能夠並行。code

    ○ Storm是如何採用的?

        兩個步驟:orm

            一、並行計算batch中的tuple數量

            二、batch強順序性存儲

            在batch強順序性存儲的同時讓其餘等待存儲的batch內部進行並行運算,沒必要等到下一個batch存儲時才進行內部運算。

        在Storm上面的兩個步驟表現爲processing階段commit階段

三、一些設計細節

使用Transactional Topology時,storm提供以下操做:

    ○ 管理狀態

        將須要處理的狀態如:transaction id 、batch meta等狀態信息放在zookeeper

    ○ 協調事務

        指定某個時間段執行processing操做和commit操做

    ○ 錯誤檢測

        storm使用acking框架自動檢測batch被成功或失敗處理,而後相應的重發(replay)

    ○ 內置批處理API

        經過對普通的bolt進行包裝,提供一套對batch處理的API、協調工做(即某個時刻處理某個processing或者commit),而且storm會自動清除中間結果

Transactional Topology是能夠徹底重發一個特定batch的消息隊列系統,在 Kakfa中正是有這樣的需求,爲此Storm在storm-contrib裏面的Storm-Kafka中爲Kafka實現了一個事務性的spout。

四、來自Storm-Starter.jar的例子

    計算來自輸入流中tuple的個數

MemoryTransactionalSpout spout = new MemoryTransactionalSpout(DATA, new Fields("word"), PARTITION_TAKE_PER_BATCH);
TransactionalTopologyBuilder builder = new TransactionalTopologyBuilder("global-count", "spout", spout, 3);
builder.setBolt("partial-count", new BatchCount(), 5)
        .shuffleGrouping("spout");
builder.setBolt("sum", new UpdateGlobalCount())
        .globalGrouping("partial-count");

    ○ 經過TransactionalTopologyBuilder類構建Transactional

        參數:

        Transaction ID:transactional topology的ID,在zookeeper中用於保存進度狀態,重啓topology時能夠直接從執行的進度開始執行而不用重頭到尾又執行一遍

        Spout ID:位於整個Topology的Spout的ID

        Spout Object:Transactional中的Spout對象

        Spout:Trasactional中的Spout的並行數

    ○ MemoryTransactionalSpout用於從一個內存變量中讀取數據

        DATA:數據

        tuple fields:字段

        tupleNum:在batch中最大的tuple數

    ○ Bolts

        第一個Bolt採用隨機分組的方式隨機分發到各個task

public static class BatchCount extends BaseBatchBolt {
    Object _id;
    BatchOutputCollector _collector;
    int _count = 0;
    @Override
    public void prepare(Map conf, TopologyContext context, BatchOutputCollector collector, Object id) {
        _collector = collector;
        _id = id;
    }
    @Override
    public void execute(Tuple tuple) {
        _count++;
    }
    @Override
    public void finishBatch() {
        _collector.emit(new Values(_id, _count));
    }
    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("id", "count"));
    }
}

        BatchBolt對象運行在BatchBoltExecutor中,BatchBoltExecutor負責BatchBolt對象的建立和清理 

        BatchBolt的ID在context對象中,該ID是一個TransactionAttempt對象.

        BatchBolt在DRPC中也可使用,只是txid類型不同,若是在Transactional Topology中使用BatchBolt,能夠繼承BaseTransactionalBolt.

        在Tranasctional Topology中全部的Tuple都必須以TransactionAttempt做爲第一個field,而後storm才能根據該field判斷Tuple所屬的BatchBolt,因此在發射Tuple必須知足此條件。

        TransactionAttempt對象中有兩個屬性:

            transaction id:強順序性,不管重發多少次都是同樣的數字

            attempt id:對每個Batch標識的ID,每次重發都其值不一致,經過該ID能夠區分每次重發的Tuple的不一樣版本

第二個Bolt使用GlobalGrouping彙總batch中的tuple數

 public static class UpdateGlobalCount extends BaseTransactionalBolt implements ICommitter {
    TransactionAttempt _attempt;
    BatchOutputCollector _collector;
    int _sum = 0;
 
    @Override
    public void prepare(Map conf, TopologyContext context, BatchOutputCollector collector, TransactionAttempt attempt) {
        _collector = collector;
        _attempt = attempt;
    }
 
    @Override
    public void execute(Tuple tuple) {
        _sum+=tuple.getInteger(1);
    }
 
    @Override
    public void finishBatch() {
        Value val = DATABASE.get(GLOBAL_COUNT_KEY);
        Value newval;
        if(val == null || !val.txid.equals(_attempt.getTransactionId())) {
            newval = new Value();
            newval.txid = _attempt.getTransactionId();
            if(val==null) {
                newval.count = _sum;
            } else {
                newval.count = _sum + val.count;
            }
            DATABASE.put(GLOBAL_COUNT_KEY, newval);
        } else {
            newval = val;
        }
        _collector.emit(new Values(_attempt, newval.count));
    }
 
    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("id", "sum"));
    }
}

    ICommitter接口:實現該接口的Bolt會在commit階段調用finishBatch方法,該方法的調用會按照強順序性,此外還可使用TransactionalTopologyBuilder的setCommiterBolt來添加Bolt實現和該接口同樣的功能。

    executor方法:在processing階段和commit階段均可以執行。

    關於更多的transactional topology例子能夠看看storm-starter中的TransactionalWords類,該例子會在一個事務中更新多個數據庫

 

五、Transaction Topology API

    Bolt類

    BaiscBolt:該Bolt不跟batch中的tuples交互,僅基於單個傳來的tuple和產生新的tuple

    BatchBolt:該Bolt處理batch中的tuples,對於每個tuple調用executor方法,整個batch完成時調用finishBatch方法

    被Committer標記的Bolt:在commit階段才調用finishBatch方法,commit具備強順序性,標記Bolt爲commit階段執行finishBatch的方法有兩種:一、實現ICommiter接口。二、TransactionalTopologyBuilder的setCommiterBolt來添加Bolt。

    Processing階段和Commit階段

    

    紅色輪廓的Bolt被標記過爲commit

    Spout向Bolt A發送整個Batch

    Bolt A處理完整個Batch以後調用finishBatch方法分別向Bolt B 和 Bolt C發送Batch

    Bolt B接收到Bolt A傳遞過來的tuple進行處理(此時還還沒有處理完畢)不會調用finishBatch方法

    Bolt C接口Bolt A傳遞的tuple,儘管處理完Bolt A傳遞來的tuple,可是因爲Bolt B還還沒有commit,因此Bolt C處於等待Bolt B commit的狀態,不會調用finishBatch方法

    Bolt D接收來自Bolt C調用executor方法時發送的全部tuple

    此時一旦Bolt B進行commit進行finishBatch操做,那麼Bolt C就會確認接收到全部Bolt B的tuple,Bolt C也調用finishBatch方法,最終Bolt D也接收到全部來自Bolt C的batch。

    在這裏儘管Bolt D是一個committer,它在接收到整個batch的tuple以後不須要等待第二個commit信號。由於它是在commit階段接收到的整個batch,它會調用finishBatch來完成整個事務。

    Acking

注意,當使用transactional topology的時候你不須要顯式地去作任何的acking或者anchoring,storm在背後都作掉了。(storm對transactional topolgies裏面的acking機制進行了高度的優化)

    Failing a transaction

在使用普通bolt的時候, 你能夠經過調用OutputCollector的fail方法來fail這個tuple所在的tuple樹。Transactional Topology對用戶隱藏了acking框架, 它提供一個不一樣的機制來fail一個batch(從而使得這個batch被replay):只要拋出一個FailedException就能夠了。跟普通的異常不同, 這個異常只會致使當前的batch被replay, 而不會使整個進程崩潰掉。

    Transactional spout

TransactionalSpout接口跟普通的Spout接口徹底不同。一個TransactionalSpout的實現會發送一批一批(batch)的tuple, 並且必須保證同一批次tuples的transaction id始終同樣。

在transactional topology運行的時候, transactional spout看起來是這樣的一個結構:

coordinator是一個普通的storm的spout——它一直爲事務的batch發射tuple。

Emitter則像一個普通的storm bolt,它負責爲每一個batch實際發射tuple,emitter以all grouping的方式訂閱coordinator的」batch emit」流。
關於如何實現一個TransactionalSpout的細節能夠參見Javadoc

    Partitioned Transactional Spout

一種常見的TransactionalSpout是那種從多個queue broker讀取數據而後再發射的tuple。好比TransactionalKafkaSpout就是這樣工做的。IPartitionedTransactionalSpout把這些管理每一個分區的狀態以保證能夠replay的冪等性的工做都自動化掉了。更多能夠參考Javadoc。

    配置

Transactional Topologies有兩個重要的配置:

Zookeeper:默認狀況下,transactional topology會把狀態信息保存在一個zookeeper裏面(協調集羣的那個)。你能夠經過這兩個配置來指定其它的zookeeper:」transactional.zookeeper.servers」 和 「transactional.zookeeper.port「。

同時活躍的batch數量:你必須設置同時處理的batch數量,你能夠經過」topology.max.spout.pending」 來指定, 若是你不指定,默認是1。

六、實現

Transactional Topologies的實現是很是優雅的。管理提交協議,檢測失敗而且串行提交看起來很複雜,可是使用storm的原語來進行抽象是很是簡單的。

一、transactional spout是一個子topology, 它由一個coordinator spout和一個emitter bolt組成。

二、coordinator是一個普通的spout,並行度爲1;emitter是一個bolt,並行度爲P,使用all分組方式鏈接到coordinator的「batch」流上。

三、coordinator使用一個acking框架決定何時一個batch被成功執行(process)完成,而後去決定一個batch何時被成功提交(commit)。

相關文章
相關標籤/搜索