2、Storm入門之Hello Storm

本章咱們將會建立一個Storm工程和咱們的第一個Storm topology。html

提示:下述假設你已經安裝JRE1.6或者更高級版本。推薦使用Oracle提供的JRE:http://www.java.com/downloads/.java

操做模式

在開始建立項目以前,瞭解Storm的操做模式(operation modes)是很重要的。Storm有兩種運行方式:git

本地模式github

在本地模式下,Storm topologies 運行在本地機器的一個JVM中。由於本地模式下查看全部topology組件共同工做最爲簡單,因此這種模式被用於開發、測試和調試。例如,咱們能夠調整參數,這使得咱們能夠看到咱們的topology在不一樣的Storm配置環境中是如何運行的。爲了以本地模式運行topologies,咱們須要下載Storm的開發依賴包,這是咱們開發、測試topologies所需的全部東西。當咱們創建本身的第一個Storm工程的時候咱們很快就能夠看到是怎麼回事了。web

提示:本地模式下運行一個topology同Storm集羣中運行相似。然而,確保全部組件線程安全很是重要,由於當它們被部署到遠程模式時,它們可能運行在不一樣的JVM或者不一樣的物理機器上,此時,它們之間不能直接交流或者共享內存。apache

在本章的全部示例中,咱們都以本地模式運行。安全

遠程模式架構

在遠程模式下,咱們將topology提交到Storm集羣中,Storm集羣由許多進程組成,這些進程一般運行在不一樣的機器上。遠程模式下不顯示調試信息,這也是它被認爲是產品模式的緣由。然而,在一臺機器上建立一個Storm集羣也是可能的,而且在部署至產品前這樣作仍是一個好方法,它能夠確保未來在一個成熟的產品環境中運行topology不會出現任何問題。併發

譯者的話:所謂產品環境/模式,指的是代碼比較成熟,能夠當成產品發佈了,與開發環境相對。app

在第六章中能夠了解到更多關於遠程模式的內容,我會在附錄B中展現如何安裝一個集羣。

Hello World Storm

在這個項目中,咱們會創建一個簡單的topology來統計單詞個數,咱們能夠將它當作是Storm topologies中的「Hello World」。然而,它又是一個很是強大的topology,由於它幾乎能夠擴展到無限大小,而且通過小小的修改,咱們甚至可使用它建立一個統計系統。例如,咱們能夠修改本項目來找到Twitter上的熱門話題。

爲了創建這個topology,咱們將使用一個spout來負責從文件中讀取單詞,第一個bolt來標準化單詞,第二個bolt去統計單詞個數,如圖2-1所示:

topology入門圖2-1.topology入門

你能夠在https://github.com/storm-book/examples-ch02-getting_started/zipball/master下載本例源碼的ZIP文件。

譯者的話:本站有備份:http://www.flyne.org/example/storm/storm-book-examples-ch02-getting_started-8e42636.zip

提示:若是你使用git(一個分佈式的版本控制和源碼管理工具),則能夠運行命令:git clone git@github.com:storm-book/examplesch02-getting_started.git進入你想要下載的源碼所在的目錄。

檢查Java安裝

搭建環境的第一步就是檢查正在運行的Java版本。運行java -version命令,咱們能夠看到相似以下信息:

java -version
java version 「1.6.0_26″
Java(TM) SE Runtime Environment(build 1.6.0_26-b03)
Java HotSpot(TM) Server VM (build 20.1-b02,mixed mode)

建立項目

首先,建立一個文件夾,用於存放這個應用(就像對於任何Java應用同樣),該文件夾包含了整個項目的源代碼。

接着咱們須要下載Storm的依賴包——添加到本應用classpath的jar包集合。能夠經過下面兩種方式完成:

  • 下載依賴包,解壓,並將它們加入classpath路徑

  • 使用Apache Maven

提示:Maven是一個軟件項目管理工具,能夠用於管理一個項目開發週期中的多個方面(從從依賴包到發佈構建過程),在本書中咱們會普遍使用Maven。可使用mvn命令檢查maven是否安裝,若是未安裝,能夠從http://maven.apache.org/download.html下載。

下一步咱們須要新建一個pom.xml文件(pom:project object model,項目的對象模型)去定義項目的結構,該文件描述了依賴包、封裝、源碼等等。這裏咱們將使用由nathanmarz構建的依賴包和Maven庫,這些依賴包能夠在https://github.com/nathanmarz/storm/wiki/Maven找到。

提示:Storm的Maven依賴包引用了在本地模式下運行Storm所需的全部函數庫。

使用這些依賴包,咱們能夠寫一個包含運行topology基本的必要組件的pom.xml文件:

<project xmlns="http://maven.apache.org/POM/4.0.0"
     xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
     xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
 
    <modelVersion>4.0.0</modelVersion>
    <groupId>storm.book</groupId>
    <artifactId>Getting-Started</artifactId>
    <version>0.0.1-SNAPSHOT</version>
 
    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>2.3.2</version>
                <configuration>
                    <source>1.6</source>
                    <target>1.6</target>
                    <compilerVersion>1.6</compilerVersion>
                </configuration>
            </plugin>
        </plugins>
    </build>
 
    <repositories>
        <!-- Repository where we can found the storm dependencies -->
        <repository>
            <id>clojars.org</id>
            <url>http://clojars.org/repo</url>
        </repository>
    </repositories>
 
    <dependencies>
        <!-- Storm Dependency -->
        <dependency>
            <groupId>storm</groupId>
            <artifactId>storm</artifactId>
            <version>0.6.0</version>
        </dependency>
    </dependencies>
 
</project>

前幾行指定了項目名稱、版本;而後咱們添加了一個編譯器插件,該插件告訴Maven咱們的代碼應該用Java1.6編譯;接着咱們定義庫(repository)(Maven支持同一個項目的多個庫),clojars是Storm依賴包所在的庫,Maven會自動下載本地模式運行Storm須要的全部子依賴包。

本項目的目錄結構以下,它是一個典型的Maven Java項目。

application structure

java目錄下的文件夾包含了咱們的源代碼,而且咱們會將咱們的單詞文件放到resources文件夾中來處理。

建立第一個topology

爲創建咱們第一個topology,咱們要建立運行本例(統計單詞個數)的全部的類。本階段例子中的有些部分不清楚很正常,咱們將在接下來的幾個章節中進一步解釋它們。

Spout(WordReader類)

WordReader類實現了IRichSpout接口,該類負責讀取文件並將每一行發送到一個bolt中去。

提示:spout發送一個定義字段(field)的列表,這種架構容許你有多種bolt讀取相同的spout流,而後這些bolt能夠定義字段(field)供其餘bolt消費。

例2-1包含WordReader類的完整代碼(後面會對代碼中的每一個部分進行分析)

例2-1.src/main/java/spouts/WordReader.java

package spouts;
 
import java.io.BufferedReader;
import java.io.FileNotFoundException;
import java.io.FileReader;
import java.util.Map;
 
import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.IRichSpout;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
 
public class WordReader implements IRichSpout{
    private SpoutOutputCollector collector;
    private FileReader fileReader;
    private boolean completed=false;
    private TopologyContext context;
 
    public boolean isDistributed(){return false;}
 
    public void ack(Object msgId) {
        System.out.println("OK:"+msgId);
    }
 
    public void close(){}
 
    public void fail(Object msgId) {
        System.out.println("FAIL:"+msgId);
    }
 
    /**
    * 該方法用於讀取文件併發送文件中的每一行
    */
 
    public void nextTuple() {
    /**
    * The nextuple it is called forever, so if we have beenreaded the file
    * we will wait and then return
    */
        if(completed){
            try {
            Thread.sleep(1000);
            } catch(InterruptedException e) {
            //Do nothing
            }
            return;
        }
        String str;
        //Open the reader
        BufferedReader reader =new BufferedReader(fileReader);
        try{
        //Read all lines
            while((str=reader.readLine())!=null){
        /**
        * By each line emmit a new value with the line as a their
        */
                this.collector.emit(new Values(str),str);
            }
        }catch(Exception e){
            throw new RuntimeException("Errorreading tuple",e);
        }finally{
            completed = true;
        }
    }
 
    /**
    * We will create the file and get the collector object
    */
 
    public void open(Map conf,TopologyContext context,SpoutOutputCollector collector) {
        try {
            this.context=context;
            this.fileReader=new FileReader(conf.get("wordsFile").toString());
        } catch(FileNotFoundException e) {
            throw new RuntimeException("Errorreading file["+conf.get("wordFile")+"]");
        }
        this.collector=collector;
    }
 
    /**
    * 聲明輸出字段「line」
    */
 
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("line"));
    }
 
}

在任何spout中調用的第一個方法都是open()方法,該方法接收3個參數:

  • TopologyContext:它包含了全部的topology數據

  • conf對象:在topology定義的時候被建立

  • SpoutOutputCollector:該類的實例可讓咱們發送將被bolts處理的數據。

下面的代碼塊是open()方法的實現:

public void open(Map conf,TopologyContext context,SpoutOutputCollector collector) {
    try {
        this.context=context;
        this.fileReader=new FileReader(conf.get("wordsFile").toString());
    } catch(FileNotFoundException e) {
        throw new RuntimeException("Errorreading file["+conf.get("wordFile")+"]");
    }
    this.collector=collector;
}

在open()方法中,咱們也建立了reader,它負責讀文件。接着,咱們須要實現nextTuple()方法,在該方法中發送要被bolt處理的值(values)。在咱們的例子中,這個方法讀文件而且每行發送一個值。

public void nextTuple() {
    if(completed){
        try {
        Thread.sleep(1000);
        } catch(InterruptedException e) {
        //Do nothing
        }
        return;
    }
    String str;
    //Open the reader
    BufferedReader reader =new BufferedReader(fileReader);
    try{
    //Read all lines
        while((str=reader.readLine())!=null){
    /**
    * By each line emmit a new value with the line as a their
    */
            this.collector.emit(new Values(str),str);
        }
    }catch(Exception e){
        throw new RuntimeException("Errorreading tuple",e);
    }finally{
        completed = true;
    }
}

 

提示:Values類是ArrayList的一個實現,將列表中的元素傳遞到構造方法中。

nextTuple()方法被週期性地調用(和ack()、fail()方法相同的循環),當沒有工做要作時,nextTuple()方法必須釋放對線程的控制,以便其餘的方法有機會被調用。所以必須在nextTuple()第一行檢查處理是否完成,若是已經完成,在返回前至少應該休眠1秒來下降處理器的負載,若是還有工做要作,則將文件中的每一行讀取爲一個值併發送出去。

提示:元組(tuple)是一個值的命名列表,它能夠是任何類型的Java對象(只要這個對象是能夠序列化的)。默認狀況下,Storm能夠序列化的經常使用類型有strings、byte arrays、ArrayList、HashMap和HashSet。

 

Bolt(WordNormalizer&WordCounter類)

上面咱們設計了一個spout來讀取文件,而且每讀取一行發送一個元組(tuple)。如今,咱們須要建立兩個bolt處理這些元組(見圖2-1)。這些bolt實現了IRichBolt接口。

在bolt中,最重要的方法是execute()方法,每當bolt收到一個元組,該方法就會被調用一次,對於每一個收到的元組,該bolt處理完以後又會發送幾個bolt。

提示:一個spout或bolt能夠發送多個tuple,當nextTuple()或execute()方法被調用時,它們能夠發送0、1或者多個元組。在第五章中你將會了解到更多。

第一個bolt,WordNormalizer,負責接收每一行,而且將行標準化——它將行分解爲一個個的單詞後轉化成小寫,而且消除單詞先後的空格。

首先,咱們須要聲明bolt的輸出參數:

public void declareOutputFields(OutputFieldsDeclarer declarer) { 
         declarer.declare(new Fields("word")); 
}

這兒,咱們聲明bolt發送一個命名爲「word」的字段。

接着,咱們實現execute方法,輸入的tuple將會在這個方法中被處理:

public void execute(Tuple input) {
    String sentence = input.getString(0);
    String[]words= sentence.split(" ");
    for(String word:words){
        word =word.trim();
        if(!word.isEmpty()){
            word =word.toLowerCase();
            //Emit the word
            List a =new ArrayList();
            a.add(input);
            collector.emit(a,new Values(word));
        }
    }
    // Acknowledge the tuple
    collector.ack(input);
}

 

第一行讀取元組中的值,能夠按照位置或者字段命名讀取。值被處理後使用collector對象發送出去。當每一個元組被處理完以後,就會調用collector的ack()方法,代表該tuple成功地被處理。若是tuple不能被處理,則應該調用collector的fail()方法。

例2-2包含這個類的完整代碼。

例2-2.src/main/java/bolts/WordNormalizer.java

下一頁

相關文章
相關標籤/搜索