Twitter Storm, 數據流分組策略,fieldsGrouping

##Storm Groupingapp

  1. shuffleGrouping負載均衡

    將流分組定義爲混排。這種混排分組意味着來自Spout的輸入將混排,或隨機分發給此Bolt中的任務。shuffle grouping對各個task的tuple分配的比較均勻。學習

  2. fieldsGrouping測試

    這種grouping機制保證相同field值的tuple會去同一個task,這對於WordCount來講很是關鍵,若是同一個單詞不去同一個task,那麼統計出來的單詞次數就不對了。ui

  3. All grouping線程

    廣播發送, 對於每個tuple將會複製到每個bolt中處理。日誌

  4. Global groupingcode

    Stream中的全部的tuple都會發送給同一個bolt任務處理,全部的tuple將會發送給擁有最小task_id的bolt任務處理。orm

  5. None grouping字符串

    不關注並行處理負載均衡策略時使用該方式,目前等同於shuffle grouping,另外storm將會把bolt任務和他的上游提供數據的任務安排在同一個線程下。

  6. Direct grouping

    由tuple的發射單元直接決定tuple將發射給那個bolt,通常狀況下是由接收tuple的bolt決定接收哪一個bolt發射的Tuple。這是一種比較特別的分組方法,用這種分組意味着消息的發送者指定由消息接收者的哪一個task處理這個消息。 只有被聲明爲Direct Stream的消息流能夠聲明這種分組方法。並且這種消息tuple必須使用emitDirect方法來發射。消息處理者能夠經過TopologyContext來獲取處理它的消息的taskid (OutputCollector.emit方法也會返回taskid)

##fieldsGrouping

上面的資料我摘抄自:http://xumingming.sinaapp.com/127/twitter-storm%E5%A6%82%E4%BD%95%E4%BF%9D%E8%AF%81%E6%B6%88%E6%81%AF%E4%B8%8D%E4%B8%A2%E5%A4%B1/

若是你瞭解Storm,我想你能明白其中的大多數Grouping。這裏的Grouping策略我想着重介紹一下fieldsGrouping,也最難理解的。

fieldsGrouping是按照數據中字段Field的值分組的。下面是個人測試代碼:

TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("words", new TestWordSpout(), 2); 
builder.setBolt("exclaim2", new DefaultStringBolt(), 5)
	    .fieldsGrouping("words", new Fields("word"));

測試的例子Spout是Storm自帶的例子,Blot代碼以下:

public void execute(Tuple tuple) {
	log.info("rev a message: " + tuple.getString(0));
	collector.emit(tuple, new Values(tuple.getString(0) + "!!!"));
    collector.ack(tuple);
}
public void declareOutputFields(OutputFieldsDeclarer declarer) {
    declarer.declare(new Fields("word"));
}

Storm自帶的例子Spout能隨機的返回<code>new String[] {"nathan", "mike", "jackson", "golda", "bertels"};</code>列表中的幾個字符串。這也是測試FieldGroup的好例子。

按照我最先作Storm開始前的理解,既然是按照Field分組,那麼是全部相同的Field值得數據都會到達一個Blot的。我測試不少次,其結果並非這樣,一個Blot會收到多個不一樣的值。我沒有仔細探究Storm這樣分組有什麼特別的地方,以致於本身對Storm的學習停滯了很長時間。

Storm能保證全部相同Field值的數據到達的是相同的Blot,可是不保證一個Blot只處理一個值域。

也就是說,全部值是nathan能到達到一個Blot,可是到達同一個Blot的值可能有多個,如"nathan", "mike"的數據都到達。

理解到這點上,fieldsGrouping就算是理解了。

下面是測試日誌:

9144 [Thread-35-exclaim2] INFO  cn.pointways.dstorm.bolt.DefaultStringBolt - rev a message: bertels
9234 [Thread-35-exclaim2] INFO  cn.pointways.dstorm.bolt.DefaultStringBolt - rev a message: mike
9245 [Thread-33-exclaim2] INFO  cn.pointways.dstorm.bolt.DefaultStringBolt - rev a message: nathan
9335 [Thread-26-exclaim2] INFO  cn.pointways.dstorm.bolt.DefaultStringBolt - rev a message: golda
9346 [Thread-26-exclaim2] INFO  cn.pointways.dstorm.bolt.DefaultStringBolt - rev a message: golda
9437 [Thread-35-exclaim2] INFO  cn.pointways.dstorm.bolt.DefaultStringBolt - rev a message: jackson
9447 [Thread-35-exclaim2] INFO  cn.pointways.dstorm.bolt.DefaultStringBolt - rev a message: mike
9537 [Thread-26-exclaim2] INFO  cn.pointways.dstorm.bolt.DefaultStringBolt - rev a message: golda
9548 [Thread-35-exclaim2] INFO  cn.pointways.dstorm.bolt.DefaultStringBolt - rev a message: jackson
9639 [Thread-33-exclaim2] INFO  cn.pointways.dstorm.bolt.DefaultStringBolt - rev a message: nathan
9649 [Thread-35-exclaim2] INFO  cn.pointways.dstorm.bolt.DefaultStringBolt - rev a message: jackson
9740 [Thread-33-exclaim2] INFO  cn.pointways.dstorm.bolt.DefaultStringBolt - rev a message: nathan
9749 [Thread-35-exclaim2] INFO  cn.pointways.dstorm.bolt.DefaultStringBolt - rev a message: jackson
9841 [Thread-35-exclaim2] INFO  cn.pointways.dstorm.bolt.DefaultStringBolt - rev a message: bertels
9850 [Thread-26-exclaim2] INFO  cn.pointways.dstorm.bolt.DefaultStringBolt - rev a message: golda

由上面的日誌能夠看出,golda這個值的數據,的確歸併到一個Blot處理的。線程編號:Thread-26-exclaim2。 其它值也都是相同值都是在一個線程內被處理的。

相關文章
相關標籤/搜索