55.storm 之 hello word(本地模式)

strom hello word

概述

而後卡一下代碼怎麼實現的:php

  1. 編寫數據源類:Spout。能夠使用兩種方式:

    繼承BaseRichSpout類

    實現IRichSpout接口

    主要須要實現或重寫幾個方法:open、nextTuple、declareOutputFields

  2. 繼續編寫數據處理類:Bolt。能夠使用兩種方式:

    繼承BaseBasicBolt類

    實現IRichBolt接口

    終點實現或重寫幾個方法:execute、declareOutputFields

  3. 最後編寫主函數(Topology)去進行提交一個任務

    在使用Topology的時候,Storm框架爲咱們提供了兩種模式:本地模式和集羣模式

    本地模式:(無需Storm集羣,直接在java中便可運行,通常用於測試和開發階段)執行main函數便可

    集羣模式:(須要Storm集羣,把實現java程序打包,而後Topology進行提交)須要把應用打成jar,使用Storm命令吧Topology提交到集羣中去。

實際操做

先來看一下代碼結構:java

 

就如上圖所說,數據從PWSpout流到PrintBolt,最後到WriteBolt寫到文件。具體看一下這幾個類的代碼:python

先看一本地模式的:apache

PWTopology1.java 拓撲結構構建

import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.StormSubmitter;
import backtype.storm.topology.TopologyBuilder;
import bhz.bolt.PrintBolt;
import bhz.bolt.WriteBolt;
import bhz.spout.PWSpout;


public class PWTopology1 {

    public static void main(String[] args) throws Exception {
        //
        Config cfg = new Config();
        cfg.setNumWorkers(2);
        cfg.setDebug(true);
        
        
        TopologyBuilder builder = new TopologyBuilder();
        builder.setSpout("spout", new PWSpout());
        builder.setBolt("print-bolt", new PrintBolt()).shuffleGrouping("spout");
        builder.setBolt("write-bolt", new WriteBolt()).shuffleGrouping("print-bolt");
        
        
        //1 本地模式
        LocalCluster cluster = new LocalCluster();
        cluster.submitTopology("top1", cfg, builder.createTopology());
        Thread.sleep(10000);
        cluster.killTopology("top1");
        cluster.shutdown();
        
        //2 集羣模式
//        StormSubmitter.submitTopology("top1", cfg, builder.createTopology());
        
    }
}

 

代碼分析:ruby

 

數據來源:

import java.util.HashMap;
import java.util.Map;
import java.util.Random;

import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichSpout;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;

public class PWSpout extends BaseRichSpout {

    private static final long serialVersionUID = 1L;
    private SpoutOutputCollector collector;
    
    private static final Map<Integer, String> map = new HashMap<Integer, String>();
    
    static {
        map.put(0, "java");
        map.put(1, "php");
        map.put(2, "groovy");
        map.put(3, "python");
        map.put(4, "ruby");
    }
    
    @Override
    public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
        //對spout進行初始化
        this.collector = collector;
        //System.out.println(this.collector);
    }
    
    /**
     * <B>方法名稱:</B>輪詢tuple<BR>
     * <B>概要說明:</B><BR>
     * @see backtype.storm.spout.ISpout#nextTuple()
     */
    @Override
    public void nextTuple() {
        //隨機發送一個單詞
        final Random r = new Random();
        int num = r.nextInt(5);
        try {
            Thread.sleep(500);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        this.collector.emit(new Values(map.get(num)));
    }

    /**
     * <B>方法名稱:</B>declarer聲明發送數據的field<BR>
     * <B>概要說明:</B><BR>
     * @see backtype.storm.topology.IComponent#declareOutputFields(backtype.storm.topology.OutputFieldsDeclarer)
     */
    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        //進行聲明
        declarer.declare(new Fields("print"));
    }



}

代碼解析:框架

總體結構dom

細入分析ide

---------------------------- open 方法---------------------------------------------------------函數

---------------------------------  nextTuple方法 --------------------------------------------------------------測試

---------------------------- declareOutputFields方法 ----------------------------------------------------

數據處理

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

import backtype.storm.topology.BasicOutputCollector;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseBasicBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;

public class PrintBolt extends BaseBasicBolt {

    private static final Log log = LogFactory.getLog(PrintBolt.class);
    
    private static final long serialVersionUID = 1L;
    
    @Override
    public void execute(Tuple input, BasicOutputCollector collector) {
        //獲取上一個組件所聲明的Field
        String print = input.getStringByField("print");
        log.info("【print】: " + print);
        //System.out.println("Name of input word is : " + word);
        //進行傳遞給下一個bolt
        collector.emit(new Values(print));
        
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("write"));
    }

}

 

 

代碼分析

 

import java.io.FileWriter;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

import clojure.main;
import backtype.storm.topology.BasicOutputCollector;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseBasicBolt;
import backtype.storm.tuple.Tuple;

public class WriteBolt extends BaseBasicBolt {

    private static final long serialVersionUID = 1L;

    private static final Log log = LogFactory.getLog(WriteBolt.class);
    
    private FileWriter writer ;
    @Override
    public void execute(Tuple input, BasicOutputCollector collector) {
        //獲取上一個組件所聲明的Field
        String text = input.getStringByField("write");
        try {
            if(writer == null){
                if(System.getProperty("os.name").equals("Windows 10")){
                    writer = new FileWriter("D:\\099_test\\" + this);
                } else if(System.getProperty("os.name").equals("Windows 8.1")){
                    writer = new FileWriter("D:\\099_test\\" + this);
                } else if(System.getProperty("os.name").equals("Windows 7")){
                    writer = new FileWriter("D:\\099_test\\" + this);
                } else if(System.getProperty("os.name").equals("Linux")){
                    System.out.println("----:" + System.getProperty("os.name"));
                    writer = new FileWriter("/usr/local/temp/" + this);
                }
            }
            log.info("【write】: 寫入文件");
            writer.write(text);
            writer.write("\n");
            writer.flush();
            
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        
    }
    


}

 

和PrintBolt 這個類很類似,都是在處理數據。不做過多解釋

相關文章
相關標籤/搜索