storm的DRPC模式的做用是實現從遠程調用storm集羣的計算資源,而不須要鏈接到集羣的某一個節點。OK。那麼storm實現DRPC主要是使用LinearDRPCTopologyBuilder這個類。下面就先來看看一個簡單的例子,它的源碼的github上。java
import backtype.storm.Config; import backtype.storm.LocalCluster; import backtype.storm.LocalDRPC; import backtype.storm.StormSubmitter; import backtype.storm.drpc.LinearDRPCTopologyBuilder; import backtype.storm.topology.BasicOutputCollector; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.topology.base.BaseBasicBolt; import backtype.storm.tuple.Fields; import backtype.storm.tuple.Tuple; import backtype.storm.tuple.Values; public class BasicDRPCTopology { public static class ExclaimBolt extends BaseBasicBolt { //主要須要覆寫execute方法和declareoutputfields方法 @Override public void execute(Tuple tuple, BasicOutputCollector collector) { String input = tuple.getString(1); collector.emit(new Values(tuple.getValue(0), input + "!")); } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("id", "result")); } } public static void main(String[] args) throws Exception { LinearDRPCTopologyBuilder builder = new LinearDRPCTopologyBuilder("exclamation");//實現DRPC模式 builder.addBolt(new ExclaimBolt(), 3); Config conf = new Config(); if (args == null || args.length == 0) { LocalDRPC drpc = new LocalDRPC(); LocalCluster cluster = new LocalCluster(); cluster.submitTopology("drpc-demo", conf, builder.createLocalTopology(drpc)); for (String word : new String[]{ "hello", "goodbye" }) { System.out.println("Result for \"" + word + "\": " + drpc.execute("exclamation", word)); } cluster.shutdown(); drpc.shutdown(); } else { conf.setNumWorkers(3); StormSubmitter.submitTopology(args[0], conf, builder.createRemoteTopology()); } } }
這段代碼主要實現的功能是給接收到的每個輸入後面添加一個感嘆號。ok,這樣就能夠編譯提交了。git
不過在這以前須要先配置storm集羣的drpc server的ip。如圖。主要是下面的server的ip須要配置好。而且集羣的每個節點的配置文件都須要配置這項參數!github
而後便可使用storm drpc &命令啓動drpc模式。(這裏的分工是172.17.150.6爲客戶端,其他的172.17.150.7(.8,.11)爲集羣的三個節點,.11是nimbus節點。)ide
OK,那接下來就使用客戶端向集羣提交Topology。如圖。使用客戶端向集羣提交名爲exclaim的Topology。裏面設置的worker數爲3。ui
從下圖能夠看到兩個supervisor分別有一個是運行兩個worker,有一個是運行一個worker。spa
ok,下面是客戶端調用遠程資源進行計算的程序。主要是聲明DRPCClient的ip以及端口,以及指定執行的方法名和傳入的參數(client.execute("exclamation",word))。orm
運行結果以下。server
OK,整個DRPC的過程就是這樣。blog
謝謝你們!本人水平有限,請不吝指正!ip