storm drpc實例

本文主要演示一下storm drpc實例html

配置

version: '2'
services:
    supervisor:
        image: storm
        container_name: supervisor
        command: storm supervisor -c storm.local.hostname="192.168.99.100" -c drpc.servers='["192.168.99.100"]' -c drpc.port=3772 -c drpc.invocations.port=3773 -c drpc.http.port=3774
        depends_on:
            - nimbus
            - zookeeper
        links:
            - nimbus
            - zookeeper
        restart: always
        ports:
            - 6700:6700
            - 6701:6701
            - 6702:6702
            - 6703:6703
            - 8000:8000
    drpc:
        image: storm
        container_name: drpc
        command: storm drpc -c storm.local.hostname="192.168.99.100" -c drpc.port=3772 -c drpc.invocations.port=3773 -c drpc.http.port=3774
        depends_on:
            - nimbus
            - supervisor
            - zookeeper
        links:
            - nimbus
            - supervisor
            - zookeeper
        restart: always
        ports:
            - 3772:3772
            - 3773:3773
            - 3774:3774
  • 這裏對supervisor配置drpc.servers及drpc.port、drpc.invocations.port,好讓worker經過drpc.invocations.port去訪問drpc節點
  • 對於drpc服務,則暴露drpc.port(好讓外部的DRPCClient訪問)、drpc.invocations.port(讓worker訪問)

TridentTopology

@Test
    public void testDeployDRPCStateQuery() throws InterruptedException, TException {
        TridentTopology topology = new TridentTopology();
        FixedBatchSpout spout = new FixedBatchSpout(new Fields("sentence"), 3,
                new Values("the cow jumped over the moon"),
                new Values("the man went to the store and bought some candy"),
                new Values("four score and seven years ago"),
                new Values("how many apples can you eat"));
        spout.setCycle(true);
        TridentState wordCounts =
                topology.newStream("spout1", spout)
                        .each(new Fields("sentence"), new Split(), new Fields("word"))
                        .groupBy(new Fields("word"))
                        //NOTE transforms a Stream into a TridentState object
                        .persistentAggregate(new MemoryMapState.Factory(), new Count(), new Fields("count"))
                        .parallelismHint(6);

        topology.newDRPCStream("words")
                .each(new Fields("args"), new Split(), new Fields("word"))
                .groupBy(new Fields("word"))
                .stateQuery(wordCounts, new Fields("word"), new MapGet(), new Fields("count"))
                .each(new Fields("count"), new FilterNull())
                .aggregate(new Fields("count"), new Sum(), new Fields("sum"));

        StormTopology stormTopology = topology.build();

        //遠程提交 mvn clean package -Dmaven.test.skip=true
        //storm默認會使用System.getProperty("storm.jar")去取,若是不設定,就不能提交
        System.setProperty("storm.jar",TOPOLOGY_JAR);

        Config conf = new Config();
        conf.put(Config.NIMBUS_SEEDS,Arrays.asList("192.168.99.100")); //配置nimbus鏈接主機地址,好比:192.168.10.1
        conf.put(Config.NIMBUS_THRIFT_PORT,6627);//配置nimbus鏈接端口,默認 6627
        conf.put(Config.STORM_ZOOKEEPER_SERVERS, Arrays.asList("192.168.99.100")); //配置zookeeper鏈接主機地址,能夠使用集合存放多個
        conf.put(Config.STORM_ZOOKEEPER_PORT,2181); //配置zookeeper鏈接端口,默認2181

        StormSubmitter.submitTopology("DRPCStateQuery", conf, stormTopology);
    }
  • 這裏newStream建立了一個TridentState,而後newDRPCStream建立了一個DRPCStream,其stateQuery指定爲前面建立的TridentState
  • 因爲TridentState把結果存儲到了MemoryMapState,於是這裏的DRPCStream經過drpc進行stateQuery

DRPCClient

@Test
    public void testLaunchDrpcClient() throws TException {
        Config conf = new Config();
        //NOTE 要設置Config.DRPC_THRIFT_TRANSPORT_PLUGIN屬性,否則client直接跑空指針
        conf.put(Config.DRPC_THRIFT_TRANSPORT_PLUGIN,SimpleTransportPlugin.class.getName());
        conf.put(Config.STORM_NIMBUS_RETRY_TIMES,3);
        conf.put(Config.STORM_NIMBUS_RETRY_INTERVAL,10000);
        conf.put(Config.STORM_NIMBUS_RETRY_INTERVAL_CEILING,10000);
        conf.put(Config.DRPC_MAX_BUFFER_SIZE, 104857600); // 100M
        DRPCClient client = new DRPCClient(conf, "192.168.99.100", 3772);
        System.out.println(client.execute("words", "cat dog the man"));
    }
  • 注意這裏的配置項不能少,不然會引起空指針
  • Config.DRPC_THRIFT_TRANSPORT_PLUGIN這裏使用的是SimpleTransportPlugin.class.getName(),雖然該類被廢棄了,不過還能夠跑通
  • 因爲使用了SimpleTransportPlugin.class,於是這裏要配置Config.DRPC_MAX_BUFFER_SIZE
  • DRPCClient配置了drpc的地址及port
  • client.execute這裏要傳入newDRPCStream指定的function名稱

小結

  • 使用drpc的時候,須要經過storm drpc啓動drpc server服務節點,另外要暴露兩個端口,一個drpc.port是供外部DRPCClient調用,一個drpc.invocations.port是給worker來訪問;drpc.http.port端口是暴露給http協議調用的(DRPCClient使用的是thrift協議調用)
  • supervisor要配置drpc.servers、drpc.invocations.port,好讓worker去訪問到drpc server
  • DRPCClient使用drpc.port指定的端口來訪問,另外client.execute這裏要傳入newDRPCStream指定的function名稱

doc

相關文章
相關標籤/搜索