本文簡單介紹下怎麼使用docker運行storm以及在springboot中使用storm。git
version: '2' services: zookeeper: image: zookeeper ##3.4.10 container_name: zookeeper restart: always ports: - 2181:2181 nimbus: image: storm ## 1.1.1 container_name: nimbus command: storm nimbus depends_on: - zookeeper links: - zookeeper restart: always ports: - 6627:6627 supervisor: image: storm container_name: supervisor command: storm supervisor depends_on: - nimbus - zookeeper links: - nimbus - zookeeper restart: always ui: image: storm container_name: stormui command: storm ui depends_on: - nimbus - zookeeper links: - nimbus - zookeeper restart: always ports: - 8080:8080
啓動以後訪問192.168.99.100:8080就能夠看見storm-ui的界面github
public class TestWordSpout extends BaseRichSpout { public static Logger LOG = LoggerFactory.getLogger(TestWordSpout.class); boolean _isDistributed; SpoutOutputCollector _collector; public TestWordSpout() { this(true); } public TestWordSpout(boolean isDistributed) { _isDistributed = isDistributed; } public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { _collector = collector; } public void close() { } public void nextTuple() { Utils.sleep(100); final String[] words = new String[] {"nathan", "mike", "jackson", "golda", "bertels"}; final Random rand = new Random(); final String word = words[rand.nextInt(words.length)]; _collector.emit(new Values(word)); } public void ack(Object msgId) { } public void fail(Object msgId) { } public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("word")); } @Override public Map<String, Object> getComponentConfiguration() { if(!_isDistributed) { Map<String, Object> ret = new HashMap<String, Object>(); ret.put(Config.TOPOLOGY_MAX_TASK_PARALLELISM, 1); return ret; } else { return null; } } }
public class WordCountBolt extends BaseBasicBolt { Map<String, Integer> counts = new HashMap<String, Integer>(); @Override public void execute(Tuple tuple, BasicOutputCollector collector) { String word = tuple.getString(0); Integer count = counts.get(word); if (count == null) count = 0; count++; counts.put(word, count); collector.emit(new Values(word, count)); } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("word", "count")); } }
public class PrintBolt extends BaseBasicBolt { @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { } @Override public void execute(Tuple tuple, BasicOutputCollector collector) { String first = tuple.getString(0); int second = tuple.getInteger(1); System.out.println(first + "," + second); } }
@SpringBootApplication public class StormDemoApplication implements CommandLineRunner{ public static void main(String[] args) { SpringApplication app = new SpringApplication((StormDemoApplication.class)); app.setWebEnvironment(false); app.run(args); } @Override public void run(String... args) throws Exception { TopologyBuilder builder = new TopologyBuilder(); //併發度10 builder.setSpout("spout", new TestWordSpout(), 10); builder.setBolt("count", new WordCountBolt(), 5).fieldsGrouping("spout", new Fields("word")); builder.setBolt("print", new PrintBolt(), 1).shuffleGrouping("count"); String topologyName = "DemoTopology"; Config conf = new Config(); conf.setDebug(true); //遠程提交 mvn clean package -Dmaven.test.skip=true // StormSubmitter.submitTopologyWithProgressBar(topologyName, conf, builder.createTopology()); try { LocalCluster cluster = new LocalCluster(); cluster.submitTopology(topologyName, conf,builder.createTopology()); Thread.sleep(60 * 1000); cluster.shutdown(); } catch (Exception e) { e.printStackTrace(); } } }
修改提交方式,而後打jar包spring
//遠程提交 mvn clean package -Dmaven.test.skip=true StormSubmitter.submitTopologyWithProgressBar(topologyName, conf, builder.createTopology());
遠程提交代碼docker
@Test public void remoteSubmit() throws InvalidTopologyException, AuthorizationException, AlreadyAliveException { Config conf = new Config(); conf.put(Config.NIMBUS_SEEDS,Arrays.asList("192.168.99.100")); //配置nimbus鏈接主機地址,好比:192.168.10.1 conf.put(Config.NIMBUS_THRIFT_PORT,6627);//配置nimbus鏈接端口,默認 6627 conf.put(Config.STORM_ZOOKEEPER_SERVERS, Arrays.asList("192.168.99.100")); //配置zookeeper鏈接主機地址,能夠使用集合存放多個 conf.put(Config.STORM_ZOOKEEPER_PORT,2181); //配置zookeeper鏈接端口,默認2181 conf.setDebug(true); conf.setNumWorkers(1); TopologyBuilder builder = new TopologyBuilder(); //併發度10 builder.setSpout("spout", new TestWordSpout(), 10); builder.setBolt("count", new WordCountBolt(), 5).fieldsGrouping("spout", new Fields("word")); builder.setBolt("print", new PrintBolt(), 1).shuffleGrouping("count"); String topologyName = "DemoTopology"; //很是關鍵的一步,使用StormSubmitter提交拓撲時,無論怎麼樣,都是須要將所需的jar提交到nimbus上去,若是不指定jar文件路徑, //storm默認會使用System.getProperty("storm.jar")去取,若是不設定,就不能提交 System.setProperty("storm.jar","/Users/downloads/storm-demo-0.0.1-SNAPSHOT.jar"); StormSubmitter.submitTopology(topologyName, conf, builder.createTopology()); }
修改~/.m2/settings.xmlspringboot
<mirrors> <mirror> <id>nexus-aliyun</id> <mirrorOf>*,!Clojars</mirrorOf> <name>Nexus aliyun</name> <url>http://maven.aliyun.com/nexus/content/groups/public</url> </mirror> <repository> <id>Clojars</id> <name>Clojars Repository</name> <url>http://clojars.org/repo/</url> <releases><enabled>true</enabled></releases> <snapshots><enabled>true</enabled></snapshots> </repository>