[case43]聊聊storm的LinearDRPCTopologyBuilder

本文主要研究一下storm的LinearDRPCTopologyBuilderhtml

實例

manual drpc

@Test
    public void testManualDRPC() throws InvalidTopologyException, AuthorizationException, AlreadyAliveException {
        TopologyBuilder builder = new TopologyBuilder();
        DRPCSpout spout = new DRPCSpout("exclamation"); //Fields("args", "return-info")
        //spout爲DRPCSpout,組件id爲drpc
        builder.setSpout("drpc", spout);
        builder.setBolt("exclaim", new ManualExclaimBolt(), 3).shuffleGrouping("drpc"); //Fields("result", "return-info")
        builder.setBolt("return", new ReturnResults(), 3).shuffleGrouping("exclaim");
        SubmitHelper.submitRemote("manualDrpc",builder.createTopology());
    }
  • 這裏展現了最原始的drpc的topology的構建,開始使用DRPCSpout,結束使用ReturnResults
  • DRPCSpout的outputFields爲Fields("args", "return-info"),ReturnResults接收的fields爲Fields("result", "return-info")
  • 這裏要求自定義的ManualExclaimBolt的outputFields爲Fields爲Fields("result", "return-info"),其中return-info能夠從input中獲取,而result則會處理結果

使用LinearDRPCTopologyBuilder

@Test
    public void testBasicDRPCTopology() throws InvalidTopologyException, AuthorizationException, AlreadyAliveException {
        LinearDRPCTopologyBuilder builder = new LinearDRPCTopologyBuilder("exclamation");
        builder.addBolt(new ExclaimBolt(), 3);
        SubmitHelper.submitRemote("basicDrpc",builder.createRemoteTopology());
    }
  • LinearDRPCTopologyBuilder自動幫你構建了DRPCSpout、PrepareRequest、CoordinatedBolt、JoinResult、ReturnResults,在使用上極爲簡潔
  • 因爲構造的component上下游不一樣,於是對用戶自定義的bolt的要求爲輸入字段爲Fields("request", "args"),輸出字段爲new Fields("id", "result"),其中前者的request即爲requestId,即爲後者的id,是long型;args爲輸入參數,result爲輸出結果

LinearDRPCTopologyBuilder

storm-2.0.0/storm-client/src/jvm/org/apache/storm/drpc/LinearDRPCTopologyBuilder.javajava

public class LinearDRPCTopologyBuilder {
    String function;
    List<Component> components = new ArrayList<>();


    public LinearDRPCTopologyBuilder(String function) {
        this.function = function;
    }

    private static String boltId(int index) {
        return "bolt" + index;
    }

    public LinearDRPCInputDeclarer addBolt(IBatchBolt bolt, Number parallelism) {
        return addBolt(new BatchBoltExecutor(bolt), parallelism);
    }

    public LinearDRPCInputDeclarer addBolt(IBatchBolt bolt) {
        return addBolt(bolt, 1);
    }

    @Deprecated
    public LinearDRPCInputDeclarer addBolt(IRichBolt bolt, Number parallelism) {
        if (parallelism == null) {
            parallelism = 1;
        }
        Component component = new Component(bolt, parallelism.intValue());
        components.add(component);
        return new InputDeclarerImpl(component);
    }

    @Deprecated
    public LinearDRPCInputDeclarer addBolt(IRichBolt bolt) {
        return addBolt(bolt, null);
    }

    public LinearDRPCInputDeclarer addBolt(IBasicBolt bolt, Number parallelism) {
        return addBolt(new BasicBoltExecutor(bolt), parallelism);
    }

    public LinearDRPCInputDeclarer addBolt(IBasicBolt bolt) {
        return addBolt(bolt, null);
    }

    public StormTopology createLocalTopology(ILocalDRPC drpc) {
        return createTopology(new DRPCSpout(function, drpc));
    }

    public StormTopology createRemoteTopology() {
        return createTopology(new DRPCSpout(function));
    }

    private StormTopology createTopology(DRPCSpout spout) {
        final String SPOUT_ID = "spout";
        final String PREPARE_ID = "prepare-request";

        TopologyBuilder builder = new TopologyBuilder();
        builder.setSpout(SPOUT_ID, spout);
        builder.setBolt(PREPARE_ID, new PrepareRequest())
               .noneGrouping(SPOUT_ID);
        int i = 0;
        for (; i < components.size(); i++) {
            Component component = components.get(i);

            Map<String, SourceArgs> source = new HashMap<String, SourceArgs>();
            if (i == 1) {
                source.put(boltId(i - 1), SourceArgs.single());
            } else if (i >= 2) {
                source.put(boltId(i - 1), SourceArgs.all());
            }
            IdStreamSpec idSpec = null;
            if (i == components.size() - 1 && component.bolt instanceof FinishedCallback) {
                idSpec = IdStreamSpec.makeDetectSpec(PREPARE_ID, PrepareRequest.ID_STREAM);
            }
            BoltDeclarer declarer = builder.setBolt(
                boltId(i),
                new CoordinatedBolt(component.bolt, source, idSpec),
                component.parallelism);

            for (SharedMemory request : component.sharedMemory) {
                declarer.addSharedMemory(request);
            }

            if (!component.componentConf.isEmpty()) {
                declarer.addConfigurations(component.componentConf);
            }

            if (idSpec != null) {
                declarer.fieldsGrouping(idSpec.getGlobalStreamId().get_componentId(), PrepareRequest.ID_STREAM, new Fields("request"));
            }
            if (i == 0 && component.declarations.isEmpty()) {
                declarer.noneGrouping(PREPARE_ID, PrepareRequest.ARGS_STREAM);
            } else {
                String prevId;
                if (i == 0) {
                    prevId = PREPARE_ID;
                } else {
                    prevId = boltId(i - 1);
                }
                for (InputDeclaration declaration : component.declarations) {
                    declaration.declare(prevId, declarer);
                }
            }
            if (i > 0) {
                declarer.directGrouping(boltId(i - 1), Constants.COORDINATED_STREAM_ID);
            }
        }

        IRichBolt lastBolt = components.get(components.size() - 1).bolt;
        OutputFieldsGetter getter = new OutputFieldsGetter();
        lastBolt.declareOutputFields(getter);
        Map<String, StreamInfo> streams = getter.getFieldsDeclaration();
        if (streams.size() != 1) {
            throw new RuntimeException("Must declare exactly one stream from last bolt in LinearDRPCTopology");
        }
        String outputStream = streams.keySet().iterator().next();
        List<String> fields = streams.get(outputStream).get_output_fields();
        if (fields.size() != 2) {
            throw new RuntimeException(
                "Output stream of last component in LinearDRPCTopology must contain exactly two fields. "
                + "The first should be the request id, and the second should be the result.");
        }

        builder.setBolt(boltId(i), new JoinResult(PREPARE_ID))
               .fieldsGrouping(boltId(i - 1), outputStream, new Fields(fields.get(0)))
               .fieldsGrouping(PREPARE_ID, PrepareRequest.RETURN_STREAM, new Fields("request"));
        i++;
        builder.setBolt(boltId(i), new ReturnResults())
               .noneGrouping(boltId(i - 1));
        return builder.createTopology();
    }

    //......
}
  • 從createTopology能夠看到,構建的spout爲DRPCSpout(spout),以後是PrepareRequest(prepare-request)
  • 以後根據用戶設置的bolt,包裝構建CoordinatedBolt,若是有多個bolt的話,會對第二個及以後的bolt設置directGrouping(boltId(i - 1), Constants.COORDINATED_STREAM_ID),用emitDirect發射Fields("id", "count")
  • 構建完用戶設置的bolt以後,構建JoinResult,最後纔是ReturnResults

DRPCSpout

storm-2.0.0/storm-client/src/jvm/org/apache/storm/drpc/DRPCSpout.javagit

public class DRPCSpout extends BaseRichSpout {
    public static final Logger LOG = LoggerFactory.getLogger(DRPCSpout.class);
    //ANY CHANGE TO THIS CODE MUST BE SERIALIZABLE COMPATIBLE OR THERE WILL BE PROBLEMS
    static final long serialVersionUID = 2387848310969237877L;
    final String _function;
    final String _local_drpc_id;
    SpoutOutputCollector _collector;
    List<DRPCInvocationsClient> _clients = new ArrayList<>();
    transient LinkedList<Future<Void>> _futures = null;
    transient ExecutorService _backround = null;

    public DRPCSpout(String function) {
        _function = function;
        if (DRPCClient.isLocalOverride()) {
            _local_drpc_id = DRPCClient.getOverrideServiceId();
        } else {
            _local_drpc_id = null;
        }
    }

    //......

    @Override
    public void open(Map<String, Object> conf, TopologyContext context, SpoutOutputCollector collector) {
        _collector = collector;
        if (_local_drpc_id == null) {
            _backround = new ExtendedThreadPoolExecutor(0, Integer.MAX_VALUE,
                                                        60L, TimeUnit.SECONDS,
                                                        new SynchronousQueue<Runnable>());
            _futures = new LinkedList<>();

            int numTasks = context.getComponentTasks(context.getThisComponentId()).size();
            int index = context.getThisTaskIndex();

            int port = ObjectReader.getInt(conf.get(Config.DRPC_INVOCATIONS_PORT));
            List<String> servers = (List<String>) conf.get(Config.DRPC_SERVERS);
            if (servers == null || servers.isEmpty()) {
                throw new RuntimeException("No DRPC servers configured for topology");
            }

            if (numTasks < servers.size()) {
                for (String s : servers) {
                    _futures.add(_backround.submit(new Adder(s, port, conf)));
                }
            } else {
                int i = index % servers.size();
                _futures.add(_backround.submit(new Adder(servers.get(i), port, conf)));
            }
        }

    }

    @Override
    public void close() {
        for (DRPCInvocationsClient client : _clients) {
            client.close();
        }
    }

    @Override
    public void nextTuple() {
        if (_local_drpc_id == null) {
            int size = 0;
            synchronized (_clients) {
                size = _clients.size(); //This will only ever grow, so no need to worry about falling off the end
            }
            for (int i = 0; i < size; i++) {
                DRPCInvocationsClient client;
                synchronized (_clients) {
                    client = _clients.get(i);
                }
                if (!client.isConnected()) {
                    LOG.warn("DRPCInvocationsClient [{}:{}] is not connected.", client.getHost(), client.getPort());
                    reconnectAsync(client);
                    continue;
                }
                try {
                    DRPCRequest req = client.fetchRequest(_function);
                    if (req.get_request_id().length() > 0) {
                        Map<String, Object> returnInfo = new HashMap<>();
                        returnInfo.put("id", req.get_request_id());
                        returnInfo.put("host", client.getHost());
                        returnInfo.put("port", client.getPort());
                        _collector.emit(new Values(req.get_func_args(), JSONValue.toJSONString(returnInfo)),
                                        new DRPCMessageId(req.get_request_id(), i));
                        break;
                    }
                } catch (AuthorizationException aze) {
                    reconnectAsync(client);
                    LOG.error("Not authorized to fetch DRPC request from DRPC server", aze);
                } catch (TException e) {
                    reconnectAsync(client);
                    LOG.error("Failed to fetch DRPC request from DRPC server", e);
                } catch (Exception e) {
                    LOG.error("Failed to fetch DRPC request from DRPC server", e);
                }
            }
            checkFutures();
        } else {
            //......
        }
    }

    @Override
    public void ack(Object msgId) {
    }

    @Override
    public void fail(Object msgId) {
        DRPCMessageId did = (DRPCMessageId) msgId;
        DistributedRPCInvocations.Iface client;

        if (_local_drpc_id == null) {
            client = _clients.get(did.index);
        } else {
            client = (DistributedRPCInvocations.Iface) ServiceRegistry.getService(_local_drpc_id);
        }

        int retryCnt = 0;
        int maxRetries = 3;

        while (retryCnt < maxRetries) {
            retryCnt++;
            try {
                client.failRequest(did.id);
                break;
            } catch (AuthorizationException aze) {
                LOG.error("Not authorized to failRequest from DRPC server", aze);
                throw new RuntimeException(aze);
            } catch (TException tex) {
                if (retryCnt >= maxRetries) {
                    LOG.error("Failed to fail request", tex);
                    break;
                }
                reconnectSync((DRPCInvocationsClient) client);
            }
        }
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("args", "return-info"));
    }
    //......
}
  • open的時候準備DRPCInvocationsClient
  • nextTuple方法經過DRPCInvocationsClient.fetchRequest(_function)獲取DRPCRequest信息
  • 以後構建returnInfo而後emit數據,msgId爲DRPCMessageId,tuple爲Values(req.get_func_args(), JSONValue.toJSONString(returnInfo))
  • 這裏重寫了fail方法,對於請求失敗,進行重試,默認重試3次

PrepareRequest

storm-2.0.0/storm-client/src/jvm/org/apache/storm/drpc/PrepareRequest.javagithub

public class PrepareRequest extends BaseBasicBolt {
    public static final String ARGS_STREAM = Utils.DEFAULT_STREAM_ID;
    public static final String RETURN_STREAM = "ret";
    public static final String ID_STREAM = "id";

    Random rand;

    @Override
    public void prepare(Map<String, Object> map, TopologyContext context) {
        rand = new Random();
    }

    @Override
    public void execute(Tuple tuple, BasicOutputCollector collector) {
        String args = tuple.getString(0);
        String returnInfo = tuple.getString(1);
        long requestId = rand.nextLong();
        collector.emit(ARGS_STREAM, new Values(requestId, args));
        collector.emit(RETURN_STREAM, new Values(requestId, returnInfo));
        collector.emit(ID_STREAM, new Values(requestId));
    }

    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declareStream(ARGS_STREAM, new Fields("request", "args"));
        declarer.declareStream(RETURN_STREAM, new Fields("request", "return"));
        declarer.declareStream(ID_STREAM, new Fields("request"));
    }
}
  • PrepareRequest取出args及returnInfo,構造requestId,而後emit到ARGS_STREAM、RETURN_STREAM、ID_STREAM三個stream
  • JoinResult會接收PrepareRequest的RETURN_STREAM,第一個CoordinatedBolt會接收ARGS_STREAM

CoordinatedBolt

storm-2.0.0/storm-client/src/jvm/org/apache/storm/coordination/CoordinatedBolt.javaapache

/**
 * Coordination requires the request ids to be globally unique for awhile. This is so it doesn't get confused in the case of retries.
 */
public class CoordinatedBolt implements IRichBolt {
    
    private TimeCacheMap<Object, TrackingInfo> _tracked;

    //......

    public void execute(Tuple tuple) {
        Object id = tuple.getValue(0);
        TrackingInfo track;
        TupleType type = getTupleType(tuple);
        synchronized (_tracked) {
            track = _tracked.get(id);
            if (track == null) {
                track = new TrackingInfo();
                if (_idStreamSpec == null) {
                    track.receivedId = true;
                }
                _tracked.put(id, track);
            }
        }

        if (type == TupleType.ID) {
            synchronized (_tracked) {
                track.receivedId = true;
            }
            checkFinishId(tuple, type);
        } else if (type == TupleType.COORD) {
            int count = (Integer) tuple.getValue(1);
            synchronized (_tracked) {
                track.reportCount++;
                track.expectedTupleCount += count;
            }
            checkFinishId(tuple, type);
        } else {
            synchronized (_tracked) {
                _delegate.execute(tuple);
            }
        }
    }

    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        _delegate.declareOutputFields(declarer);
        declarer.declareStream(Constants.COORDINATED_STREAM_ID, true, new Fields("id", "count"));
    }

    //......

    public static class TrackingInfo {
        int reportCount = 0;
        int expectedTupleCount = 0;
        int receivedTuples = 0;
        boolean failed = false;
        Map<Integer, Integer> taskEmittedTuples = new HashMap<>();
        boolean receivedId = false;
        boolean finished = false;
        List<Tuple> ackTuples = new ArrayList<>();

        @Override
        public String toString() {
            return "reportCount: " + reportCount + "\n" +
                   "expectedTupleCount: " + expectedTupleCount + "\n" +
                   "receivedTuples: " + receivedTuples + "\n" +
                   "failed: " + failed + "\n" +
                   taskEmittedTuples.toString();
        }
    }
}
  • CoordinatedBolt在declareOutputFields的時候,除了調用代理bolt的declareOutputFields外,還declareStream,給Constants.COORDINATED_STREAM_ID發射Fields("id", "count")
  • execute方法首先保證每一個requestId都有一個TrackingInfo,它記錄了expectedTupleCount以及receivedTuples統計數,還有taskEmittedTuples(這裏命名有點歧義,實際上是這裏維護的是當前bolt發射給下游bolt的task的tuple數量,用於emitDirect告知下游bolt的task它應該接收到的tuple數量(具體是在checkFinishId方法中,在finished的時候發送),下游bolt接收到該統計數以後更新expectedTupleCount)
  • execute方法接收到的tuple有幾類,一類是TupleType.ID(_idStreamSpec不爲null的狀況下)、一類是TupleType.COORD(接收Fields("id", "count"),並執行checkFinishId,判斷是否應該結束)、一類是TupleType.REGULAR(正常的執行bolt的execute方法)
  • checkFinishId會判斷track.reportCount == _numSourceReports以及track.expectedTupleCount == track.receivedTuples,若是知足條件則標記track.finished = true,同時通知下游bolt它應該接收到多少數量的tuple(若是還有的話)。

JoinResult

storm-2.0.0/storm-client/src/jvm/org/apache/storm/drpc/JoinResult.javaapi

public class JoinResult extends BaseRichBolt {
    public static final Logger LOG = LoggerFactory.getLogger(JoinResult.class);

    String returnComponent;
    Map<Object, Tuple> returns = new HashMap<>();
    Map<Object, Tuple> results = new HashMap<>();
    OutputCollector _collector;

    public JoinResult(String returnComponent) {
        this.returnComponent = returnComponent;
    }

    public void prepare(Map<String, Object> map, TopologyContext context, OutputCollector collector) {
        _collector = collector;
    }

    public void execute(Tuple tuple) {
        Object requestId = tuple.getValue(0);
        if (tuple.getSourceComponent().equals(returnComponent)) {
            returns.put(requestId, tuple);
        } else {
            results.put(requestId, tuple);
        }

        if (returns.containsKey(requestId) && results.containsKey(requestId)) {
            Tuple result = results.remove(requestId);
            Tuple returner = returns.remove(requestId);
            LOG.debug(result.getValue(1).toString());
            List<Tuple> anchors = new ArrayList<>();
            anchors.add(result);
            anchors.add(returner);
            _collector.emit(anchors, new Values("" + result.getValue(1), returner.getValue(1)));
            _collector.ack(result);
            _collector.ack(returner);
        }
    }

    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("result", "return-info"));
    }
}
  • 若是tuple是PrepareRequest發送過來的,則將tuple放入returns,不然放入results
  • 以後判斷returns及results兩個map是否同時都有該requestId,若是有表示匹配出告終果,則往下游emit數據
  • emit的第一個字段爲result,第二個爲returnInfo

ReturnResults

storm-2.0.0/storm-client/src/jvm/org/apache/storm/drpc/ReturnResults.javadom

public class ReturnResults extends BaseRichBolt {
    public static final Logger LOG = LoggerFactory.getLogger(ReturnResults.class);
    //ANY CHANGE TO THIS CODE MUST BE SERIALIZABLE COMPATIBLE OR THERE WILL BE PROBLEMS
    static final long serialVersionUID = -774882142710631591L;
    OutputCollector _collector;
    boolean local;
    Map<String, Object> _conf;
    Map<List, DRPCInvocationsClient> _clients = new HashMap<List, DRPCInvocationsClient>();

    @Override
    public void prepare(Map<String, Object> topoConf, TopologyContext context, OutputCollector collector) {
        _conf = topoConf;
        _collector = collector;
        local = topoConf.get(Config.STORM_CLUSTER_MODE).equals("local");
    }

    @Override
    public void execute(Tuple input) {
        String result = (String) input.getValue(0);
        String returnInfo = (String) input.getValue(1);
        if (returnInfo != null) {
            Map<String, Object> retMap;
            try {
                retMap = (Map<String, Object>) JSONValue.parseWithException(returnInfo);
            } catch (ParseException e) {
                LOG.error("Parseing returnInfo failed", e);
                _collector.fail(input);
                return;
            }
            final String host = (String) retMap.get("host");
            final int port = ObjectReader.getInt(retMap.get("port"));
            String id = (String) retMap.get("id");
            DistributedRPCInvocations.Iface client;
            if (local) {
                client = (DistributedRPCInvocations.Iface) ServiceRegistry.getService(host);
            } else {
                List server = new ArrayList() {{
                    add(host);
                    add(port);
                }};

                if (!_clients.containsKey(server)) {
                    try {
                        _clients.put(server, new DRPCInvocationsClient(_conf, host, port));
                    } catch (TTransportException ex) {
                        throw new RuntimeException(ex);
                    }
                }
                client = _clients.get(server);
            }


            int retryCnt = 0;
            int maxRetries = 3;
            while (retryCnt < maxRetries) {
                retryCnt++;
                try {
                    client.result(id, result);
                    _collector.ack(input);
                    break;
                } catch (AuthorizationException aze) {
                    LOG.error("Not authorized to return results to DRPC server", aze);
                    _collector.fail(input);
                    throw new RuntimeException(aze);
                } catch (TException tex) {
                    if (retryCnt >= maxRetries) {
                        LOG.error("Failed to return results to DRPC server", tex);
                        _collector.fail(input);
                    }
                    reconnectClient((DRPCInvocationsClient) client);
                }
            }
        }
    }

    private void reconnectClient(DRPCInvocationsClient client) {
        if (client instanceof DRPCInvocationsClient) {
            try {
                LOG.info("reconnecting... ");
                client.reconnectClient(); //Blocking call
            } catch (TException e2) {
                LOG.error("Failed to connect to DRPC server", e2);
            }
        }
    }

    @Override
    public void cleanup() {
        for (DRPCInvocationsClient c : _clients.values()) {
            c.close();
        }
    }

    public void declareOutputFields(OutputFieldsDeclarer declarer) {
    }
}
  • ReturnResults主要是將結果發送給請求的DRPCInvocationsClient
  • returnInfo裏頭包含了要將結果發送到的目標host、port,根據host、port構造DRPCInvocationsClient
  • 以後調用DRPCInvocationsClient.result(id, result)方法將結果返回,默認重試3次,若是是AuthorizationException則直接fail,若是成功則ack

小結

  • LinearDRPCTopologyBuilder在v0.9.1-incubating版本的時候被標記爲@Deprecated(2012年月),當時認爲Trident的newDRPCStream的替代,不過這樣的話要用drpc就得使用Trident,因此後來(2018年4月)移除掉該標誌,在2.0.0, 1.1.3, 1.0.7, 1.2.2版本均已經不是廢棄標記
  • LinearDRPCTopologyBuilder包裝組合了DRPCSpout、PrepareRequest、CoordinatedBolt、JoinResult、ReturnResults,對外暴露簡單的api無需用戶在構造這些componentjvm

    • DRPCSpout主要是構造args以及returnInfo信息;
    • PrepareRequest將數據分流,發往ARGS_STREAM、RETURN_STREAM、ID_STREAM;
    • CoordinatedBolt主要是保障這些bolt之間的tuple被完整傳遞及ack;
    • JoinResult主要是匹配requestId及結果,將請求與響應的數據匹配上,而後發送到下游;
    • ReturnResults根據returnInfo將數據返回給Client端
  • 使用LinearDRPCTopologyBuilder,對於第一個bolt,其輸入爲Fields("request", "args");對最後一個bolt要求輸出字段爲new Fields("id", "result");對於非最後一個bolt要求輸出字段的第一個字段爲id,即requestId,方便CoordinatedBolt進行追蹤統計,確認bolt是否成功接收上游bolt發送的全部tuple。

doc

相關文章
相關標籤/搜索