1、前言html
針對大叔據實時處理的入門,除了使用WordCount示例以外,還須要相對更深刻點的示例來理解Storm,所以,本篇博文利用Storm實現了頻繁項集挖掘的案例,以方便更好的入門Storm。java
2、基礎知識git
2.1 頻繁二項集挖掘github
如顧客去超市購物時,牙膏和牙刷基本上都是擺放在一塊兒,由於購買牙膏時,頗有可能會購買牙刷。另外,「啤酒與尿布」的案例則是對訂單進行分析挖掘後發現的規律,將啤酒和尿布一塊兒擺放會促進啤酒的銷量。redis
2.2 算法設計算法
本示例中不考慮太複雜的挖掘算法,只考慮將兩個商品組合後的挖掘,設計以下數據庫
· 將每筆訂單的商品按照兩兩分組。編程
· 將每一個分組的頻度進行統計(不考慮商品的次序)。json
· 根據頻度計算支持度(每一個組合出現的頻率越高,更有多是頻繁組合)和置信度(商品組合出現的置信程度)。服務器
· 設置支持度和置信度閾值,過濾不達標的數據。
2.3 Storm設計思路
· 使用Redis做爲存儲訂單數據的數據庫。
· 使用Spout從Redis中讀取訂單數據。
· 使用Bolt計算分組頻度。
· 使用Bolt計算支持度和置信度。
· 使用Bolt篩選結果並存儲到Redis中。
2.4 拓撲結構圖
根據程序思路設計以下所示的拓撲結構,其組件在以後進行介紹。
3、設計實現
3.1 實現步驟
1. 產生訂單數據
經過模擬程序產生訂單數據,並存儲Redis中,即便用OrderGenerator來生成訂單數據並存入Redis中,每一個訂單有四種不一樣商品及其數量組成。
2. 接入訂單數據
經過OrderSpout讀取Redis中的訂單數據,以供拓撲結構下游的Bolt使用。
3. 對訂單中商品進行分組
經過SplitBolt對訂單中的商品進行分組,兩兩分組並構建商品對,發送元組至下游Bolt。
4. 統計商品對總數
使用PairTotalCountBolt對全部商品對數量進行統計(用於計算支持度),併發送元組至下游Bolt。
5. 統計商品對及其出現次數
使用PairCountBolt對商品對出現的次數進行統計,併發送元組至下游Bolt。
6. 計算商品對支持度
使用SupportComputeBolt對商品對的支持度進行計算,併發送元組至下游Bolt。
7. 計算商品對置信度
使用ConfidenceComputeBolt對商品對的置信度進行計算,併發送元組至下游Bolt。
8. 過濾符合條件的商品對
使用FilterBolt對符合條件的商品對進行過濾並存入redis,併發送元組至下游Bolt。
3.1 源碼分析
下面給出拓撲結構中的各組件的源碼並進行分析。
1. OrderSpout
package com.hust.grid.leesf.ordertest.spout; import java.util.Map; import org.json.simple.JSONArray; import org.json.simple.JSONObject; import org.json.simple.JSONValue; import com.hust.grid.leesf.ordertest.common.ConfKeys; import com.hust.grid.leesf.ordertest.common.FieldNames; import backtype.storm.spout.SpoutOutputCollector; import backtype.storm.task.TopologyContext; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.topology.base.BaseRichSpout; import backtype.storm.tuple.Fields; import backtype.storm.tuple.Values; import redis.clients.jedis.Jedis; /** * 數據源,從redis讀取訂單 * * @author leesf * */ public class OrderSpout extends BaseRichSpout { private static final long serialVersionUID = 1L; private SpoutOutputCollector collector; private Jedis jedis; private String host; private int port; public void open(@SuppressWarnings("rawtypes") Map conf, TopologyContext context, SpoutOutputCollector collector) { this.collector = collector; this.host = conf.get(ConfKeys.REDIS_HOST).toString(); this.port = Integer.parseInt(conf.get(ConfKeys.REDIS_PORT).toString()); connectToRedis(); } private void connectToRedis() { jedis = new Jedis(host, port); jedis.connect(); } public void nextTuple() { String content = jedis.rpop("orders"); // 獲取一條訂單數據 if (null == content || "nil".equals(content)) { // 若無,則等待300ms try { Thread.sleep(300); } catch (InterruptedException e) { e.printStackTrace(); } } else { // 對訂單數據進行轉化 JSONObject object = (JSONObject) JSONValue.parse(content); String id = object.get(FieldNames.ID).toString(); // 獲取ID JSONArray items = (JSONArray) object.get(FieldNames.ITEMS); // 獲取訂單中的商品 for (Object obj : items) { // 遍歷訂單中的商品 JSONObject item = (JSONObject) obj; String name = item.get(FieldNames.NAME).toString(); // 商品名稱 int count = Integer.parseInt(item.get(FieldNames.COUNT).toString()); // 商品數量 collector.emit(new Values(id, name, count)); // 發射訂單號、商品名稱、商品數量 if (jedis.hexists("itemCounts", name)) { // redis中存在name字段 jedis.hincrBy("itemCounts", name, 1); // 商品對應數量(訂單中多個商品看成1個)增長1 } else { // redis中不存在name字段 jedis.hset("itemCounts", name, "1"); // 將name字段的值(商品數量)設置爲1 } } } } public void declareOutputFields(OutputFieldsDeclarer declarer) { // 聲明發射元組字段 declarer.declare(new Fields(FieldNames.ID, FieldNames.NAME, FieldNames.COUNT)); } }
說明:OrderSpout會從redis中讀取訂單數據,並遍歷訂單中每一個商品併發射,同時會統計商品數據並存入redis。
2. CommandSpout
package com.hust.grid.leesf.ordertest.spout; import java.util.Map; import com.hust.grid.leesf.ordertest.common.FieldNames; import backtype.storm.spout.SpoutOutputCollector; import backtype.storm.task.TopologyContext; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.topology.base.BaseRichSpout; import backtype.storm.tuple.Fields; import backtype.storm.tuple.Values; /** * 統計支持度和置信度 * * @author leesf */ public class CommandSpout extends BaseRichSpout { private static final long serialVersionUID = 1L; private SpoutOutputCollector collector; public void open(@SuppressWarnings("rawtypes") Map conf, TopologyContext context, SpoutOutputCollector collector) { this.collector = collector; } public void nextTuple() { // 休眠5S後發射「statistics」 try { Thread.sleep(5000); } catch (InterruptedException e) { e.printStackTrace(); } collector.emit(new Values("statistics")); } public void declareOutputFields(OutputFieldsDeclarer declarer) { // 聲明元組字段 declarer.declare(new Fields(FieldNames.COMMAND)); } }
說明:下游Bolt根據其發射的元組信息來統計支持度和置信度,其每5秒發射一次統計信號。
3. SplitBolt
package com.hust.grid.leesf.ordertest.bolt; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; import com.hust.grid.leesf.ordertest.common.FieldNames; import backtype.storm.task.OutputCollector; import backtype.storm.task.TopologyContext; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.topology.base.BaseRichBolt; import backtype.storm.tuple.Fields; import backtype.storm.tuple.Tuple; import backtype.storm.tuple.Values; /** * 對訂單中的商品進行兩兩組合併發送 * * @author leesf * */ public class SplitBolt extends BaseRichBolt { private static final long serialVersionUID = 1L; private OutputCollector collector; private Map<String, List<String>> orderItems; // 存儲訂單及其商品 public void prepare(@SuppressWarnings("rawtypes") Map conf, TopologyContext context, OutputCollector collector) { this.collector = collector; orderItems = new HashMap<String, List<String>>(); } public void execute(Tuple tuple) { // 獲取訂單號和商品名稱 String id = tuple.getStringByField(FieldNames.ID); String newItem = tuple.getStringByField(FieldNames.NAME); if (!orderItems.containsKey(id)) { // 不包含該訂單 // 新生商品鏈表 ArrayList<String> items = new ArrayList<String>(); // 添加商品 items.add(newItem); orderItems.put(id, items); return; } // 包含訂單,取出訂單中包含的商品 List<String> items = orderItems.get(id); for (String existItem : items) { // 遍歷商品 // 將元組中提取的商品與訂單中已存在的商品組合後發射 collector.emit(createPair(newItem, existItem)); } // 添加新的商品 items.add(newItem); } private Values createPair(String item1, String item2) { // 按照指定順序生成商品對 if (item1.compareTo(item2) > 0) { return new Values(item1, item2); } return new Values(item2, item1); } public void declareOutputFields(OutputFieldsDeclarer declarer) { // 聲明元組字段 declarer.declare(new Fields(FieldNames.ITEM1, FieldNames.ITEM2)); } }
說明:其將每一個訂單的兩兩商品進行組合,而後發射。
4. PairTotalCountBolt
package com.hust.grid.leesf.ordertest.bolt; import java.util.Map; import com.hust.grid.leesf.ordertest.common.FieldNames; import backtype.storm.task.OutputCollector; import backtype.storm.task.TopologyContext; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.topology.base.BaseRichBolt; import backtype.storm.tuple.Fields; import backtype.storm.tuple.Tuple; import backtype.storm.tuple.Values; /** * 計算商品對總數 * * @author leesf * */ public class PairTotalCountBolt extends BaseRichBolt { private static final long serialVersionUID = 1L; private OutputCollector collector; private int totalCount; // 商品對總數 public void prepare(@SuppressWarnings("rawtypes") Map conf, TopologyContext context, OutputCollector collector) { this.collector = collector; totalCount = 0; } public void execute(Tuple tuple) { totalCount++; // 每收到一個元組,便增長商品對總數 collector.emit(new Values(totalCount)); // 發射商品對總數 } public void declareOutputFields(OutputFieldsDeclarer declarer) { // 聲明元組字段 declarer.declare(new Fields(FieldNames.TOTAL_COUNT)); } }
說明:其用於統計全部商品對的數量(用於後面支持度的計算)。
5. PairCountBolt
package com.hust.grid.leesf.ordertest.bolt; import java.util.HashMap; import java.util.Map; import com.hust.grid.leesf.ordertest.common.FieldNames; import com.hust.grid.leesf.ordertest.common.ItemPair; import backtype.storm.task.OutputCollector; import backtype.storm.task.TopologyContext; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.topology.base.BaseRichBolt; import backtype.storm.tuple.Fields; import backtype.storm.tuple.Tuple; import backtype.storm.tuple.Values; /** * 計算商品對出現的次數 * * @author leesf * */ public class PairCountBolt extends BaseRichBolt { private static final long serialVersionUID = 1L; private OutputCollector collector; private Map<ItemPair, Integer> pairCounts; // 存儲商品對及其出現的次數 public void prepare(@SuppressWarnings("rawtypes") Map conf, TopologyContext context, OutputCollector collector) { this.collector = collector; this.pairCounts = new HashMap<ItemPair, Integer>(); } public void execute(Tuple tuple) { String item1 = tuple.getStringByField(FieldNames.ITEM1); String item2 = tuple.getStringByField(FieldNames.ITEM2); ItemPair itemPair = new ItemPair(item1, item2); int pairCount = 0; if (pairCounts.containsKey(itemPair)) { // 包含商品對 // 取出商品對出現的次數 pairCount = pairCounts.get(itemPair); } // 更新出現次數 pairCount++; pairCounts.put(itemPair, pairCount); collector.emit(new Values(item1, item2, pairCount)); } public void declareOutputFields(OutputFieldsDeclarer declarer) { // 聲明元組字段 declarer.declare(new Fields(FieldNames.ITEM1, FieldNames.ITEM2, FieldNames.PAIR_COUNT)); } }
說明:其用於統計每一個商品對出現的次數,而後發射。
6. SupportComputeBolt
package com.hust.grid.leesf.ordertest.bolt; import java.util.HashMap; import java.util.Map; import com.hust.grid.leesf.ordertest.common.FieldNames; import com.hust.grid.leesf.ordertest.common.ItemPair; import backtype.storm.task.OutputCollector; import backtype.storm.task.TopologyContext; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.topology.base.BaseRichBolt; import backtype.storm.tuple.Fields; import backtype.storm.tuple.Tuple; import backtype.storm.tuple.Values; /** * 計算商品對的支持度 * * @author leesf * */ public class SupportComputeBolt extends BaseRichBolt { private static final long serialVersionUID = 1L; private OutputCollector collector; private Map<ItemPair, Integer> pairCounts; // 存儲商品對及其出現的次數 private int pairTotalCount; // 商品對總數 public void prepare(@SuppressWarnings("rawtypes") Map conf, TopologyContext context, OutputCollector collector) { this.collector = collector; pairCounts = new HashMap<ItemPair, Integer>(); pairTotalCount = 0; } /** * 因爲SupportComputeBolt訂閱了多個流,其須要根據不一樣的字段作出不一樣的行爲 */ public void execute(Tuple tuple) { if (tuple.getFields().get(0).equals(FieldNames.TOTAL_COUNT)) { // 對應PairTotalCountBolt // 取出商品對總數量 pairTotalCount = tuple.getIntegerByField(FieldNames.TOTAL_COUNT); } else if (tuple.getFields().size() == 3) { // 對應PairCountBolt // 取出商品及其商品對出現的次數 String item1 = tuple.getStringByField(FieldNames.ITEM1); String item2 = tuple.getStringByField(FieldNames.ITEM2); int pairCount = tuple.getIntegerByField(FieldNames.PAIR_COUNT); // 存儲商品對及其次數 pairCounts.put(new ItemPair(item1, item2), pairCount); } else if (tuple.getFields().get(0).equals(FieldNames.COMMAND)) { // 對應CommandSpout for (ItemPair itemPair : pairCounts.keySet()) { // 遍歷商品對 // 計算商品支持度,使用商品對出現的次數除以商品對總數量 double itemSupport = (double) (pairCounts.get(itemPair).intValue()) / pairTotalCount; collector.emit(new Values(itemPair.getItem1(), itemPair.getItem2(), itemSupport)); } } } public void declareOutputFields(OutputFieldsDeclarer declarer) { // 定義元組字段 declarer.declare(new Fields(FieldNames.ITEM1, FieldNames.ITEM2, FieldNames.SUPPORT)); } }
說明:計算每一個商品對的支持度,而且發射支持度。
7. ConfidenceComputeBolt
package com.hust.grid.leesf.ordertest.bolt; import java.util.HashMap; import java.util.Map; import com.hust.grid.leesf.ordertest.common.ConfKeys; import com.hust.grid.leesf.ordertest.common.FieldNames; import com.hust.grid.leesf.ordertest.common.ItemPair; import backtype.storm.task.OutputCollector; import backtype.storm.task.TopologyContext; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.topology.base.BaseRichBolt; import backtype.storm.tuple.Fields; import backtype.storm.tuple.Tuple; import backtype.storm.tuple.Values; import redis.clients.jedis.Jedis; /** * 計算商品對的置信度 * * @author leesf */ public class ConfidenceComputeBolt extends BaseRichBolt { private static final long serialVersionUID = 1L; private OutputCollector collector; private Map<ItemPair, Integer> pairCounts; // 存儲商品對及其出現的次數 private String host; private int port; private Jedis jedis; public void prepare(@SuppressWarnings("rawtypes") Map conf, TopologyContext context, OutputCollector collector) { this.collector = collector; this.host = conf.get(ConfKeys.REDIS_HOST).toString(); this.port = Integer.parseInt(conf.get(ConfKeys.REDIS_PORT).toString()); pairCounts = new HashMap<ItemPair, Integer>(); connectToRedis(); } private void connectToRedis() { jedis = new Jedis(host, port); jedis.connect(); } /** * 因爲ConfidenceComputeBolt訂閱了多個流,其須要根據元組不一樣的字段作出不一樣的行爲 */ public void execute(Tuple tuple) { if (tuple.getFields().size() == 3) { // 對應PairCountBolt // 取出商品對及其出現次數 String item1 = tuple.getStringByField(FieldNames.ITEM1); String item2 = tuple.getStringByField(FieldNames.ITEM2); int pairCount = tuple.getIntegerByField(FieldNames.PAIR_COUNT); pairCounts.put(new ItemPair(item1, item2), pairCount); } else if (tuple.getFields().get(0).equals(FieldNames.COMMAND)) { // 對應CommandSpout,須要進行統計 for (ItemPair itemPair : pairCounts.keySet()) { // 遍歷商品對 // 從redis中取出商品對中商品出現的次數 double item1Count = Integer.parseInt(jedis.hget("itemCounts", itemPair.getItem1())); double item2Count = Integer.parseInt(jedis.hget("itemCounts", itemPair.getItem2())); double itemConfidence = pairCounts.get(itemPair).intValue(); // 計算商品對置信度 if (item1Count < item2Count) { itemConfidence /= item1Count; } else { itemConfidence /= item2Count; } collector.emit(new Values(itemPair.getItem1(), itemPair.getItem2(), itemConfidence)); } } } public void declareOutputFields(OutputFieldsDeclarer declarer) { // 聲明元組字段 declarer.declare(new Fields(FieldNames.ITEM1, FieldNames.ITEM2, FieldNames.CONFIDENCE)); } }
說明:計算商品對的置信度,而且發射置信度。
8. FilterBolt
package com.hust.grid.leesf.ordertest.bolt; import java.util.Map; import org.json.simple.JSONObject; import com.hust.grid.leesf.ordertest.common.ConfKeys; import com.hust.grid.leesf.ordertest.common.FieldNames; import com.hust.grid.leesf.ordertest.common.ItemPair; import backtype.storm.task.OutputCollector; import backtype.storm.task.TopologyContext; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.topology.base.BaseRichBolt; import backtype.storm.tuple.Fields; import backtype.storm.tuple.Tuple; import backtype.storm.tuple.Values; import redis.clients.jedis.Jedis; /** * 過濾符合條件的商品對並存入redis * * @author leesf * */ public class FilterBolt extends BaseRichBolt { private static final long serialVersionUID = 1L; // 商品對的支持度和置信度 private static final double SUPPORT_THRESHOLD = 0.01; private static final double CONFIDENCE_THRESHOLD = 0.01; private OutputCollector collector; private Jedis jedis; private String host; private int port; public void prepare(@SuppressWarnings("rawtypes") Map conf, TopologyContext context, OutputCollector collector) { this.collector = collector; this.host = conf.get(ConfKeys.REDIS_HOST).toString(); this.port = Integer.parseInt(conf.get(ConfKeys.REDIS_PORT).toString()); connectToRedis(); } private void connectToRedis() { jedis = new Jedis(host, port); jedis.connect(); } @SuppressWarnings("unchecked") public void execute(Tuple tuple) { // 取出商品並構造商品對 String item1 = tuple.getStringByField(FieldNames.ITEM1); String item2 = tuple.getStringByField(FieldNames.ITEM2); ItemPair itemPair = new ItemPair(item1, item2); String pairString = itemPair.toString(); double support = 0; double confidence = 0; if (tuple.getFields().get(2).equals(FieldNames.SUPPORT)) { // 對應SupportComputeBolt // 獲取支持度並存入redis support = tuple.getDoubleByField(FieldNames.SUPPORT); jedis.hset("supports", pairString, String.valueOf(support)); } else if (tuple.getFields().get(2).equals(FieldNames.CONFIDENCE)) { // 對應ConfidenceComputeBolt // 獲取置信度並存入redis confidence = tuple.getDoubleByField(FieldNames.CONFIDENCE); jedis.hset("confidences", pairString, String.valueOf(confidence)); } if (!jedis.hexists("supports", pairString) || !jedis.hexists("confidences", pairString)) { // 商品對的支持度和置信度還未計算完成,返回 return; } // 商品對的支持度和置信度已經計算完成 support = Double.parseDouble(jedis.hget("supports", pairString)); confidence = Double.parseDouble(jedis.hget("confidences", pairString)); if (support >= SUPPORT_THRESHOLD && confidence >= CONFIDENCE_THRESHOLD) { // 支持度和置信度超過閾值 // 將該商品對信息存入redis中 JSONObject pairValue = new JSONObject(); pairValue.put(FieldNames.SUPPORT, support); pairValue.put(FieldNames.CONFIDENCE, confidence); jedis.hset("recommendedPairs", pairString, pairValue.toJSONString()); collector.emit(new Values(item1, item2, support, confidence)); } else { // 不高於閾值,則從redis中刪除 jedis.hdel("recommendedPairs", pairString); } } public void declareOutputFields(OutputFieldsDeclarer declarer) { // 聲明元組字段 declarer.declare(new Fields(FieldNames.ITEM1, FieldNames.ITEM2, FieldNames.SUPPORT, FieldNames.CONFIDENCE)); } }
說明:判斷支持度和置信度是否超過了閾值,若超過則須要存入redis,不然,從redis中刪除。
4、程序運行
4.1. 環境依賴
打開redis服務器、客戶端(方便觀看結果)和zookeeper。
4.2. 寫入訂單數據
運行OrderGenerator,生成並寫入訂單數據,經過redis查看,結果以下
表示已經成功寫入了訂單數據。
4.3. 運行任務拓撲
運行OrderTopology,其會根據訂單中的商品數據,生成並寫入推薦商品對,經過redis查看,結果以下
能夠看到運行完成後,已經成功生成了推薦商品方案。
5、總結
經過本篇Storm案例的學習,對於Storm的編程有了更深刻的認識,同時,本項目的源代碼已經上傳至github,歡迎star,謝謝各位園友的觀看~
參考連接:http://www.jikexueyuan.com/course/1437.html