storm入門實例

word計數實例

1.組件拓撲結構

在storm上面運行的單詞計數拓撲Topology包含三個組件,java

WordReader(spout):從文件中讀取每行單子數據。apache

WordNormalizer(bolt):將每行數據拆分紅單個單詞。api

WordCounter(bolt):對單詞計數。maven

流程圖以下:ide

2.API簡介

storm的拓撲結構中只會有兩種組件,spout連接數據源讀取數據並傳遞個bolt;bolt處理數據節點輸出結果或傳遞給下一個bolt。ui

3.建立拓撲Topology

建立maven工程,pom文件格式以下this

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
	<modelVersion>4.0.0</modelVersion>
	<groupId>storm.book</groupId>
	<artifactId>Getting-Started</artifactId>
	<version>0.0.1-SNAPSHOT</version>
	<properties>
		<slf4j.version>1.7.12</slf4j.version>
		<logback.version>1.1.3</logback.version>
	</properties>

	<dependencies>
		<!-- https://mvnrepository.com/artifact/org.apache.storm/storm-core -->
		<dependency>
		    <groupId>org.apache.storm</groupId>
		    <artifactId>storm-core</artifactId>
		    <version>0.9.7</version>
		    <!-- 打包時不講storm打入jar包中 -->
		    <scope>provided</scope>
		</dependency>
		<!-- logback -->
		<dependency>
			<groupId>org.slf4j</groupId>
			<artifactId>slf4j-api</artifactId>
			<version>${slf4j.version}</version>
		</dependency>
		<dependency>
			<groupId>org.slf4j</groupId>
			<artifactId>jcl-over-slf4j</artifactId>
			<version>${slf4j.version}</version>
		</dependency>
		<dependency>
			<groupId>ch.qos.logback</groupId>
			<artifactId>logback-core</artifactId>
			<version>${logback.version}</version>
		</dependency>
		<dependency>
			<groupId>ch.qos.logback</groupId>
			<artifactId>logback-classic</artifactId>
			<version>${logback.version}</version>
		</dependency>
	</dependencies>
	<build>
		<plugins>
			<plugin>
				<!-- 插件將依賴包打入jar包 -->
				<artifactId> maven-assembly-plugin</artifactId>
				<configuration>
					<descriptorRefs>
						<descriptorRef> jar-with-dependencies</descriptorRef>
					</descriptorRefs>
					<archive>
						<manifest>
							<mainClass>sfpay.TopologyMain</mainClass>
						</manifest>
					</archive>
				</configuration>
			</plugin>
		</plugins>
	</build>
</project>

建立WordReader(Spout)這裏經過spout從文件中讀取數據spa

import java.io.BufferedReader;
import java.io.FileNotFoundException;
import java.io.FileReader;
import java.io.IOException;
import java.util.Map;

import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.IRichSpout;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;

public class WordReader implements IRichSpout {

	private TopologyContext context;

	private FileReader fileReader;

	private SpoutOutputCollector collector;
	
	private boolean completed;

	/**
	 * conf配置對象,在定義topology對象是建立;TopologyContext對象,包含全部拓撲數據;還有SpoutOutputCollector對象,它能讓咱們發佈交給bolts處理的數據。
	 */
	@Override
	public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
		//當Spout被建立以後,這個方法會被條用
		System.out.println("WordReader.open(Map conf, TopologyContext context, SpoutOutputCollector collector)");
		this.context = context;
		try {
			this.fileReader = new FileReader(conf.get("wordsFile").toString());
		} catch (FileNotFoundException e) {
			e.printStackTrace();
		}

		this.collector = collector;

	}

	/**
	 * 經過它向bolts發佈待處理的數據 <br>
	 * 會在同一個循環內被ack()和fail()週期性的調用。沒有任務時它必須釋放對線程的控制,其它方法纔有機會得以執行
	 */
	@Override
	public void nextTuple() {
		// 這裏讀取文件,逐行發佈數據
		System.out.println("WordReader.nextTuple()");
		if (completed) {
			try {
				Thread.sleep(1000);
			} catch (InterruptedException e) {
				//e.printStackTrace();
			}

			return;
		}
		String str;

		BufferedReader reader = new BufferedReader(fileReader);
		try {
			while ((str = reader.readLine()) != null) {
				this.collector.emit(new Values(str),str);// Values是一個ArrarList實現,它的元素就是傳入構造器的參數
			}
		} catch (IOException e) {
			e.printStackTrace();
		} finally {
			completed = true;
		}
	}

	@Override
	public void ack(Object msgId) {
		//消息發送成功調用
		System.out.println("wordReader.ack(Object msgId):"+msgId);
	}

	@Override
	public void fail(Object msgId) {
		//調用失敗時調用
		System.out.println("fail(Object msgId):"+msgId);
	}

	@Override
	public void declareOutputFields(OutputFieldsDeclarer declarer) {
		System. out.println("WordReader.declareOutputFields(OutputFieldsDeclarer declarer)");
		declarer.declare(new Fields("line"));
	}
	@Override
	public void close() {
		//當Topology中止時,會調用這個方法
		System.out.println("WordReader.close()");
	}

	@Override
	public void activate() {
		System. out.println("WordReader.activate()");
	}

	@Override
	public void deactivate() {
		System. out.println("WordReader.deactivate()");
	}

	@Override
	public Map<String, Object> getComponentConfiguration() {
		System. out.println("WordReader.getComponentConfiguration()");
		return null;
	}
}

建立WordNormalizer(Bolt)拆分spout傳過來的數據行爲單詞。插件

import java.util.Map;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.IRichBolt;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;

public class WordNormalizer implements IRichBolt{

	private OutputCollector collector;
	
	private TopologyContext context;
	
	private Map stormConf;
	@Override
	public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
		//初始化時被調用
		System.out.println("WordNormalizer.prepare(Map stormConf, TopologyContext context, OutputCollector collector)");
		this.stormConf = stormConf;
		this.context = context;
		this.collector = collector;
	}

	/**
	 * 每次接收到元組時都會被調用一次,還會再發布若干個元組
	 */
	@Override
	public void execute(Tuple input) {
		//接受從spout或其餘bolt傳過來的元組數據
		System.out.println("WordNormalizer。execute(Tuple input)");
		String sentence = input.getString(0);//傳入的元組列表
		String[] words = sentence.split(" ");
		for (String word : words) {
			//單詞去空格,轉小寫,而後再發布出去
			word = word.trim();
			if(!word.isEmpty()){
				word = word.toLowerCase();
				collector.emit(new Values(word));
			}
		}
		//對元組作出應答
		collector.ack(input);
		
	}

	@Override
	public void cleanup() {
		System.out.println("WordNormalizer。cleanup()");
	}

	/**
	 * 聲明bolt的出參
	 * 
	 *  這個*bolt*只會發佈「word」域
	 */
	@Override
	public void declareOutputFields(OutputFieldsDeclarer declarer) {
		System.out.println("WordNormalizer。declareOutputFields(OutputFieldsDeclarer declarer)");
		declarer.declare(new Fields("word")); //聲明bolt將發佈一個名爲「word」的域
	}

	@Override
	public Map<String, Object> getComponentConfiguration() {
		System.out.println("WordNormalizer。getComponentConfiguration()");
		return null;
	}

}

建立WordCounter(Bolt)對單詞進行計數線程

import java.util.HashMap;
import java.util.Map;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.IRichBolt;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Tuple;

public class WordCounter implements IRichBolt{

	private Map<String,Integer> counters;
	
	private OutputCollector collector;
	
	private String name;
	
	private Integer id;
	/**
	 * 初始化
	 */
	@Override
	public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
		System.out.println("WordCounter.prepare(Map stormConf, TopologyContext context, OutputCollector collector)");
		this.collector = collector;
		this.counters = new HashMap<String, Integer>();
		this.name = context.getThisComponentId();
		this.id = context.getThisTaskId();
	}
	/**
	 * 爲每一個單詞計數
	 */
	@Override
	public void execute(Tuple input) {
		System.out.println("WordCounter.execute(Tuple input)");
		String str = input.getString(0);
		if(!counters.containsKey(str)){
			counters.put(str, 1);
		}else{
			counters.put(str, counters.get(str)+1);
		}
		
		collector.ack(input);
	}

	/**
	 * 這個spout結束時(集羣關閉的時候),會顯示單詞數量
	 */
	@Override
	public void cleanup() {
		//在Bolt銷燬時,調用cleanup方法
		System.out.println("WordCounter.cleanup()");
        for(Map.Entry<String,Integer> entry : counters.entrySet()){
            System.out.println(entry.getKey()+": "+entry.getValue());
        }
	}

	
	@Override
	public void declareOutputFields(OutputFieldsDeclarer declarer) {
		System.out.println("WordCounter.declareOutputFields(OutputFieldsDeclarer declarer)");
	}
	@Override
	public Map<String, Object> getComponentConfiguration() {
		System.out.println("WordCounter.getComponentConfiguration()");
		return null;
	}

}

建立拓撲TopologyMain定義數據執行流程,拓撲運行支持兩種模式本地模式和遠程模式

import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.StormSubmitter;
import backtype.storm.generated.StormTopology;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.tuple.Fields;
import bolts.WordCounter;
import bolts.WordNormalizer;
import spouts.WordReader;

public class TopologyMain {

	public static void main(String[] args) {

		// 定義拓撲
		TopologyBuilder builder = new TopologyBuilder();
		builder.setSpout("word-reader", new WordReader(),3);
		builder.setBolt("word-normalizer", new WordNormalizer(),3).shuffleGrouping("word-reader");
		builder.setBolt("word-counter", new WordCounter()).fieldsGrouping("word-normalizer", new Fields("word"));
		StormTopology topology = builder.createTopology();

		// 配置
		Config conf = new Config();
		conf.put("wordsFile", "/opt/storm/words.txt");
		conf.setDebug(false);

		if (args != null && args.length > 0) {
			// 有參數爲生產方式
			try {
				StormSubmitter.submitTopology(args[0], conf, topology);
			} catch (Exception e) {
				e.printStackTrace();
			}
		} else {
			// 沒有參數爲本地方式
			// 運行拓撲
			LocalCluster cluster = new LocalCluster();
			// conf.put(Config.TOPOLOGY_MAX_SPOUT_PENDING, 1);
			cluster.submitTopology("Getting-Started-Topologie", conf, topology);
			try {
				Thread.sleep(5000);
				cluster.shutdown();
			} catch (InterruptedException e) {
				e.printStackTrace();
			}

		}
	}

}

配置日誌文件logback,storm使用的logback日誌實現,這裏簡單定義日誌輸出級別。

<?xml version="1.0" encoding= "UTF-8"?>
<configuration>
  <root level="ERROR" />
</configuration>

項目結構以下:

打包提交運行

咱們開發的topology會使用到storm的api,可是因爲storm集羣自己已經有了這些api,因此咱們在maven打包的時候,不須要打包進storm相關jar。因此要加上provided。

可是對於一些storm集羣自己沒有提供的依賴,例如,咱們以前提到的,Storm的最佳數據源是消息中間件,若是咱們之後的案例使用到了RocketMq,咱們須要將其打包進去。默認狀況下,maven打包時只會將咱們本身開發的代碼進行打包,可是依賴的全部jar包都不會被打包進去。此時利用Maven Assembly插件來實現這個功能。

打包命令

mvn assembly:assembly

target目錄下產生兩個jar包,一個帶依賴,一個沒有帶依賴

相關文章
相關標籤/搜索