概述java
在基於Hadoop平臺的不少應用場景中,咱們須要對數據進行離線和實時分析,離線分析能夠很容易地藉助於Hive或者mr來實現統計分析,可是對於實時的需求Hive和mr就不合適了。實時應用場景可使用Storm,它是一個實時處理系統,它爲實時處理類應用提供了一個計算模型,能夠很容易地進行編程處理。爲了統一離線和實時計算,通常狀況下,咱們都但願將離線和實時計算的數據源的集合統一塊兒來做爲輸入,而後將數據的流向分別經由實時系統和離線分析系統,分別進行分析處理,這時咱們能夠考慮將數據源(如使用Flume收集日誌)直接鏈接一個消息中間件,如Kafka,能夠整合Flume+Kafka,Flume做爲消息的Producer,生產的消息數據(日誌數據、業務請求數據等等)發佈到Kafka中,而後經過訂閱的方式,使用Storm的Topology做爲消息的Consumer,在Storm集羣中分別進行以下兩個需求場景的處理:
直接使用Storm的Topology對數據進行實時分析處理
整合Storm+HDFS,將消息處理後寫入HDFS進行離線分析處理mysql
flume+kafka+storm相結合,此時,flume做爲數據來源收集數據,kafka做爲消息隊列,起緩衝做用,storm從kafka拉取數據分析處理。作軟件開發的都知道模塊化思想,這樣設計的緣由有兩方面:
一方面是能夠模塊化,功能劃分更加清晰,從「數據採集--數據接入--流式計算--數據輸出/存儲」sql
1).數據採集
負責從各節點上實時採集數據,選用cloudera的flume來實現
2).數據接入
因爲採集數據的速度和數據處理的速度不必定同步,所以添加一個消息中間件來做爲緩衝,選用apache的kafka
3).流式計算
對採集到的數據進行實時分析,選用apache的storm
4).數據輸出
對分析後的結果持久化,暫定用mysql
另外一方面是模塊化以後,假如當Storm掛掉了以後,數據採集和數據接入仍是繼續在跑着,數據不會丟失,storm起來以後能夠繼續進行流式計算;apache
數據來源flume編程
Kafka生產的數據,是由Flume的Sink提供的,這裏咱們須要用到Flume集羣,經過Flume集羣將Agent的日誌收集分發到 Kafka。咱們根據狀況選擇合適的source,這裏我用的是exec,channel是memory,sink固然就是kafkasink。詳細配置以下:模塊化
flume到kafka工具
flume到kafka的傳輸過程以下圖:oop
kafka的配置跟以前搭建的沒有什麼改動。測試
測試flume到kafkaui
flume和kafka配置好之後,先啓動flume集羣,這裏是後臺運行:
flume-ng agent -n agent -c /usr/local/apache-flume-1.6.0-bin/conf -f /usr/local/apache-flume-1.6.0-bin/conf/flume-test.conf -Dflume.root.logger=DEBUG,console &
而後啓動zookeeper:
./zkServer.sh start
接着啓動kafka集羣,這裏是後臺運行:
./kafka-server-start.sh ../config/server.properties &
而後向監控的文件裏輸入數據:
echo 'hello world' >> topic-test.txt
接着在kafka集羣上建立消費者,測試flume到kafka是否聯通,固然也可使用kafka監控工具查看:
咱們能夠事先建立好topic,固然咱們也能夠自動建立topic,設置kafka auto.create.topics.enable屬性爲true,默認就爲true。
./kafka-console-consumer.sh --zookeeper master:2181 --from-beginning --topic topic1
這邊輸出'hello world'則代表flume到kafka鏈接成功。
storm讀取kafka數據分析編程
首先搭建好storm集羣,啓動nimbus、supervisor、ui
而後topology編程,我這裏是java編程的一個小例子:
主類
package com.kafka_storm; import java.util.HashMap; import java.util.Map; import storm.kafka.BrokerHosts; import storm.kafka.KafkaSpout; import storm.kafka.SpoutConfig; import storm.kafka.ZkHosts; import storm.kafka.bolt.KafkaBolt; import backtype.storm.Config; import backtype.storm.LocalCluster; import backtype.storm.StormSubmitter; import backtype.storm.spout.SchemeAsMultiScheme; import backtype.storm.topology.TopologyBuilder; import backtype.storm.utils.Utils; public class StormKafkaTopo { public static void main(String[] args) throws Exception { // 配置Zookeeper地址 BrokerHosts brokerHosts = new ZkHosts("master:2181"); // 配置Kafka訂閱的Topic,以及zookeeper中數據節點目錄和名字 //這裏須要注意的是,spout會根據config的後面兩個參數在zookeeper上爲每一個kafka分區建立保存讀取偏移的節點,如:/zkroot/topo/partition_0。 SpoutConfig spoutConfig = new SpoutConfig(brokerHosts, "topic1", "/zkkafkaspout" , "kafkaspout"); // 配置KafkaBolt中的kafka.broker.properties(能夠參考kafka java編程) Config conf = new Config(); Map<String, String> map = new HashMap<String, String>(); // 配置Kafka broker地址 map.put("metadata.broker.list", "master:9092"); // serializer.class爲消息的序列化類 map.put("serializer.class", "kafka.serializer.StringEncoder"); conf.put("kafka.broker.properties", map); // 配置KafkaBolt生成的topic conf.put("topic", "topic2"); //默認狀況下,spout下會發射域名爲bytes的binary數據,若是有須要,能夠經過設置schema進行修改。 spoutConfig.scheme = new SchemeAsMultiScheme(new MessageScheme()); TopologyBuilder builder = new TopologyBuilder(); builder.setSpout("spout", new KafkaSpout(spoutConfig)); builder.setBolt("bolt", new SenqueceBolt()).shuffleGrouping("spout"); builder.setBolt("kafkabolt", new KafkaBolt<String, Integer>()).shuffleGrouping("bolt"); if (args != null && args.length > 0) { conf.setNumWorkers(3); StormSubmitter.submitTopology(args[0], conf, builder.createTopology()); } else { LocalCluster cluster = new LocalCluster(); cluster.submitTopology("Topo", conf, builder.createTopology()); Utils.sleep(100000); cluster.killTopology("Topo"); cluster.shutdown(); } } }
消息處理
package com.kafka_storm; import java.io.UnsupportedEncodingException; import java.util.List; import backtype.storm.spout.Scheme; import backtype.storm.tuple.Fields; import backtype.storm.tuple.Values; /** * 使用KafkaSpout時須要子集實現Scheme接口,它主要負責從消息流中解析出須要的數據 * @author lenovo * */ public class MessageScheme implements Scheme { /* (non-Javadoc) * @see backtype.storm.spout.Scheme#deserialize(byte[]) */ public List<Object> deserialize(byte[] ser) { try { String msg = new String(ser, "UTF-8"); return new Values(msg); } catch (UnsupportedEncodingException e) { } return null; } /* (non-Javadoc) * @see backtype.storm.spout.Scheme#getOutputFields() */ public Fields getOutputFields() { // TODO Auto-generated method stub return new Fields("msg"); } }
bolt
package com.kafka_storm; import backtype.storm.topology.BasicOutputCollector; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.topology.base.BaseBasicBolt; import backtype.storm.tuple.Fields; import backtype.storm.tuple.Tuple; import backtype.storm.tuple.Values; public class SenqueceBolt extends BaseBasicBolt{ /* (non-Javadoc) * @see backtype.storm.topology.IBasicBolt#execute(backtype.storm.tuple.Tuple, backtype.storm.topology.BasicOutputCollector) */ public void execute(Tuple input, BasicOutputCollector collector) { // TODO Auto-generated method stub String word = (String) input.getValue(0); String out = "I'm " + word + "!"; System.out.println("out=" + out); collector.emit(new Values(out)); } /* (non-Javadoc) * @see backtype.storm.topology.IComponent#declareOutputFields(backtype.storm.topology.OutputFieldsDeclarer) */ public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("message")); } }
在集羣上運行
咱們要將引入的第三方包所有放到storm的lib包下面,包括kafka、zookeeper的,不然會報缺失jar包的錯
storm jar StormKafkaDemo.jar com.kafka_storm.StormKafkaTopo StormKafkaDemo
開始整體測試:
向flume監控的文件輸入數據,在storm的log日誌裏查看輸出,固然咱們也能夠在kafka裏查看,由於我將結果輸出到kafka裏了,topic爲topic2。
日誌裏結果以下:
到此,flume+kafka+storm結合使用結束