咱們知道storm的做用主要是進行流式計算,對於源源不斷的均勻數據流流入處理是很是有效的,而現實生活中大部分場景並非均勻的數據流,而是時而多時而少的數據流入,這種狀況下顯然用批量處理是不合適的,若是使用storm作實時計算的話可能由於數據擁堵而致使服務器掛掉,應對這種狀況,使用kafka做爲消息隊列是很是合適的選擇,kafka能夠將不均勻的數據轉換成均勻的消息流,從而和storm比較完善的結合,這樣才能夠實現穩定的流式計算,那麼咱們接下來開發一個簡單的案例來實現storm和kafka的結合html
storm和kafka結合,實質上無非是以前咱們說過的計算模式結合起來,就是數據先進入kafka生產者,而後storm做爲消費者進行消費,最後將消費後的數據輸出或者保存到文件、數據庫、分佈式存儲等等,具體框圖以下:java
這張圖片摘自博客地址:http://www.cnblogs.com/tovin/p/3974417.html 在此感謝做者的奉獻數據庫
首先咱們保證在服務器上zookeeper、kafka、storm正常運行,而後咱們開始寫程序,這裏使用eclipse for javaee IDEapache
和以前同樣,創建一個maven項目,在pom.xml寫入以下代碼:vim
1 <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" 2 xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> 3 <modelVersion>4.0.0</modelVersion> 4 5 <groupId>kafkastorm</groupId> 6 <artifactId>kafkastorm</artifactId> 7 <version>0.0.1-SNAPSHOT</version> 8 <packaging>jar</packaging> 9 10 <name>kafkastorm</name> 11 <url>http://maven.apache.org</url> 12 13 <properties> 14 <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> 15 </properties> 16 17 <dependencies> 18 <dependency> 19 <groupId>junit</groupId> 20 <artifactId>junit</artifactId> 21 <version>3.8.1</version> 22 <scope>test</scope> 23 </dependency> 24 <dependency> 25 <groupId>org.apache.storm</groupId> 26 <artifactId>storm-core</artifactId> 27 <version>0.9.6</version> 28 <scope>provided</scope> 29 </dependency> 30 <dependency> 31 <groupId>org.apache.kafka</groupId> 32 <artifactId>kafka_2.9.2</artifactId> 33 <version>0.8.2.2</version> 34 <exclusions> 35 <exclusion> 36 <groupId>org.apache.zookeeper</groupId> 37 <artifactId>zookeeper</artifactId> 38 </exclusion> 39 <exclusion> 40 <groupId>log4j</groupId> 41 <artifactId>log4j</artifactId> 42 </exclusion> 43 </exclusions> 44 </dependency> 45 <dependency> 46 <groupId>org.apache.storm</groupId> 47 <artifactId>storm-kafka</artifactId> 48 <version>0.9.6</version> 49 </dependency> 50 </dependencies> 51 52 <build> 53 <plugins> 54 <plugin> 55 <artifactId>maven-assembly-plugin</artifactId> 56 <configuration> 57 <descriptorRefs> 58 <descriptorRef>jar-with-dependencies</descriptorRef> 59 </descriptorRefs> 60 </configuration> 61 <executions> 62 <execution> 63 <id>make-assembly</id> 64 <phase>package</phase> 65 <goals> 66 <goal>single</goal> 67 </goals> 68 </execution> 69 </executions> 70 </plugin> 71 </plugins> 72 </build> 73 </project>
主要是導入的zookeeper、storm、kafka外部依賴這些疊加起來,還有<plugin>插件便於咱們後續對程序進程maven的打包windows
和以前同樣首先編寫storm消費kafka的邏輯,MessageScheme類,代碼以下:api
1 package net.zengzhiying; 2 3 import java.io.UnsupportedEncodingException; 4 import java.util.List; 5 6 import backtype.storm.spout.Scheme; 7 import backtype.storm.tuple.Fields; 8 import backtype.storm.tuple.Values; 9 10 public class MessageScheme implements Scheme { 11 12 public List<Object> deserialize(byte[] arg0) { 13 try { 14 String msg = new String(arg0, "UTF-8"); 15 return new Values(msg); 16 } catch (UnsupportedEncodingException e) { 17 e.printStackTrace(); 18 } 19 return null; 20 } 21 22 public Fields getOutputFields() { 23 return new Fields("msg"); 24 } 25 26 }
邏輯很簡單,就是對kafka出來的數據轉換成字符串,接下來咱們想辦法來處理strom清洗以後的數據,咱們爲了簡單就把輸出保存到一個文件中,Bolt邏輯SenqueceBolt類的代碼以下:服務器
1 package net.zengzhiying; 2 3 import java.io.DataOutputStream; 4 import java.io.FileNotFoundException; 5 import java.io.FileOutputStream; 6 import java.io.IOException; 7 8 import backtype.storm.topology.BasicOutputCollector; 9 import backtype.storm.topology.OutputFieldsDeclarer; 10 import backtype.storm.topology.base.BaseBasicBolt; 11 import backtype.storm.tuple.Fields; 12 import backtype.storm.tuple.Tuple; 13 import backtype.storm.tuple.Values; 14 15 public class SenqueceBolt extends BaseBasicBolt { 16 17 public void execute(Tuple arg0, BasicOutputCollector arg1) { 18 String word = (String) arg0.getValue(0); 19 String out = "output:" + word; 20 System.out.println(out); 21 22 //寫文件 23 try { 24 DataOutputStream out_file = new DataOutputStream(new FileOutputStream("kafkastorm.out")); 25 out_file.writeUTF(out); 26 out_file.close(); 27 } catch (FileNotFoundException e) { 28 // TODO Auto-generated catch block 29 e.printStackTrace(); 30 } catch (IOException e) { 31 // TODO Auto-generated catch block 32 e.printStackTrace(); 33 } 34 35 arg1.emit(new Values(out)); 36 } 37 38 public void declareOutputFields(OutputFieldsDeclarer arg0) { 39 arg0.declare(new Fields("message")); 40 } 41 42 }
就是把輸出的消息放到文件kafkastorm.out中框架
而後咱們編寫主類,也就是配置kafka提交topology到storm的代碼,類名爲StormKafkaTopo,代碼以下:eclipse
1 package net.zengzhiying; 2 3 import java.util.HashMap; 4 import java.util.Map; 5 6 import backtype.storm.Config; 7 import backtype.storm.LocalCluster; 8 import backtype.storm.StormSubmitter; 9 import backtype.storm.generated.AlreadyAliveException; 10 import backtype.storm.generated.InvalidTopologyException; 11 import backtype.storm.spout.SchemeAsMultiScheme; 12 import backtype.storm.topology.TopologyBuilder; 13 import backtype.storm.utils.Utils; 14 import storm.kafka.BrokerHosts; 15 import storm.kafka.KafkaSpout; 16 import storm.kafka.SpoutConfig; 17 import storm.kafka.ZkHosts; 18 import storm.kafka.bolt.KafkaBolt; 19 20 public class StormKafkaTopo { 21 public static void main(String[] args) { 22 BrokerHosts brokerHosts = new ZkHosts("192.168.1.216:2181/kafka"); 23 24 SpoutConfig spoutConfig = new SpoutConfig(brokerHosts, "topic1", "/kafka", "kafkaspout"); 25 26 Config conf = new Config(); 27 Map<String, String> map = new HashMap<String, String>(); 28 29 map.put("metadata.broker.list", "192.168.1.216:9092"); 30 map.put("serializer.class", "kafka.serializer.StringEncoder"); 31 conf.put("kafka.broker.properties", map); 32 conf.put("topic", "topic2"); 33 34 spoutConfig.scheme = new SchemeAsMultiScheme(new MessageScheme()); 35 36 TopologyBuilder builder = new TopologyBuilder(); 37 builder.setSpout("spout", new KafkaSpout(spoutConfig)); 38 builder.setBolt("bolt", new SenqueceBolt()).shuffleGrouping("spout"); 39 builder.setBolt("kafkabolt", new KafkaBolt<String, Integer>()).shuffleGrouping("bolt"); 40 41 if(args != null && args.length > 0) { 42 //提交到集羣運行 43 try { 44 StormSubmitter.submitTopology(args[0], conf, builder.createTopology()); 45 } catch (AlreadyAliveException e) { 46 e.printStackTrace(); 47 } catch (InvalidTopologyException e) { 48 e.printStackTrace(); 49 } 50 } else { 51 //本地模式運行 52 LocalCluster cluster = new LocalCluster(); 53 cluster.submitTopology("Topotest1121", conf, builder.createTopology()); 54 Utils.sleep(1000000); 55 cluster.killTopology("Topotest1121"); 56 cluster.shutdown(); 57 } 58 59 60 61 } 62 }
注意上面代碼的配置,和以前單獨運行storm和kafka代碼不太同樣,配置也很簡單,注意區別便可,若是細心的話會注意到這裏建了兩個topic一個是topic1,一個是topic2,topic1的含義kafka接收生產者過來的數據所須要的topic,topic2是KafkaBolt也就是storm中的bolt生成的topic,固然這裏topic2這行配置能夠省略,是沒有任何問題的,相似於一箇中轉的東西,另外咱們此次測試是上傳到服務器執行,本地模式的代碼沒有執行到,固然原理是同樣的
以前通常網上的教程到這裏就完畢了,這樣咱們會引發一種沒有生產者的誤區,注意:上面3個類實現的功能是kafka消費者輸出的數據被storm消費!生產者的代碼能夠當作獨立的其餘來源,能夠寫在其餘項目中,根據數據源的狀況來,下面咱們爲了示例,編寫一個類來進行生產,代碼和以前kafka單獨的同樣:
1 package net.zengzhiying; 2 3 import java.util.ArrayList; 4 import java.util.List; 5 import java.util.Properties; 6 7 import kafka.javaapi.producer.Producer; 8 import kafka.producer.KeyedMessage; 9 import kafka.producer.ProducerConfig; 10 11 public class DataProducerInsert { 12 private static Producer<Integer,String> producer; 13 private final Properties props=new Properties(); 14 public DataProducerInsert(){ 15 //定義鏈接的broker list 16 props.put("metadata.broker.list", "192.168.1.216:9092"); 17 //定義序列化類 Java中對象傳輸以前要序列化 18 props.put("serializer.class", "kafka.serializer.StringEncoder"); 19 //props.put("advertised.host.name", "192.168.1.216"); 20 producer = new Producer<Integer, String>(new ProducerConfig(props)); 21 } 22 public static void main(String[] args) { 23 DataProducerInsert sp=new DataProducerInsert(); 24 //定義topic 25 String topic="topic1"; 26 //開始時間統計 27 long startTime = System.currentTimeMillis(); 28 //定義要發送給topic的消息 29 String messageStr = "This is a message"; 30 List<KeyedMessage<Integer, String>> datalist = new ArrayList<KeyedMessage<Integer, String>>(); 31 32 //構建消息對象 33 KeyedMessage<Integer, String> data = new KeyedMessage<Integer, String>(topic, messageStr); 34 datalist.add(data); 35 36 //結束時間統計 37 long endTime = System.currentTimeMillis(); 38 KeyedMessage<Integer, String> data1 = new KeyedMessage<Integer, String>(topic, "用時" + (endTime-startTime)/1000.0); 39 datalist.add(data1); 40 41 //推送消息到broker 42 producer.send(data); 43 producer.close(); 44 } 45 }
注意,這裏咱們定義的topic是topic1,正好和前面的topic1數據源對應,是整個kafka保持一致的topic,也就是說kafka生產者topic和消費者topic是必須名稱相同才能夠響應,下面簡單添加了一點時間統計的代碼,也很簡單
另外還要注意kafka配置文件host.name儘可能改爲ip,和以前說過的同樣
到如今項目就編寫完成了,而後咱們使用maven命令對項目打包,首先得保證咱們windows上安裝好了maven,咱們運行cmd,進入到當前項目目錄下,執行命令: mvn assembly:assembly 進行打包,打包的前提就是以前pom.xml的全部配置,執行後maven會自動下載相應的依賴並完成打包,須要耐心等待一會:
看到如圖所示的BUILD SUCCESS返回以後,那麼打包就成功了,如今進入項目目錄下的target目錄中,會看到2個jar包
其中後面那個文件名較長的大小也比較大,是包含相關依賴的包,接下來咱們將這個包上傳到服務器,而後使用storm執行jar包將咱們的topology上傳到集羣中運行,注意是使用storm執行jar包,而不是java
/usr/storm/apache-storm-0.9.6/bin/storm jar kafkastorm-0.0.1-SNAPSHOT-jar-with-dependencies.jar net.zengzhiying.StormKafkaTopo kafkagostorm
前面是storm的絕對路徑,參數jar執行jar包,後面跟的是上傳topology的主類,最後kafkagostorm是咱們上傳拓撲的名稱
這裏執行完以後會回到命令行,如今就在後臺集羣中開始分發運行了,這就是集羣模式,以前咱們講的storm案例會不斷滾動大量數據,那個屬於本地模式,若是咱們如今開啓ui界面的話,那麼訪問咱們的地址http://192.168.1.216:8080/能夠看到正在運行的Topology
能夠看到狀態是active正在運行了,咱們上面代碼中kafkabolt建立了一個topic2的消息,咱們如今能夠測試一下,消費者這裏只是簡單地原樣輸出,咱們進入kafka目錄,執行下面命令:
bin/kafka-console-consumer.sh --zookeeper localhost:2181/kafka --topic topic2 --from-beginning
後面參數--from-beginning不添加也是能夠的,添加包括舊信息,不添加就是新的輸出
如今界面卡住,待會咱們來觀察輸出,如今咱們新開一個窗口,仍是使用storm執行剛纔的生產者類DataProducerInsert來生產一條消息,命令以下:
/usr/storm/apache-storm-0.9.6/bin/storm jar kafkastorm-0.0.1-SNAPSHOT-jar-with-dependencies.jar net.zengzhiying.DataProducerInsert
回車以後,等待界面滾動2s程序跑完以後,咱們看到另外一個窗口輸出了消息:
而後,咱們的輸出文件在哪呢,剛纔咱們使用storm執行的生產者代碼,因此輸出的kafkastorm.out就在storm的安裝目錄下,咱們使用cat或者vim均可以看到文件內容,若是有時間統計的話兩行內容顯示可能會有點問題,由於後續要進行簡單的轉換,去掉時間統計代碼只輸出消息的內容以下,這裏使用vim打開的:
另外注意,上傳拓撲時全部的代碼都加載到集羣了,因此修改代碼版本時,必定要先在storm目錄下執行 bin/storm kill topo_name 結束拓撲,修改代碼後從新上傳便可再次運行,不然可能會出現錯誤,在集羣上的時候kafka配置文件的host.name註釋便可,默認爲localhost,最後代碼中用到的參數比較多,很容易出錯,因此寫代碼時仍是要仔細點
這樣storm集成kafka的測試案例就完成了,而且實現了必定的功能,只要咱們靈活掌握了怎麼寫kafka和storm結合的總體拓撲結構,那麼主要的代碼就集中在數據源也就是kafka生產者的發送和storm消費後的存儲問題,這全部的代碼都是在storm和kafka給好的方法內寫邏輯,而不用關心底層,這樣使開發更加簡單快捷,好比咱們消費後的數據既能夠寫到文件、數據庫還能夠索引到solr,存到Hbase等,這樣就能夠靈活運用了;其實最關鍵的仍是要了解這些框架底層的實現原理,這樣遇到問題才能夠知其然知其因此然