Trident 是對狀態化(stateful)數據源進行讀取和寫入操做的最好抽象。狀態(state)既能夠保存在拓撲內部(例如保存在內存中並備份到HDFS上),也能夠存入像Memcached或者Cassandra這樣的外部數據庫。而對於Trident API而言,這兩種機制沒有區別。html
Trident manages state in a fault-tolerant way so that state updates are idempotent in the face of retries and failures. This lets you reason about Trident topologies as if each message were processed exactly-once.java
Trident使用一種容錯性的方式來對state進行管理,這樣即便在面對失敗或重試時,狀態的更新還是冪等的。基於這個機制,每條消息被看做被剛好處理了一次(exactly-once)。git
State更新中存在多級容錯性機制。在討論這一點以前,咱們先來看一個例子,這個例子展現了實現剛好一次(exactly-once)語義時必需的訣竅。假如你正在對數據流進行一個count聚合操做,並打算將計數結果存入數據庫中。在這個例子裏,你在數據庫中存入一個單一值來表示count值,並且你每次計算一個新tuple來增長count。github
當出現失敗狀況,tuple 將被重發。這樣就給state更新帶來了一個問題(或者其餘反作用)—— 你沒法知道當前的這個tuple更新操做是否已經被處理過。也許你以前並無處理這個tuple,那麼你須要增長count;也許你以前已經處理了這個tuple而且成功增長了count,可是在另外一部執行操做過程當中tuple處理失敗了,這時你就不該該增長count;還有可能你以前在使用這個tuple,可是在更新數據庫時候出錯了,出現這種狀況時,你仍然須要更新數據庫。數據庫
在數據庫中只存儲count值時,你不知道這個tuple是否以前被處理過。因此你須要更多的信息來得到正確的決定。Trident提供了下面能夠知足剛好處理一次的語義。apache
基於這些原語,你的 State 實現就能夠檢測tuple的batch以前是否已經被處理,而後來選擇合適的方式來進行state更新操做。你具體的操做取決於你在每一個batch中輸入spout提供的語義。有三類支持容錯性的 spout:「非事務型」(non-transactional)、「事務型」(transactional)以及「模糊事務型」(opaque transactional)。接下來咱們來分析下每種 spout 類型的容錯性語義。編程
記住,Trident 是經過small batch來處理tuple的,並且每一個batch都會有一個惟一的txid。spout的特性是由他們所提供給每一個batch的保證機制決定的。事務型 spout 包含如下特性:緩存
這是一種容易理解的spout類型,stream被切分紅固定不變的batchs。storm-contrib提供一個關於Kafka的an implementation of a transactional spout 。安全
你可能會有疑問:爲何不所有使用事務型 spout 呢?緣由很好理解。一方面,有些spout並沒有必要去保障足夠可靠的容錯性。好比,TransactionalTridentKafkaSpout
的工做方式就是使得帶有某個 txid 的 batch 中包含有來自一個 Kafka topic 的全部 partition 的 tuple。一旦一個 batch 被髮送出去,在未來不管從新發送這個 batch 多少次,batch 中都會包含有徹底相同的 tuple 集,這是由事務型 spout 的語義決定的。如今假設 TransactionalTridentKafkaSpout
發送出的某個 batch 處理失敗了,而與此同時,Kafka 的某個節點由於故障下線了。這時你就沒法從新處理以前的 batch 了(由於 Kafka 的節點故障,Kafka topic 必然有一部分 partition 沒法獲取到),這個處理過程也會所以終止。app
這就是要有「模糊事務型」 spout 的緣由了 —— 模糊事務型 spout 支持在數據源節點丟失的狀況下仍然能夠實現exactly-once剛好一次的處理語義。咱們會在下一節討論這類 spout。
順便提一點,若是 Kafka 支持數據複製,那麼就能夠放心地使用事務型 spout 提供的容錯性機制了,由於這種狀況下某個節點的故障不會致使數據丟失。
在討論「模糊事務型」 spout 以前,讓咱們先來看看如何爲事務型 spout 設計一種支持exactly-once剛好一次語義的State。這個 State 就稱爲 「事務型 state」,它利用了特定的 txid 永遠只與同一組 tuple 相關聯這一事實。
假如你的拓撲須要計算單詞數,並且你準備將計數結果存入一個 K-V 型數據庫中。這裏的 key 就是單詞,value 對應於單詞數。從上面的討論中你應該已經明白了僅僅存儲計數結果是沒法肯定某個 batch 中的tuple 是否已經被處理過的。因此,如今你應該將 txid 做爲一種原子化的值與計數值一塊兒存入數據庫。隨後,在更新計數值的時候,你就能夠將數據庫中的 txid 與當前處理的 batch 的 txid 進行比對。若是二者相同,你就能夠跳過更新操做 —— 因爲 Trident 的強有序性處理機制,能夠肯定數據庫中的值是對應於當前的 batch 的。若是二者不一樣,你就能夠放心地增長計數值。因爲一個 batch 的 txid 永遠不會改變,並且 Trident 可以保證 state 的更新操做徹底是按照 batch 的順序進行的,因此,這樣的處理邏輯是徹底可行的。
下面來看一個例子。假如你正在處理 txid 3,其中包含有如下幾個 tuple:
["man"] ["man"] ["dog"] |
假如數據庫中有如下幾個 key-value 對:
man => [count=3, txid=1] dog => [count=4, txid=3] apple => [count=10, txid=2] |
其中與 「man」 相關聯的 txid 爲 1。因爲當前處理的 txid 爲 3,你就能夠肯定當前處理的 batch 與數據庫中存儲的值無關,這樣你就能夠放心地將 「man」 的計數值加上 2 並更新 txid 爲 3。另外一方面,因爲 「dog」 的 txid 與當前的 txid 相同,因此,「dog」 的計數是以前已經處理過的,如今不能再對數據庫中的計數值進行更新操做。這樣,在結束 txid3 的更新操做以後,數據庫中的結果就會變成這樣:
man => [count=5, txid=3] dog => [count=4, txid=3] apple => [count=10, txid=2] |
如今咱們再來討論一下「模糊事務型」 spout。
前面已經提到過,模糊事務型 spout 不能保證一個 txid 對應的 batch 中包含的 tuple 徹底一致。模糊事務型 spout 有如下的特性:
OpaqueTridentKafkaSpout 就具備這樣的特性,同時它對 Kafka 節點的丟失問題具備很好的容錯性。OpaqueTridentKafkaSpout
在發送一個 batch 的時候總會總上一個 batch 結束的地方開始發送新 tuple。這一點能夠保證 tuple 不會被遺漏,並且也不會被多個 batch 處理。
不過,模糊事務型 spout 的缺點就在於不能經過 txid 來識別數據庫中的 state 是不是已經處理過的。這是由於在 state 的更新的過程當中,batch 有可能會發生變化。
在這種狀況下,你應該在數據庫中存儲更多的 state 信息。除了一個結果值和 txid 以外,你還應該存入前一個結果值。咱們再以上面的計數值的例子來分析如下這個問題。假如你的 batch 的部分計數值是 「2」,如今你須要應用一個更新操做。假定如今數據庫中的值是這樣的:
{ value = 4, prevValue = 1, txid = 2 } |
假如當前處理的 txid 爲 3,這與數據庫中的 txid 不一樣。這時能夠將 「prevValue」 的值設爲 「value」 的值,再爲 「value」 的值加上部分計數的結果並更新 txid。執行完這一系列操做以後的數據庫中的值就會變成這樣:
{ value = 6, prevValue = 4, txid = 3 } |
若是當前處理的 txid 爲 2,也就是和數據庫中存儲的 txid 一致,這種狀況下的處理邏輯與上面的 txid 不一致的狀況又有所不一樣。由於此時你會知道數據庫中的更新操做是由上一個擁有相同 txid 的batch 作出的。不過那個 batch 有可能與當前的 batch 並不相同,因此你須要忽略它的操做。這個時候,你應該將 「prevValue」 加上 batch 中的部分計數值來計算新的 「value」。在這個操做以後數據庫中的值就會變成這樣:
{ value = 3, prevValue = 1, txid = 2 } |
這種方法之因此可行是由於 Trident 具備強順序性處理的特性。一旦 Trident 開始處理一個新的 batch 的狀態更新操做,它永遠不會回到過去的 batch 的處理上。同時,因爲模糊事務型 spout 會保證 batch 之間不會存在重複 —— 每一個 tuple 只會被某一個 batch 完成處理 —— 因此你能夠放心地使用 prevValue 來更新 value。
非事務型 spout 不能爲 batch 提供任何的安全性保證。非事務型 spout 有可能提供一種「至多一次」的處理模型,在這種狀況下 batch 處理失敗後 tuple 並不會從新處理;也有可能提供一種「至少一次」的處理模型,在這種狀況下可能會有多個 batch 分別處理某個 tuple。總之,此類 spout 不能提供「剛好一次」的語義。
下圖顯示了不一樣的 spout/state 的組合是否支持剛好一次的消息處理語義:
模糊事務型 state 具備最好的容錯性特徵,不過這是以在數據庫中存儲更多的內容爲代價的(一個 txid 和兩個 value)。事務型 state 要求的存儲空間相對較小,可是它的缺點是隻對事務型 spout 有效。相對的,非事務型要求的存儲空間最少,可是它也不能提供任何的剛好一次的消息執行語義。
你選擇 state 與 spout 的時候必須在容錯性與存儲空間佔用之間權衡。能夠根據你的應用的需求來肯定哪一種組合最適合你。
從上文的描述中你已經瞭解到了exactly-once剛好一次的消息執行語義的原理是多麼的複雜。不過做爲用戶你並不須要處理這些複雜的 txid 比對、多值存儲等操做,Trident 已經在 State 中封裝了全部的容錯性處理邏輯,你只須要像下面這樣寫代碼便可:
TridentTopology topology = new TridentTopology(); TridentState wordCounts = topology.newStream("spout1", spout) .each(new Fields("sentence"), new Split(), new Fields("word")) .groupBy(new Fields("word")) .persistentAggregate(MemcachedState.opaque(serverLocations), new Count(), new Fields("count")) .parallelismHint(6); |
全部處理模糊事務型 state 的邏輯已經封裝在 MemcachedState.opaque
的調用中了。另外,狀態更新都會自動調整爲批處理操做,這樣能夠減少與數據庫的反覆交互的資源損耗。
基本的 State
接口只有兩個方法:
public interface State { void beginCommit(Long txid); // can be null for things like partitionPersist occurring off a DRPC stream void commit(Long txid); } |
前面已經說過,state 更新操做的開始時和結束時都會獲取一個 txid。對於你的 state 怎麼工做,你在其中使用什麼樣的方法執行更新操做,或者使用什麼樣的方法從 state 中讀取數據,Trident 並不關心。
假如你有一個包含有用戶的地址信息的定製數據庫,你須要使用 Trident 與該數據庫交互。你的 State 的實現就會包含有用於獲取與設置用戶信息的方法,好比下面這樣:
public class LocationDB implements State { public void beginCommit(Long txid) { } public void commit(Long txid) { } public void setLocation(long userId, String location) { // code to access database and set location } public String getLocation(long userId) { // code to get location from database } } |
接着你就能夠爲 Trident 提供一個 StateFactory 來建立 Trident 任務內部的 State 對象的實例。對應於你的數據庫(LocationDB)的 StateFactory 大概是這樣的:
public class LocationDBFactory implements StateFactory { public State makeState(Map conf, int partitionIndex, int numPartitions) { return new LocationDB(); } } |
Trident 提供了一個用於查詢 state 數據源的 QueryFunction
接口,以及一個用於更新 state 數據源的 StateUpdater
接口。例如,咱們能夠寫一個查詢 LocationDB 中的用戶地址信息的 「QueryLocation」。讓咱們從你在拓撲中使用這個操做的方式開始。假如在拓撲中須要讀取輸入流中的 userid 信息:
TridentTopology topology = new TridentTopology(); TridentState locations = topology.newStaticState(new LocationDBFactory()); topology.newStream("myspout", spout) .stateQuery(locations, new Fields("userid"), new QueryLocation(), new Fields("location")) |
這裏的 QueryLocation
的實現多是這樣的:
public class QueryLocation extends BaseQueryFunction<LocationDB, String> { public List<String> batchRetrieve(LocationDB state, List<TridentTuple> inputs) { List<String> ret = new ArrayList(); for(TridentTuple input: inputs) { ret.add(state.getLocation(input.getLong(0))); } return ret; } public void execute(TridentTuple tuple, String location, TridentCollector collector) { collector.emit(new Values(location)); } } |
QueryFunction
的執行包含兩個步驟。首先,Trident 會將讀取的一些數據中彙總爲一個 batch 傳入 batchRetrieve 方法中。在這個例子中,batchRetrieve 方法會收到一些用戶 id。而後 batchRetrieve 會返回一個與輸入 tuple 列表大小相同的隊列。結果隊列的第一個元素與第一個輸入 tuple 對應,第二個元素與第二個輸入 tuple 相對應,以此類推。
你會發現這段代碼並無發揮出 Trident 批處理的優點,由於這段代碼僅僅一次查詢一下 LocationDB。因此,實現 LocationDB 的更好的方式應該是這樣的:
public class LocationDB implements State { public void beginCommit(Long txid) { } public void commit(Long txid) { } public void setLocationsBulk(List<Long> userIds, List<String> locations) { // set locations in bulk } public List<String> bulkGetLocations(List<Long> userIds) { // get locations in bulk } } |
而後,你能夠這樣實現 QueryLocation
方法:
public class QueryLocation extends BaseQueryFunction<LocationDB, String> { public List<String> batchRetrieve(LocationDB state, List<TridentTuple> inputs) { List<Long> userIds = new ArrayList<Long>(); for(TridentTuple input: inputs) { userIds.add(input.getLong(0)); } return state.bulkGetLocations(userIds); } public void execute(TridentTuple tuple, String location, TridentCollector collector) { collector.emit(new Values(location)); } } |
這段代碼大幅減小了域數據庫的IO,具備更高的執行效率。
你須要使用 StateUpdater
接口來更新 state。下面是一個更新 LocationDB 的地址信息的 StateUpdater 實現:
public class LocationUpdater extends BaseStateUpdater<LocationDB> { public void updateState(LocationDB state, List<TridentTuple> tuples, TridentCollector collector) { List<Long> ids = new ArrayList<Long>(); List<String> locations = new ArrayList<String>(); for(TridentTuple t: tuples) { ids.add(t.getLong(0)); locations.add(t.getString(1)); } state.setLocationsBulk(ids, locations); } } |
而後你就能夠在 Trident 拓撲中這樣使用這個操做:
TridentTopology topology = new TridentTopology(); TridentState locations = topology.newStream("locations", locationsSpout) .partitionPersist(new LocationDBFactory(), new Fields("userid", "location"), new LocationUpdater()) |
partitionPersist
操做會更新 state 數據源。StateUpdater
接收 State 和一批 tuple 做爲輸入,而後更新這個 State。上面的代碼僅僅從輸入 tuple 中抓取 userid 和 location 信息,而後對 State 執行一個批處理更新操做。
在 Trident 拓撲更新 LocationDB 以後,partitionPersist
會返回一個表示更新後狀態的 TridentState
對象。隨後你就能夠在拓撲的其餘地方使用 stateQuery
方法對這個 state 執行查詢操做。
你也許注意到了 StateUpdater 中有一個 TridentCollector 參數。發送到這個 collector 的 tuple 會進入一個「新的數值流」中。在這個例子裏向這個新的流發送 tuple 並無意義,不過若是你須要處理相似於更新數據庫中的計數值這樣的操做,你能夠考慮將更新後的技術結果發送到這個流中。能夠經過 TridentState.newValuesStream
方法來獲取新的流的數據。
Trident 使用一個稱爲 persistentAggregate
的方法來更新 State。你已經在前面的數據流單詞統計的例子裏見過了這個方法,這裏再寫一遍:
TridentTopology topology = new TridentTopology(); TridentState wordCounts = topology.newStream("spout1", spout) .each(new Fields("sentence"), new Split(), new Fields("word")) .groupBy(new Fields("word")) .persistentAggregate(new MemoryMapState.Factory(), new Count(), new Fields("count")) |
partitionPersist是一個接收Trident聚合器做爲參數並對state數據源進行更新的方法,persistentAggregate 就是構建於partitionPersist上層的一個編程抽象。在這個例子裏,因爲是一個分組數據流(grouped stream),Trident 須要你提供一個實現 MapState
接口的 state。被分組的域就是 state 中的 key,而聚合的結果就是 state 中的 value。MapState
接口是這樣的:
public interface MapState<T> extends State { List<T> multiGet(List<List<Object>> keys); List<T> multiUpdate(List<List<Object>> keys, List<ValueUpdater> updaters); void multiPut(List<List<Object>> keys, List<T> vals); } |
而當你在非分組數據流上執行聚合操做時(全局聚合操做),Trident須要你提供一個實現了 Snapshottable
接口的對象:
public interface Snapshottable<T> extends State { T get(); T update(ValueUpdater updater); void set(T o); } |
MemoryMapState and MemcachedState each implement both of these interfaces.
實現 MapState
接口很是簡單,Trident 幾乎已經爲你作好了全部的準備工做。OpaqueMap
、TransactionalMap
、與NonTransactionalMap
類都分別實現了各自的容錯性語義。你只須要爲這些類提供一個用於對不一樣的 key/value 進行 multiGets 與 multiPuts 處理的 IBackingMap 實現類。IBackingMap
接口是這樣的:
public interface IBackingMap<T> { List<T> multiGet(List<List<Object>> keys); void multiPut(List<List<Object>> keys, List<T> vals); } |
OpaqueMap 會使用 OpaqueValue做爲 vals 參數來調用 multiPut 方法,TransactionalMap 會使用TransactionalValue做爲參數,而 NonTransactionalMap 則直接將拓撲中的對象傳入。
Trident 也提供了一個 CachedMap用於實現 K-V map 的自動 LRU 緩存功能。
最後,Trident 還提供了一個 SnapshottableMap 類,該類經過將全局聚合結果存入一個固定的 key 中的方法將 MapState 對象轉化爲一個 Snapshottable 對象。
能夠參考MemcachedState的實現來了解如何將這些工具結合到一塊兒來提供一個高性能的 MapState。MemcachedState
支持選擇模糊事務型、事務型或者非事務型語義。