Storm計算網站UV(去重計算模式)

(1)UVTopologyhtml

import java.util.HashMap;
import java.util.Map;
 
import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.tuple.Fields;
 
public class UVTopology {
 
     public static final String SPOUT_ID = SourceSpout.class .getSimpleName();
     public static final String UVFMT_ID = UVFmtBolt.class .getSimpleName();
     public static final String UVDEEPVISITBOLT_ID = UVDeepVisitBolt.class .getSimpleName();
     public static final String UVSUMBOLT_ID = UVSumBolt.class .getSimpleName();
 
     public static void main(String[] args) {
 
         TopologyBuilder builder = new TopologyBuilder();
          builder.setSpout( SPOUT_ID , new SourceSpout(), 1);
          // 格式化數據源: 日期 sid,格式: 20181010 ABYH6Y4V4SCV
          builder.setBolt( UVFMT_ID , new UVFmtBolt(), 4).shuffleGrouping(SPOUT_ID );
          // 統計每一個線程 對應的日 UV ,格式: 20181010_ABYH6Y4V4SCV 4
          builder.setBolt( UVDEEPVISITBOLT_ID , new UVDeepVisitBolt(), 4).fieldsGrouping( UVFMT_ID,
                  new Fields("date" , "sid" ));
          // 單線程彙總
          builder.setBolt( UVSUMBOLT_ID , new UVSumBolt(), 1).shuffleGrouping(UVDEEPVISITBOLT_ID );
 
         Map<String, Object> conf = new HashMap<String, Object>();
         conf.put(Config. TOPOLOGY_RECEIVER_BUFFER_SIZE , 8);
         conf.put(Config. TOPOLOGY_TRANSFER_BUFFER_SIZE , 32);
         conf.put(Config. TOPOLOGY_EXECUTOR_RECEIVE_BUFFER_SIZE , 16384);
         conf.put(Config. TOPOLOGY_EXECUTOR_SEND_BUFFER_SIZE , 16384);
         LocalCluster cluster = new LocalCluster();
         cluster.submitTopology(UVTopology. class .getSimpleName(), conf , builder .createTopology());
    }
}

(2)SourceSpout模擬數據java

import java.util.Map;
import java.util.Random;
import java.util.concurrent.ConcurrentLinkedQueue;
 
import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichSpout;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
 
/**
* 模擬消息隊列產生的數據
*
*/
public class SourceSpout extends BaseRichSpout {
 
     ConcurrentLinkedQueue<String> queue = new ConcurrentLinkedQueue<String>();// 原子操做線程安全
     private SpoutOutputCollector collector;
     /**
     * 
     */
     private static final long serialVersionUID = 1L;
     /**
     * Called when a task for this component is initialized within a worker on the cluster.
     * It provides the spout with the environment in which the spout executes.
     *
     * <p>This includes the:</p>
     *
     * @param conf The Storm configuration for this spout. This is the configuration provided to the topology merged in with cluster configuration on this machine.
     * @param context This object can be used to get information about this task's place within the topology, including the task id and component id of this task, input and output information, etc.
     * @param collector The collector is used to emit tuples from this spout. Tuples can be emitted at any time, including the open and close methods. The collector is thread-safe and should be saved as an instance variable of this spout object.
     */
     @Override
     public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
 
          String[] time = { "2015-10-12 08:40:50", "2015-10-12 08:40:50", "2015-10-12 08:40:50","2015-10-12 08:40:50" };
          String[] sid = {  "ABYH6Y4V4SCV00", "ABYH6Y4V4SCV01", "ABYH6Y4V4SCV01","ABYH6Y4V4SCV03"  };
          String[] url = {  "http://www.jd.com/1.html", "http://www.jd.com/2.html", "http://www.jd.com/3.html" ,"http://www.jd.com/3.html"};
          
          for (int i = 0; i < 6; i++) {
               Random r = new Random();
               int k = r.nextInt(4);
               queue.add(time[k] + "\t" + sid[k] + "\t" + url[k]);
          }
          this.collector = collector;
     }
 
     /**
     * 從數據源專斷的讀取數據,每隔5s讀取一次
     */
     @Override
     public void nextTuple() {
 
          if(queue.size()>0){
               String line = queue.poll();
               this.collector.emit(new Values(line));
          }
          try {
               //Thread.sleep(1000);
          } catch (Exception e) {
               e.printStackTrace();
          }
     }
     @Override
     public void declareOutputFields(OutputFieldsDeclarer declarer) {
          declarer.declare(new Fields("line"));
     }
}

(3)UVFmtBolt 爲一級bolt,進行格式轉換數據庫

import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Map;
 
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;
 
/**
* 進行多線程局部彙總
*
*/
public class UVFmtBolt extends BaseRichBolt {
     /**
     * 
     */
     private static final long serialVersionUID = 1L;
 
     OutputCollector collector;
     @Override
     public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
          this.collector = collector;
     }
     @Override
     public void execute(Tuple input) {
          
          //2014-01-07 08:40:50     ABYH6Y4V4SCV     http://www.jd.com/1.html
          String line = input.getStringByField("line");
          String date = line.split("\t")[0];
          String sid = line.split("\t")[1];
          
          SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd");
          try {
               date = sdf.format(new Date(sdf.parse(date).getTime()));
               
               this.collector.emit(new Values(date,sid));
               this.collector.ack(input);
          } catch (Exception e) {
               e.printStackTrace();
               this.collector.fail(input);
          }
     }
     
     @Override
     public void declareOutputFields(OutputFieldsDeclarer declarer) {
          declarer.declare(new Fields("date","sid"));
     }
}

 

(4)UVDeepVisitBolt 二級bolt ,多線程統計每一個訪客對應的pv數安全

import java.util.HashMap;
import java.util.Map;
 
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;
 
/**
* 進行多線程局部彙總,統計天天UV數
*
*/
public class UVDeepVisitBolt extends BaseRichBolt {
     /**
     * 
     */
     private static final long serialVersionUID = 1L;
 
     OutputCollector collector;
     Map<String, Integer> counts = new HashMap<String, Integer>();// 每一個task實例都會輸出:
                                                                                     // k爲日期_sid,v
                                                                                     // 爲每一個用戶的PV
 
     @Override
     public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
          this.collector = collector;
     }
 
     /**
     * 輸入內容: 2018-01-07 ABYH6Y4V4SCV 2019-01-07 ABYH6Y4V4SCV 2018-01-07
     * ACYH6Y4V4SCV
     */
     @Override
     public void execute(Tuple input) {
 
          String date = input.getStringByField("date");
          String sid = input.getStringByField("sid");
          String key = date + "_" + sid;
          Integer count = 0;
          try {
               count = counts.get(key);
               if (count == null) {
                    count = 0;
               }
               count++;
               counts.put(key, count);
               this.collector.emit(new Values(date + "_" + sid, count));//每一個訪客  PV數
               this.collector.ack(input);
          } catch (Exception e) {
               e.printStackTrace();
               System.err.println("UVDeepVisitBolt is failure.date:" + date + ",sid:" + sid + ",uvCount" + count); 
               this.collector.fail(input);
          }
     }
 
     @Override
     public void declareOutputFields(OutputFieldsDeclarer declarer) {
          declarer.declare(new Fields("date_sid", "count"));
     }

 

(5)UVSumBolt 三級bolt,單線程彙總pv和uv數據,並保存hbase多線程

package com.yun.storm.uv;
 
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;
 
import com.yun.hbase.HBaseUtils;
 
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.tuple.Tuple;
 
/**
* 進行單線程彙總最終結果
* 
*
*/
public class UVSumBolt extends BaseRichBolt {
     SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd");
     /**
     * 
     */
     private static final long serialVersionUID = 1L;
     OutputCollector collector;
     HBaseUtils hbase;
     Map<String, Integer> counts = new HashMap<String, Integer>();
     //<k,v> 爲每一個訪客  對應的PV數
     @Override
     public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
          this.collector = collector;
          this.hbase = new HBaseUtils();
     }
     //初始化hbase代碼放在這裏不正確,經過prepare初始化參數數據,具體緣由可參考http://www.xuebuyuan.com/616276.html
     //HBaseUtils hbase = new HBaseUtils();
     @Override
     public void execute(Tuple input) {
          int pv = 0;
          int uv = 0;
          
          String dateSid = input.getStringByField("date_sid");
          Integer count = input.getIntegerByField("count");
          // 訪問日期若不是以今天開頭而且訪問日期大於當前日期,則計算新的一天的UV
          String currDate = sdf.format(new Date());
          try {
               Date accessDate = sdf.parse(dateSid.split("_")[0]);
               if (!dateSid.startsWith(currDate) && accessDate.after(new Date())) {
                    counts.clear();
               }
          } catch (ParseException e1) {
               e1.printStackTrace();
          }
          counts.put(dateSid, count);// 彙總每一個訪客  對應的PV數,這裏能夠經過map或者hbase做爲去重的持久化操做
          
          for (Map.Entry<String, Integer> e : counts.entrySet()) {
               if(dateSid.split("_")[0].startsWith(currDate)){
                    uv++;
                    pv+=e.getValue();
               }
          }
          //保存到HBase或者數據庫中
          System.out.println(currDate + "的pv數爲"+pv+",uv數爲"+uv);
          put(pv, uv, currDate);
     }
     /**
     * 保持pv和uv數據到hbase中,並實時更新
     * @param pv
     * @param uv
     * @param currDate
     */
     public void put(int pv, int uv, String currDate) {
          hbase.put("pv_uv", currDate, "info", "pv", String.valueOf(pv));
          hbase.put("pv_uv", currDate, "info", "uv", String.valueOf(uv));
     }
 
     @Override
     public void declareOutputFields(OutputFieldsDeclarer declarer) {
 
     }
 
}
相關文章
相關標籤/搜索