storm demo

 

要實現的功能如上java

而後如今先寫幾個組件:數組

RandomWordSpout(採集數據,這裏爲了簡單一些,就隨機產生一些數據)併發

public class RandomWordSpout extends BaseRichSpout{

	private SpoutOutputCollector collector;
	
	//模擬一些數組
	String[] words = {"iphone","xiaomi","mate","sony","sumsung","moto","meizu"};
	
	//不斷向下一個組件發送 tuple 消息
	//這裏面是該 spout 組件的核心邏輯
	@Override
	public void nextTuple() {

		//能夠從 kafka 消息隊列中拿到數據,簡便起見,咱們從 words 數組中隨機挑選一個商品名發送出去
		Random random = new Random();
		int index = random.nextInt(words.length);
		
		//經過隨機數拿到一個商品名
		String godName = words[index];
		
		
		//將商品名封裝成 tuple ,發送消息給下一個組件
		collector.emit(new Values(godName));
		
		//無法送一個消息,休眠500ms
		Utils.sleep(500);
		
		
	}

	//初始化方法,在 spout 組件實例化時調用一次
	@Override
	public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {

		this.collector = collector;
		
		
	}

	//聲明本 spout 組件發送出去的 tuple 中的數據的字段名
	@Override
	public void declareOutputFields(OutputFieldsDeclarer declarer) {

		declarer.declare(new Fields("orignname"));
		
	}

}

UpperBolt(轉換爲大寫)dom

public class UpperBolt extends BaseBasicBolt{

	
	//業務邏輯
	@Override
	public void execute(Tuple tuple, BasicOutputCollector collector) {
		
		//先獲取上一個組件傳遞過來的數據,數據在 tiple 裏面
		String godName = tuple.getString(0);
		
		//將商品名轉化成大寫
		String godName_upper = godName.toUpperCase();
		
		//將轉換完成的商品名發送出去
		collector.emit(new Values(godName_upper));
		
	}

	
	
	//聲明該 blot 組件要發送出去的 tuple 字段
	@Override
	public void declareOutputFields(OutputFieldsDeclarer declarer) {
		
		declarer.declare(new Fields("uppername"));
	}

}

 

SuffixBolt(添加後綴,寫入文件)iphone

public class SuffixBolt extends BaseBasicBolt{
	
	FileWriter fileWriter = null;
	
	
	//該 bolt 組件運行過程當中只會被調用一次
	@Override
	public void prepare(Map stormConf, TopologyContext context) {

		try {
			fileWriter = new FileWriter("/home/hadoop/stormoutput/"+UUID.randomUUID());
		} catch (IOException e) {
			throw new RuntimeException(e);
		}
		
	}
	
	
	
	//該 blot 組件的核心處理邏輯
	//每收到一個 tuple 消息,就會被調用一次
	@Override
	public void execute(Tuple tuple, BasicOutputCollector collector) {

		//先拿到上一個組件發送過來的商品名稱
		String upper_name = tuple.getString(0);
		String suffix_name = upper_name + "_itisok";
		
		
		//爲上一個組件發送過來的商品名稱添加後綴
		
		try {
			fileWriter.write(suffix_name);
			fileWriter.write("\n");
			fileWriter.flush();
			
		} catch (IOException e) {
			throw new RuntimeException(e);
		}
		
		
		
	}

	
	
	
	//本 blot 已經不須要發送 tuple 消息到下一個組件,因此不須要再聲明 tuple 字段
	@Override
	public void declareOutputFields(OutputFieldsDeclarer arg0) {

		
	}

}

TopoMain(把上面三個組件串起來)ide

/**
 * 組織各個處理組件造成一個完整的處理流程,就是所謂的 topology(相似MapReduce中的 job )
 * 而且將該 topology 提交給 storm 集羣去運行,topology 提交到集羣中,將無間隙的運行,除非人爲或者異常退出
 * @author duanhaitao@itcast.cn
 *
 */
public class TopoMain {

	
	public static void main(String[] args) throws Exception {
		
		TopologyBuilder builder = new TopologyBuilder();
		
		//將咱們的 spout 組件設置到 topology 中去 
        //parallelism_hint :4 表示用 4 個 excutor 來執行這個組件
		//setNumTasks(8) 設置的是該組件執行時,併發task 數量,也就是 1 個 excutor 會運行 2 個task
		builder.setSpout("randomspout", new RandomWordSpout(), 4).setNumTasks(8);
		
		//將咱們的 spout 組件設置到 topology 中去,而且指定它接受 randomspout 組件的消息
        //.shuffleGrouping("upperbolt")有兩層含義
        //一、upperbolt 組件接受的 tuple 消息必定來自於 randomspout     
        //二、randomspout 組件和 upperbolt 組件的大量併發 task 實例之間收發消息時,採用的分組策略是隨機分組shuffleGrouping
		builder.setBolt("upperbolt", new UpperBolt(), 4).shuffleGrouping("randomspout");
		
		//將添加後綴的 bolt 組件設置到 topology 去,而且指定它接受 upperblit 組件的消息
		builder.setBolt("suffixbolt", new SuffixBolt(), 4).shuffleGrouping("upperbolt");
		
		//用 builder 來建立一個 topology 
		StormTopology demotop = builder.createTopology();
		
		
		//配置一些 topology 在集羣中運行時的參數
		Config conf = new Config();
        //這裏設置的是整個 demotop 所佔用的槽位數,也就是 workor 數量
		conf.setNumWorkers(4);
		conf.setDebug(true);
		conf.setNumAckers(0);
		
		
		//將這個 topology 提交給 strom 集羣運行
		StormSubmitter.submitTopology("demotopo", conf, demotop);
		
	}
}

 

能夠本地,也可提交到集羣oop

先打個包,傳到集羣中,運行便可ui

相關文章
相關標籤/搜索