以電信通話記錄爲例java
移動呼叫及其持續時間將做爲對Apache Storm的輸入,Storm將處理和分組在相同呼叫者和接收者之間的呼叫及其呼叫總數。linux
Storm編程套路:apache
在storm中,把對數據的處理過程抽象成一個topology,這個topology包含的組件主要是spout、bolt,以及以tuple形式在組件之間傳輸的數據流。這個數據流在topology流一遍,就是對數據的一次處理。編程
一、建立Spout類dom
這一部分,是建立數據流的源頭。ide
建立一個類,實現IRichSpout接口,實現相應方法。其中幾個方法的含義:ui
open(Map conf, TopologyContext context, SpoutOutputCollector collector)
nextTuple()
nextTuple()從與ack()和fail()方法相同的循環中按期調用。它必須釋放線程的控制,當沒有工做要作,以便其餘方法有機會被調用。所以,nextTuple的第一行檢查處理是否已完成。若是是這樣,它應該休眠至少一毫秒,以減小處理器在返回以前的負載。this
declareOutputFields(OutputFieldsDeclarer declarer)
declarer -它用於聲明輸出流id,輸出字段等,此方法用於指定元組的輸出模式。spa
ack(Object msgId)
該方法確認已經處理了特定元組。線程
fail(Object o)
此方法通知特定元組還沒有徹底處理。 Storm將從新處理特定的元組
package com.jing.calllogdemo; import org.apache.storm.spout.SpoutOutputCollector; import org.apache.storm.task.TopologyContext; import org.apache.storm.topology.IRichSpout; import org.apache.storm.topology.OutputFieldsDeclarer; import org.apache.storm.tuple.Fields; import org.apache.storm.tuple.Values; import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.Random; /* spout類,負責產生數據流 */ public class CallLogSpout implements IRichSpout { //spout 輸出收集器 private SpoutOutputCollector collector; //是否完成 private boolean completed = false; //上下文對象 private TopologyContext context; //隨機發生器 private Random randomGenerator = new Random(); //索引 private Integer idx = 0; @Override public void open(Map map, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) { //第一次運行要作的事 this.context = topologyContext; this.collector = spoutOutputCollector; } @Override public void close() { } @Override public void activate() { } @Override public void deactivate() { } @Override public void nextTuple() { //產生第一條數據, if (this.idx <= 1000){ List<String> mobileNumbers = new ArrayList<String>(); mobileNumbers.add("1234123401"); mobileNumbers.add("1234123402"); mobileNumbers.add("1234123403"); mobileNumbers.add("1234123404"); Integer localIdx = 0; while (localIdx++ < 100 && this.idx++ <1000){ //取出主叫 String caller = mobileNumbers.get(randomGenerator.nextInt(4)); //取出被叫 String callee = mobileNumbers.get(randomGenerator.nextInt(4)); while (caller == callee){ //從新取出被叫 callee = mobileNumbers.get(randomGenerator.nextInt(4)); } //模擬通話時長 Integer duration = randomGenerator.nextInt(60); //輸出元祖 this.collector.emit(new Values(caller,callee,duration)); } } } @Override public void ack(Object o) { } @Override public void fail(Object o) { } @Override public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) { //聲明輸出字段,定義元組的結構,定義輸出字段名稱 outputFieldsDeclarer.declare(new Fields("from", "to", "duration")); } @Override public Map<String, Object> getComponentConfiguration() { return null; } }
二、建立Bolt類
這一部分是完成對數據流的處理,Bolt把元組做爲輸入,對元組進行處理後,產生新的元組,bolt類能夠有多個。
建立一個類,實現IRichBolt接口,實現相應方法。
prepare(Map conf, TopologyContext context, OutputCollector collector)
execute(Tuple tuple)
這是bolt的核心方法,這裏的元組是要處理的輸入元組。execute方法一次處理單個元組。元組數據能夠經過Tuple類的getValue方法訪問。沒必要當即處理輸入元組。多元組能夠被處理和輸出爲單個輸出元組。處理的元組能夠經過使用OutputCollector類發出。
cleanup()
declareOutputFields(OutputFieldsDeclarer declarer)
這個方法用於指定元組的輸出模式,參數declarer用於聲明輸出流id,輸出字段等。
這裏有兩個bolt
呼叫日誌建立者bolt接收呼叫日誌元組。呼叫日誌元組具備主叫方號碼,接收方號碼和呼叫持續時間。此bolt經過組合主叫方號碼和接收方號碼簡單地建立一個新值。新值的格式爲「來電號碼 - 接收方號碼」,並將其命名爲新字段「呼叫」
package com.jing.calllogdemo; import org.apache.storm.task.OutputCollector; import org.apache.storm.task.TopologyContext; import org.apache.storm.topology.IRichBolt; import org.apache.storm.topology.OutputFieldsDeclarer; import org.apache.storm.tuple.Fields; import org.apache.storm.tuple.Tuple; import org.apache.storm.tuple.Values; import java.util.Map; /* 建立calllog日誌的bolt */ public class CallLogCreatorBolt implements IRichBolt { private OutputCollector collector; @Override public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) { this.collector = outputCollector; } @Override public void execute(Tuple tuple) { //處理新的同話記錄 String from = tuple.getString(0); String to = tuple.getString(1); Integer duration = tuple.getInteger(2); //產生新的tuple String fromTO = from + "-" + to; collector.emit(new Values(fromTO, duration)); } @Override public void cleanup() { } @Override public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) { //設置輸出字段的名稱 outputFieldsDeclarer.declare(new Fields("call", "duration")); } @Override public Map<String, Object> getComponentConfiguration() { return null; } }
呼叫日誌建立者bolt接收呼叫日誌元組。呼叫日誌元組具備主叫方號碼,接收方號碼和呼叫持續時間。此bolt經過組合主叫方號碼和接收方號碼簡單地建立一個新值。新值的格式爲「來電號碼 - 接收方號碼」,並將其命名爲新字段「呼叫」。
package com.jing.calllogdemo; import org.apache.storm.task.OutputCollector; import org.apache.storm.task.TopologyContext; import org.apache.storm.topology.IRichBolt; import org.apache.storm.topology.OutputFieldsDeclarer; import org.apache.storm.tuple.Fields; import org.apache.storm.tuple.Tuple; import java.util.HashMap; import java.util.Map; /* 通話記錄計數器bolt */ public class CallLogCounterBolt implements IRichBolt { Map<String, Integer> counterMap; private OutputCollector collector; @Override public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) { this.counterMap = new HashMap<String, Integer>(); this.collector = outputCollector; } @Override public void execute(Tuple tuple) { String call = tuple.getString(0); Integer duration = tuple.getInteger(1); if(!counterMap.containsKey(call)){ counterMap.put(call, 1); }else { Integer c = counterMap.get(call) + duration; counterMap.put(call, c); } collector.ack(tuple); } @Override public void cleanup() { for(Map.Entry<String, Integer> entry : counterMap.entrySet()){ System.out.println(entry.getKey() + " : " + entry.getValue()); } } @Override public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) { outputFieldsDeclarer.declare(new Fields("call")); } @Override public Map<String, Object> getComponentConfiguration() { return null; } }
三、建立執行入口類,構建Topology
package com.jing.calllogdemo; import org.apache.storm.Config; import org.apache.storm.LocalCluster; import org.apache.storm.StormSubmitter; import org.apache.storm.generated.AlreadyAliveException; import org.apache.storm.generated.AuthorizationException; import org.apache.storm.generated.InvalidTopologyException; import org.apache.storm.topology.TopologyBuilder; import org.apache.storm.tuple.Fields; public class App { public static void main(String[] args) throws InterruptedException, InvalidTopologyException, AuthorizationException, AlreadyAliveException { TopologyBuilder builder = new TopologyBuilder(); //設置spout builder.setSpout("spout", new CallLogSpout()); //設置creator-bolt builder.setBolt("creator-bolt", new CallLogCreatorBolt()).shuffleGrouping("spout"); //設置countor-bolt builder.setBolt("counter-bolt", new CallLogCounterBolt()). fieldsGrouping("creator-bolt", new Fields("call")); Config config = new Config(); config.setDebug(true); /*本地模式 LocalCluster cluster = new LocalCluster(); cluster.submitTopology("LogAnalyserStorm", config, builder.createTopology()); Thread.sleep(10000); cluster.shutdown(); */ StormSubmitter.submitTopology("myTop", config, builder.createTopology()); } }
生產環境的集羣上運行topology
1)修改提交方式,在代碼中
2)導出jar包 mvn
3)在linux上運行topologys
&>storm jar XXX.jar full.class.name