Storm是一個分佈式的流處理系統,利用anchor和ack機制保證全部tuple都被成功處理。若是tuple出錯,則能夠被重傳,可是如何保證出錯的tuple只被處理一次呢?Storm提供了一套事務性組件Transaction Topology,用來解決這個問題。 git
Transactional Topology目前已經再也不維護,由Trident來實現事務性topology,可是原理相同。 github
Storm如何實現即對tuple並行處理,又保證事務性。本節從簡單的事務性實現方法入手,逐步引出Transactional Topology的原理。 數據庫
保證tuple只被處理一次,最簡單的方法就是將tuple流變成強順序的,而且每次只處理一個tuple。從1開始,給每一個tuple都順序加上一個id。在處理tuple的時候,將處理成功的tuple id和計算結果存在數據庫中。下一個tuple到來的時候,將其id與數據庫中的id作比較。若是相同,則說明這個tuple已經被成功處理過了,忽略它;若是不一樣,根據強順序性,說明這個tuple沒有被處理過,將它的id及計算結果更新到數據庫中。 app
以統計消息總數爲例。每來一個tuple,若是數據庫中存儲的id 與當前tuple id不一樣,則數據庫中的消息總數加1,同時更新數據庫中的當前tuple id值。如圖: 框架
可是這種機制使得系統一次只能處理一個tuple,沒法實現分佈式計算。 分佈式
爲了實現分佈式,咱們能夠每次處理一批tuple,稱爲一個batch。一個batch中的tuple能夠被並行處理。 ide
咱們要保證一個batch只被處理一次,機制和上一節相似。只不過數據庫中存儲的是batch id。batch的中間計算結果先存在局部變量中,當一個batch中的全部tuple都被處理完以後,判斷batch id,若是跟數據庫中的id不一樣,則將中間計算結果更新到數據庫中。 ui
如何確保一個batch裏面的全部tuple都被處理完了呢?能夠利用Storm提供的CoordinateBolt。如圖: spa
可是強順序batch流也有侷限,每次只能處理一個batch,batch之間沒法並行。要想實現真正的分佈式事務處理,可使用storm提供的Transactional Topology。在此以前,咱們先詳細介紹一下CoordinateBolt的原理。 設計
CoordinateBolt具體原理以下:
整個過程如圖所示:
CoordinateBolt主要用於兩個場景:
CoordinatedBolt對於業務是有侵入的,要使用CoordinatedBolt提供的功能,你必需要保證你的每一個bolt發送的每一個tuple的第一個field是request-id。 所謂的「我已經處理完個人上游」的意思是說當前這個bolt對於當前這個request-id所須要作的工做作完了。這個request-id在DRPC裏面表明一個DRPC請求;在Transactional Topology裏面表明一個batch。
Storm提供的Transactional Topology將batch計算分爲process和commit兩個階段。Process階段能夠同時處理多個batch,不用保證順序性;commit階段保證batch的強順序性,而且一次只能處理一個batch,第1個batch成功提交以前,第2個batch不能被提交。
仍是以統計消息總數爲例,如下代碼來自storm-starter裏面的TransactionalGlobalCount。
MemoryTransactionalSpout spout = new MemoryTransactionalSpout(DATA,new Fields(「word「), PARTITION_TAKE_PER_BATCH);
TransactionalTopologyBuilder builder = new TransactionalTopologyBuilder(「global-count「, 「spout「, spout, 3);
builder.setBolt(「partial-count「, new BatchCount(), 5).noneGrouping(「spout「);
builder.setBolt(「sum「, new UpdateGlobalCount()).globalGrouping(「partial-count「);
TransactionalTopologyBuilder共接收四個參數。
下面是BatchCount的定義:
public static class BatchCount extends BaseBatchBolt {
Object _id;
BatchOutputCollector _collector;
int _count = 0;
@Override
public void prepare(Map conf, TopologyContext context,
BatchOutputCollector collector, Object id) {
_collector = collector;
_id = id;
}
@Override
public void execute(Tuple tuple) {
_count++;
}
@Override
public void finishBatch() {
_collector.emit(new Values(_id, _count));
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields(「id「, 「count「));
}
}
BatchCount的prepare方法的最後一個參數是batch id,在Transactional Tolpoloyg裏面這id是一個TransactionAttempt對象。
Transactional Topology裏發送的tuple都必須以TransactionAttempt做爲第一個field,storm根據這個field來判斷tuple屬於哪個batch。
TransactionAttempt包含兩個值:一個transaction id,一個attempt id。transaction id的做用就是咱們上面介紹的對於每一個batch中的tuple是惟一的,並且無論這個batch replay多少次都是同樣的。attempt id是對於每一個batch惟一的一個id, 可是對於同一個batch,它replay以後的attempt id跟replay以前就不同了, 咱們能夠把attempt id理解成replay-times, storm利用這個id來區別一個batch發射的tuple的不一樣版本。
execute方法會爲batch裏面的每一個tuple執行一次,你應該把這個batch裏面的計算狀態保持在一個本地變量裏面。對於這個例子來講, 它在execute方法裏面遞增tuple的個數。
最後, 當這個bolt接收到某個batch的全部的tuple以後, finishBatch方法會被調用。這個例子裏面的BatchCount類會在這個時候發射它的局部數量到它的輸出流裏面去。
下面是UpdateGlobalCount類的定義:
public static class UpdateGlobalCount extends BaseTransactionalBolt
implements ICommitter {
TransactionAttempt _attempt;
BatchOutputCollector _collector;
int _sum = 0;
@Override
public void prepare(Map conf, TopologyContext context,
BatchOutputCollector collector, TransactionAttempt attempt) {
_collector = collector;
_attempt = attempt;
}
@Override
public void execute(Tuple tuple) {
_sum+=tuple.getInteger(1);
}
@Override
public void finishBatch() {
Value val = DATABASE.get(GLOBAL_COUNT_KEY);
Value newval;
if(val == null || !val.txid.equals(_attempt.getTransactionId())) {
newval = new Value();
newval.txid = _attempt.getTransactionId();
if(val==null) {
newval.count = _sum;
} else {
newval.count = _sum + val.count;
}
DATABASE.put(GLOBAL_COUNT_KEY, newval);
} else {
newval = val;
}
_collector.emit(new Values(_attempt, newval.count));
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields(「id「, 「sum「));
}
}
UpdateGlobalCount實現了ICommitter接口,因此storm只會在commit階段執行finishBatch方法。而execute方法能夠在任何階段完成。
在UpdateGlobalCount的finishBatch方法中,將當前的transaction id與數據庫中存儲的id作比較。若是相同,則忽略這個batch;若是不一樣,則把這個batch的計算結果加到總結果中,並更新數據庫。
Transactional Topolgy運行示意圖以下:
下面總結一下Transactional Topology的一些特性
Trident是Storm之上的高級抽象,提供了joins,grouping,aggregations,fuctions和filters等接口。若是你使用過Pig或Cascading,對這些接口就不會陌生。
Trident將stream中的tuples分紅batches進行處理,API封裝了對這些batches的處理過程,保證tuple只被處理一次。處理batches中間結果存儲在TridentState對象中。
Trident事務性原理這裏不詳細介紹,有興趣的讀者請自行查閱資料。
參考:http://xumingming.sinaapp.com/736/twitter-storm-transactional-topolgoy/
http://xumingming.sinaapp.com/811/twitter-storm-code-analysis-coordinated-bolt/
https://github.com/nathanmarz/storm/wiki/Trident-tutorial