本章咱們將會建立一個Storm工程和咱們的第一個Storm topology。html
提示:下述假設你已經安裝JRE1.6或者更高級版本。推薦使用Oracle提供的JRE:http://www.java.com/downloads/.java
在開始建立項目以前,瞭解Storm的操做模式(operation modes)是很重要的。Storm有兩種運行方式:git
本地模式github
在本地模式下,Storm topologies 運行在本地機器的一個JVM中。由於本地模式下查看全部topology組件共同工做最爲簡單,因此這種模式被用於開發、測試和調試。例如,咱們能夠調整參數,這使得咱們能夠看到咱們的topology在不一樣的Storm配置環境中是如何運行的。爲了以本地模式運行topologies,咱們須要下載Storm的開發依賴包,這是咱們開發、測試topologies所需的全部東西。當咱們創建本身的第一個Storm工程的時候咱們很快就能夠看到是怎麼回事了。web
提示:本地模式下運行一個topology同Storm集羣中運行相似。然而,確保全部組件線程安全很是重要,由於當它們被部署到遠程模式時,它們可能運行在不一樣的JVM或者不一樣的物理機器上,此時,它們之間不能直接交流或者共享內存。apache
在本章的全部示例中,咱們都以本地模式運行。安全
遠程模式架構
在遠程模式下,咱們將topology提交到Storm集羣中,Storm集羣由許多進程組成,這些進程一般運行在不一樣的機器上。遠程模式下不顯示調試信息,這也是它被認爲是產品模式的緣由。然而,在一臺機器上建立一個Storm集羣也是可能的,而且在部署至產品前這樣作仍是一個好方法,它能夠確保未來在一個成熟的產品環境中運行topology不會出現任何問題。併發
譯者的話:所謂產品環境/模式,指的是代碼比較成熟,能夠當成產品發佈了,與開發環境相對。app
在第六章中能夠了解到更多關於遠程模式的內容,我會在附錄B中展現如何安裝一個集羣。
在這個項目中,咱們會創建一個簡單的topology來統計單詞個數,咱們能夠將它當作是Storm topologies中的「Hello World」。然而,它又是一個很是強大的topology,由於它幾乎能夠擴展到無限大小,而且通過小小的修改,咱們甚至可使用它建立一個統計系統。例如,咱們能夠修改本項目來找到Twitter上的熱門話題。
爲了創建這個topology,咱們將使用一個spout來負責從文件中讀取單詞,第一個bolt來標準化單詞,第二個bolt去統計單詞個數,如圖2-1所示:
你能夠在https://github.com/storm-book/examples-ch02-getting_started/zipball/master下載本例源碼的ZIP文件。
譯者的話:本站有備份:http://www.flyne.org/example/storm/storm-book-examples-ch02-getting_started-8e42636.zip
提示:若是你使用git(一個分佈式的版本控制和源碼管理工具),則能夠運行命令:git clone git@github.com:storm-book/examplesch02-getting_started.git進入你想要下載的源碼所在的目錄。
檢查Java安裝
搭建環境的第一步就是檢查正在運行的Java版本。運行java -version命令,咱們能夠看到相似以下信息:
java -version
java version 「1.6.0_26″
Java(TM) SE Runtime Environment(build 1.6.0_26-b03)
Java HotSpot(TM) Server VM (build 20.1-b02,mixed mode)
首先,建立一個文件夾,用於存放這個應用(就像對於任何Java應用同樣),該文件夾包含了整個項目的源代碼。
接着咱們須要下載Storm的依賴包——添加到本應用classpath的jar包集合。能夠經過下面兩種方式完成:
下載依賴包,解壓,並將它們加入classpath路徑
使用Apache Maven
提示:Maven是一個軟件項目管理工具,能夠用於管理一個項目開發週期中的多個方面(從從依賴包到發佈構建過程),在本書中咱們會普遍使用Maven。可使用mvn命令檢查maven是否安裝,若是未安裝,能夠從http://maven.apache.org/download.html下載。
下一步咱們須要新建一個pom.xml文件(pom:project object model,項目的對象模型)去定義項目的結構,該文件描述了依賴包、封裝、源碼等等。這裏咱們將使用由nathanmarz構建的依賴包和Maven庫,這些依賴包能夠在https://github.com/nathanmarz/storm/wiki/Maven找到。
提示:Storm的Maven依賴包引用了在本地模式下運行Storm所需的全部函數庫。
使用這些依賴包,咱們能夠寫一個包含運行topology基本的必要組件的pom.xml文件:
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>storm.book</groupId> <artifactId>Getting-Started</artifactId> <version>0.0.1-SNAPSHOT</version> <build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <version>2.3.2</version> <configuration> <source>1.6</source> <target>1.6</target> <compilerVersion>1.6</compilerVersion> </configuration> </plugin> </plugins> </build> <repositories> <!-- Repository where we can found the storm dependencies --> <repository> <id>clojars.org</id> <url>http://clojars.org/repo</url> </repository> </repositories> <dependencies> <!-- Storm Dependency --> <dependency> <groupId>storm</groupId> <artifactId>storm</artifactId> <version>0.6.0</version> </dependency> </dependencies> </project>
前幾行指定了項目名稱、版本;而後咱們添加了一個編譯器插件,該插件告訴Maven咱們的代碼應該用Java1.6編譯;接着咱們定義庫(repository)(Maven支持同一個項目的多個庫),clojars是Storm依賴包所在的庫,Maven會自動下載本地模式運行Storm須要的全部子依賴包。
本項目的目錄結構以下,它是一個典型的Maven Java項目。
java目錄下的文件夾包含了咱們的源代碼,而且咱們會將咱們的單詞文件放到resources文件夾中來處理。
爲創建咱們第一個topology,咱們要建立運行本例(統計單詞個數)的全部的類。本階段例子中的有些部分不清楚很正常,咱們將在接下來的幾個章節中進一步解釋它們。
Spout(WordReader類)
WordReader類實現了IRichSpout接口,該類負責讀取文件並將每一行發送到一個bolt中去。
提示:spout發送一個定義字段(field)的列表,這種架構容許你有多種bolt讀取相同的spout流,而後這些bolt能夠定義字段(field)供其餘bolt消費。
例2-1包含WordReader類的完整代碼(後面會對代碼中的每一個部分進行分析)
例2-1.src/main/java/spouts/WordReader.java
package spouts; import java.io.BufferedReader; import java.io.FileNotFoundException; import java.io.FileReader; import java.util.Map; import backtype.storm.spout.SpoutOutputCollector; import backtype.storm.task.TopologyContext; import backtype.storm.topology.IRichSpout; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.tuple.Fields; import backtype.storm.tuple.Values; public class WordReader implements IRichSpout{ private SpoutOutputCollector collector; private FileReader fileReader; private boolean completed=false; private TopologyContext context; public boolean isDistributed(){return false;} public void ack(Object msgId) { System.out.println("OK:"+msgId); } public void close(){} public void fail(Object msgId) { System.out.println("FAIL:"+msgId); } /** * 該方法用於讀取文件併發送文件中的每一行 */ public void nextTuple() { /** * The nextuple it is called forever, so if we have beenreaded the file * we will wait and then return */ if(completed){ try { Thread.sleep(1000); } catch(InterruptedException e) { //Do nothing } return; } String str; //Open the reader BufferedReader reader =new BufferedReader(fileReader); try{ //Read all lines while((str=reader.readLine())!=null){ /** * By each line emmit a new value with the line as a their */ this.collector.emit(new Values(str),str); } }catch(Exception e){ throw new RuntimeException("Errorreading tuple",e); }finally{ completed = true; } } /** * We will create the file and get the collector object */ public void open(Map conf,TopologyContext context,SpoutOutputCollector collector) { try { this.context=context; this.fileReader=new FileReader(conf.get("wordsFile").toString()); } catch(FileNotFoundException e) { throw new RuntimeException("Errorreading file["+conf.get("wordFile")+"]"); } this.collector=collector; } /** * 聲明輸出字段「line」 */ public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("line")); } }
在任何spout中調用的第一個方法都是open()方法,該方法接收3個參數:
TopologyContext:它包含了全部的topology數據
conf對象:在topology定義的時候被建立
SpoutOutputCollector:該類的實例可讓咱們發送將被bolts處理的數據。
下面的代碼塊是open()方法的實現:
public void open(Map conf,TopologyContext context,SpoutOutputCollector collector) { try { this.context=context; this.fileReader=new FileReader(conf.get("wordsFile").toString()); } catch(FileNotFoundException e) { throw new RuntimeException("Errorreading file["+conf.get("wordFile")+"]"); } this.collector=collector; }
在open()方法中,咱們也建立了reader,它負責讀文件。接着,咱們須要實現nextTuple()方法,在該方法中發送要被bolt處理的值(values)。在咱們的例子中,這個方法讀文件而且每行發送一個值。
public void nextTuple() { if(completed){ try { Thread.sleep(1000); } catch(InterruptedException e) { //Do nothing } return; } String str; //Open the reader BufferedReader reader =new BufferedReader(fileReader); try{ //Read all lines while((str=reader.readLine())!=null){ /** * By each line emmit a new value with the line as a their */ this.collector.emit(new Values(str),str); } }catch(Exception e){ throw new RuntimeException("Errorreading tuple",e); }finally{ completed = true; } }
提示:Values類是ArrayList的一個實現,將列表中的元素傳遞到構造方法中。
nextTuple()方法被週期性地調用(和ack()、fail()方法相同的循環),當沒有工做要作時,nextTuple()方法必須釋放對線程的控制,以便其餘的方法有機會被調用。所以必須在nextTuple()第一行檢查處理是否完成,若是已經完成,在返回前至少應該休眠1秒來下降處理器的負載,若是還有工做要作,則將文件中的每一行讀取爲一個值併發送出去。
提示:元組(tuple)是一個值的命名列表,它能夠是任何類型的Java對象(只要這個對象是能夠序列化的)。默認狀況下,Storm能夠序列化的經常使用類型有strings、byte arrays、ArrayList、HashMap和HashSet。
Bolt(WordNormalizer&WordCounter類)
上面咱們設計了一個spout來讀取文件,而且每讀取一行發送一個元組(tuple)。如今,咱們須要建立兩個bolt處理這些元組(見圖2-1)。這些bolt實現了IRichBolt接口。
在bolt中,最重要的方法是execute()方法,每當bolt收到一個元組,該方法就會被調用一次,對於每一個收到的元組,該bolt處理完以後又會發送幾個bolt。
提示:一個spout或bolt能夠發送多個tuple,當nextTuple()或execute()方法被調用時,它們能夠發送0、1或者多個元組。在第五章中你將會了解到更多。
第一個bolt,WordNormalizer,負責接收每一行,而且將行標準化——它將行分解爲一個個的單詞後轉化成小寫,而且消除單詞先後的空格。
首先,咱們須要聲明bolt的輸出參數:
public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("word")); }
這兒,咱們聲明bolt發送一個命名爲「word」的字段。
接着,咱們實現execute方法,輸入的tuple將會在這個方法中被處理:
public void execute(Tuple input) { String sentence = input.getString(0); String[]words= sentence.split(" "); for(String word:words){ word =word.trim(); if(!word.isEmpty()){ word =word.toLowerCase(); //Emit the word List a =new ArrayList(); a.add(input); collector.emit(a,new Values(word)); } } // Acknowledge the tuple collector.ack(input); }
第一行讀取元組中的值,能夠按照位置或者字段命名讀取。值被處理後使用collector對象發送出去。當每一個元組被處理完以後,就會調用collector的ack()方法,代表該tuple成功地被處理。若是tuple不能被處理,則應該調用collector的fail()方法。
例2-2包含這個類的完整代碼。
例2-2.src/main/java/bolts/WordNormalizer.java