在大數據領域中存在三大計算中心和三大計算引擎java
Apache Storm是Twitter開源的一個相似於Hadoop的實時數據處理框架,它原來是由BackType開發,後BackType被Twitter收購,將Storm做爲Twitter的實時數據分析系統。sql
數據來源shell
hadoop數據庫
stormapache
處理過程安全
hadoopbash
storm架構
處理速度框架
hadoopssh
storm
適用場景
Spout
Bolt
apache-storm-1.0.2.tar.gz
[root@uplooking01 /soft] tar -zxvf apache-storm-1.0.2.tar.gz -C /opt mv apache-storm-1.0.2/ storm
storm-env.sh
[root@uplooking01 /soft] export JAVA_HOME=/opt/jdk export STORM_CONF_DIR="/opt/storm/conf"
storm.yaml
[root@uplooking01 /opt/storm/conf] storm.zookeeper.servers: - "uplooking03" - "uplooking04" - "uplooking05" #配置兩個主節點,實現主節點的單點故障 nimbus.seeds: ["uplooking01", "uplooking02"] storm.local.dir: "/opt/storm/storm-local" #配置從節點的槽數 supervisor.slots.ports: - 6700 - 6701 - 6702 - 6703
[root@uplooking01 /] scp -r /opt/storm uplooking02:/opt scp -r /opt/storm uplooking03:/opt scp -r /opt/storm uplooking04:/opt scp -r /opt/storm uplooking05:/opt
[root@uplooking01 /] #啓動主進程和ui進程 nohup /opt/storm/bin/storm nimbus >/dev/null 2>&1 & nohup /opt/storm/bin/storm ui >/dev/null 2>&1 & nohup /opt/storm/bin/storm logviewer >/dev/null 2>&1 &
[root@uplooking02 /] #啓動主進程(numbus) nohup /opt/storm/bin/storm numbus >/dev/null 2>&1 & nohup /opt/storm/bin/storm logviewer >/dev/null 2>&1 &
#啓動從節點進程(supervisor) [root@uplooking03 /] nohup /opt/storm/bin/storm supervisor >/dev/null 2>&1 & nohup /opt/storm/bin/storm logviewer >/dev/null 2>&1 & [root@uplooking04 /] nohup /opt/storm/bin/storm supervisor >/dev/null 2>&1 & nohup /opt/storm/bin/storm logviewer >/dev/null 2>&1 & [root@uplooking05 /] nohup /opt/storm/bin/storm supervisor >/dev/null 2>&1 & nohup /opt/storm/bin/storm logviewer >/dev/null 2>&1 &
#!/bin/bash #啓動nimbus for nimbusHost in `cat /opt/shell/nimbus.host` do #-T 進制分配僞終端 通常自動化腳本不須要分配僞終端 ssh -T root@${nimbusHost} << eeooff nohup /opt/storm/bin/storm nimbus >/dev/null 2>&1 & eeooff done #啓動supervisor for supervisorHost in `cat /opt/shell/supervisor.host` do #-T 進制分配僞終端 通常自動化腳本不須要分配僞終端 ssh -T root@${supervisorHost} << eeooff nohup /opt/storm/bin/storm supervisor >/dev/null 2>&1 & eeooff done #啓動logviewer for logviewerHost in `cat /opt/shell/logviewer.host` do #-T 進制分配僞終端 通常自動化腳本不須要分配僞終端 ssh -T root@${logviewerHost} << eeooff nohup /opt/storm/bin/storm logviewer >/dev/null 2>&1 & eeooff done #啓動ui for uiHost in `cat /opt/shell/ui.host` do #-T 進制分配僞終端 通常自動化腳本不須要分配僞終端 ssh -T root@${uiHost} << eeooff nohup /opt/storm/bin/storm ui >/dev/null 2>&1 & eeooff done
public class MySpout extends BaseRichSpout { private SpoutOutputCollector collector; //初始化累加的數字 int num = 0; @Override public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { this.collector = collector; } @Override public void nextTuple() { collector.emit(new Values(num)); num++; } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("mynum")); } }
public class MyBolt extends BaseRichBolt { @Override public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { } @Override public void execute(Tuple tuple) { Integer num = tuple.getIntegerByField("mynum"); System.out.println(num); } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { } }
public class MyTopology { public static void main(String[] args) { //建立自定義的spout MySpout mySpout = new MySpout(); //建立自定義的bolt MyBolt myBolt = new MyBolt(); //建立topology名稱 String topologyName = "MyNumTopology"; //建立topology的配置對象 Map conf = new Config(); //建立topology的構造器 TopologyBuilder topologyBuilder = new TopologyBuilder(); //爲topology設置spout和bolt topologyBuilder.setSpout("myspout", mySpout); topologyBuilder.setBolt("mybolt", myBolt).shuffleGrouping("myspout"); //建立本地的topology提交器 StormTopology stormTopology = topologyBuilder.createTopology(); LocalCluster localCluster = new LocalCluster(); localCluster.submitTopology(topologyName, conf, stormTopology); } }
public class MyBolt02 extends BaseRichBolt { @Override public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { } @Override public void execute(Tuple tuple) { System.out.println(tuple.getIntegerByField("mynum02") + "....."); } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { } }
public class MyBolt extends BaseRichBolt { private OutputCollector collector; @Override public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { this.collector = collector; } @Override public void execute(Tuple tuple) { Integer num = tuple.getIntegerByField("mynum"); System.out.println(num); collector.emit(new Values(num)); } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("mynum02")); } }
public class MyTopology { public static void main(String[] args) { //建立自定義的spout MySpout mySpout = new MySpout(); //建立自定義的bolt MyBolt myBolt = new MyBolt(); MyBolt02 myBolt02 = new MyBolt02(); //建立topology名稱 String topologyName = "MyNumTopology"; //建立topology的配置對象 Map conf = new Config(); //建立topology的構造器 TopologyBuilder topologyBuilder = new TopologyBuilder(); //爲topology設置spout和bolt topologyBuilder.setSpout("myspout", mySpout); topologyBuilder.setBolt("mybolt", myBolt).shuffleGrouping("myspout"); topologyBuilder.setBolt("mybolt02", myBolt02).shuffleGrouping("mybolt"); //建立本地的topology提交器 StormTopology stormTopology = topologyBuilder.createTopology(); LocalCluster localCluster = new LocalCluster(); localCluster.submitTopology(topologyName, conf, stormTopology); } }
StormSubmitter.submitTopology(topologyName, conf, stormTopology);
在storm中的並行度說的就是一個進程的運行須要多少個線程來參與,若是storm運行的線程個數+1,則並行度+1
Worker :