本文主要演示一下storm drpc實例html
version: '2' services: supervisor: image: storm container_name: supervisor command: storm supervisor -c storm.local.hostname="192.168.99.100" -c drpc.servers='["192.168.99.100"]' -c drpc.port=3772 -c drpc.invocations.port=3773 -c drpc.http.port=3774 depends_on: - nimbus - zookeeper links: - nimbus - zookeeper restart: always ports: - 6700:6700 - 6701:6701 - 6702:6702 - 6703:6703 - 8000:8000 drpc: image: storm container_name: drpc command: storm drpc -c storm.local.hostname="192.168.99.100" -c drpc.port=3772 -c drpc.invocations.port=3773 -c drpc.http.port=3774 depends_on: - nimbus - supervisor - zookeeper links: - nimbus - supervisor - zookeeper restart: always ports: - 3772:3772 - 3773:3773 - 3774:3774
好讓外部的DRPCClient訪問
)、drpc.invocations.port(讓worker訪問
)@Test public void testDeployDRPCStateQuery() throws InterruptedException, TException { TridentTopology topology = new TridentTopology(); FixedBatchSpout spout = new FixedBatchSpout(new Fields("sentence"), 3, new Values("the cow jumped over the moon"), new Values("the man went to the store and bought some candy"), new Values("four score and seven years ago"), new Values("how many apples can you eat")); spout.setCycle(true); TridentState wordCounts = topology.newStream("spout1", spout) .each(new Fields("sentence"), new Split(), new Fields("word")) .groupBy(new Fields("word")) //NOTE transforms a Stream into a TridentState object .persistentAggregate(new MemoryMapState.Factory(), new Count(), new Fields("count")) .parallelismHint(6); topology.newDRPCStream("words") .each(new Fields("args"), new Split(), new Fields("word")) .groupBy(new Fields("word")) .stateQuery(wordCounts, new Fields("word"), new MapGet(), new Fields("count")) .each(new Fields("count"), new FilterNull()) .aggregate(new Fields("count"), new Sum(), new Fields("sum")); StormTopology stormTopology = topology.build(); //遠程提交 mvn clean package -Dmaven.test.skip=true //storm默認會使用System.getProperty("storm.jar")去取,若是不設定,就不能提交 System.setProperty("storm.jar",TOPOLOGY_JAR); Config conf = new Config(); conf.put(Config.NIMBUS_SEEDS,Arrays.asList("192.168.99.100")); //配置nimbus鏈接主機地址,好比:192.168.10.1 conf.put(Config.NIMBUS_THRIFT_PORT,6627);//配置nimbus鏈接端口,默認 6627 conf.put(Config.STORM_ZOOKEEPER_SERVERS, Arrays.asList("192.168.99.100")); //配置zookeeper鏈接主機地址,能夠使用集合存放多個 conf.put(Config.STORM_ZOOKEEPER_PORT,2181); //配置zookeeper鏈接端口,默認2181 StormSubmitter.submitTopology("DRPCStateQuery", conf, stormTopology); }
@Test public void testLaunchDrpcClient() throws TException { Config conf = new Config(); //NOTE 要設置Config.DRPC_THRIFT_TRANSPORT_PLUGIN屬性,否則client直接跑空指針 conf.put(Config.DRPC_THRIFT_TRANSPORT_PLUGIN,SimpleTransportPlugin.class.getName()); conf.put(Config.STORM_NIMBUS_RETRY_TIMES,3); conf.put(Config.STORM_NIMBUS_RETRY_INTERVAL,10000); conf.put(Config.STORM_NIMBUS_RETRY_INTERVAL_CEILING,10000); conf.put(Config.DRPC_MAX_BUFFER_SIZE, 104857600); // 100M DRPCClient client = new DRPCClient(conf, "192.168.99.100", 3772); System.out.println(client.execute("words", "cat dog the man")); }
DRPCClient使用的是thrift協議調用
)