在本地模式下,Storm拓撲結構運行在本地計算機的單一JVM進程上。這個模式用於開發、測試以及調試,由於這是觀察全部組件如何協同工做的最簡單方法。在這種模式下,咱們能夠調整參數,觀察咱們的拓撲結構如何在不一樣的Storm配置環境下運行。要在本地模式下運行,咱們要下載Storm開發依賴,以便用來開發並測試咱們的拓撲結構。咱們建立了第一個Storm工程之後,很快就會明白如何使用本地模式了。 NOTE: 在本地模式下,跟在集羣環境運行很像。不過頗有必要確認一下全部組件都是線程安全的,由於當把它們部署到遠程模式時它們可能會運行在不一樣的JVM進程甚至不一樣的物理機上,這個時候它們之間沒有直接的通信或共享內存。php
在遠程模式下,咱們向Storm集羣提交拓撲,它一般由許多運行在不一樣機器上的流程組成。遠程模式不會出現調試信息, 所以它也稱做生產模式。不過在單一開發機上創建一個Storm集羣是一個好主意,能夠在部署到生產環境以前,用來確認拓撲在集羣環境下沒有任何問題。java
1)基本接口python
(1)IComponent接口
(2)ISpout接口
(3)IRichSpout接口
(4)IStateSpout接口
(5)IRichStateSpout接口
(6)IBolt接口
(7)IRichBolt接口
(8)IBasicBolt接口
複製代碼
2)基本抽象類數據庫
(1)BaseComponent抽象類
(2)BaseRichSpout抽象類
(3)BaseRichBolt抽象類
(4)BaseTransactionalBolt抽象類
(5)BaseBasicBolt抽象類
複製代碼
建立數據源(Spouts )apache
package com.qxw.spout;
import java.util.HashMap;
import java.util.Map;
import java.util.Random;
import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichSpout;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Values;
/**
* 數據源 spout
* @author qxw
* @data 2018年9月17日上午11:21:00
*
* 申明數據源的方式:繼承BaseRichSpout類 , 重寫須要的方法。實現IRichSpout接口 重寫全部的方法
*/
public class DataSource extends BaseRichSpout {
private static final long serialVersionUID = 1L;
private SpoutOutputCollector collector;
private static final Map<Integer, String> map = new HashMap<Integer, String>();
static {
map.put(0, "java");
map.put(1, "php");
map.put(2, "groovy");
map.put(3, "python");
map.put(4, "ruby");
}
/**
* 初始化方法
*/
@Override
public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
this.collector = collector;
}
/**
* 輪詢tuple 發送數據
*/
@Override
public void nextTuple() {
//這裏能夠查詢數據庫 或者讀取消息隊列中的數據、測試使用map替代
final Random r = new Random();
int num = r.nextInt(5);
try {
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
//發送單詞到下一個拓撲節點
this.collector.emit(new Values(map.get(num)));
}
/**
* 聲明發送數據的名稱
*/
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
//指定名稱 用於下一個節店取值時使用
declarer.declare(new Fields("data"));
}
/**
* 在該spout關閉前執行,可是並不能獲得保證其必定被執行
*/
@Override
public void close() {
System.out.println("spout關閉前執行");
}
/**
* 當Spout已經從失效模式中激活時被調用。該Spout的nextTuple()方法很快就會被調用。
*/
@Override
public void activate() {
System.out.println("當Spout已經從失效模式中激活時被調用");
}
/**
* 當Spout已經失效時被調用。在Spout失效期間,nextTuple不會被調用。Spout未來可能會也可能不會被從新激活。
*/
@Override
public void deactivate() {
System.out.println("當Spout已經失效時被調用");
}
/**
* 成功處理tuple回調方法
*/
@Override
public void ack(Object paramObject) {
System.out.println("成功處理tuple回調方法");
}
/**
* 處理失敗tuple回調方法
*/
@Override
public void fail(Object paramObject) {
System.out.println("paramObject");
}
}
複製代碼
數據流處理組件api
package com.qxw.bolt;
import org.apache.storm.topology.BasicOutputCollector;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseBasicBolt;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;
/**
* 數據庫流處理組件
*
* 打印出輸處理的bolt
* 實現方式:繼承BaseBasicBolt類 或實現IBasicBolt
* @author qxw
* @data 2018年9月17日上午11:36:07
*/
public class OutBolt extends BaseBasicBolt {
private static final long serialVersionUID = 1L;
/**
* 接受一個tuple進行處理,也可發送數據到下一級組件
*/
@Override
public void execute(Tuple input, BasicOutputCollector collector) {
////獲取上一個組件所聲明的Field
String value=input.getStringByField("data");
System.out.println("數據源發送的data: "+value);
//發送到下一個組件
collector.emit(new Values(value));
}
/**
* 聲明發送數據的名稱
*/
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
//可同時發送多個Field
declarer.declare(new Fields("outdata"));
}
}
複製代碼
package com.qxw.bolt;
import org.apache.storm.topology.BasicOutputCollector;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseBasicBolt;
import org.apache.storm.tuple.Tuple;
/**
* 數據庫流處理組件
*
* 打印出輸處理的bolt
* 實現方式:繼承BaseBasicBolt類 或實現IBasicBolt
* @author qxw
* @data 2018年9月17日上午11:36:07
*/
public class OutBolt2 extends BaseBasicBolt {
private static final long serialVersionUID = 1L;
/**
* 接受一個tuple進行處理,也可發送數據到下一級組件
*/
@Override
public void execute(Tuple input, BasicOutputCollector collector) {
////獲取上一個組件所聲明的Field
String value=input.getStringByField("outdata");
System.out.println("接收OutBolt數據庫流處理組件發送的值: "+value);
}
/**
* 聲明發送數據的名稱
*/
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
}
}
複製代碼
構造拓撲圖安全
package com.qxw.topology;
import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.topology.TopologyBuilder;
import com.qxw.bolt.OutBolt;
import com.qxw.bolt.OutBolt2;
import com.qxw.spout.DataSource;
public class TopologyTest {
public static void main(String[] args) throws Exception {
//配置
Config cfg = new Config();
cfg.setNumWorkers(2);//指定工做進程數 (jvm數量,分佈式環境下可用,本地模式設置無心義)
cfg.setDebug(true);
//構造拓撲流程圖
TopologyBuilder builder = new TopologyBuilder();
//設置數據源
builder.setSpout("dataSource", new DataSource());
//設置數據建流處理組件
builder.setBolt("out-bolt", new OutBolt()).shuffleGrouping("dataSource");//隨機分組
builder.setBolt("out-bol2", new OutBolt2()).shuffleGrouping("out-bolt");
//1 本地模式
LocalCluster cluster = new LocalCluster();
//提交拓撲圖 會一直輪詢執行
cluster.submitTopology("topo", cfg, builder.createTopology());
//2 集羣模式
// StormSubmitter.submitTopology("topo", cfg, builder.createTopology());
}
}
複製代碼
數據源Spoutruby
package com.qxw.wordCount;
import java.util.Map;
import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.IRichSpout;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Values;
/**
* 數據源
* @author qxw
* @data 2018年9月18日上午11:58:35
*/
public class WordSpout implements IRichSpout{
private static final long serialVersionUID = 1L;
private SpoutOutputCollector collector;
private int index=0;
private final String[] lines = {
"long long ago I like playing with cat",
"playing with cat make me happy",
"I feel happy to be with you",
"you give me courage",
"I like to be together with you",
"long long ago I like you"
};
//初始化
@Override
public void open(Map conf, TopologyContext context,SpoutOutputCollector collector) {
this.collector=collector;
}
//發送數據
@Override
public void nextTuple() {
this.collector.emit(new Values(lines[index]));
index++;
if(index>=lines.length){
index=0;
}
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("line"));
}
@Override
public void close() {
// TODO Auto-generated method stub
}
@Override
public void activate() {
// TODO Auto-generated method stub
}
@Override
public void deactivate() {
// TODO Auto-generated method stub
}
@Override
public void ack(Object msgId) {
// TODO Auto-generated method stub
}
@Override
public void fail(Object msgId) {
// TODO Auto-generated method stub
}
@Override
public Map<String, Object> getComponentConfiguration() {
// TODO Auto-generated method stub
return null;
}
}
複製代碼
切割組件bash
public class WordSplitBolt implements IRichBolt {
private static final long serialVersionUID = 1L;
private OutputCollector collector;
@Override
public void prepare(Map stormConf, TopologyContext context,OutputCollector collector) {
this.collector = collector;
}
/**
* 這個函數也會被不斷執行,但它的數據來自於上游。
* 這裏將文本行分割爲單詞,併發送
* @param tuple
*/
@Override
public void execute(Tuple input) {
String line = input.getStringByField("line");
String[] words = line.split(" ");
for(String word : words){
this.collector.emit(new Values(word));
}
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word"));
}
@Override
public void cleanup() {
// TODO Auto-generated method stub
}
@Override
public Map<String, Object> getComponentConfiguration() {
// TODO Auto-generated method stub
return null;
}
}
複製代碼
統計組件併發
public class WordCountBolt implements IRichBolt{
private static final long serialVersionUID = 1L;
private OutputCollector collector;
private HashMap<String, Long> counts=null;
/**
* 初始化放方法
*/
@Override
public void prepare(Map stormConf, TopologyContext context,OutputCollector collector) {
this.collector = collector;
this.counts=new HashMap<String, Long>();
}
/**
* 統計單詞出現的次數 通常是存儲到數據庫
*/
@Override
public void execute(Tuple input) {
String word=input.getStringByField("word");
Long count = 1L;
if(counts.containsKey(word)){
count = counts.get(word) + 1;
}
counts.put(word, count);
System.out.println("統計單詞:"+word+" 出現次數: "+count);
this.collector.emit(new Values(word, count));
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word","count"));
}
@Override
public void cleanup() {
// TODO Auto-generated method stub
}
@Override
public Map<String, Object> getComponentConfiguration() {
// TODO Auto-generated method stub
return null;
}
}
複製代碼
輸出組件
public class WordReportBolt implements IRichBolt {
private static final long serialVersionUID = 1L;
@Override
public void prepare(Map stormConf, TopologyContext context,
OutputCollector collector) {
}
@Override
public void execute(Tuple input) {
String word=input.getStringByField("word");
Long count=input.getLongByField("count");
System.out.printf("實時統計單詞出現次數 "+"%s\t%d\n", word, count);
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
// TODO Auto-generated method stub
}
@Override
public void cleanup() {
}
@Override
public Map<String, Object> getComponentConfiguration() {
// TODO Auto-generated method stub
return null;
}
}
複製代碼
Topology主函數類
public class WordTopology {
public static void main(String[] args) throws InterruptedException {
// 組建拓撲,並使用流分組
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("WordSpout", new WordSpout());
builder.setBolt("WordSplitBolt", new WordSplitBolt(),5).shuffleGrouping("WordSpout");
builder.setBolt("WordCountBolt", new WordCountBolt(),5).fieldsGrouping("WordSplitBolt", new Fields("word"));
builder.setBolt("WordReportBolt", new WordReportBolt(),10).globalGrouping("WordCountBolt");
//配置
Config cfg = new Config();
cfg.setDebug(false);
LocalCluster cluster = new LocalCluster();
//提交拓撲圖 會一直輪詢執行
cluster.submitTopology("wordcount-topo", cfg, builder.createTopology());
}
}
複製代碼