Storm系列(六)storm和kafka集成

使用kafka-client jar進行Storm Apache Kafka集成

這包括新的Apache Kafka消費者API。兼容性 Apache Kafka版本0.10起 引入jar包git

<dependency>
      <groupId>org.apache.storm</groupId>
      <artifactId>storm-kafka-client</artifactId>
      <version>1.2.0</version>
    </dependency>
複製代碼

原文連接:a870439570.github.io/interview-d…github

從kafka中訂閱消息讀取

經過使用KafkaSpoutConfig類來配置spout實現。此類使用Builder模式,能夠經過調用其中一個Builders構造函數或經過調用KafkaSpoutConfig類中的靜態方法構建器來啓動。apache

用法示例

建立一個簡單的不kafka數據源 如下將使用發佈到「topic」的全部事件,並將它們發送到MyBolt,其中包含「topic」,「partition」,「offset」,「key」,「value」字段。bash

TopologyBuilder tp = new TopologyBuilder();
            tp.setSpout("kafka_spout", new KafkaSpout(KafkaSpoutConfig.builder("localhost:9092" , "qxw").build()), 1);
            tp.setBolt("bolt", new MyBolt()).shuffleGrouping("kafka_spout");
            Config cfg=new Config();
            cfg.setNumWorkers(1);//指定工做進程數  (jvm數量,分佈式環境下可用,本地模式設置無心義)
            cfg.setDebug(true);
            LocalCluster locl=new LocalCluster();
             locl.submitTopology("kkafka-topo",cfg,tp.createTopology());
複製代碼
public static  class MyBolt extends BaseBasicBolt{
            public void execute(Tuple tuple, BasicOutputCollector basicOutputCollector) {
                System.err.println("接受訂閱kafka消息: "+tuple.getStringByField("topic"));
                System.err.println("接受訂閱kafka消息: "+tuple.getStringByField("value"));
            }
            public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
            }
        }
複製代碼
相關文章
相關標籤/搜索