流聚合(stream join)是指將具備共同元組(tuple)字段的數據流(兩個或者多個)聚合造成一個新的數據流的過程。html
從定義上看,流聚合和SQL中表的聚合(table join)很像,可是兩者有明顯的區別:table join的輸入是有限的,而且join的語義是很是明確的;而流聚合的語義是不明確的而且輸入流是無限的。git
數據流的聚合類型跟具體的應用有關。一些應用把兩個流發出的全部的tuple都聚合起來——無論多長時間;而另一些應用則只會聚合一些特定的tuple。而另一些應用的聚合邏輯又可能徹底不同。而這些聚合類型裏面最多見的類型是把全部的輸入流進行同樣的劃分,這個在storm裏面用fields grouping在相同字段上進行grouping就能夠實現。github
下面是對storm-starter(代碼見:https://github.com/nathanmarz/storm-starter)中有關兩個流的聚合的示例代碼剖析:web
先看一下入口類SingleJoinExample。ide
(1)這裏首先建立了兩個發射源spout,分別是genderSpout和ageSpout:post
FeederSpout genderSpout = new FeederSpout(new Fields("id", "gender")); FeederSpout ageSpout = new FeederSpout(new Fields("id", "age")); TopologyBuilder builder = new TopologyBuilder(); builder.setSpout("gender", genderSpout); builder.setSpout("age", ageSpout);
其中genderSpout包含兩個tuple字段:id和gender,ageSpout包含兩個tuple字段:id和age(這裏流聚合就是經過將相同id的tuple進行聚合,獲得一個新的輸出流,包含id、gender和age字段)。ui
(2)爲了避免同的數據流中的同一個id的tuple可以落到同一個task中進行處理,這裏使用了storm中的fileds grouping在id字段上進行分組劃分:url
builder.setBolt("join", new SingleJoinBolt(new Fields("gender", "age"))) .fieldsGrouping("gender", new Fields("id")) .fieldsGrouping("age", new Fields("id"));
從中能夠看到,SingleJoinBolt就是真正進行流聚合的地方。下面咱們來看看:spa
(1)SingleJoinBolt構造時接收一個Fileds對象,其中傳進的是聚合後將要被輸出的字段(這裏就是gender和age字段),保存到變量_outFileds中。code
(2)接下來看看完成SingleJoinBolt的構造後,SingleJoinBolt在真正開始接收處理tuple以前所作的準備工做(代碼見prepare方法):
a)首先,將保存OutputCollector對象,建立TimeCacheMap對象,設置超時回調接口,用於tuple處理失敗時fail消息;緊接着記錄數據源的個數:
_collector = collector; int timeout = ((Number) conf.get(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS)).intValue(); _pending = new TimeCacheMap<List<Object>, Map<GlobalStreamId, Tuple>>(timeout, new ExpireCallback()); _numSources = context.getThisSources().size();
b)遍歷TopologyContext中不一樣數據源,獲得全部數據源(這裏就是genderSpout和ageSpout)中公共的Filed字段,保存到變量_idFields中(例子中就是id字段),同時將_outFileds中字段所在數據源記錄下來,保存到一張HashMap中_fieldLocations,以便聚合後獲取對應的字段值。
Set<String> idFields = null; for(GlobalStreamId source: context.getThisSources().keySet()) { Fields fields = context.getComponentOutputFields(source.get_componentId(), source.get_streamId()); Set<String> setFields = new HashSet<String>(fields.toList()); if(idFields==null) idFields = setFields; else idFields.retainAll(setFields); for(String outfield: _outFields) { for(String sourcefield: fields) { if(outfield.equals(sourcefield)) { _fieldLocations.put(outfield, source); } } } } _idFields = new Fields(new ArrayList<String>(idFields)); if(_fieldLocations.size()!=_outFields.size()) { throw new RuntimeException("Cannot find all outfields among sources"); }
(3)好了,下面開始兩個spout流的聚合過程了(代碼見execute方法):
首先,從tuple中獲取_idFields字段,若是不存在於等待被處理的隊列_pending中,則加入一行,其中key是獲取到的_idFields字段,value是一個空的HashMap<GlobalStreamId, Tuple>對象,記錄GlobalStreamId到Tuple的映射。
List<Object> id = tuple.select(_idFields); GlobalStreamId streamId = new GlobalStreamId(tuple.getSourceComponent(), tuple.getSourceStreamId()); if(!_pending.containsKey(id)) { _pending.put(id, new HashMap<GlobalStreamId, Tuple>()); }
從_pending隊列中,獲取當前GlobalStreamId streamId對應的HashMap對象parts中:
Map<GlobalStreamId, Tuple> parts = _pending.get(id);
若是streamId已經包含其中,則拋出異常,接收到同一個spout中的兩條同樣id的tuple,不然將該streamid加入parts中:
if(parts.containsKey(streamId)) throw new RuntimeException("Received same side of single join twice"); parts.put(streamId, tuple);
若是parts已經包含了聚合數據源的個數_numSources時,從_pending隊列中移除這條記錄,而後開始構造聚合後的結果字段:依次遍歷_outFields中各個字段,從_fieldLocations中取到這些outFiled字段對應的GlobalStreamId,緊接着從parts中取出GlobalStreamId對應的outFiled,放入聚合後的結果中。
if(parts.size()==_numSources) { _pending.remove(id); List<Object> joinResult = new ArrayList<Object>(); for(String outField: _outFields) { GlobalStreamId loc = _fieldLocations.get(outField); joinResult.add(parts.get(loc).getValueByField(outField)); }
最後經過_collector將parts中存放的tuple和聚合後的輸出結果發射出去,並ack這些tuple已經處理成功。
_collector.emit( ArrayList<Tuple>
不然,繼續等待兩個spout流中這個streamid都到齊後再進行聚合處理。
(4)最後,聲明一下輸出字段(代碼見declareOutputFields方法):
declarer.declare(_outFields);