Kafka實戰-Kafka到Storm

1.概述

  在《Kafka實戰-Flume到Kafka》一文中給你們分享了Kafka的數據源生產,今天爲你們介紹如何去實時消費Kafka中的數據。這裏使用實時計算的模型——Storm。下面是今天分享的主要內容,以下所示:html

  • 數據消費
  • Storm計算
  • 預覽截圖

  接下來,咱們開始分享今天的內容。java

2.數據消費

  Kafka的數據消費,是由Storm去消費,經過KafkaSpout將數據輸送到Storm,而後讓Storm安裝業務需求對接受的數據作實時處理,下面給你們介紹數據消費的流程圖,以下圖所示:api

  從圖能夠看出,Storm經過KafkaSpout獲取Kafka集羣中的數據,在通過Storm處理後,結果會被持久化到DB庫中。session

3.Storm計算

  接着,咱們使用Storm去計算,這裏須要體檢搭建部署好Storm集羣,如果未搭建部署集羣,你們能夠參考我寫的《Kafka實戰-Storm Cluster》。這裏就很少作贅述搭建的過程了,下面給你們介紹實現這部分的代碼,關於KafkaSpout的代碼以下所示:oop

  • KafkaSpout類:
package cn.hadoop.hdfs.storm;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import cn.hadoop.hdfs.conf.ConfigureAPI.KafkaProperties;
import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.IRichSpout;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;

/**
 * @Date Jun 10, 2015
 *
 * @Author dengjie
 *
 * @Note Data sources using KafkaSpout to consume Kafka
 */
public class KafkaSpout implements IRichSpout {

    /**
     * 
     */
    private static final long serialVersionUID = -7107773519958260350L;
    private static final Logger LOGGER = LoggerFactory.getLogger(KafkaSpout.class);

    SpoutOutputCollector collector;
    private ConsumerConnector consumer;
    private String topic;

    private static ConsumerConfig createConsumerConfig() {
        Properties props = new Properties();
        props.put("zookeeper.connect", KafkaProperties.ZK);
        props.put("group.id", KafkaProperties.GROUP_ID);
        props.put("zookeeper.session.timeout.ms", "40000");
        props.put("zookeeper.sync.time.ms", "200");
        props.put("auto.commit.interval.ms", "1000");
        return new ConsumerConfig(props);
    }

    public KafkaSpout(String topic) {
        this.topic = topic;
    }

    public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
        this.collector = collector;
    }

    public void close() {
        // TODO Auto-generated method stub

    }

    public void activate() {
        this.consumer = Consumer.createJavaConsumerConnector(createConsumerConfig());
        Map<String, Integer> topickMap = new HashMap<String, Integer>();
        topickMap.put(topic, new Integer(1));
        Map<String, List<KafkaStream<byte[], byte[]>>> streamMap = consumer.createMessageStreams(topickMap);
        KafkaStream<byte[], byte[]> stream = streamMap.get(topic).get(0);
        ConsumerIterator<byte[], byte[]> it = stream.iterator();
        while (it.hasNext()) {
            String value = new String(it.next().message());
            LOGGER.info("(consumer)==>" + value);
            collector.emit(new Values(value), value);
        }
    }

    public void deactivate() {
        // TODO Auto-generated method stub

    }

    public void nextTuple() {
        // TODO Auto-generated method stub

    }

    public void ack(Object msgId) {
        // TODO Auto-generated method stub

    }

    public void fail(Object msgId) {
        // TODO Auto-generated method stub

    }

    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("KafkaSpout"));
    }

    public Map<String, Object> getComponentConfiguration() {
        // TODO Auto-generated method stub
        return null;
    }

}
  • KafkaTopology類:
package cn.hadoop.hdfs.storm.client;

import cn.hadoop.hdfs.storm.FileBlots;
import cn.hadoop.hdfs.storm.KafkaSpout;
import cn.hadoop.hdfs.storm.WordsCounterBlots;
import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.StormSubmitter;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.tuple.Fields;

/**
 * @Date Jun 10, 2015
 *
 * @Author dengjie
 *
 * @Note KafkaTopology Task
 */
public class KafkaTopology {
    public static void main(String[] args) {
        TopologyBuilder builder = new TopologyBuilder();
        builder.setSpout("testGroup", new KafkaSpout("test"));
        builder.setBolt("file-blots", new FileBlots()).shuffleGrouping("testGroup");
        builder.setBolt("words-counter", new WordsCounterBlots(), 2).fieldsGrouping("file-blots", new Fields("words"));
        Config config = new Config();
        config.setDebug(true);
        if (args != null && args.length > 0) {
            // online commit Topology
            config.put(Config.NIMBUS_HOST, args[0]);
            config.setNumWorkers(3);
            try {
                StormSubmitter.submitTopologyWithProgressBar(KafkaTopology.class.getSimpleName(), config,
                        builder.createTopology());
            } catch (Exception e) {
                e.printStackTrace();
            }
        } else {
            // Local commit jar
            LocalCluster local = new LocalCluster();
            local.submitTopology("counter", config, builder.createTopology());
            try {
                Thread.sleep(60000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            local.shutdown();
        }
    }
}

4.預覽截圖

  首先,咱們啓動Kafka集羣,目前未生產任何消息,以下圖所示:post

  接下來,咱們啓動Flume集羣,開始收集日誌信息,將數據輸送到Kafka集羣,以下圖所示:學習

  接下來,咱們啓動Storm UI來查看Storm提交的任務運行情況,以下圖所示:
ui

  最後,將統計的結果持久化到Redis或者MySQL等DB中,結果以下圖所示:this

5.總結

  這裏給你們分享了數據的消費流程,而且給出了持久化的結果預覽圖,關於持久化的細節,後面有單獨有一篇博客會詳細的講述,給你們分享其中的過程,這裏你們熟悉下流程,預覽結果便可。spa

6.結束語

  這篇博客就和你們分享到這裏,若是你們在研究學習的過程中有什麼問題,能夠加羣進行討論或發送郵件給我,我會盡我所能爲您解答,與君共勉!

相關文章
相關標籤/搜索