【Storm】Storm實戰之頻繁二項集挖掘(附源碼)

1、前言html

  針對大叔據實時處理的入門,除了使用WordCount示例以外,還須要相對更深刻點的示例來理解Storm,所以,本篇博文利用Storm實現了頻繁項集挖掘的案例,以方便更好的入門Storm。java

2、基礎知識git

  2.1 頻繁二項集挖掘github

  如顧客去超市購物時,牙膏和牙刷基本上都是擺放在一塊兒,由於購買牙膏時,頗有可能會購買牙刷。另外,「啤酒與尿布」的案例則是對訂單進行分析挖掘後發現的規律,將啤酒和尿布一塊兒擺放會促進啤酒的銷量。redis

  2.2 算法設計算法

  本示例中不考慮太複雜的挖掘算法,只考慮將兩個商品組合後的挖掘,設計以下數據庫

    · 將每筆訂單的商品按照兩兩分組。編程

    · 將每一個分組的頻度進行統計(不考慮商品的次序)。json

    · 根據頻度計算支持度(每一個組合出現的頻率越高,更有多是頻繁組合)和置信度(商品組合出現的置信程度)。服務器

    · 設置支持度和置信度閾值,過濾不達標的數據。

  2.3 Storm設計思路

    · 使用Redis做爲存儲訂單數據的數據庫。

    · 使用Spout從Redis中讀取訂單數據。

    · 使用Bolt計算分組頻度。

    · 使用Bolt計算支持度和置信度。

    · 使用Bolt篩選結果並存儲到Redis中。

  2.4 拓撲結構圖

  根據程序思路設計以下所示的拓撲結構,其組件在以後進行介紹。

  

3、設計實現

  3.1 實現步驟

  1. 產生訂單數據

  經過模擬程序產生訂單數據,並存儲Redis中,即便用OrderGenerator來生成訂單數據並存入Redis中,每一個訂單有四種不一樣商品及其數量組成。

  2. 接入訂單數據

  經過OrderSpout讀取Redis中的訂單數據,以供拓撲結構下游的Bolt使用。

  3. 對訂單中商品進行分組

  經過SplitBolt對訂單中的商品進行分組,兩兩分組並構建商品對,發送元組至下游Bolt。

  4. 統計商品對總數

  使用PairTotalCountBolt對全部商品對數量進行統計(用於計算支持度),併發送元組至下游Bolt。

  5. 統計商品對及其出現次數

  使用PairCountBolt對商品對出現的次數進行統計,併發送元組至下游Bolt。

  6. 計算商品對支持度

  使用SupportComputeBolt對商品對的支持度進行計算,併發送元組至下游Bolt。

  7. 計算商品對置信度

  使用ConfidenceComputeBolt對商品對的置信度進行計算,併發送元組至下游Bolt。

  8. 過濾符合條件的商品對

  使用FilterBolt對符合條件的商品對進行過濾並存入redis,併發送元組至下游Bolt。

     3.1 源碼分析

  下面給出拓撲結構中的各組件的源碼並進行分析。

  1. OrderSpout   

package com.hust.grid.leesf.ordertest.spout;

import java.util.Map;

import org.json.simple.JSONArray;
import org.json.simple.JSONObject;
import org.json.simple.JSONValue;

import com.hust.grid.leesf.ordertest.common.ConfKeys;
import com.hust.grid.leesf.ordertest.common.FieldNames;

import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichSpout;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
import redis.clients.jedis.Jedis;

/**
 * 數據源,從redis讀取訂單
 * 
 * @author leesf
 *
 */
public class OrderSpout extends BaseRichSpout {
    private static final long serialVersionUID = 1L;

    private SpoutOutputCollector collector;
    private Jedis jedis;
    private String host;
    private int port;

    public void open(@SuppressWarnings("rawtypes") Map conf, TopologyContext context, SpoutOutputCollector collector) {
        this.collector = collector;
        this.host = conf.get(ConfKeys.REDIS_HOST).toString();
        this.port = Integer.parseInt(conf.get(ConfKeys.REDIS_PORT).toString());
        connectToRedis();
    }

    private void connectToRedis() {
        jedis = new Jedis(host, port);
        jedis.connect();
    }

    public void nextTuple() {
        String content = jedis.rpop("orders"); // 獲取一條訂單數據

        if (null == content || "nil".equals(content)) { // 若無,則等待300ms
            try {
                Thread.sleep(300);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        } else { // 對訂單數據進行轉化
            JSONObject object = (JSONObject) JSONValue.parse(content);
            String id = object.get(FieldNames.ID).toString(); // 獲取ID
            JSONArray items = (JSONArray) object.get(FieldNames.ITEMS); // 獲取訂單中的商品

            for (Object obj : items) { // 遍歷訂單中的商品
                JSONObject item = (JSONObject) obj;
                String name = item.get(FieldNames.NAME).toString(); // 商品名稱
                int count = Integer.parseInt(item.get(FieldNames.COUNT).toString()); // 商品數量
                collector.emit(new Values(id, name, count)); // 發射訂單號、商品名稱、商品數量

                if (jedis.hexists("itemCounts", name)) { // redis中存在name字段
                    jedis.hincrBy("itemCounts", name, 1); // 商品對應數量(訂單中多個商品看成1個)增長1
                } else { // redis中不存在name字段
                    jedis.hset("itemCounts", name, "1"); // 將name字段的值(商品數量)設置爲1
                }
            }
        }
    }

    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        // 聲明發射元組字段
        declarer.declare(new Fields(FieldNames.ID, FieldNames.NAME, FieldNames.COUNT));
    }
}
OrderSpout

  說明:OrderSpout會從redis中讀取訂單數據,並遍歷訂單中每一個商品併發射,同時會統計商品數據並存入redis。

  2. CommandSpout

package com.hust.grid.leesf.ordertest.spout;

import java.util.Map;

import com.hust.grid.leesf.ordertest.common.FieldNames;

import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichSpout;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;

/**
 * 統計支持度和置信度
 * 
 * @author leesf
 */
public class CommandSpout extends BaseRichSpout {
    private static final long serialVersionUID = 1L;

    private SpoutOutputCollector collector;

    public void open(@SuppressWarnings("rawtypes") Map conf, TopologyContext context, SpoutOutputCollector collector) {
        this.collector = collector;
    }

    public void nextTuple() {
        // 休眠5S後發射「statistics」
        try {
            Thread.sleep(5000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        collector.emit(new Values("statistics"));
    }

    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        // 聲明元組字段
        declarer.declare(new Fields(FieldNames.COMMAND));
    }
}
CommandSpout

  說明:下游Bolt根據其發射的元組信息來統計支持度和置信度,其每5秒發射一次統計信號。

  3. SplitBolt 

package com.hust.grid.leesf.ordertest.bolt;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import com.hust.grid.leesf.ordertest.common.FieldNames;

import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;

/**
 * 對訂單中的商品進行兩兩組合併發送
 * 
 * @author leesf
 *
 */
public class SplitBolt extends BaseRichBolt {
    private static final long serialVersionUID = 1L;

    private OutputCollector collector;
    private Map<String, List<String>> orderItems; // 存儲訂單及其商品

    public void prepare(@SuppressWarnings("rawtypes") Map conf, TopologyContext context, OutputCollector collector) {
        this.collector = collector;
        orderItems = new HashMap<String, List<String>>();
    }

    public void execute(Tuple tuple) {
        // 獲取訂單號和商品名稱
        String id = tuple.getStringByField(FieldNames.ID);
        String newItem = tuple.getStringByField(FieldNames.NAME);

        if (!orderItems.containsKey(id)) { // 不包含該訂單
            // 新生商品鏈表
            ArrayList<String> items = new ArrayList<String>();
            // 添加商品
            items.add(newItem);

            orderItems.put(id, items);

            return;
        }
        // 包含訂單,取出訂單中包含的商品
        List<String> items = orderItems.get(id);
        for (String existItem : items) { // 遍歷商品
            // 將元組中提取的商品與訂單中已存在的商品組合後發射
            collector.emit(createPair(newItem, existItem));
        }
        // 添加新的商品
        items.add(newItem);
    }

    private Values createPair(String item1, String item2) { // 按照指定順序生成商品對
        if (item1.compareTo(item2) > 0) {
            return new Values(item1, item2);
        }

        return new Values(item2, item1);
    }

    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        // 聲明元組字段
        declarer.declare(new Fields(FieldNames.ITEM1, FieldNames.ITEM2));
    }
}
SplitBolt

  說明:其將每一個訂單的兩兩商品進行組合,而後發射。

  4. PairTotalCountBolt 

package com.hust.grid.leesf.ordertest.bolt;

import java.util.Map;

import com.hust.grid.leesf.ordertest.common.FieldNames;

import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;

/**
 * 計算商品對總數
 * 
 * @author leesf
 *
 */
public class PairTotalCountBolt extends BaseRichBolt {
    private static final long serialVersionUID = 1L;

    private OutputCollector collector;
    private int totalCount; // 商品對總數

    public void prepare(@SuppressWarnings("rawtypes") Map conf, TopologyContext context, OutputCollector collector) {
        this.collector = collector;
        totalCount = 0;
    }

    public void execute(Tuple tuple) {
        totalCount++; // 每收到一個元組,便增長商品對總數
        collector.emit(new Values(totalCount)); // 發射商品對總數
    }

    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        // 聲明元組字段
        declarer.declare(new Fields(FieldNames.TOTAL_COUNT));
    }
}
PairTotalCountBolt

  說明:其用於統計全部商品對的數量(用於後面支持度的計算)。

  5. PairCountBolt

package com.hust.grid.leesf.ordertest.bolt;

import java.util.HashMap;
import java.util.Map;

import com.hust.grid.leesf.ordertest.common.FieldNames;
import com.hust.grid.leesf.ordertest.common.ItemPair;

import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;

/**
 * 計算商品對出現的次數
 * 
 * @author leesf
 *
 */
public class PairCountBolt extends BaseRichBolt {
    private static final long serialVersionUID = 1L;

    private OutputCollector collector;
    private Map<ItemPair, Integer> pairCounts; // 存儲商品對及其出現的次數

    public void prepare(@SuppressWarnings("rawtypes") Map conf, TopologyContext context, OutputCollector collector) {
        this.collector = collector;
        this.pairCounts = new HashMap<ItemPair, Integer>();
    }

    public void execute(Tuple tuple) {
        String item1 = tuple.getStringByField(FieldNames.ITEM1);
        String item2 = tuple.getStringByField(FieldNames.ITEM2);

        ItemPair itemPair = new ItemPair(item1, item2);
        int pairCount = 0;

        if (pairCounts.containsKey(itemPair)) { // 包含商品對
            // 取出商品對出現的次數
            pairCount = pairCounts.get(itemPair);
        }
        // 更新出現次數
        pairCount++;

        pairCounts.put(itemPair, pairCount);

        collector.emit(new Values(item1, item2, pairCount));
    }

    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        // 聲明元組字段
        declarer.declare(new Fields(FieldNames.ITEM1, FieldNames.ITEM2, FieldNames.PAIR_COUNT));
    }
}
PairCountBolt

  說明:其用於統計每一個商品對出現的次數,而後發射。

  6. SupportComputeBolt  

package com.hust.grid.leesf.ordertest.bolt;

import java.util.HashMap;
import java.util.Map;

import com.hust.grid.leesf.ordertest.common.FieldNames;
import com.hust.grid.leesf.ordertest.common.ItemPair;

import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;

/**
 * 計算商品對的支持度
 * 
 * @author leesf
 *
 */
public class SupportComputeBolt extends BaseRichBolt {
    private static final long serialVersionUID = 1L;

    private OutputCollector collector;
    private Map<ItemPair, Integer> pairCounts; // 存儲商品對及其出現的次數
    private int pairTotalCount; // 商品對總數

    public void prepare(@SuppressWarnings("rawtypes") Map conf, TopologyContext context, OutputCollector collector) {
        this.collector = collector;
        pairCounts = new HashMap<ItemPair, Integer>();
        pairTotalCount = 0;
    }

    /**
     * 因爲SupportComputeBolt訂閱了多個流,其須要根據不一樣的字段作出不一樣的行爲
     */
    public void execute(Tuple tuple) {
        if (tuple.getFields().get(0).equals(FieldNames.TOTAL_COUNT)) { // 對應PairTotalCountBolt
            // 取出商品對總數量
            pairTotalCount = tuple.getIntegerByField(FieldNames.TOTAL_COUNT);
        } else if (tuple.getFields().size() == 3) { // 對應PairCountBolt
            // 取出商品及其商品對出現的次數
            String item1 = tuple.getStringByField(FieldNames.ITEM1);
            String item2 = tuple.getStringByField(FieldNames.ITEM2);
            int pairCount = tuple.getIntegerByField(FieldNames.PAIR_COUNT);
            // 存儲商品對及其次數
            pairCounts.put(new ItemPair(item1, item2), pairCount);
        } else if (tuple.getFields().get(0).equals(FieldNames.COMMAND)) { // 對應CommandSpout
            for (ItemPair itemPair : pairCounts.keySet()) { // 遍歷商品對
                // 計算商品支持度,使用商品對出現的次數除以商品對總數量
                double itemSupport = (double) (pairCounts.get(itemPair).intValue()) / pairTotalCount;

                collector.emit(new Values(itemPair.getItem1(), itemPair.getItem2(), itemSupport));
            }
        }
    }

    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        // 定義元組字段
        declarer.declare(new Fields(FieldNames.ITEM1, FieldNames.ITEM2, FieldNames.SUPPORT));
    }

}
SupportComputeBolt

  說明:計算每一個商品對的支持度,而且發射支持度。

  7. ConfidenceComputeBolt 

package com.hust.grid.leesf.ordertest.bolt;

import java.util.HashMap;
import java.util.Map;

import com.hust.grid.leesf.ordertest.common.ConfKeys;
import com.hust.grid.leesf.ordertest.common.FieldNames;
import com.hust.grid.leesf.ordertest.common.ItemPair;

import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;
import redis.clients.jedis.Jedis;

/**
 * 計算商品對的置信度
 * 
 * @author leesf
 */
public class ConfidenceComputeBolt extends BaseRichBolt {
    private static final long serialVersionUID = 1L;

    private OutputCollector collector;
    private Map<ItemPair, Integer> pairCounts; // 存儲商品對及其出現的次數

    private String host;
    private int port;
    private Jedis jedis;

    public void prepare(@SuppressWarnings("rawtypes") Map conf, TopologyContext context, OutputCollector collector) {
        this.collector = collector;
        this.host = conf.get(ConfKeys.REDIS_HOST).toString();
        this.port = Integer.parseInt(conf.get(ConfKeys.REDIS_PORT).toString());
        pairCounts = new HashMap<ItemPair, Integer>();
        connectToRedis();
    }

    private void connectToRedis() {
        jedis = new Jedis(host, port);
        jedis.connect();
    }

    /**
     * 因爲ConfidenceComputeBolt訂閱了多個流,其須要根據元組不一樣的字段作出不一樣的行爲
     */
    public void execute(Tuple tuple) {
        if (tuple.getFields().size() == 3) { // 對應PairCountBolt
            // 取出商品對及其出現次數
            String item1 = tuple.getStringByField(FieldNames.ITEM1);
            String item2 = tuple.getStringByField(FieldNames.ITEM2);
            int pairCount = tuple.getIntegerByField(FieldNames.PAIR_COUNT);

            pairCounts.put(new ItemPair(item1, item2), pairCount);
        } else if (tuple.getFields().get(0).equals(FieldNames.COMMAND)) { // 對應CommandSpout,須要進行統計
            for (ItemPair itemPair : pairCounts.keySet()) { // 遍歷商品對
                // 從redis中取出商品對中商品出現的次數
                double item1Count = Integer.parseInt(jedis.hget("itemCounts", itemPair.getItem1()));
                double item2Count = Integer.parseInt(jedis.hget("itemCounts", itemPair.getItem2()));
                double itemConfidence = pairCounts.get(itemPair).intValue();

                // 計算商品對置信度
                if (item1Count < item2Count) {
                    itemConfidence /= item1Count;
                } else {
                    itemConfidence /= item2Count;
                }

                collector.emit(new Values(itemPair.getItem1(), itemPair.getItem2(), itemConfidence));
            }
        }
    }

    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        // 聲明元組字段
        declarer.declare(new Fields(FieldNames.ITEM1, FieldNames.ITEM2, FieldNames.CONFIDENCE));
    }
}
View Code

  說明:計算商品對的置信度,而且發射置信度。

  8. FilterBolt

package com.hust.grid.leesf.ordertest.bolt;

import java.util.Map;

import org.json.simple.JSONObject;

import com.hust.grid.leesf.ordertest.common.ConfKeys;
import com.hust.grid.leesf.ordertest.common.FieldNames;
import com.hust.grid.leesf.ordertest.common.ItemPair;

import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;
import redis.clients.jedis.Jedis;

/**
 * 過濾符合條件的商品對並存入redis
 * 
 * @author leesf
 *
 */
public class FilterBolt extends BaseRichBolt {
    private static final long serialVersionUID = 1L;

    // 商品對的支持度和置信度
    private static final double SUPPORT_THRESHOLD = 0.01;
    private static final double CONFIDENCE_THRESHOLD = 0.01;

    private OutputCollector collector;

    private Jedis jedis;
    private String host;
    private int port;

    public void prepare(@SuppressWarnings("rawtypes") Map conf, TopologyContext context, OutputCollector collector) {
        this.collector = collector;
        this.host = conf.get(ConfKeys.REDIS_HOST).toString();
        this.port = Integer.parseInt(conf.get(ConfKeys.REDIS_PORT).toString());
        connectToRedis();
    }

    private void connectToRedis() {
        jedis = new Jedis(host, port);
        jedis.connect();
    }

    @SuppressWarnings("unchecked")
    public void execute(Tuple tuple) {
        // 取出商品並構造商品對
        String item1 = tuple.getStringByField(FieldNames.ITEM1);
        String item2 = tuple.getStringByField(FieldNames.ITEM2);
        ItemPair itemPair = new ItemPair(item1, item2);
        String pairString = itemPair.toString();

        double support = 0;
        double confidence = 0;

        if (tuple.getFields().get(2).equals(FieldNames.SUPPORT)) { // 對應SupportComputeBolt
            // 獲取支持度並存入redis
            support = tuple.getDoubleByField(FieldNames.SUPPORT);
            jedis.hset("supports", pairString, String.valueOf(support));
        } else if (tuple.getFields().get(2).equals(FieldNames.CONFIDENCE)) { // 對應ConfidenceComputeBolt
            // 獲取置信度並存入redis
            confidence = tuple.getDoubleByField(FieldNames.CONFIDENCE);
            jedis.hset("confidences", pairString, String.valueOf(confidence));
        }

        if (!jedis.hexists("supports", pairString) || !jedis.hexists("confidences", pairString)) { // 商品對的支持度和置信度還未計算完成,返回
            return;
        }
        // 商品對的支持度和置信度已經計算完成
        support = Double.parseDouble(jedis.hget("supports", pairString));
        confidence = Double.parseDouble(jedis.hget("confidences", pairString));

        if (support >= SUPPORT_THRESHOLD && confidence >= CONFIDENCE_THRESHOLD) { // 支持度和置信度超過閾值
            // 將該商品對信息存入redis中
            JSONObject pairValue = new JSONObject();
            pairValue.put(FieldNames.SUPPORT, support);
            pairValue.put(FieldNames.CONFIDENCE, confidence);

            jedis.hset("recommendedPairs", pairString, pairValue.toJSONString());

            collector.emit(new Values(item1, item2, support, confidence));
        } else { // 不高於閾值,則從redis中刪除
            jedis.hdel("recommendedPairs", pairString);
        }
    }

    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        // 聲明元組字段
        declarer.declare(new Fields(FieldNames.ITEM1, FieldNames.ITEM2, FieldNames.SUPPORT, FieldNames.CONFIDENCE));
    }
}
FilterBolt

  說明:判斷支持度和置信度是否超過了閾值,若超過則須要存入redis,不然,從redis中刪除。

4、程序運行

  4.1. 環境依賴

  打開redis服務器、客戶端(方便觀看結果)和zookeeper。

  4.2. 寫入訂單數據

  運行OrderGenerator,生成並寫入訂單數據,經過redis查看,結果以下

  

  表示已經成功寫入了訂單數據。

  4.3. 運行任務拓撲

  運行OrderTopology,其會根據訂單中的商品數據,生成並寫入推薦商品對,經過redis查看,結果以下

  

  能夠看到運行完成後,已經成功生成了推薦商品方案。

5、總結

  經過本篇Storm案例的學習,對於Storm的編程有了更深刻的認識,同時,本項目的源代碼已經上傳至github,歡迎star,謝謝各位園友的觀看~  

 

參考連接:http://www.jikexueyuan.com/course/1437.html

相關文章
相關標籤/搜索