通過前面Kafka實戰系列的學習,咱們經過學習《Kafka實戰-入門》瞭解Kafka的應用場景和基本原理,《Kafka實戰-Kafka Cluster》一文給你們分享了Kafka集羣的搭建部署,讓你們掌握了集羣的搭建步驟,《Kafka實戰-實時日誌統計流程》一文給你們講解一個項目(或者說是系統)的總體流程,《Kafka實戰-Flume到Kafka》一文給你們介紹了Kafka的數據生產過程,《Kafka實戰-Kafka到Storm》一文給你們介紹了Kafka的數據消費,經過Storm來實時計算處理。今天進入Kafka實戰的最後一個環節,那就是Kafka實戰的結果的數據持久化。下面是今天要分享的內容目錄:html
下面開始今天的分享內容。前端
通常,咱們在進行實時計算,將結果統計處理後,須要將結果進行輸出,供前端工程師去展現咱們統計的結果(所說的報表)。結果的存儲,這裏咱們選擇的是Redis+MySQL進行存儲,下面用一張圖來展現這個持久化的流程,以下圖所示:java
從途中能夠看出,實時計算的部分由Storm集羣去完成,而後將計算的結果輸出到Redis和MySQL庫中進行持久化,給前端展現提供數據源。接下來,我給你們介紹如何實現這部分流程。mysql
首先,咱們去實現Storm的計算結果輸出到Redis庫中,代碼以下所示:redis
package cn.hadoop.hdfs.storm; import java.util.HashMap; import java.util.Map; import java.util.Map.Entry; import redis.clients.jedis.Jedis; import cn.hadoop.hdfs.util.JedisFactory; import backtype.storm.task.OutputCollector; import backtype.storm.task.TopologyContext; import backtype.storm.topology.IRichBolt; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.tuple.Tuple; /** * @Date Jun 10, 2015 * * @Author dengjie * * @Note Calc WordsCount eg. */ public class WordsCounterBlots implements IRichBolt { /** * */ private static final long serialVersionUID = -619395076356762569L; OutputCollector collector; Map<String, Integer> counter; @SuppressWarnings("rawtypes") public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { this.collector = collector; this.counter = new HashMap<String, Integer>(); } public void execute(Tuple input) { String word = input.getString(0); Integer integer = this.counter.get(word); if (integer != null) { integer += 1; this.counter.put(word, integer); } else { this.counter.put(word, 1); } for (Entry<String, Integer> entry : this.counter.entrySet()) { // write result to redis Jedis jedis = JedisFactory.getJedisInstance("real-time"); jedis.set(entry.getKey(), entry.getValue().toString()); // write result to mysql // ... } this.collector.ack(input); } public void cleanup() { // TODO Auto-generated method stub } public void declareOutputFields(OutputFieldsDeclarer declarer) { // TODO Auto-generated method stub } public Map<String, Object> getComponentConfiguration() { // TODO Auto-generated method stub return null; } }
注:這裏關於輸出到MySQL就不贅述了,你們能夠按需處理便可。sql
在實現持久化到Redis的代碼實現後,接下來,咱們經過提交Storm做業,來觀察是否將計算後的結果持久化到了Redis集羣中。結果以下圖所示:前端工程師
經過Redis的Client來瀏覽存儲的Key值,能夠觀察統計的結果持久化到來Redis中。oop
咱們在提交做業到Storm集羣的時候須要觀察做業運行情況,有可能會出現異常,咱們能夠經過Storm UI界面來觀察,會有提示異常信息的詳細描述。如果出錯,你們能夠經過Storm UI的錯誤信息和Log日誌打印的錯誤信息來定位出緣由,從而找到對應的解決辦法。post
這篇博客就和你們分享到這裏,若是你們在研究學習的過程中有什麼問題,能夠加羣進行討論或發送郵件給我,我會盡我所能爲您解答,與君共勉!學習