storm源碼分析---State APIs

你已經看到實現有且只有一次被執行的語義時的複雜性。Trident這樣作的好處把全部容錯想過的邏輯都放在了State裏面 -- 做爲一個用戶,你並不須要本身去處理複雜的txid,存儲多餘的信息到數據庫中,或者是任何其餘相似的事情。你只須要寫以下這樣簡單的codejava

  TridentTopology topology = new  TridentTopology();          TridentState wordCounts =         topology.newStream("spout1", spout)           .each(new Fields("sentence"), new Split(), new  Fields("word"))           .groupBy(new Fields("word"))           .persistentAggregate(MemcachedState.opaque(serverLocations), new  Count(), new Fields("count"))                           .parallelismHint(6);  

全部管理opaque transactional state所需的邏輯都在MemcachedState.opaque方法的調用中被涵蓋了,除此以外,數據庫的更新會自動以batch的形式來進行以免屢次訪問數據庫。State的基本接口只包含下面兩個方法:數據庫

  public interface State {       void beginCommit(Long txid); // can be null for things like  partitionPersist occurring off a DRPC stream       void commit(Long txid);  }  

當一個State更新開始時,以及當一個State更新結束時你都會被告知,而且會告訴你該次的txid。Trident並無對你的state的工做方式有任何的假定。微信

假定你本身搭了一套數據庫來存儲用戶位置信息,而且你想要在Trident中去訪問它。則在State的實現中應該有用戶信息的set、get方法:ide

  public class LocationDB implements State {       public void beginCommit(Long txid) {            }          public void commit(Long txid) {            }          public void setLocation(long userId, String location) {         // code to access database and set location       }          public String getLocation(long userId) {         // code to get location from database       }  }  

而後你還須要提供給Trident一個StateFactory來在Trident的task中建立你的State對象。LocationDB 的 StateFactory可能會以下所示:spa

  public class LocationDBFactory implements  StateFactory {      public State makeState(Map conf, int partitionIndex, int  numPartitions) {         return new LocationDB();     }    }  

Trident提供了一個QueryFunction接口用來實現Trident中在一個state source上查詢的功能。同時還提供了一個StateUpdater來實現Trident中更新statesource的功能。好比說,讓咱們寫一個查詢地址的操做,這個操做會查詢LocationDB來找到用戶的地址。下面以以怎樣在topology中使用該功能開始,假定這個topology會接受一個用戶id做爲輸入數據流:code

  TridentTopology topology = new  TridentTopology();  TridentState locations =  topology.newStaticState(new LocationDBFactory());  topology.newStream("myspout",  spout)           .stateQuery(locations, new Fields("userid"), new  QueryLocation(), new Fields("location"))  

接下來讓咱們一塊兒來看看QueryLocation 的實現應該是什麼樣的:orm

  public class QueryLocation extends  BaseQueryFunction<LocationDB, String> {       public List<String> batchRetrieve(LocationDB state,  List<TridentTuple> inputs) {           List<String> ret = new ArrayList();           for(TridentTuple input: inputs) {               ret.add(state.getLocation(input.getLong(0)));           }           return ret;       }          public void execute(TridentTuple tuple, String location,  TridentCollector collector) {           collector.emit(new Values(location));       }      }  

QueryFunction的執行分爲兩部分。首先Trident收集了一個batch的read操做並把他們統一交給batchRetrieve。在這個例子中,batchRetrieve會接受到多個用戶id。batchRetrieve應該返還一個大小和輸入tuple數量相同的result列表。result列表中的第一個元素對應着第一個輸入tuple的結果,result列表中的第二個元素對應着第二個輸入tuple的結果,以此類推。server

你能夠看到,這段代碼並無像Trident那樣很好的利用batch的優點,而是爲每一個輸入tuple去查詢了一次LocationDB。因此一種更好的操做LocationDB方式應該是這樣的:對象

  public class LocationDB implements State {       public void beginCommit(Long txid) {            }          public void commit(Long txid) {            }          public void setLocationsBulk(List<Long> userIds,  List<String> locations) {         // set locations in bulk       }          public List<String> bulkGetLocations(List<Long> userIds) {         // get locations in bulk       }  }  

接着,你能夠這樣改寫上面的QueryLocation:接口

  public class QueryLocation extends  BaseQueryFunction<LocationDB, String> {       public List<String> batchRetrieve(LocationDB state,  List<TridentTuple> inputs) {           List<Long> userIds = new ArrayList<Long>();           for(TridentTuple input: inputs) {               userIds.add(input.getLong(0));           }           return state.bulkGetLocations(userIds);       }          public void execute(TridentTuple tuple, String location,  TridentCollector collector) {           collector.emit(new Values(location));       }      }  

經過有效減小訪問數據庫的次數,這段代碼比上一個實現會高效的多。

若是你要更新State,你須要使用StateUpdater接口,下面是一個StateUpdater的例子,用來將新的地址信息更新到LocationDB當中。

  public class LocationUpdater extends  BaseStateUpdater<LocationDB> {       public void updateState(LocationDB state, List<TridentTuple>  tuples, TridentCollector collector) {           List<Long> ids = new ArrayList<Long>();           List<String> locations = new ArrayList<String>();           for(TridentTuple t: tuples) {               ids.add(t.getLong(0));               locations.add(t.getString(1));          }           state.setLocationsBulk(ids, locations);       }  }  

下面列出了你應該如何在Trident topology中使用上面聲明的LocationUpdater:

  TridentTopology topology = new  TridentTopology();  TridentState locations =        topology.newStream("locations", locationsSpout)           .partitionPersist(new LocationDBFactory(), new  Fields("userid", "location"), new LocationUpdater())  

partitionPersist 操做會更新一個State,其內部是將State和一批更新的tuple交給StateUpdater,由StateUpdater完成相應的更新操做。

在這段代碼中,只是簡單的從輸入的tuple中提取出userid和對應的location,並一塊兒更新到State中。

partitionPersist 會返回一個TridentState對象來表示被這個Trident topoloy更新過的location db。 而後你就可使用這個state在topology的任何地方進行查詢操做了。

同時,你也能夠看到咱們傳了一個TridentCollector給StateUpdaters,collector發送的tuple就會去往一個新的stream。在這個例子中,咱們並無去往一個新的stream的須要,可是若是你在作一些事情,好比說更新數據庫中的某個count,你能夠emit更新的count到這個新的stream。而後你能夠經過調用TridentState#newValuesStream方法來訪問這個新的stream來進行其餘的處理。

更多精彩內容請關注:http://bbs.superwu.cn

關注超人學院微信二維碼:

關注超人學院java免費交流羣:

相關文章
相關標籤/搜索