storm+kafka集成

POM java

<properties>
        <storm.version>1.1.0</storm.version>
        <kafka.version>0.10.2.0</kafka.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.apache.storm</groupId>
            <artifactId>storm-core</artifactId>
            <version>${storm.version}</version>
            <!-- 因爲storm環境中有該jar,因此不用pack到最終的task.jar中 -->
            <scope>provided</scope>
        </dependency>

        <dependency>
            <groupId>org.apache.storm</groupId>
            <artifactId>storm-kafka</artifactId>
            <version>${storm.version}</version>
            <scope>${provided.scope}</scope>
        </dependency>

        <dependency>
            <groupId>org.apache.storm</groupId>
            <artifactId>storm-kafka-client</artifactId>
            <version>${storm.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>${kafka.version}</version>
        </dependency>
    </dependencies>

toploployapache

public class KafkaTopology {

    /**
     *  storm jar /opt/storm-jar/storm-1.0-SNAPSHOT.jar com.boyoi.kafka.topology.KafkaTopology x2
     */
    public static void main(String[] args) throws InvalidTopologyException, AuthorizationException, AlreadyAliveException {
        TopologyBuilder builder = new TopologyBuilder();

        builder.setSpout("spout", new KafkaSpout<>(getKafkaSpoutConfig()),200);

        /**
         * 1)shuffleGrouping(隨機分組)
         * 2)fieldsGrouping(按照字段分組,在這裏便是同一個單詞只能發送給一個Bolt)
         * 3)allGrouping(廣播發送,即每個Tuple,每個Bolt都會收到)
         * 4)globalGrouping(全局分組,將Tuple分配到task id值最低的task裏面)
         * 5)noneGrouping(隨機分派)
         * 6)directGrouping(直接分組,指定Tuple與Bolt的對應發送關係)
         * 7)Local or shuffle Grouping
         * 8)customGrouping (自定義的Grouping)
         */
        builder.setBolt("filter", new FilterBolt(), 200).shuffleGrouping("spout");

        builder.setBolt("analysis", new AnalysisBlot(), 200).shuffleGrouping("filter");

        builder.setBolt("HBase", new HBaseBlot(),200).shuffleGrouping("analysis");

        Config conf = new Config();
//        conf.setDebug(true);

        if (args != null && args.length > 0) {
            conf.setNumWorkers(5);
            StormSubmitter.submitTopologyWithProgressBar(args[0], conf, builder.createTopology());
        } else {
            LocalCluster cluster = new LocalCluster();
            cluster.submitTopology("x2", conf, builder.createTopology());
            cluster.shutdown();
        }
    }

    /**
     * 獲取kafka spout 配置
     */
    private static KafkaSpoutConfig<String, String> getKafkaSpoutConfig() {
        return KafkaSpoutConfig.builder("192.168.1.9:9092,192.168.1.40:9092", "test2")
                                                                            .setGroupId("kafka")
                                                                            .setRecordTranslator(JUST_VALUE_FUNC, new Fields("str"))
                                                                            .build();
    }

    private static Func<ConsumerRecord<String, String>, List<Object>> JUST_VALUE_FUNC = new JustValueFunc();
    /**
     * Needs to be serializable
     */
    private static class JustValueFunc implements Func<ConsumerRecord<String, String>, List<Object>>, Serializable {
        @Override
        public List<Object> apply(ConsumerRecord<String, String> record) {
            return new Values(record.value());
        }
    }
}

Blotapp

public class FilterBolt extends BaseRichBolt{
    
    private OutputCollector outputCollector;

    /**
     * 初始化工做
     */
    @Override
    public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
        this.outputCollector = outputCollector;
    }

    /**
     * 執行邏輯,目的是過濾無用的字符串
     */
    @Override
    public void execute(Tuple tuple) {
        String value = tuple.getString(0);

        // 提交下一個
        if (null != value && !"".equals(value)){
            try {
                int val = Integer.parseInt(value);
                outputCollector.emit(new Values(val));
            }catch (Exception e){
                // ignore
                System.out.println(value + "不是數字!略過!!!");
            }

        }
        // 返回確認
        outputCollector.ack(tuple);
    }

    /**
     * 申明傳入到一個Bolt的字段名稱
     * 經過 input.getStringByField("str");input.getIntegerByField("int"); 得到指定的
     */
    @Override
    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
        outputFieldsDeclarer.declare(new Fields("value"));
    }
}

 

其它blot略maven

代碼寫好後。使用maven打成jar包。ide

放在集羣Nimbus的某個目錄下ui

storm jar /opt/storm-jar/storm-1.0-SNAPSHOT.jar com.*.kafka.topology.KafkaTopology x2
相關文章
相關標籤/搜索