你已經看到實現有且只有一次被執行的語義時的複雜性。Trident這樣作的好處把全部容錯想過的邏輯都放在了State裏面 -- 做爲一個用戶,你並不須要本身去處理複雜的txid,存儲多餘的信息到數據庫中,或者是任何其餘相似的事情。你只須要寫以下這樣簡單的code:java
TridentTopology topology = new TridentTopology(); TridentState wordCounts = topology.newStream("spout1", spout) .each(new Fields("sentence"), new Split(), new Fields("word")) .groupBy(new Fields("word")) .persistentAggregate(MemcachedState.opaque(serverLocations), new Count(), new Fields("count")) .parallelismHint(6); |
全部管理opaque transactional state所需的邏輯都在MemcachedState.opaque方法的調用中被涵蓋了,除此以外,數據庫的更新會自動以batch的形式來進行以免屢次訪問數據庫。State的基本接口只包含下面兩個方法:數據庫
public interface State { void beginCommit(Long txid); // can be null for things like partitionPersist occurring off a DRPC stream void commit(Long txid); } |
當一個State更新開始時,以及當一個State更新結束時你都會被告知,而且會告訴你該次的txid。Trident並無對你的state的工做方式有任何的假定。微信
假定你本身搭了一套數據庫來存儲用戶位置信息,而且你想要在Trident中去訪問它。則在State的實現中應該有用戶信息的set、get方法:ide
public class LocationDB implements State { public void beginCommit(Long txid) { } public void commit(Long txid) { } public void setLocation(long userId, String location) { // code to access database and set location } public String getLocation(long userId) { // code to get location from database } } |
而後你還須要提供給Trident一個StateFactory來在Trident的task中建立你的State對象。LocationDB 的 StateFactory可能會以下所示:spa
public class LocationDBFactory implements StateFactory { public State makeState(Map conf, int partitionIndex, int numPartitions) { return new LocationDB(); } } |
Trident提供了一個QueryFunction接口用來實現Trident中在一個state source上查詢的功能。同時還提供了一個StateUpdater來實現Trident中更新statesource的功能。好比說,讓咱們寫一個查詢地址的操做,這個操做會查詢LocationDB來找到用戶的地址。下面以以怎樣在topology中使用該功能開始,假定這個topology會接受一個用戶id做爲輸入數據流:code
TridentTopology topology = new TridentTopology(); TridentState locations = topology.newStaticState(new LocationDBFactory()); topology.newStream("myspout", spout) .stateQuery(locations, new Fields("userid"), new QueryLocation(), new Fields("location")) |
接下來讓咱們一塊兒來看看QueryLocation 的實現應該是什麼樣的:orm
public class QueryLocation extends BaseQueryFunction<LocationDB, String> { public List<String> batchRetrieve(LocationDB state, List<TridentTuple> inputs) { List<String> ret = new ArrayList(); for(TridentTuple input: inputs) { ret.add(state.getLocation(input.getLong(0))); } return ret; } public void execute(TridentTuple tuple, String location, TridentCollector collector) { collector.emit(new Values(location)); } } |
QueryFunction的執行分爲兩部分。首先Trident收集了一個batch的read操做並把他們統一交給batchRetrieve。在這個例子中,batchRetrieve會接受到多個用戶id。batchRetrieve應該返還一個大小和輸入tuple數量相同的result列表。result列表中的第一個元素對應着第一個輸入tuple的結果,result列表中的第二個元素對應着第二個輸入tuple的結果,以此類推。server
你能夠看到,這段代碼並無像Trident那樣很好的利用batch的優點,而是爲每一個輸入tuple去查詢了一次LocationDB。因此一種更好的操做LocationDB方式應該是這樣的:對象
public class LocationDB implements State { public void beginCommit(Long txid) { } public void commit(Long txid) { } public void setLocationsBulk(List<Long> userIds, List<String> locations) { // set locations in bulk } public List<String> bulkGetLocations(List<Long> userIds) { // get locations in bulk } } |
接着,你能夠這樣改寫上面的QueryLocation:接口
public class QueryLocation extends BaseQueryFunction<LocationDB, String> { public List<String> batchRetrieve(LocationDB state, List<TridentTuple> inputs) { List<Long> userIds = new ArrayList<Long>(); for(TridentTuple input: inputs) { userIds.add(input.getLong(0)); } return state.bulkGetLocations(userIds); } public void execute(TridentTuple tuple, String location, TridentCollector collector) { collector.emit(new Values(location)); } } |
經過有效減小訪問數據庫的次數,這段代碼比上一個實現會高效的多。
若是你要更新State,你須要使用StateUpdater接口,下面是一個StateUpdater的例子,用來將新的地址信息更新到LocationDB當中。
public class LocationUpdater extends BaseStateUpdater<LocationDB> { public void updateState(LocationDB state, List<TridentTuple> tuples, TridentCollector collector) { List<Long> ids = new ArrayList<Long>(); List<String> locations = new ArrayList<String>(); for(TridentTuple t: tuples) { ids.add(t.getLong(0)); locations.add(t.getString(1)); } state.setLocationsBulk(ids, locations); } } |
下面列出了你應該如何在Trident topology中使用上面聲明的LocationUpdater:
TridentTopology topology = new TridentTopology(); TridentState locations = topology.newStream("locations", locationsSpout) .partitionPersist(new LocationDBFactory(), new Fields("userid", "location"), new LocationUpdater()) |
partitionPersist 操做會更新一個State,其內部是將State和一批更新的tuple交給StateUpdater,由StateUpdater完成相應的更新操做。
在這段代碼中,只是簡單的從輸入的tuple中提取出userid和對應的location,並一塊兒更新到State中。
partitionPersist 會返回一個TridentState對象來表示被這個Trident topoloy更新過的location db。 而後你就可使用這個state在topology的任何地方進行查詢操做了。
同時,你也能夠看到咱們傳了一個TridentCollector給StateUpdaters,collector發送的tuple就會去往一個新的stream。在這個例子中,咱們並無去往一個新的stream的須要,可是若是你在作一些事情,好比說更新數據庫中的某個count,你能夠emit更新的count到這個新的stream。而後你能夠經過調用TridentState#newValuesStream方法來訪問這個新的stream來進行其餘的處理。
更多精彩內容請關注:http://bbs.superwu.cn
關注超人學院微信二維碼:
關注超人學院java免費交流羣: