storm kafka 編程指南

1、原理及關鍵步驟介紹

storm中的storm-kafka組件提供了storm與kafka交互的所需的全部功能,請參考其官方文檔:https://github.com/apache/storm/tree/master/external/storm-kafka#brokerhostshtml

(一)使用storm-kafka的關鍵步驟

一、建立ZkHosts

當storm從kafka中讀取某個topic的消息時,須要知道這個topic有多少個分區,以及這些分區放在哪一個kafka節點(broker)上,ZkHosts就是用於這個功能。 
建立zkHosts有2種形式:java

public ZkHosts(String brokerZkStr, String brokerZkPath) 
   public ZkHosts(String brokerZkStr)

(1)默認狀況下,zk信息被放到/brokers中,此時可使用第2種方式: node

new ZkHosts(「192.168.172.117:2181,192.168.172.98:2181,192.168.172.111:2181,192.168.172.114:2181,192.168.172.116:2181」);

(2)若zk信息被放置在/kafka/brokers中(咱們的集羣就是這種情形),則可使用: git

new ZkHosts(「192.168.172.117:2181,192.168.172.98:2181,192.168.172.111:2181,192.168.172.114:2181,192.168.172.116:2181」,「/kafka」)

或者直接:

new ZkHosts("192.168.172.117:2181,192.168.172.98:2181,192.168.172.111:2181,192.168.172.114:2181,192.168.172.116:2181/kafka」)

默認狀況下,每60秒去讀取一次kafka的分區信息,能夠經過修改host.refreshFreqSecs來設置。github

(3)除了使用ZkHosts來讀取分析信息外,storm-kafka還提供了一種靜態指定的方法(不推薦此方法),如:apache

Broker brokerForPartition0 = new Broker("localhost");//localhost:9092
    Broker brokerForPartition1 = new Broker("localhost", 9092);//localhost:9092 but we specified the port explicitly
    Broker brokerForPartition2 = new Broker("localhost:9092");//localhost:9092 specified as one string.
    GlobalPartitionInformation partitionInfo = new GlobalPartitionInformation();
    partitionInfo.addPartition(0, brokerForPartition0);//mapping form partition 0 to brokerForPartition0
    partitionInfo.addPartition(1, brokerForPartition1);//mapping form partition 1 to brokerForPartition1
    partitionInfo.addPartition(2, brokerForPartition2);//mapping form partition 2 to brokerForPartition2
    StaticHosts hosts = new StaticHosts(partitionInfo);

由此能夠看出,ZkHosts完成的功能就是指定了從哪一個kafka節點讀取某個topic的哪一個分區。編程

二、建立KafkaConfig

(1)有2種方式建立KafkaConfig json

public KafkaConfig(BrokerHosts hosts, String topic) 
public KafkaConfig(BrokerHosts hosts, String topic, String clientId) 


BrokerHosts就是上面建立的實例,topic就是要訂閱的topic名稱,clientId用於指定存放當前topic consumer的offset的位置,這個id 應該是惟一的,不然多個拓撲會引發衝突。 
事實上,trident的offset並不保存在這個位置,見下面介紹。 
真正使用時,有2種擴展,分別用於通常的storm以及trident。 
(2)core storm api

Spoutconfig is an extension of KafkaConfig that supports additional fields with ZooKeeper connection info and for controlling behavior specific to KafkaSpout. The Zkroot will be used as root to store your consumer’s offset. The id should uniquely identify your spout. 
public SpoutConfig(BrokerHosts hosts, String topic, String zkRoot, String id); 
public SpoutConfig(BrokerHosts hosts, String topic, String id); 
In addition to these parameters, SpoutConfig contains the following fields that control how KafkaSpout behaves:bash

// setting for how often to save the current kafka offset to ZooKeeper
    public long stateUpdateIntervalMs = 2000;

    // Exponential back-off retry settings.  These are used when retrying messages after a bolt
    // calls OutputCollector.fail().
    // Note: be sure to set backtype.storm.Config.MESSAGE_TIMEOUT_SECS appropriately to prevent
    // resubmitting the message while still retrying.
    public long retryInitialDelayMs = 0;
    public double retryDelayMultiplier = 1.0;
    public long retryDelayMaxMs = 60 * 1000;

KafkaSpout 只接受 SpoutConfig做爲參數

(3)TridentKafkaConfig,TridentKafkaEmitter只接受TridentKafkaConfig使用參數 
trident消費kafka的offset位置是在創建拓撲中指定,如:

topology.newStream(test, kafkaSpout).

則offset的位置爲:

/transactional/test/coordinator/currtx

(4)KafkaConfig的一些默認參數

public int fetchSizeBytes = 1024 * 1024;
    public int socketTimeoutMs = 10000;
    public int fetchMaxWait = 10000;
    public int bufferSizeBytes = 1024 * 1024;
    public MultiScheme scheme = new RawMultiScheme();
    public boolean forceFromStart = false;
    public long startOffsetTime = kafka.api.OffsetRequest.EarliestTime();
    public long maxOffsetBehind = Long.MAX_VALUE;
    public boolean useStartOffsetTimeIfOffsetOutOfRange = true;
    public int metricsTimeBucketSizeInSecs = 60;

能夠經過如下方式修改:

kafkaConfig.scheme = new SchemeAsMultiScheme(new StringScheme());

三、設置MultiScheme

MultiScheme用於指定如何處理從kafka中讀取到的字節,同時它用於控制輸出字段名稱。

public Iterable<List<Object>> deserialize(byte[] ser);
 public Fields getOutputFields();

默認狀況下,RawMultiScheme讀取一個字段並返回一個字節,而發射的字段名稱爲bytes。 
能夠經過SchemeAsMultiScheme和 KeyValueSchemeAsMultiScheme改變這種默認行爲:

kafkaConfig.scheme =new SchemeAsMultiScheme(new StringScheme());

上面的語句指定了將字節轉化爲字符。 
同時創建拓撲時:

topology.newStream(「test",kafkaSpout).each(new Fields("str"),new FilterFunction(),new Fields("word」))….

會指定發射的字段名稱爲str。

四、建立Spout

(1)core storm

KafkaSpout kafkaSpout = new KafkaSpout(spoutConfig);

(2)trident

OpaqueTridentKafkaSpoutkafkaSpout = new OpaqueTridentKafkaSpout(kafkaConfig);

五、創建拓撲:

(1)core-storm

builder.setSpout("kafka-reader",new KafkaSpout(spoutConf),12);

kafka-reader指定了spout的名稱,12指定了並行度。

(2)trident

topology.newStream(「test", kafkaSpout). each(new Fields("str"), new FilterFunction(),new Fields("word」))….

test指定了放置offset的位置,也就是txid的位置。str指定了spout發射字段的名稱。

完整示例: 
Core Spout

BrokerHosts hosts = new ZkHosts(zkConnString);
SpoutConfig spoutConfig = new SpoutConfig(hosts, topicName, "/" + topicName, UUID.randomUUID().toString());
spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
KafkaSpout kafkaSpout = new KafkaSpout(spoutConfig);

Trident Spout

TridentTopology topology = new TridentTopology();
BrokerHosts zk = new ZkHosts("localhost");
TridentKafkaConfig spoutConf = new TridentKafkaConfig(zk, "test-topic");
spoutConf.scheme = new SchemeAsMultiScheme(new StringScheme());
OpaqueTridentKafkaSpout spout = new OpaqueTridentKafkaSpout(spoutConf);

(二)當拓撲出錯時,如何從上一次的kafka位置繼續處理消息

一、KafkaConfig.startOffsetTime

KafkaConfig有一個配置項爲KafkaConfig.startOffsetTime,它用於指定拓撲從哪一個位置上開始處理消息,可取的值有3個: 
(1)kafka.api.OffsetRequest.EarliestTime(): 從最先的消息開始 
(2)kafka.api.OffsetRequest.LatestTime(): 從最新的消息開始,即從隊列隊伍最末端開始。 
(3)根據時間點:

kafkaConfig.startOffsetTime =  new SimpleDateFormat("yyyy.MM.dd-HH:mm:ss").parse(startOffsetTime).getTime();

能夠參閱 How do I accurately get offsets of messages for a certain timestamp using OffsetRequest? 的實現原理。 

How do I accurately get offsets of messages for a certain timestamp using OffsetRequest? 
Kafka allows querying offsets of messages by time and it does so at segment granularity.The timestamp parameter is the unix timestamp and querying the offset by timestamp returns the latest possible offset of the message that is appended no later than the given timestamp. There are 2 special values of the timestamp - latest and earliest. For any other value of the unix timestamp, Kafka will get the starting offset of the log segment that is created no later than the given timestamp. Due to this, and since the offset request is served only at segment granularity, the offset fetch request returns less accurate results for larger segment sizes. 
For more accurate results, you may configure the log segment size based on time (log.roll.ms) instead of size (log.segment.bytes). However care should be taken since doing so might increase the number of file handlers due to frequent log segment rolling. 

二、因爲運行拓撲時,指定了offset在zk中保存的位置,當出現錯誤時,能夠找出offset 
當從新部署拓撲時,必須保證offset的保存位置不變,它才能正確的讀取到offset。 
(1)對於core storm,就是

SpoutConfigspoutConf = new SpoutConfig(brokerHosts,topic, zkRoot,id);

後2個參數不能變化 
(2)對於trident而言,就是

topology.newStream(「test", kafkaSpout).

第1個參數不能變化。 
三、也就是說只要拓撲運行過一次KafkaConfig.startOffsetTime,以後從新部署時都可從offset中開始
再看看這2個參數

public booleanforceFromStart =false;
   public long startOffsetTime= kafka.api.OffsetRequest.EarliestTime();

若是將forceFromStart(舊版本是ignoreZkOffsets)設置爲true,則每次拓撲從新啓動時,都會從開頭讀取消息。 
若是爲false,則: 
第一次啓動,從開頭讀取,以後的重啓均是從offset中讀取。 
通常使用時,將數值設置爲以上2個便可。

(三)結果寫回kafka

若是想把結果寫回kafka,並保證事務性,可使用 storm.kafka.trident.TridentState, storm.kafka.trident.TridentStateFactory and storm.kafka.trident.TridentKafkaUpdater.

如下是官方說明。 

Writing to Kafka as part of your topology 
You can create an instance of storm.kafka.bolt.KafkaBolt and attach it as a component to your topology or if you are using trident you can use storm.kafka.trident.TridentState, storm.kafka.trident.TridentStateFactory and storm.kafka.trident.TridentKafkaUpdater. 
You need to provide implementation of following 2 interfaces

TupleToKafkaMapper and TridentTupleToKafkaMapper 
These interfaces have 2 methods defined:

K getKeyFromTuple(Tuple/TridentTuple tuple);
    V getMessageFromTuple(Tuple/TridentTuple tuple);

as the name suggests these methods are called to map a tuple to kafka key and kafka message. If you just want one field as key and one field as value then you can use the provided FieldNameBasedTupleToKafkaMapper.Java implementation. In the KafkaBolt, the implementation always looks for a field with field name 「key」 and 「message」 if you use the default constructor to construct FieldNameBasedTupleToKafkaMapper for backward compatibility reasons. Alternatively you could also specify a different key and message field by using the non default constructor. In the TridentKafkaState you must specify what is the field name for key and message as there is no default constructor. These should be specified while constructing and instance of FieldNameBasedTupleToKafkaMapper.

KafkaTopicSelector and trident KafkaTopicSelector 
This interface has only one method

publicinterface KafkaTopicSelector {
    StringgetTopics(Tuple/TridentTuple tuple);
}

The implementation of this interface should return topic to which the tuple’s key/message mapping needs to be published You can return a null and the message will be ignored. If you have one static topic name then you can use DefaultTopicSelector.java and set the name of the topic in the constructor.

Specifying kafka producer properties 
You can provide all the produce properties , seehttp://kafka.apache.org/documentation.html#producerconfigs section 「Important configuration properties for the producer」, in your storm topology config by setting the properties map with key kafka.broker.properties.

附帶2個官方的示例 
For the bolt :

TopologyBuilder builder = new TopologyBuilder();

        Fields fields = new Fields("key", "message");
        FixedBatchSpout spout = new FixedBatchSpout(fields, 4,
                    new Values("storm", "1"),
                    new Values("trident", "1"),
                    new Values("needs", "1"),
                    new Values("javadoc", "1")
        );
        spout.setCycle(true);
        builder.setSpout("spout", spout, 5);
        KafkaBolt bolt = new KafkaBolt()
                .withTopicSelector(new DefaultTopicSelector("test"))
                .withTupleToKafkaMapper(new FieldNameBasedTupleToKafkaMapper());
        builder.setBolt("forwardToKafka", bolt, 8).shuffleGrouping("spout");

        Config conf = new Config();
        //set producer properties.
        Properties props = new Properties();
        props.put("metadata.broker.list", "localhost:9092");
        props.put("request.required.acks", "1");
        props.put("serializer.class", "kafka.serializer.StringEncoder");
        conf.put(KafkaBolt.KAFKA_BROKER_PROPERTIES, props);

        StormSubmitter.submitTopology("kafkaboltTest", conf, builder.createTopology());

For Trident:

Fields fields = new Fields("word", "count");
        FixedBatchSpout spout = new FixedBatchSpout(fields, 4,
                new Values("storm", "1"),
                new Values("trident", "1"),
                new Values("needs", "1"),
                new Values("javadoc", "1")
        );
        spout.setCycle(true);

        TridentTopology topology = new TridentTopology();
        Stream stream = topology.newStream("spout1", spout);

        TridentKafkaStateFactory stateFactory = new TridentKafkaStateFactory()
                .withKafkaTopicSelector(new DefaultTopicSelector("test"))
                .withTridentTupleToKafkaMapper(new FieldNameBasedTupleToKafkaMapper("word", "count"));
        stream.partitionPersist(stateFactory, fields, new TridentKafkaUpdater(), new Fields());

        Config conf = new Config();
        //set producer properties.
        Properties props = new Properties();
        props.put("metadata.broker.list", "localhost:9092");
        props.put("request.required.acks", "1");
        props.put("serializer.class", "kafka.serializer.StringEncoder");
        conf.put(TridentKafkaState.KAFKA_BROKER_PROPERTIES, props);
        StormSubmitter.submitTopology("kafkaTridentTest", conf, topology.build());

2、完整示例

(一)基本訂閱

基本場景:訂閱kafka的某個topic,而後在讀取的消息前加上自定義的字符串,而後寫回到kafka另一個topic。

從Kafka讀取數據的Spout使用storm.kafka.KafkaSpout,向Kafka寫數據的Bolt使用storm.kafka.bolt.KafkaBolt。中間進行進行數據處理的Bolt定義爲TopicMsgBolt。閒言少敘,奉上代碼:

public class TopicMsgTopology {
    public static void main(String[] args) throws Exception {
        // 配置Zookeeper地址
        BrokerHosts brokerHosts = new ZkHosts("zk1:2181,zk2:2281,zk3:2381");
        // 配置Kafka訂閱的Topic,以及zookeeper中數據節點目錄和名字
        SpoutConfig spoutConfig = new SpoutConfig(brokerHosts, "msgTopic1", "/topology/root", "topicMsgTopology");
        // 配置KafkaBolt中的kafka.broker.properties
        Config conf = new Config();
        Properties props = new Properties();
        // 配置Kafka broker地址
        props.put("metadata.broker.list", "dev2_55.wfj-search:9092");
        // serializer.class爲消息的序列化類
        props.put("serializer.class", "kafka.serializer.StringEncoder");
        conf.put("kafka.broker.properties", props);
        // 配置KafkaBolt生成的topic
        conf.put("topic", "msgTopic2");
        spoutConfig.scheme = new SchemeAsMultiScheme(new MessageScheme());
        TopologyBuilder builder = new TopologyBuilder();
        builder.setSpout("msgKafkaSpout", new KafkaSpout(spoutConfig));
        builder.setBolt("msgSentenceBolt", new TopicMsgBolt()).shuffleGrouping("msgKafkaSpout");
        builder.setBolt("msgKafkaBolt", new KafkaBolt<String, Integer>()).shuffleGrouping("msgSentenceBolt");
        if (args.length == 0) {
            String topologyName = "kafkaTopicTopology";
            LocalCluster cluster = new LocalCluster();
            cluster.submitTopology(topologyName, conf, builder.createTopology());
            Utils.sleep(100000);
            cluster.killTopology(topologyName);
            cluster.shutdown();
        } else {
            conf.setNumWorkers(1);
            StormSubmitter.submitTopology(args[0], conf, builder.createTopology());
        }
    }
}

storm.kafka.ZkHosts構造方法的參數是zookeeper標準配置地址的形式(ZooKeeper環境搭建能夠查看ZooKeeper安裝部署),zk一、zk二、zk3在本地配置了host,由於服務器使用的僞分佈式模式,所以幾個端口號不是默認的2181。

storm.kafka.SpoutConfig構造方法第一個參數爲上述的storm.kafka.ZkHosts對象,第二個爲待訂閱的topic名稱,第三個參數zkRoot爲寫讀取topic時的偏移量offset數據的節點(zk node),第四個參數爲該節點上的次級節點名(有個地方說這個是spout的id)。

backtype.storm.Config對象是配置storm的topology(拓撲)所須要的基礎配置。

backtype.storm.spout.SchemeAsMultiScheme的構造方法輸入的參數是訂閱kafka數據的處理參數,這裏的MessageScheme是自定義的,代碼以下:

public class MessageScheme implements Scheme {
    private static final Logger logger = LoggerFactory.getLogger(MessageScheme.class);

    @Override
    public List<Object> deserialize(byte[] ser) {
        try {
            String msg = new String(ser, "UTF-8");
            logger.info("get one message is {}", msg);
            return new Values(msg);
        } catch (UnsupportedEncodingException ignored) {
            return null;
        }
    }

    @Override
    public Fields getOutputFields() {
        return new Fields("msg");
    }
}

MessageScheme類中getOutputFields方法是KafkaSpout向後發送tuple(storm傳輸數據的最小結構)的名字,須要與接收數據的Bolt中統一(在這個例子中能夠不統一,由於後面直接取第0條數據,可是在wordCount的那個例子中就須要統一了)。

TopicMsgBolt類是從storm.kafka.KafkaSpout接收數據的Bolt,對接收到的數據進行處理,而後向後傳輸給storm.kafka.bolt.KafkaBolt。代碼以下:

public class TopicMsgBolt extends BaseBasicBolt {
    private static final Logger logger = LoggerFactory.getLogger(TopicMsgBolt.class);

    @Override
    public void execute(Tuple input, BasicOutputCollector collector) {
        String word = (String) input.getValue(0);
        String out = "Message got is '" + word + "'!";
        logger.info("out={}", out);
        collector.emit(new Values(out));
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("message"));
    }
}

此處須要特別注意的是,要使用backtype.storm.topology.base.BaseBasicBolt對象做爲父類,不然不會在zk記錄偏移量offset數據。

須要編寫的代碼已完成,接下來就是在搭建好的storm、kafka中進行測試:

# 建立topic
./bin/kafka-topics.sh --create --zookeeper zk1:2181,zk2:2281,zk3:2381 --replication-factor 1 --partitions 1 --topic msgTopic1
./bin/kafka-topics.sh --create --zookeeper zk1:2181,zk2:2281,zk3:2381 --replication-factor 1 --partitions 1 --topic msgTopic2

接下來須要分別對msgTopic一、msgTopic2啓動producer(生產者)與consumer(消費者):

# 對msgTopic1啓動producer,用於發送數據
./bin/kafka-console-producer.sh --broker-list dev2_55.wfj-search:9092 --topic msgTopic1
# 對msgTopic2啓動consumer,用於查看發送數據的處理結果
./bin/kafka-console-consumer.sh --zookeeper zk1:2181,zk2:2281,zk3:2381 --topic msgTopic2 --from-beginning

而後將打好的jar包上傳到storm的nimbus(可使用遠程上傳或先上傳jar包到nimbus節點所在服務器,而後本地執行):

# ./bin/storm jar topology TopicMsgTopology.jar cn.howardliu.demo.storm.kafka.topicMsg.TopicMsgTopology TopicMsgTopology

待對應的worker啓動好以後,就能夠在msgTopic1的producer對應終端輸入數據,而後在msgTopic2的consumer對應終端查看輸出結果了。

有幾點須要注意的:

  1. 必須先建立msgTopic一、msgTopic2兩個topic;
  2. 定義的bolt必須使用BaseBasicBolt做爲父類,不可以使用BaseRichBolt,不然沒法記錄偏移量;
  3. zookeeper最好使用至少三個節點的分佈式模式或僞分佈式模式,不然會出現一些異常狀況;
  4. 在整個storm下,spout、bolt的id必須惟一,不然會出現異常。
  5. TopicMsgBolt類做爲storm.kafka.bolt.KafkaBolt前的最後一個Bolt,須要將輸出數據名稱定義爲message,不然KafkaBolt沒法接收數據。

(二)Topic 消費與回寫:wordCount

簡單的輸入輸出作完了,來點複雜點兒的場景:從某個topic定於消息,而後根據空格分詞,統計單詞數量,而後將當前輸入的單詞數量推送到另外一個topic。

首先規劃須要用到的類:

  1. 從KafkaSpout接收數據並進行處理的backtype.storm.spout.Scheme子類;
  2. 數據切分bolt:SplitSentenceBolt
  3. 計數bolt:WordCountBolt
  4. 報表bolt:ReportBolt
  5. topology定義:WordCountTopology
  6. 最後再加一個原樣顯示訂閱數據的bolt:SentenceBolt

backtype.storm.spout.Scheme子類可使用上面已經定義過的MessageScheme,此處再也不贅述。

SplitSentenceBolt是對輸入數據進行分割,簡單的使用String類的split方法,而後將每一個單詞命名爲「word」,向後傳輸,代碼以下:

public class SplitSentenceBolt extends BaseBasicBolt {
    @Override
    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
        outputFieldsDeclarer.declare(new Fields("word"));
    }

    @Override
    public void execute(Tuple input, BasicOutputCollector collector) {
        String sentence = input.getStringByField("msg");
        String[] words = sentence.split(" ");
        Arrays.asList(words).forEach(word -> collector.emit(new Values(word)));
    }
}

SentenceBolt是從KafkaSpout接收數據,而後直接輸出。在拓撲圖上就是從輸入分叉,一個進入SplitSentenceBolt,一個進入SentenceBolt。這種結構能夠應用在Lambda架構中,代碼以下:

public class SentenceBolt extends BaseBasicBolt {
    private static final Logger logger = LoggerFactory.getLogger(SentenceBolt.class);

    @Override
    public void execute(Tuple tuple, BasicOutputCollector basicOutputCollector) {
        String msg = tuple.getStringByField("msg");
        logger.info("get one message is {}", msg);
        basicOutputCollector.emit(new Values(msg));
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
        outputFieldsDeclarer.declare(new Fields("sentence"));
    }
}

WordCountBolt是對接收到的單詞進行彙總統一,而後將單詞「word」及其對應數量「count」向後傳輸,代碼以下:

public class WordCountBolt extends BaseBasicBolt {
    private Map<String, Long> counts = null;

    @Override
    public void prepare(Map stormConf, TopologyContext context) {
        this.counts = new ConcurrentHashMap<>();
        super.prepare(stormConf, context);
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
        outputFieldsDeclarer.declare(new Fields("word", "count"));
    }

    @Override
    public void execute(Tuple input, BasicOutputCollector collector) {
        String word = input.getStringByField("word");
        Long count = this.counts.get(word);
        if (count == null) {
            count = 0L;
        }
        count++;
        this.counts.put(word, count);
        collector.emit(new Values(word, count));
    }
}

ReportBolt是對接收到的單詞及數量進行整理,拼成json格式,而後繼續向後傳輸,代碼以下:

public class ReportBolt extends BaseBasicBolt {
    @Override
    public void execute(Tuple input, BasicOutputCollector collector) {
        String word = input.getStringByField("word");
        Long count = input.getLongByField("count");
        String reportMessage = "{'word': '" + word + "', 'count': '" + count + "'}";
        collector.emit(new Values(reportMessage));
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
        outputFieldsDeclarer.declare(new Fields("message"));
    }
}

最後是定義topology(拓撲)WordCountTopology,代碼以下:

public class WordCountTopology {
    private static final String KAFKA_SPOUT_ID = "kafkaSpout";
    private static final String SENTENCE_BOLT_ID = "sentenceBolt";
    private static final String SPLIT_BOLT_ID = "sentenceSplitBolt";
    private static final String WORD_COUNT_BOLT_ID = "sentenceWordCountBolt";
    private static final String REPORT_BOLT_ID = "reportBolt";
    private static final String KAFKA_BOLT_ID = "kafkabolt";
    private static final String CONSUME_TOPIC = "sentenceTopic";
    private static final String PRODUCT_TOPIC = "wordCountTopic";
    private static final String ZK_ROOT = "/topology/root";
    private static final String ZK_ID = "wordCount";
    private static final String DEFAULT_TOPOLOGY_NAME = "sentenceWordCountKafka";

    public static void main(String[] args) throws Exception {
        // 配置Zookeeper地址
        BrokerHosts brokerHosts = new ZkHosts("zk1:2181,zk2:2281,zk3:2381");
        // 配置Kafka訂閱的Topic,以及zookeeper中數據節點目錄和名字
        SpoutConfig spoutConfig = new SpoutConfig(brokerHosts, CONSUME_TOPIC, ZK_ROOT, ZK_ID);
        spoutConfig.scheme = new SchemeAsMultiScheme(new MessageScheme());

        TopologyBuilder builder = new TopologyBuilder();
        builder.setSpout(KAFKA_SPOUT_ID, new KafkaSpout(spoutConfig));
        builder.setBolt(SENTENCE_BOLT_ID, new SentenceBolt()).shuffleGrouping(KAFKA_SPOUT_ID);
        builder.setBolt(SPLIT_BOLT_ID, new SplitSentenceBolt()).shuffleGrouping(KAFKA_SPOUT_ID);
        builder.setBolt(WORD_COUNT_BOLT_ID, new WordCountBolt()).fieldsGrouping(SPLIT_BOLT_ID, new Fields("word"));
        builder.setBolt(REPORT_BOLT_ID, new ReportBolt()).shuffleGrouping(WORD_COUNT_BOLT_ID);
        builder.setBolt(KAFKA_BOLT_ID, new KafkaBolt<String, Long>()).shuffleGrouping(REPORT_BOLT_ID);

        Config config = new Config();
        Map<String, String> map = new HashMap<>();
        map.put("metadata.broker.list", "dev2_55.wfj-search:9092");// 配置Kafka broker地址
        map.put("serializer.class", "kafka.serializer.StringEncoder");// serializer.class爲消息的序列化類
        config.put("kafka.broker.properties", map);// 配置KafkaBolt中的kafka.broker.properties
        config.put("topic", PRODUCT_TOPIC);// 配置KafkaBolt生成的topic

        if (args.length == 0) {
            LocalCluster cluster = new LocalCluster();
            cluster.submitTopology(DEFAULT_TOPOLOGY_NAME, config, builder.createTopology());
            Utils.sleep(100000);
            cluster.killTopology(DEFAULT_TOPOLOGY_NAME);
            cluster.shutdown();
        } else {
            config.setNumWorkers(1);
            StormSubmitter.submitTopology(args[0], config, builder.createTopology());
        }
    }
}

除了上面提過應該注意的地方,此處還須要注意,storm.kafka.SpoutConfig定義的zkRoot與id應該與第一個例子中不一樣(至少保證id不一樣,不然兩個topology將使用一個節點記錄偏移量)。

Refer:

[1] storm-kafka編程指南

http://blog.csdn.net/lujinhong2/article/details/47132287

[2] kafka集羣編程指南

http://blog.csdn.net/lujinhong2/article/details/47146693

[3] 關於kafka中的timestamp與offset的對應關係

http://blog.csdn.net/lujinhong2/article/details/49661309

[4] storm筆記:Storm+Kafka簡單應用

http://www.howardliu.cn/a-few-notes-about-storm/

相關文章
相關標籤/搜索