基於Storm的WordCount

Storm WordCount 工做過程

Storm 版本:
一、Spout 從外部數據源中讀取數據,隨機發送一個元組對象出去;
二、SplitBolt 接收 Spout 中輸出的元組對象,將元組中的數據切分紅單詞,並將切分後的單詞發射出去;
三、WordCountBolt 接收 SplitBolt 中輸出的單詞數組,對裏面單詞的頻率進行累加,將累加後的結果輸出。html

Java 版本:
一、讀取文件中的數據,一行一行的讀取;
二、將讀到的數據進行切割;
三、對切割後的數組中的單詞進行計算。java

Hadoop 版本:
一、按行讀取文件中的數據;
二、在 Mapper()函數中對每一行的數據進行切割,並輸出切割後的數據數組;
三、接收 Mapper()中輸出的數據數組,在 Reducer()函數中對數組中的單詞進行計算,將計算後的統計結果輸出。apache

源代碼

storm的配置、eclipse裏maven的配置以及建立項目部分省略。數組

Mainclassapp

package com.test.stormwordcount;
import backtype.storm.Config; 
import backtype.storm.LocalCluster; 
import backtype.storm.StormSubmitter; 
import backtype.storm.generated.AlreadyAliveException; 
import backtype.storm.generated.InvalidTopologyException; 
import backtype.storm.topology.TopologyBuilder; 
import backtype.storm.tuple.Fields; 

public class MainClass { 

    public static void main(String[] args) throws AlreadyAliveException, InvalidTopologyException {         
        //建立一個 TopologyBuilder         
        TopologyBuilder tb = new TopologyBuilder();         
        tb.setSpout("SpoutBolt", new SpoutBolt(), 2);         tb.setBolt("SplitBolt", new SplitBolt(), 2).shuffleGrouping("SpoutBolt");         
        tb.setBolt("CountBolt", new CountBolt(), 4).fieldsGrouping("SplitBolt", new Fields("word"));         
        //建立配置         
        Config conf = new Config();         
        //設置 worker 數量         
        conf.setNumWorkers(2);         
        //提交任務         
        //集羣提交         
        //StormSubmitter.submitTopology("myWordcount", conf, tb.createTopology());         
        //本地提交         
        LocalCluster localCluster = new LocalCluster();         
        localCluster.submitTopology("myWordcount", conf, tb.createTopology()); 
    }  
}

SplitBolt 部分eclipse

package com.test.stormwordcount;
import java.util.Map; 
import backtype.storm.task.OutputCollector; 
import backtype.storm.task.TopologyContext; 
import backtype.storm.topology.OutputFieldsDeclarer; 
import backtype.storm.topology.base.BaseRichBolt; 
import backtype.storm.tuple.Fields; 
import backtype.storm.tuple.Tuple; 
import backtype.storm.tuple.Values; 

public class SplitBolt extends BaseRichBolt{      
    OutputCollector collector; 

    /**      * 初始化      */     
    public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {         
        this.collector = collector;     
        } 

    /**      * 執行方法      */     
    public void execute(Tuple input) {         
        String line = input.getString(0);         
        String[] split = line.split(" ");         
        for (String word : split) {             
            collector.emit(new Values(word));         
            }     
        } 

    /**      * 輸出      */     
    public void declareOutputFields(OutputFieldsDeclarer declarer) {         
        declarer.declare(new Fields("word"));     
        } 
}

CountBolt 部分maven

package com.test.stormwordcount;
import java.util.HashMap; 
import java.util.Map; 
import backtype.storm.task.OutputCollector; 
import backtype.storm.task.TopologyContext; 
import backtype.storm.topology.OutputFieldsDeclarer; 
import backtype.storm.topology.base.BaseRichBolt; 
import backtype.storm.tuple.Tuple; 

public class CountBolt extends BaseRichBolt{ 

    OutputCollector collector;
    Map<String, Integer> map = new HashMap<String, Integer>(); 

    /**      * 初始化      */     
    public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {         
        this.collector = collector;     
        } 


    /**      * 執行方法      */     
public void execute(Tuple input) {         
    String word = input.getString(0);         
    if(map.containsKey(word)){             
    Integer c = map.get(word);             
        map.put(word, c+1);         
        }else{             
        map.put(word, 1);         
        }         
    //測試輸出         
    System.out.println("結果:"+map);     
    } 

    /**      * 輸出      */     
public void declareOutputFields(OutputFieldsDeclarer declarer) {     
    
} 
}

SpoutBolt 部分函數

package com.test.stormwordcount;
import java.util.Map; 
import backtype.storm.spout.SpoutOutputCollector; 
import backtype.storm.task.TopologyContext; 
import backtype.storm.topology.OutputFieldsDeclarer; 
import backtype.storm.topology.base.BaseRichSpout; 
import backtype.storm.tuple.Fields; 
import backtype.storm.tuple.Values; 

public class SpoutBolt extends BaseRichSpout{ 

    SpoutOutputCollector collector;
    /**      * 初始化方法      */     
    public void open(Map map, TopologyContext context, SpoutOutputCollector collector) {         
        this.collector = collector;     
        } 

    /**      * 重複調用方法      */     
    public void nextTuple() {         
        collector.emit(new Values("hello world this is a test"));     
        } 

    /**      * 輸出      */     
    public void declareOutputFields(OutputFieldsDeclarer declarer) {         
        declarer.declare(new Fields("test"));     
        } 
}

POM.XML 文件內容oop

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<groupId>com.test</groupId>
<artifactId>stormwordcount</artifactId>
<version>0.9.6</version>
<packaging>jar</packaging>

<name>stormwordcount</name>
<url>http://maven.apache.org</url>

<properties>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
<dependencies>
    <dependency>
        <groupId>junit</groupId>
        <artifactId>junit</artifactId>
        <version>3.8.1</version>
        <scope>test</scope>
    </dependency>
    <dependency>
        <groupId>org.apache.storm</groupId>
        <artifactId>storm-core</artifactId>
        <version>0.9.6</version>
    </dependency>
</dependencies>
<build>
    <plugins>
        <plugin>
            <artifactId>maven-assembly-plugin</artifactId>
            <configuration>
                <descriptorRefs>
                    <descriptorRef>jar-with-dependencies</descriptorRef>
                </descriptorRefs>
                <archive>
                    <manifest>
                        <mainClass>com.test.stormwordcount.MainClass</mainClass>
                    </manifest>
                </archive>
            </configuration>
            <executions>
                <execution>
                    <id>make-assembly</id>
                    <phase>package</phase>
                    <goals>
                        <goal>single</goal>
                    </goals>
                </execution>
            </executions>
        </plugin>
        <plugin>
            <groupId>org.apache.maven.plugins</groupId>
            <artifactId>maven-compiler-plugin</artifactId>
            <configuration>
                <source>1.7</source>
                <target>1.7</target>
            </configuration>
        </plugin>
    </plugins>
</build>

遇到的問題

基於Storm的WordCount須要eclipse安裝了maven插件,以前的大數據實踐安裝的eclipse版本爲Eclipse IDE for Eclipse Committers4.5.2,這個版本不自帶maven插件,後續安裝失敗了幾回(網上不少的教程都已經失效),這裏分享一下我成功安裝的方法:
使用連接下載,Help->Install New SoftWare

點擊Add,name輸入隨意,在location輸入下載eclipse的maven插件,下載地址能夠這樣獲取
點擊鏈接:http://www.eclipse.org/m2e/index.html 進入網站後點擊download,拉到最下面能夠看到不少eclipse maven插件的版本和發佈時間,選在適合eclipse的版本複製連接便可。建議取消選中Contack all update sites during install to find required software(耗時過久)。測試

可是安裝成功後仍是沒法配置(這裏緣由不太清楚,沒找到解決辦法),就直接上官網換成本身maven插件的JavaEE IDE了...

後續的maven的配置這些都比較順利,第一次建立maven-archetype-quickstat項目報錯,試了網上不少辦法都還沒成功,而後打開 Windows->Preferencs->Maven->Installation發現以前配置了的maven的安裝路徑沒了...從新配置了下就能夠建立項目了。

最後運行成功的結果:

相關文章
相關標籤/搜索