Storm guarantees data processing by providing an at least once processing guarantee. The most common question asked about Storm is "Given that tuples can be replayed, how do you do things like counting on top of Storm? Won't you overcount?"html
Storm 0.7.0 introduces transactional topologies, which enable you to get exactly once messaging semantics for pretty much any computation. So you can do things like counting in a fully-accurate, scalable, and fault-tolerant way.java
Storm默認的reliable特性支持at least once processing guarantee.
這個在某些場景下明顯是不夠的, 好比計數, 不斷的replay必然致使計數不許, 那麼須要支持exactly once semantics.git
Storm 0.7就提供transactional topology特性來支持, 其實這個和DRPC同樣, Storm只是提供一種特殊的topology的封裝, 固然transactional topology更復雜.github
這裏說transactional topologies爲了提供strong ordering, 這個要求是要強於以前說的exactly once semantics.數據庫
對於每一個transaction有惟一的transaction id來標識, 對於第一種design, 每一個transaction就是一個tuple
拿計數做爲例子, 每一個tuple產生的number, 最終須要累加到數據庫裏面
不使用transactional, 重複replay一個tuple, 必然會致使該tuple的number被反覆累加到數據庫apache
怎麼處理? 其實想法很簡單, 引入transaction的概念, 並在累加number到數據庫的同時記下該transactioin id.
這樣若是replay該tuple, 只須要對比transaction id就知道該transaction已經累加過, 能夠直接ignore緩存
看到這裏, 就知道保持strong ordering的重要性, 強順序意味着, 若是當前的transaction失敗, 會反覆被replay, 直到成功才繼續下一個transaction.
這意味着, 在數據庫咱們只須要記錄latest的transaction id, 而不是累加過的全部transaction id, 實現上會簡單許多.併發
可是design1的問題是效率過低, 徹底線性的處理tuple, 沒法利用storm的併發能力, 並且數據庫的負載很高, 每一個tuple都須要去操做數據庫app
The core idea behind transactional topologies is to provide a strong ordering on the processing of data.
The simplest manifestation of this, and the first design we'll look at, is processing the tuples one at a time and not moving on to the next tuple until the current tuple has been successfully processed by the topology.jvm
Each tuple is associated with a transaction id. If the tuple fails and needs to be replayed, then it is emitted with the exact same transaction id. A transaction id is an integer that increments for every tuple, so the first tuple will have transaction id 1
, the second id 2
, and so on.
There is a significant problem though with this design of processing one tuple at time. Having to wait for each tuple to be completely processed before moving on to the next one is horribly inefficient. It entails a huge amount of database calls (at least one per tuple), and this design makes very little use of the parallelization capabilities of Storm. So it isn't very scalable.
Design2的想法很簡單, 用batch tuple來做爲transaction的單位, 而不是一個tuple.
這樣帶來的好處是, batch內部的tuple能夠實現並行, 而且以batch爲單位去更新數據庫, 大大減小數據庫負載.
但本質上和Design1沒有區別, batch之間仍然是串行的, 因此效率仍然比較低
Instead of processing one tuple at a time, a better approach is to process a batch of tuples for each transaction.
So if you're doing a global count, you would increment the count by the number of tuples in the entire batch. If a batch fails, you replay the exact batch that failed.
Instead of assigning a transaction id to each tuple, you assign a transaction id to each batch, and the processing of the batches is strongly ordered. Here's a diagram of this design:
這個設計體現出storm的創意, 將topology的過程分爲processing和commit, processing就是進行局部的計算和統計, 只有commit時纔會把計算的結果更新到全局數據集(數據庫)
那麼對於processing階段徹底沒有必要限制, 只要保證在commit的時候按照順序一個個commit就ok.
好比對於計數, 不一樣的batch的局部計數過程沒有任何限制, 能夠徹底並行的完成, 可是當須要將計數結果累加到數據庫的時候, 就須要用transaction來保證只被累加一次
processing和commit階段合稱爲transaction, 任何階段的失敗都會replay整個transaction
A key realization is that not all the work for processing batches of tuples needs to be strongly ordered. For example, when computing a global count, there's two parts to the computation:
Computing the partial count for the batch
Updating the global count in the database with the partial count
The computation of #2 needs to be strongly ordered across the batches, but there's no reason you shouldn't be able to pipeline the computation of the batches by computing #1 for many batches in parallel. So while batch 1 is working on updating the database, batches 2 through 10 can compute their partial counts.
Storm accomplishes this distinction by breaking the computation of a batch into two phases:
The processing phase: this is the phase that can be done in parallel for many batches
The commit phase: The commit phases for batches are strongly ordered. So the commit for batch 2 is not done until the commit for batch 1 has been successful.
The two phases together are called a "transaction".
Many batches can be in the processing phase at a given moment, but only one batch can be in the commit phase.
If there's any failure in the processing or commit phase for a batch, the entire transaction is replayed (both phases).
爲了實現上面的Design3, storm在transactional topologies裏面默默的作了不少事
管理狀態, 經過Zookeeper去記錄全部transaction相關的狀態信息
協調transactions, 決定應該執行那個transaction的那個階段
Fault 檢測, 使用storm acker機制來detect batch是否被成功執行, 而且storm在transactional topology上 對acker機制作了比較大的優化, 用戶不用本身去acking或anchoring, 方便許多
提供batch bolt接口, 在bolt接口中提升對batch的支持, 好比提供finishbatch接口
最後, transactional topology要求source queue具備replay an exact batch的能力, 這兒說kafka是很好的選擇
不過我很好奇, 爲何要由source queue來提供batch replay的功能, 好的設計應該是batch對source queue透明, spout自身控制batch的劃分和replay, 這樣不能夠嗎?
When using transactional topologies, Storm does the following for you:
Manages state: Storm stores in Zookeeper all the state necessary to do transactional topologies.
This includes the current transaction id as well as the metadata defining the parameters for each batch.
Coordinates the transactions: Storm will manage everything necessary to determine which transactions should be processing or committing at any point.
Fault detection: Storm leverages the acking framework to efficiently determine when a batch has successfully processed, successfully committed, or failed.
Storm will then replay batches appropriately. You don't have to do any acking or anchoring -- Storm manages all of this for you.
First class batch processing API: Storm layers an API on top of regular bolts to allow for batch processing of tuples.
Storm manages all the coordination for determining when a task has received all the tuples for that particular transaction.
Storm will also take care of cleaning up any accumulated state for each transaction (like the partial counts).
Finally, another thing to note is that transactional topologies require a source queue that can replay an exact batch of messages. Technologies like Kestrel can't do this. Apache Kafka is a perfect fit for this kind of spout, and storm-kafka in storm-contrib contains a transactional spout implementation for Kafka.
You build transactional topologies by using TransactionalTopologyBuilder. Here's the transactional topology definition for a topology that computes the global count of tuples from the input stream. This code comes from TransactionalGlobalCount in storm-starter.
MemoryTransactionalSpout spout = MemoryTransactionalSpout(DATA, Fields(""), PARTITION_TAKE_PER_BATCH); TransactionalTopologyBuilder builder = TransactionalTopologyBuilder("", "", spout, 3); builder.setBolt("", BatchCount(), 5) .shuffleGrouping(""); builder.setBolt("", UpdateGlobalCount()) .globalGrouping("");
首先須要使用TransactionalSpout, MemoryTransactionalSpout
被用來從一個內存變量裏面讀取數據(DATA), 第二個參數制定數據的fields, 第三個參數指定每一個batch的最大tuple數量
接着, 須要使用TransactionalTopologyBuilder, 其餘和普通的topology看上去沒有不一樣, storm的封裝作的很好
下面經過processing和commit階段的bolt來了解對batch和transaction的支持
首先看看BatchCount, processing階段的bolt, 用於統計局部的tuple數目
BatchCount BaseBatchBolt { Object _id; BatchOutputCollector _collector; _count = 0; @Override prepare(Map conf, TopologyContext context, BatchOutputCollector collector, Object id) { _collector = collector; _id = id; } @Override execute(Tuple tuple) { _count++; } @Override finishBatch() { _collector.emit( Values(_id, _count)); } @Override declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare( Fields("", "")); } }
BatchCount繼承自BaseBatchBolt, 代表其對batch的支持, 主要反應在finishBatch函數, 而普通的bolt的不一樣在於, 只有在finishBatch的時候纔會去emit結果, 而不是每次execute都emit結果
在prepare時, 多出個id, a TransactionAttempt object, 而且從output定義看出, 全部emit的tuple第一個參數必須是id(TransactionAttempt
)
The
TransactionAttempt
contains two values: the "transaction id" and the "attempt id"(表示被replay次數).
The "transaction id" is the unique id chosen for this batch and is the same no matter how many times the batch is replayed.
The "attempt id" is a unique id for this particular batch of tuples and lets Storm distinguish tuples from different emissions of the same batch. Without the attempt id, Storm could confuse a replay of a batch with tuples from a prior time that batch was emitted.
All tuples emitted within a transactional topology must have the TransactionAttempt
as the first field of the tuple. This lets Storm identify which tuples belong to which batches. So when you emit tuples you need to make sure to meet this requirement.
其實這裏的BaseBatchBolt, 是通用的batch基類, 也能夠用於其餘的須要batch支持的場景, 好比DRPC, 只不過此處的id類型變爲RPC id
若是隻是要support tansactional topology場景, 能夠直接使用BaseTransactionalBolt
BaseTransactionalBolt BaseBatchBolt<TransactionAttempt> { }
繼續看, commit階段的bolt, UpdateGlobalCount, 將統計的結果累加到全局數據庫中
UpdateGlobalCount之間繼承自BaseTransactionalBolt, 因此此處prepare的參數直接是TransactionAttempt attempt(而不是object id)
而且比較重要的是實現ICommitter接口, 代表這個bolt是個commiter, 意味着這個blot的finishBatch函數須要在commit階段被調用
另外一種把bolt標識爲committer的方法是, 在topology build的時候使用setCommitterBolt來替代setBolt
First, notice that this bolt implements the ICommitter
interface. This tells Storm that the finishBatch
method of this bolt should be part of the commit phase of the transaction.
So calls to finishBatch
for this bolt will be strongly ordered by transaction id (calls to execute
on the other hand can happen during either the processing or commit phases).
An alternative way to mark a bolt as a committer is to use the setCommitterBolt
method in TransactionalTopologyBuilder
instead of setBolt
.
UpdateGlobalCount BaseTransactionalBolt ICommitter { TransactionAttempt _attempt; BatchOutputCollector _collector; _sum = 0; @Override prepare(Map conf, TopologyContext context, BatchOutputCollector collector, TransactionAttempt attempt) { _collector = collector; _attempt = attempt; } @Override execute(Tuple tuple) { _sum+=tuple.getInteger(1); } @Override finishBatch() { Value val = DATABASE.get(GLOBAL_COUNT_KEY); Value newval; (val == || !val.txid.equals(_attempt.getTransactionId())) { newval = Value(); newval.txid = _attempt.getTransactionId(); (val==) { newval.count = _sum; } { newval.count = _sum + val.count; } DATABASE.put(GLOBAL_COUNT_KEY, newval); } { newval = val; } _collector.emit( Values(_attempt, newval.count)); } @Override declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare( Fields("", "")); } }
storm會保證commiter裏面的finishBatch被順序執行, 而且在finishBatch裏面, 須要check transaction id, 確保只有新的transaction的結果才被更新到全局數據庫.
The code for finishBatch
in UpdateGlobalCount
gets the current value from the database and compares its transaction id to the transaction id for this batch. If they are the same, it does nothing. Otherwise, it increments the value in the database by the partial count for this batch.
A more involved transactional topology example that updates multiple databases idempotently can be found in storm-starter in the TransactionalWords class.
There are three kinds of bolts possible in a transactional topology:
BasicBolt: This bolt doesn't deal with batches of tuples and just emits tuples based on a single tuple of input.
BatchBolt: This bolt processes batches of tuples. execute
is called for each tuple, and finishBatch
is called when the batch is complete.
BatchBolt's that are marked as committers: The only difference between this bolt and a regular batch bolt is when finishBatch
is called. A committer bolt has finishedBatch
called during the commit phase. The commit phase is guaranteed to occur only after all prior batches have successfully committed, and it will be retried until all bolts in the topology succeed the commit for the batch.
上面列出可能遇到的3種bolt, 下面的例子給出不一樣blot的區別,
紅線標出的都是commiter, 這裏有兩個commiter, 分別是B和D
A的輸出分別輸出到B和C
B能夠先執行execute(processing), 但不能直接執行finishBatch, 由於須要等待storm調度, 必須等前面的batch commit完後, 才能進行commit
因此C也沒法馬上執行finishBatch, 由於須要等從B過來的tuple
對於D, 原文說它會在commit階段接收全部的batch tuple, 因此能夠直接commit, 這個怎麼保證?
Notice that even though Bolt D is a committer, it doesn't have to wait for a second commit message when it receives the whole batch. Since it receives the whole batch during the commit phase, it goes ahead and completes the transaction.
Committer bolts act just like batch bolts during the commit phase.
The only difference between committer bolts and batch bolts is that committer bolts will not call finishBatch
during the processing phase of a transaction.
Notice that you don't have to do any acking or anchoring when working with transactional topologies. Storm manages all of that underneath the hood. The acking strategy is heavily optimized.
因爲封裝的比較好, 不須要用戶去ack或fail tuple, 那麼怎麼去fail一個batch?
拋出FailedException, Storm捕獲這個異常會replay Batch, 而不會crash
When using regular bolts, you can call the fail
method on OutputCollector
to fail the tuple trees of which that tuple is a member.
Since transactional topologies hide the acking framework from you, they provide a different mechanism to fail a batch (and cause the batch to be replayed).
Just throw a FailedException. Unlike regular exceptions, this will only cause that particular batch to replay and will not crash the process.
Transactional spout和普通的spout徹底不一樣的實現, 自己就是一個mini的topology, 分爲coordinator spout和emitter bolt
The TransactionalSpout
interface is completely different from a regular Spout
interface. A TransactionalSpout
implementation emits batches of tuples and must ensure that the same batch of tuples is always emitted for the same transaction id.
A transactional spout looks like this while a topology is executing:
The coordinator on the left is a regular Storm spout that emits a tuple whenever a batch should be emitted for a transaction. The emitters execute as a regular Storm bolt and are responsible for emitting the actual tuples for the batch. The emitters subscribe to the "batch emit" stream of the coordinator using an all grouping.
The need to be idempotent with respect to the tuples it emits requires a TransactionalSpout
to store a small amount of state. The state is stored in Zookeeper.
下面是transactional spout的工做流程,
首先coordinator spout只會有一個task, 並會產生兩種stream, batch stream和commit stream
它會決定什麼時候開始某transaction processing階段, 此時就往batch stream裏面發送包含TransactionAttempt的tuple
它也決定什麼時候開始某transaction commit階段(當經過acker知道processing階段已經完成的時候, 而且全部prior transaction都已經被commit), 此時就往commit steam裏面發送一個包含TransactionAttempt的tuple做爲通知, 全部commtting bolt都會預訂(經過setBolt的all grouping方式)commit stream, 並根據收到的通知完成commit階段.
對於commit階段和processing階段同樣, 經過acker來判斷是成功仍是fail, 前面說了transactional topology對acker機制作了較大的優化, 因此全部acking和anchoring都由storm自動完成了.
對於emitter bolt, 能夠併發的, 而且以all grouping的方式訂閱coordinator的batch stream, 即全部emitter都會獲得同樣的batch stream, 使用幾個emitter取決於場景.
對於topology而言, emitter bolt是真正產生數據的地方, 當coordinator開始某batch的processing過程, 並往batch steam放tuple數據時, emitter bolt就會從batch stream收到數據, 並轉發給topology
Here's how transactional spout works:
Transactional spout is a subtopology consisting of a coordinator spout and an emitter bolt
The coordinator is a regular spout with a parallelism of 1
The emitter is a bolt with a parallelism of P, connected to the coordinator's "batch" stream using an all grouping
When the coordinator determines it's time to enter the processing phase for a transaction, it emits a tuple containing the TransactionAttempt and the metadata for that transaction to the "batch" stream
Because of the all grouping, every single emitter task receives the notification that it's time to emit its portion of the tuples for that transaction attempt
Storm automatically manages the anchoring/acking necessary throughout the whole topology to determine when a transaction has completed the processing phase. The key here is that *the root tuple was created by the coordinator, so the coordinator will receive an "ack" if the processing phase succeeds, and a "fail" if it doesn't succeed for any reason (failure or timeout).
If the processing phase succeeds, and all prior transactions have successfully committed, the coordinator emits a tuple containing the TransactionAttempt to the "commit" stream.
All committing bolts subscribe to the commit stream using an all grouping, so that they will all receive a notification when the commit happens.
Like the processing phase, the coordinator uses the acking framework to determine whether the commit phase succeeded or not. If it receives an "ack", it marks that transaction as complete in zookeeper.
從後面的討論, 能夠知道transactional spout的batch replay是依賴於source queue的
好比, 對於kafka這種數據是分佈在partition上的queue, 須要使用partitioned transactional spout, 用於封裝對從不一樣partition讀數據的過程
Partitioned Transactional Spout
A common kind of transactional spout is one that reads the batches from a set of partitions across many queue brokers. For example, this is how TransactionalKafkaSpout works. An
IPartitionedTransactionalSpout
automates the bookkeeping work of managing the state for each partition to ensure idempotent replayability.
對於Transactional spout, 並不會象普通tuple同樣由spout緩存和負責replay, 只會記下該batch數據在source queue的位置(應該是zookeeper), 當須要replay的時候, Transactional spout會重新去source queue去讀batch而後replay.
這樣的問題是過於依賴source queue, 並且會致使transaction batch沒法被replay(好比因爲某個partition fail)
這個問題如何解決? 能夠參考原文, 比較好的方法, 是fail當前和後續全部的transaction, 而後從新產生transaction的batch數據, 並跳過失敗部分
我的決定這個設計不太好, 過於依賴source queue
爲什麼不在spout緩存batch數據, 雖然這樣對於比較大的batch可能有效率問題, 或者會限制同時處理的batch數目, 但從新從source queue讀數據來replay也會有不少問題...