Storm bolt重複消費問題解決

最近碰到一個storm的坑, 兩個bolt都須要從kafkaSpout中獲取數據進行各自的業務處理, bolt1的處理是冪等的, bolt2的處理是非冪等的, 上線後發現非冪等的bolt處理老是會處理兩次, 代碼以下:html

//建立拓撲做業
        TopologyBuilder builder = new TopologyBuilder();

        //1. 建立Spout,負責時間調度
        builder.setSpout("timeSpout", new TimeScheduleSpout(60 * 60), 1);

        //2. 建立Spout,從Kafka中讀取信息,流ID:RcKafkaSpout

        builder.setSpout("RcKafkaSpout", new KafkaSpout(spoutConfig), RiskControllConfig.getInt(StormConfig.STORM_SPOUT_PARALLELISM_HINT, 1));

        //3. 建立Bolt,處理Kafka中讀取的信息, redis計數,流ID:RcAnalyzeBolt

        builder.setBolt("RcAnalyzeBolt", new RcAnalyzeBolt(), RiskControllConfig.getInt(StormConfig.STORM_BOLT1_PARALLELISM_HINT, 1)).allGrouping("RcKafkaSpout").allGrouping("timeSpout");//非冪等的疊加操做

        //4. 建立Bolt,將處理的結果存儲至Redis

        builder.setBolt("RcAggregateBolt", new RcAggregateBolt(), RiskControllConfig.getInt(StormConfig.STORM_BOLT2_PARALLELISM_HINT, 1)).shuffleGrouping("RcAnalyzeBolt");

        //5. 更新用戶已完成訂單金額的bolt
        builder.setBolt("LastOrderBolt", new LastOrderBolt(), RiskControllConfig.getInt(StormConfig.STORM_BOLT1_PARALLELISM_HINT, 1)).allGrouping("RcKafkaSpout");//冪等的hbase put操做

 

紅色位置即爲bug, 錯誤緣由是對 storm 消息分發策略的理解有問題redis

徐明明的博客在這一點上講的有點誤導: http://xumingming.sinaapp.com/117/twitter-storm%E7%9A%84%E4%B8%80%E4%BA%9B%E5%85%B3%E9%94%AE%E6%A6%82%E5%BF%B5/apache

All Grouping: 廣播發送, 對於每個tuple, 全部的Bolts都會收到。

實際上, 官網的解釋以下:緩存

http://storm.apache.org/documentation/Concepts.htmlapp

All grouping: The stream is replicated across all the bolt's tasks. Use this grouping with care

應該是對於每一個tuple, 全部Bolt的全部task(也就是線程)都會收到, 也就意味着, 若是你的並行度設置>1, 則每一個tuple會被bolt處理N次ui

 

allgrouping, 通常用於全局的數據同步和共享才須要, 好比全局的配置更新等, 好比上面的用於定時更新緩存數據的timeSpout, 咱們就使用的是allgrouping方式this

相關文章
相關標籤/搜索