要實現的功能如上java
而後如今先寫幾個組件:數組
RandomWordSpout(採集數據,這裏爲了簡單一些,就隨機產生一些數據)併發
public class RandomWordSpout extends BaseRichSpout{ private SpoutOutputCollector collector; //模擬一些數組 String[] words = {"iphone","xiaomi","mate","sony","sumsung","moto","meizu"}; //不斷向下一個組件發送 tuple 消息 //這裏面是該 spout 組件的核心邏輯 @Override public void nextTuple() { //能夠從 kafka 消息隊列中拿到數據,簡便起見,咱們從 words 數組中隨機挑選一個商品名發送出去 Random random = new Random(); int index = random.nextInt(words.length); //經過隨機數拿到一個商品名 String godName = words[index]; //將商品名封裝成 tuple ,發送消息給下一個組件 collector.emit(new Values(godName)); //無法送一個消息,休眠500ms Utils.sleep(500); } //初始化方法,在 spout 組件實例化時調用一次 @Override public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { this.collector = collector; } //聲明本 spout 組件發送出去的 tuple 中的數據的字段名 @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("orignname")); } }
UpperBolt(轉換爲大寫)dom
public class UpperBolt extends BaseBasicBolt{ //業務邏輯 @Override public void execute(Tuple tuple, BasicOutputCollector collector) { //先獲取上一個組件傳遞過來的數據,數據在 tiple 裏面 String godName = tuple.getString(0); //將商品名轉化成大寫 String godName_upper = godName.toUpperCase(); //將轉換完成的商品名發送出去 collector.emit(new Values(godName_upper)); } //聲明該 blot 組件要發送出去的 tuple 字段 @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("uppername")); } }
SuffixBolt(添加後綴,寫入文件)iphone
public class SuffixBolt extends BaseBasicBolt{ FileWriter fileWriter = null; //該 bolt 組件運行過程當中只會被調用一次 @Override public void prepare(Map stormConf, TopologyContext context) { try { fileWriter = new FileWriter("/home/hadoop/stormoutput/"+UUID.randomUUID()); } catch (IOException e) { throw new RuntimeException(e); } } //該 blot 組件的核心處理邏輯 //每收到一個 tuple 消息,就會被調用一次 @Override public void execute(Tuple tuple, BasicOutputCollector collector) { //先拿到上一個組件發送過來的商品名稱 String upper_name = tuple.getString(0); String suffix_name = upper_name + "_itisok"; //爲上一個組件發送過來的商品名稱添加後綴 try { fileWriter.write(suffix_name); fileWriter.write("\n"); fileWriter.flush(); } catch (IOException e) { throw new RuntimeException(e); } } //本 blot 已經不須要發送 tuple 消息到下一個組件,因此不須要再聲明 tuple 字段 @Override public void declareOutputFields(OutputFieldsDeclarer arg0) { } }
TopoMain(把上面三個組件串起來)ide
/** * 組織各個處理組件造成一個完整的處理流程,就是所謂的 topology(相似MapReduce中的 job ) * 而且將該 topology 提交給 storm 集羣去運行,topology 提交到集羣中,將無間隙的運行,除非人爲或者異常退出 * @author duanhaitao@itcast.cn * */ public class TopoMain { public static void main(String[] args) throws Exception { TopologyBuilder builder = new TopologyBuilder(); //將咱們的 spout 組件設置到 topology 中去 //parallelism_hint :4 表示用 4 個 excutor 來執行這個組件 //setNumTasks(8) 設置的是該組件執行時,併發task 數量,也就是 1 個 excutor 會運行 2 個task builder.setSpout("randomspout", new RandomWordSpout(), 4).setNumTasks(8); //將咱們的 spout 組件設置到 topology 中去,而且指定它接受 randomspout 組件的消息 //.shuffleGrouping("upperbolt")有兩層含義 //一、upperbolt 組件接受的 tuple 消息必定來自於 randomspout //二、randomspout 組件和 upperbolt 組件的大量併發 task 實例之間收發消息時,採用的分組策略是隨機分組shuffleGrouping builder.setBolt("upperbolt", new UpperBolt(), 4).shuffleGrouping("randomspout"); //將添加後綴的 bolt 組件設置到 topology 去,而且指定它接受 upperblit 組件的消息 builder.setBolt("suffixbolt", new SuffixBolt(), 4).shuffleGrouping("upperbolt"); //用 builder 來建立一個 topology StormTopology demotop = builder.createTopology(); //配置一些 topology 在集羣中運行時的參數 Config conf = new Config(); //這裏設置的是整個 demotop 所佔用的槽位數,也就是 workor 數量 conf.setNumWorkers(4); conf.setDebug(true); conf.setNumAckers(0); //將這個 topology 提交給 strom 集羣運行 StormSubmitter.submitTopology("demotopo", conf, demotop); } }
能夠本地,也可提交到集羣oop
先打個包,傳到集羣中,運行便可ui