配置Java環境變量 JAVA_HOME、Path、CLASSPATH三個值分別爲(按照本身安裝情況設置,此處供參考):java
D:\java\jdk1.8python
%JAVA_HOME%/bin;%JAVA_HOME%/jre/binmysql
.;%JAVA_HOME%/lib/dt.jar;%JAVA_HOME%/lib/tools.jar (要加.表示當前路徑)git
這是爲了測試安裝效果,咱們將部署 storm-starter project案例中word coun程序,用的是python寫的multi-lang bolt,使用python 2.7.11,安裝路徑在:github
C:\Python27\sql
Download Apache Zookeeper 3.4.8 ,解壓配置:apache
> cd zookeeper-3.4.8
> copy conf\zoo_sample.cfg conf\zoo.cfg
> .\bin\zkServer.cmdwindows
Storm的windows官方版尚未釋放,here.下載,源碼here下載。瀏覽器
注意1:
源碼必定要用這個版本,不然啓動會報各類錯誤,而這些錯誤和 jdk、python、zookeeper、eclipse 版本都無關。
http://dl.dropboxusercontent.com/s/iglqz73chkul1tu/storm-0.9.1-incubating-SNAPSHOT-12182013.zip
配置Storm環境變量
C:\storm-0.9.1-incubating-SNAPSHOT-12182013\
%STORM_HOME%\bin;C:\Python27\Lib\site-packages\;C:\Python27\Scripts\
此處與參考文章略有不一樣,下圖是參考文章給出的配置
JAVA_HOME已經在安裝JDK時手動配置了環境變量,而Python好像是默認自動就會配置好環境變量的,
個人Python目錄下沒有Scripts文件夾,暫時這樣配置就能夠了,不影響下面的使用。
注意2:
必定要在 STORM_HOME 目錄下執行後續命令,不然會報錯。
ERROR backtype.storm.event - Error when processing event
java.lang.RuntimeException: java.io.InvalidClassException: clojure.lang.APersistentMap; local class incompatible: stream classdesc serialVersionUID = 8648225932767613808, local class serialVersionUID = 270281984708184947
at backtype.storm.utils.Utils.deserialize(Utils.java:86) ~[storm-core-0.9.1-incubating-SNAPSHOT-12182013.jar:na]
> cd %STORM_HOME%
> storm nimbus
> cd %STORM_HOME%
> storm supervisor
> cd %STORM_HOME%
> storm ui
瀏覽器打開http://localhost:8080/ 可看到Storm運行。
部署這個jar在本地:
> storm jar storm-starter-0.0.1-SNAPSHOT-jar-with-dependencies.jar storm.starter.WordCountTopology WordCount -c nimbus.host=localhost
若是你刷新 Storm UI頁面,會看到 「WordCount」 topology顯示列出,點按連接確認它處理數據。
storm 求 csdn 密碼庫中密碼出現的 topN,並直接在 eclipse 中調試運行:
package com.bj.test.top10; /** * @Author:tester * @DateTime:2016年6月21日 下午7:58:45 * @Description: Spout做爲數據源,它實現了IRichSpout接口,功能是讀取一個文本文件並把它的每一行內容發送給bolt。 * @Version:1.0 */ import java.io.BufferedReader; import java.io.FileNotFoundException; import java.io.FileReader; import java.util.Map; import backtype.storm.spout.SpoutOutputCollector; import backtype.storm.task.TopologyContext; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.topology.base.BaseRichSpout; import backtype.storm.tuple.Fields; import backtype.storm.tuple.Values; public class PasswdSpout extends BaseRichSpout { private SpoutOutputCollector collector; private FileReader fileReader; private boolean completed = false; public void ack(Object msgId) { System.out.println("==============OK:" + msgId); } public void close() { } public void fail(Object msgId) { System.out.println("++++++++++++++FAIL:" + msgId); } /** * 這是Spout最主要的方法,在這裏咱們讀取文本文件,並把它的每一行發射出去(給bolt) * 這個方法會不斷被調用,爲了下降它對CPU的消耗,當任務完成時讓它sleep一下 * **/ public void nextTuple() { /** * The nextuple it is called forever, so if we have been readed the file * we will wait and then return */ if (completed) { try { Thread.sleep(1000); } catch (InterruptedException e) { // Do nothing } return; } String line; // Open the reader BufferedReader reader = new BufferedReader(fileReader); try { // Read all lines while ((line = reader.readLine()) != null) { String[] words = line.split("#"); String passwd = words[1].trim(); // Emit the word collector.emit(new Values(passwd)); /*for(String word : words){ word = word.trim(); if(!word.isEmpty()){ word = word.toLowerCase(); // Emit the word collector.emit(new Values(word)); } }*/ } } catch (Exception e) { throw new RuntimeException("Error reading tuple", e); } finally { completed = true; } } /** * 這是第一個方法,裏面接收了三個參數,第一個是建立Topology時的配置, * 第二個是全部的Topology數據,第三個是用來把Spout的數據發射給bolt * **/ public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { try { //獲取建立Topology時指定的要讀取的文件路徑 this.fileReader = new FileReader(conf.get("wordsFile").toString()); } catch (FileNotFoundException e) { throw new RuntimeException("Error reading file [" + conf.get("wordFile") + "]"); } //初始化發射器 this.collector = collector; } /** * Declare the output field "word" */ public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("word")); } } ///////////////////////////////////////////////////////////////////////////////////////////// package com.bj.test.top10; import java.util.HashMap; import java.util.Map; import java.util.NavigableMap; import java.util.TreeMap; import backtype.storm.task.TopologyContext; import backtype.storm.topology.BasicOutputCollector; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.topology.base.BaseBasicBolt; import backtype.storm.tuple.Tuple; import static com.bj.test.top10.SortMapByValue.*; public class Top10Bolt extends BaseBasicBolt { Integer id; String name; NavigableMap<String, Integer> counters; /** * Topology執行完畢的清理工做,好比關閉鏈接、釋放資源等操做都會寫在這裏 * 由於這只是個Demo,咱們用它來打印咱們的計數器 * */ @Override public void cleanup() { System.out.println(">>>>>>>>>>>> Word Counter ["+name+"-"+id+"] <<<<<<<<<<<"); /*for(Map.Entry<String, Integer> entry : counters.entrySet()){ System.out.println(entry.getKey()+": "+entry.getValue()); }*/ printMap(list2Map(sortMapByValuesTopN(counters, 10))); } /** * On create */ @Override public void prepare(Map stormConf, TopologyContext context) { this.counters = new TreeMap<String, Integer>().descendingMap(); this.name = context.getThisComponentId(); this.id = context.getThisTaskId(); } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) {} // Bolt中最重要的是execute方法,每當一個tuple傳過來時它便會被調用 @Override public void execute(Tuple input, BasicOutputCollector collector) { String word = input.getString(0); /** * If the word dosn't exist in the map we will create * this, if not We will add 1 */ if(!counters.containsKey(word)){ counters.put(word, 1); }else{ Integer count = counters.get(word) + 1; counters.put(word, count); } } } ///////////////////////////////////////////////////////////////////////////////////////////// package com.bj.test.top10; /** * @Author:tester * @DateTime:2016年6月21日 下午7:52:32 * @Description: * @Version:1.0 */ import backtype.storm.Config; import backtype.storm.LocalCluster; import backtype.storm.topology.TopologyBuilder; import backtype.storm.tuple.Fields; public class TopologyMain { public static void main(String[] args) throws InterruptedException { // 定義一個Topology TopologyBuilder builder = new TopologyBuilder(); // executor的數目, set parallelism hint to 4 builder.setSpout("PasswdSpout", new PasswdSpout(), 1); // set tasks number to 4 builder.setBolt("Top10Bolt", new Top10Bolt(), 1).setNumTasks(1).fieldsGrouping("PasswdSpout", new Fields("word")); // 配置 Config conf = new Config(); conf.put("wordsFile", "H:\\mysql\\csdn_database\\www.csdn.net.100.sql"); // conf.put("wordsFile", "H:\\mysql\\csdn_database\\www.csdn.net.sql"); conf.setDebug(false); conf.put(Config.TOPOLOGY_MAX_SPOUT_PENDING, 1); // use two worker processes // conf.setNumWorkers(4); // 建立一個本地模式cluster LocalCluster cluster = new LocalCluster(); // 提交Topology cluster.submitTopology("Getting-Started-Toplogie", conf, builder.createTopology()); Thread.sleep(1000); cluster.shutdown(); } }
[1] windows安裝storm
http://blog.csdn.net/jiutianhe/article/details/41211403
[2] storm異常集錦
http://ganliang13.iteye.com/blog/2117722
http://bimoziyan.iteye.com/blog/1981116
[2] storm教程2、安裝部署
http://www.cnblogs.com/jinhong-lu/p/4634912.html
[3] Storm實戰之WordCount
http://m635674608.iteye.com/blog/2221179
[4] Storm的並行度、Grouping策略以及消息可靠處理機制簡介
http://m635674608.iteye.com/blog/2232221
[5] Storm的滑動窗口
http://zqhxuyuan.github.io/2015/09/10/2015-09-10-Storm-Window/
[6] [Storm中文文檔]Trident教程
http://blog.csdn.net/lujinhong2/article/details/47132313
[7] Storm Trident API 實踐
http://blog.csdn.net/suifeng3051/article/details/41118721
[8] flume+kafka+storm運行實例
http://my.oschina.net/u/2000675/blog/613747
[9] Kafka+Storm+HDFS整合實踐