【Storm】storm安裝、配置、使用以及Storm單詞計數程序的實例分析

前言:閱讀筆記

storm和hadoop集羣很是像。hadoop執行mr。storm執行topologies。
mr和topologies最關鍵的不一樣點是:mr執行終於會結束,而topologies永遠執行直到你kill。

storm集羣有兩種節點:master和worker。
master執行一個後臺進程Nimbus,和hadoop的jobtracker類似。
Nimbus負責在集羣中分發代碼。爲工做節點分配任務,並監控故障。

worker執行一個後臺進程Supervisor。

supervisor監聽分配來的任務,啓動和中止worker進程去處理nimbus分配來的任務。
每個worker進程執行拓撲的一個子集;一個執行的拓撲結構由很是多分佈在不一樣機器的worker進程構成。

所有nimbus和supervisor之間的協調工做是有zk集羣來作的。
此外。nimbus和supervisor是fail-fast和stateless;所有狀態保存在zk或者本地磁盤。
守護進程可以是無狀態的而且失效或從新啓動時不會影響整個系統的健康。

執行storm
storm jar all-my-code.jar backtype.storm.MyTopology arg1 arg2
storm jar負責鏈接nimbus並且上傳jar。
原始主要的storm提供了spouts和bolts作流轉換。

spouts和bolts是執行應用邏輯要實現的接口。html

spout是流的源。讀進數據並以流的形式發送出去;
bolt消費輸入的流,處理或者以新的流發送出去。

Storm會本身主動又一次分配失敗的任務,並且storm保證不會有數據丟失。即便機器宕機

下載安裝

http://storm.apache.org/downloads.html
java

一、依賴安裝

yum install uuid -y
yum install e2fsprogs -y
yum install libuuid*
yum install libtool -y
yum install *c++* -y
yum install git -y

二、zk集羣

http://blog.csdn.net/simonchi/article/details/43019401
python

三、zeromq&jzmq

tar -xzvf zeromq-4.0.5.tar.gz
./autogen.sh
./configure && make && make install
jzmq
git clone git://github.com/nathanmarz/jzmq.git
./autogen.sh
./configure && make && make install

四、python

./configure
make
make install
rm -f  /usr/bin/python
ln /usr/local/bin/python3.4 /usr/bin/python
python -V
vi /usr/bin/yum
#!/usr/bin/python 改成 #!/usr/bin/python2.4

配置執行

storm.yamlc++

storm.zookeeper.servers:
     - 192.168.11.176
     - 192.168.11.177
     - 192.168.11.178
storm.zookeeper.port: 2181
nimbus.host: "192.168.11.176"
storm.local.dir: "/home/storm/workdir"
supervisor.slots.ports:
  - 6700
  - 6701
  - 6702
  - 6703

一、Nimbus: 在Storm主控節點上執行"bin/storm nimbus >/dev/null 2>&1 &"啓動Nimbus後臺程序,並放到後臺執行。
二、Supervisor: 在Storm各個工做節點上執行"bin/storm supervisor >/dev/null 2>&1 &"啓動Supervisor後臺程序,並放到後臺執行;
三、UI: 在Storm主控節點上執行"bin/storm ui >/dev/null 2>&1 &"啓動UI後臺程序。並放到後臺執行,啓動後可以經過http://{nimbus host}:8080觀察集羣的worker資源使用狀況、Topologies的執行狀態等信息。

單詞計數程序

Spout

package com.cmcc.chiwei.storm;

import java.io.BufferedReader;
import java.io.FileNotFoundException;
import java.io.FileReader;
import java.util.Map;
import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichSpout;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;

public class WordReader extends BaseRichSpout {

	private static final long serialVersionUID = 1L;
	private SpoutOutputCollector collector;
	private FileReader fileReader;
	private boolean completed = false;
	public boolean isDistributed() {
		return false;
	}
	public void ack(Object msgId) {
		System.out.println("OK:"+msgId);
	}
	public void close() {}
	public void fail(Object msgId) {
		System.out.println("FAIL:"+msgId);
	}

	/**
	 * The only thing that the methods will do It is emit each 
	 * file line
	 */
	public void nextTuple() {
		if(completed){
			try {
				Thread.sleep(1000);
			} catch (InterruptedException e) {
				//Do nothing
			}
			return;
		}
		String str;
		//Open the reader
		BufferedReader reader = new BufferedReader(fileReader);
		try{
			//Read all lines
			while((str = reader.readLine()) != null){
				/**
				 * By each line emmit a new value with the line as a their
				 */
				this.collector.emit(new Values(str),str);
			}
		}catch(Exception e){
			throw new RuntimeException("Error reading tuple",e);
		}finally{
			completed = true;
		}
	}

	/**
	 * We will create the file and get the collector object
	 */
	public void open(Map conf, TopologyContext context,
			SpoutOutputCollector collector) {
		try {
			this.fileReader = new FileReader(conf.get("wordsFile").toString());
		} catch (FileNotFoundException e) {
			throw new RuntimeException("Error reading file ["+conf.get("wordFile")+"]");
		}
		this.collector = collector;
	}

	/**
	 * Declare the output field "word"
	 */
	public void declareOutputFields(OutputFieldsDeclarer declarer) {
		declarer.declare(new Fields("line"));
	}
}

Bolt1

package com.cmcc.chiwei.storm;

import backtype.storm.topology.BasicOutputCollector;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseBasicBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;

public class WordNormalizer extends BaseBasicBolt {

	private static final long serialVersionUID = 1L;

	public void cleanup() {}

	/**
	 * The bolt will receive the line from the
	 * words file and process it to Normalize this line
	 * 
	 * The normalize will be put the words in lower case
	 * and split the line to get all words in this 
	 */
	public void execute(Tuple input, BasicOutputCollector collector) {
        String sentence = input.getString(0);
        String[] words = sentence.split(" ");
        for(String word : words){
            word = word.trim();
            if(!word.isEmpty()){
                word = word.toLowerCase();
                collector.emit(new Values(word));
            }
        }
	}
	

	/**
	 * The bolt will only emit the field "word" 
	 */
	public void declareOutputFields(OutputFieldsDeclarer declarer) {
		declarer.declare(new Fields("word"));
	}
}

Bolt2

package com.cmcc.chiwei.storm;

import java.util.HashMap;
import java.util.Map;

import backtype.storm.task.TopologyContext;
import backtype.storm.topology.BasicOutputCollector;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseBasicBolt;
import backtype.storm.tuple.Tuple;

public class WordCounter extends BaseBasicBolt {

	private static final long serialVersionUID = 1L;
	Integer id;
	String name;
	Map<String, Integer> counters;

	/**
	 * At the end of the spout (when the cluster is shutdown
	 * We will show the word counters
	 */
	@Override
	public void cleanup() {
		System.out.println("-- Word Counter ["+name+"-"+id+"] --");
		for(Map.Entry<String, Integer> entry : counters.entrySet()){
			System.out.println(entry.getKey()+": "+entry.getValue());
		}
	}

	/**
	 * On create 
	 */
	@Override
	public void prepare(Map stormConf, TopologyContext context) {
		this.counters = new HashMap<String, Integer>();
		this.name = context.getThisComponentId();
		this.id = context.getThisTaskId();
	}

	public void declareOutputFields(OutputFieldsDeclarer declarer) {}


	public void execute(Tuple input, BasicOutputCollector collector) {
		String str = input.getString(0);
		if(!counters.containsKey(str)){
			counters.put(str, 1);
		}else{
			Integer c = counters.get(str) + 1;
			counters.put(str, c);
		}
	}
}

Topology

package com.cmcc.chiwei.storm;

import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.tuple.Fields;


public class TopologyMain {
	public static void main(String[] args) throws InterruptedException {
         
        //Topology建立拓撲,安排storm各個節點以及它們交換數據的方式
		TopologyBuilder builder = new TopologyBuilder();
		builder.setSpout("word-reader",new WordReader());
		builder.setBolt("word-normalizer", new WordNormalizer())
			.shuffleGrouping("word-reader");
		builder.setBolt("word-counter", new WordCounter(),1)
			.fieldsGrouping("word-normalizer", new Fields("word"));
		
        //Configuration
		Config conf = new Config();
		conf.put("wordsFile", args[0]);
		conf.setDebug(false);
        //Topology run
		conf.put(Config.TOPOLOGY_MAX_SPOUT_PENDING, 1);
		LocalCluster cluster = new LocalCluster();
		cluster.submitTopology("Getting-Started-Topology", conf, builder.createTopology());
		Thread.sleep(2000);
		cluster.shutdown();
	}
}

words.txt

hello
world
storm flume hadoop hdfs
what's wrong flume ?
what's up hdfs ?
Hi,storm,what are you doing ?

執行結果

OK:hello
OK:world
OK:storm flume hadoop hdfs
OK:what's wrong flume ?

OK:what's up hdfs ? OK:Hi,storm,what are you doing ?git


-- Word Counter [word-counter-2] --
what's: 2
flume: 2
hdfs: 2
you: 1
storm: 1
up: 1
hello: 1
hadoop: 1
hi,storm,what: 1
are: 1
doing: 1
wrong: 1
?

: 3 world: 1github

分析內容:

spout
讀取原始數據,爲bolt提供數據。

bolt
從spout或其餘bolt接收數據並處理。處理結果可做爲其餘bolt的數據源或終於結果。

nimbus
主節點的守護進程。負責爲工做節點分發任務。


topology
拓撲結構。storm的一個任務單元。

define fields定義域,由spout和bolt提供。被bolt接收。
一個storm集羣就是在一連串的bolt之間轉換spout傳過來的數據。
如:
spout讀到一行文本,文本行傳給一個bolt。按單詞分割後傳給還有一個bolt,第二個bolt作計數累加。

Spout

open --> nextTuple

Bolt1apache

declareOutputFields --> execute

Bolt2less

prepare --> execute --> cleanupide


更具體的內容,將在興許慢慢解說,我也在研究中。oop

。。

。。


望各位不吝不吝賜教!。

相關文章
相關標籤/搜索