Storm 系列(九)—— Storm 集成 Kafka

1、整合說明

Storm 官方對 Kafka 的整合分爲兩個版本,官方說明文檔分別以下:html

這裏我服務端安裝的 Kafka 版本爲 2.2.0(Released Mar 22, 2019) ,按照官方 0.10.x+ 的整合文檔進行整合,不適用於 0.8.x 版本的 Kafka。java

2、寫入數據到Kafka

2.1 項目結構

2.2 項目主要依賴

<properties>
    <storm.version>1.2.2</storm.version>
    <kafka.version>2.2.0</kafka.version>
</properties>

<dependencies>
    <dependency>
        <groupId>org.apache.storm</groupId>
        <artifactId>storm-core</artifactId>
        <version>${storm.version}</version>
    </dependency>
    <dependency>
        <groupId>org.apache.storm</groupId>
        <artifactId>storm-kafka-client</artifactId>
        <version>${storm.version}</version>
    </dependency>
    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-clients</artifactId>
        <version>${kafka.version}</version>
    </dependency>
</dependencies>

2.3 DataSourceSpout

/**
 * 產生詞頻樣本的數據源
 */
public class DataSourceSpout extends BaseRichSpout {

    private List<String> list = Arrays.asList("Spark", "Hadoop", "HBase", "Storm", "Flink", "Hive");

    private SpoutOutputCollector spoutOutputCollector;

    @Override
    public void open(Map map, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) {
        this.spoutOutputCollector = spoutOutputCollector;
    }

    @Override
    public void nextTuple() {
        // 模擬產生數據
        String lineData = productData();
        spoutOutputCollector.emit(new Values(lineData));
        Utils.sleep(1000);
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
        outputFieldsDeclarer.declare(new Fields("line"));
    }


    /**
     * 模擬數據
     */
    private String productData() {
        Collections.shuffle(list);
        Random random = new Random();
        int endIndex = random.nextInt(list.size()) % (list.size()) + 1;
        return StringUtils.join(list.toArray(), "\t", 0, endIndex);
    }

}

產生的模擬數據格式以下:git

Spark   HBase
Hive    Flink   Storm   Hadoop  HBase   Spark
Flink
HBase   Storm
HBase   Hadoop  Hive    Flink
HBase   Flink   Hive    Storm
Hive    Flink   Hadoop
HBase   Hive
Hadoop  Spark   HBase   Storm

2.4 WritingToKafkaApp

/**
 * 寫入數據到 Kafka 中
 */
public class WritingToKafkaApp {

    private static final String BOOTSTRAP_SERVERS = "hadoop001:9092";
    private static final String TOPIC_NAME = "storm-topic";

    public static void main(String[] args) {


        TopologyBuilder builder = new TopologyBuilder();

        // 定義 Kafka 生產者屬性
        Properties props = new Properties();
        /*
         * 指定 broker 的地址清單,清單裏不須要包含全部的 broker 地址,生產者會從給定的 broker 裏查找其餘 broker 的信息。
         * 不過建議至少要提供兩個 broker 的信息做爲容錯。
         */
        props.put("bootstrap.servers", BOOTSTRAP_SERVERS);
        /*
         * acks 參數指定了必需要有多少個分區副本收到消息,生產者纔會認爲消息寫入是成功的。
         * acks=0 : 生產者在成功寫入消息以前不會等待任何來自服務器的響應。
         * acks=1 : 只要集羣的首領節點收到消息,生產者就會收到一個來自服務器成功響應。
         * acks=all : 只有當全部參與複製的節點所有收到消息時,生產者纔會收到一個來自服務器的成功響應。
         */
        props.put("acks", "1");
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

        KafkaBolt bolt = new KafkaBolt<String, String>()
                .withProducerProperties(props)
                .withTopicSelector(new DefaultTopicSelector(TOPIC_NAME))
                .withTupleToKafkaMapper(new FieldNameBasedTupleToKafkaMapper<>());

        builder.setSpout("sourceSpout", new DataSourceSpout(), 1);
        builder.setBolt("kafkaBolt", bolt, 1).shuffleGrouping("sourceSpout");


        if (args.length > 0 && args[0].equals("cluster")) {
            try {
                StormSubmitter.submitTopology("ClusterWritingToKafkaApp", new Config(), builder.createTopology());
            } catch (AlreadyAliveException | InvalidTopologyException | AuthorizationException e) {
                e.printStackTrace();
            }
        } else {
            LocalCluster cluster = new LocalCluster();
            cluster.submitTopology("LocalWritingToKafkaApp",
                    new Config(), builder.createTopology());
        }
    }
}

2.5 測試準備工做

進行測試前須要啓動 Kakfa:github

1. 啓動Kakfa

Kafka 的運行依賴於 zookeeper,須要預先啓動,能夠啓動 Kafka 內置的 zookeeper,也能夠啓動本身安裝的:shell

# zookeeper啓動命令
bin/zkServer.sh start

# 內置zookeeper啓動命令
bin/zookeeper-server-start.sh config/zookeeper.properties

啓動單節點 kafka 用於測試:apache

# bin/kafka-server-start.sh config/server.properties

2. 建立topic

# 建立用於測試主題
bin/kafka-topics.sh --create --bootstrap-server hadoop001:9092 --replication-factor 1 --partitions 1 --topic storm-topic

# 查看全部主題
 bin/kafka-topics.sh --list --bootstrap-server hadoop001:9092

3. 啓動消費者

啓動一個消費者用於觀察寫入狀況,啓動命令以下:bootstrap

# bin/kafka-console-consumer.sh --bootstrap-server hadoop001:9092 --topic storm-topic --from-beginning

2.6 測試

能夠用直接使用本地模式運行,也能夠打包後提交到服務器集羣運行。本倉庫提供的源碼默認採用 maven-shade-plugin 進行打包,打包命令以下:服務器

# mvn clean package -D maven.test.skip=true

啓動後,消費者監聽狀況以下:app

3、從Kafka中讀取數據

3.1 項目結構

3.2 ReadingFromKafkaApp

/**
 * 從 Kafka 中讀取數據
 */
public class ReadingFromKafkaApp {

    private static final String BOOTSTRAP_SERVERS = "hadoop001:9092";
    private static final String TOPIC_NAME = "storm-topic";

    public static void main(String[] args) {

        final TopologyBuilder builder = new TopologyBuilder();
        builder.setSpout("kafka_spout", new KafkaSpout<>(getKafkaSpoutConfig(BOOTSTRAP_SERVERS, TOPIC_NAME)), 1);
        builder.setBolt("bolt", new LogConsoleBolt()).shuffleGrouping("kafka_spout");

        // 若是外部傳參 cluster 則表明線上環境啓動,不然表明本地啓動
        if (args.length > 0 && args[0].equals("cluster")) {
            try {
                StormSubmitter.submitTopology("ClusterReadingFromKafkaApp", new Config(), builder.createTopology());
            } catch (AlreadyAliveException | InvalidTopologyException | AuthorizationException e) {
                e.printStackTrace();
            }
        } else {
            LocalCluster cluster = new LocalCluster();
            cluster.submitTopology("LocalReadingFromKafkaApp",
                    new Config(), builder.createTopology());
        }
    }

    private static KafkaSpoutConfig<String, String> getKafkaSpoutConfig(String bootstrapServers, String topic) {
        return KafkaSpoutConfig.builder(bootstrapServers, topic)
                // 除了分組 ID,如下配置都是可選的。分組 ID 必須指定,不然會拋出 InvalidGroupIdException 異常
                .setProp(ConsumerConfig.GROUP_ID_CONFIG, "kafkaSpoutTestGroup")
                // 定義重試策略
                .setRetry(getRetryService())
                // 定時提交偏移量的時間間隔,默認是 15s
                .setOffsetCommitPeriodMs(10_000)
                .build();
    }

    // 定義重試策略
    private static KafkaSpoutRetryService getRetryService() {
        return new KafkaSpoutRetryExponentialBackoff(TimeInterval.microSeconds(500),
                TimeInterval.milliSeconds(2), Integer.MAX_VALUE, TimeInterval.seconds(10));
    }
}

3.3 LogConsoleBolt

/**
 * 打印從 Kafka 中獲取的數據
 */
public class LogConsoleBolt extends BaseRichBolt {


    private OutputCollector collector;

    public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
        this.collector=collector;
    }

    public void execute(Tuple input) {
        try {
            String value = input.getStringByField("value");
            System.out.println("received from kafka : "+ value);
            // 必須 ack,不然會重複消費 kafka 中的消息
            collector.ack(input);
        }catch (Exception e){
            e.printStackTrace();
            collector.fail(input);
        }

    }

    public void declareOutputFields(OutputFieldsDeclarer declarer) {

    }
}

這裏從 value 字段中獲取 kafka 輸出的值數據。dom

在開發中,咱們能夠經過繼承 RecordTranslator 接口定義了 Kafka 中 Record 與輸出流之間的映射關係,能夠在構建 KafkaSpoutConfig 的時候經過構造器或者 setRecordTranslator() 方法傳入,並最後傳遞給具體的 KafkaSpout

默認狀況下使用內置的 DefaultRecordTranslator,其源碼以下,FIELDS 中 定義了 tuple 中全部可用的字段:主題,分區,偏移量,消息鍵,值。

public class DefaultRecordTranslator<K, V> implements RecordTranslator<K, V> {
    private static final long serialVersionUID = -5782462870112305750L;
    public static final Fields FIELDS = new Fields("topic", "partition", "offset", "key", "value");
    @Override
    public List<Object> apply(ConsumerRecord<K, V> record) {
        return new Values(record.topic(),
                record.partition(),
                record.offset(),
                record.key(),
                record.value());
    }

    @Override
    public Fields getFieldsFor(String stream) {
        return FIELDS;
    }

    @Override
    public List<String> streams() {
        return DEFAULT_STREAM;
    }
}

3.4 啓動測試

這裏啓動一個生產者用於發送測試數據,啓動命令以下:

# bin/kafka-console-producer.sh --broker-list hadoop001:9092 --topic storm-topic

本地運行的項目接收到從 Kafka 發送過來的數據:


用例源碼下載地址:storm-kafka-integration

參考資料

  1. Storm Kafka Integration (0.10.x+)

更多大數據系列文章能夠參見 GitHub 開源項目大數據入門指南

相關文章
相關標籤/搜索