聊聊storm trident的coordinator

本文主要研究一下storm trident的coordinatorhtml

實例

代碼示例

@Test
    public void testDebugTopologyBuild(){
        FixedBatchSpout spout = new FixedBatchSpout(new Fields("user", "score"), 3,
                new Values("nickt1", 4),
                new Values("nickt2", 7),
                new Values("nickt3", 8),
                new Values("nickt4", 9),
                new Values("nickt5", 7),
                new Values("nickt6", 11),
                new Values("nickt7", 5)
        );
        spout.setCycle(false);
        TridentTopology topology = new TridentTopology();
        Stream stream1 = topology.newStream("spout1",spout)
                .each(new Fields("user", "score"), new BaseFunction() {
                    @Override
                    public void execute(TridentTuple tuple, TridentCollector collector) {
                        System.out.println("tuple:"+tuple);
                    }
                },new Fields());

        topology.build();
    }
  • 這裏使用的spout爲FixedBatchSpout,它是IBatchSpout類型

拓撲圖

MasterBatchCoordinator

storm-1.2.2/storm-core/src/jvm/org/apache/storm/trident/topology/MasterBatchCoordinator.javajava

public class MasterBatchCoordinator extends BaseRichSpout { 
    public static final Logger LOG = LoggerFactory.getLogger(MasterBatchCoordinator.class);
    
    public static final long INIT_TXID = 1L;
    
    
    public static final String BATCH_STREAM_ID = "$batch";
    public static final String COMMIT_STREAM_ID = "$commit";
    public static final String SUCCESS_STREAM_ID = "$success";

    private static final String CURRENT_TX = "currtx";
    private static final String CURRENT_ATTEMPTS = "currattempts";
    
    private List<TransactionalState> _states = new ArrayList();
    
    TreeMap<Long, TransactionStatus> _activeTx = new TreeMap<Long, TransactionStatus>();
    TreeMap<Long, Integer> _attemptIds;
    
    private SpoutOutputCollector _collector;
    Long _currTransaction;
    int _maxTransactionActive;
    
    List<ITridentSpout.BatchCoordinator> _coordinators = new ArrayList();
    
    
    List<String> _managedSpoutIds;
    List<ITridentSpout> _spouts;
    WindowedTimeThrottler _throttler;
    
    boolean _active = true;
    
    public MasterBatchCoordinator(List<String> spoutIds, List<ITridentSpout> spouts) {
        if(spoutIds.isEmpty()) {
            throw new IllegalArgumentException("Must manage at least one spout");
        }
        _managedSpoutIds = spoutIds;
        _spouts = spouts;
        LOG.debug("Created {}", this);
    }

    public List<String> getManagedSpoutIds(){
        return _managedSpoutIds;
    }

    @Override
    public void activate() {
        _active = true;
    }

    @Override
    public void deactivate() {
        _active = false;
    }
        
    @Override
    public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
        _throttler = new WindowedTimeThrottler((Number)conf.get(Config.TOPOLOGY_TRIDENT_BATCH_EMIT_INTERVAL_MILLIS), 1);
        for(String spoutId: _managedSpoutIds) {
            _states.add(TransactionalState.newCoordinatorState(conf, spoutId));
        }
        _currTransaction = getStoredCurrTransaction();

        _collector = collector;
        Number active = (Number) conf.get(Config.TOPOLOGY_MAX_SPOUT_PENDING);
        if(active==null) {
            _maxTransactionActive = 1;
        } else {
            _maxTransactionActive = active.intValue();
        }
        _attemptIds = getStoredCurrAttempts(_currTransaction, _maxTransactionActive);

        
        for(int i=0; i<_spouts.size(); i++) {
            String txId = _managedSpoutIds.get(i);
            _coordinators.add(_spouts.get(i).getCoordinator(txId, conf, context));
        }
        LOG.debug("Opened {}", this);
    }

    @Override
    public void close() {
        for(TransactionalState state: _states) {
            state.close();
        }
        LOG.debug("Closed {}", this);
    }
    
    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        // in partitioned example, in case an emitter task receives a later transaction than it's emitted so far,
        // when it sees the earlier txid it should know to emit nothing
        declarer.declareStream(BATCH_STREAM_ID, new Fields("tx"));
        declarer.declareStream(COMMIT_STREAM_ID, new Fields("tx"));
        declarer.declareStream(SUCCESS_STREAM_ID, new Fields("tx"));
    }

    @Override
    public Map<String, Object> getComponentConfiguration() {
        Config ret = new Config();
        ret.setMaxTaskParallelism(1);
        ret.registerSerialization(TransactionAttempt.class);
        return ret;
    }

    //......
}
  • prepare方法首先從Config.TOPOLOGY_TRIDENT_BATCH_EMIT_INTERVAL_MILLIS(topology.trident.batch.emit.interval.millis,在defaults.yaml默認爲500)讀取觸發batch的頻率配置,而後建立WindowedTimeThrottler,其maxAmt值爲1
  • 這裏使用TransactionalState在zookeeper上維護transactional狀態
  • 以後讀取Config.TOPOLOGY_MAX_SPOUT_PENDING(topology.max.spout.pending,在defaults.yaml中默認爲null)設置_maxTransactionActive,若是爲null,則設置爲1

MasterBatchCoordinator.nextTuple

storm-1.2.2/storm-core/src/jvm/org/apache/storm/trident/topology/MasterBatchCoordinator.javaapache

@Override
    public void nextTuple() {
        sync();
    }

    private void sync() {
        // note that sometimes the tuples active may be less than max_spout_pending, e.g.
        // max_spout_pending = 3
        // tx 1, 2, 3 active, tx 2 is acked. there won't be a commit for tx 2 (because tx 1 isn't committed yet),
        // and there won't be a batch for tx 4 because there's max_spout_pending tx active
        TransactionStatus maybeCommit = _activeTx.get(_currTransaction);
        if(maybeCommit!=null && maybeCommit.status == AttemptStatus.PROCESSED) {
            maybeCommit.status = AttemptStatus.COMMITTING;
            _collector.emit(COMMIT_STREAM_ID, new Values(maybeCommit.attempt), maybeCommit.attempt);
            LOG.debug("Emitted on [stream = {}], [tx_status = {}], [{}]", COMMIT_STREAM_ID, maybeCommit, this);
        }
        
        if(_active) {
            if(_activeTx.size() < _maxTransactionActive) {
                Long curr = _currTransaction;
                for(int i=0; i<_maxTransactionActive; i++) {
                    if(!_activeTx.containsKey(curr) && isReady(curr)) {
                        // by using a monotonically increasing attempt id, downstream tasks
                        // can be memory efficient by clearing out state for old attempts
                        // as soon as they see a higher attempt id for a transaction
                        Integer attemptId = _attemptIds.get(curr);
                        if(attemptId==null) {
                            attemptId = 0;
                        } else {
                            attemptId++;
                        }
                        _attemptIds.put(curr, attemptId);
                        for(TransactionalState state: _states) {
                            state.setData(CURRENT_ATTEMPTS, _attemptIds);
                        }
                        
                        TransactionAttempt attempt = new TransactionAttempt(curr, attemptId);
                        final TransactionStatus newTransactionStatus = new TransactionStatus(attempt);
                        _activeTx.put(curr, newTransactionStatus);
                        _collector.emit(BATCH_STREAM_ID, new Values(attempt), attempt);
                        LOG.debug("Emitted on [stream = {}], [tx_attempt = {}], [tx_status = {}], [{}]", BATCH_STREAM_ID, attempt, newTransactionStatus, this);
                        _throttler.markEvent();
                    }
                    curr = nextTransactionId(curr);
                }
            }
        }
    }
  • nextTuple就是調用sync方法,該方法在ack及fail中均有調用;sync方法首先根據事務狀態,若是須要提交,則會往MasterBatchCoordinator.COMMIT_STREAM_ID($commit)發送tuple;以後根據_maxTransactionActive以及WindowedTimeThrottler限制,符合要求才啓動新的TransactionAttempt,往MasterBatchCoordinator.BATCH_STREAM_ID($batch)發送tuple,同時對WindowedTimeThrottler標記下windowEvent數量

MasterBatchCoordinator.ack

storm-1.2.2/storm-core/src/jvm/org/apache/storm/trident/topology/MasterBatchCoordinator.javasegmentfault

@Override
    public void ack(Object msgId) {
        TransactionAttempt tx = (TransactionAttempt) msgId;
        TransactionStatus status = _activeTx.get(tx.getTransactionId());
        LOG.debug("Ack. [tx_attempt = {}], [tx_status = {}], [{}]", tx, status, this);
        if(status!=null && tx.equals(status.attempt)) {
            if(status.status==AttemptStatus.PROCESSING) {
                status.status = AttemptStatus.PROCESSED;
                LOG.debug("Changed status. [tx_attempt = {}] [tx_status = {}]", tx, status);
            } else if(status.status==AttemptStatus.COMMITTING) {
                _activeTx.remove(tx.getTransactionId());
                _attemptIds.remove(tx.getTransactionId());
                _collector.emit(SUCCESS_STREAM_ID, new Values(tx));
                _currTransaction = nextTransactionId(tx.getTransactionId());
                for(TransactionalState state: _states) {
                    state.setData(CURRENT_TX, _currTransaction);                    
                }
                LOG.debug("Emitted on [stream = {}], [tx_attempt = {}], [tx_status = {}], [{}]", SUCCESS_STREAM_ID, tx, status, this);
            }
            sync();
        }
    }
  • ack主要是根據當前事務狀態進行不一樣操做,若是以前是AttemptStatus.PROCESSING狀態,則更新爲AttemptStatus.PROCESSED;若是以前是AttemptStatus.COMMITTING,則移除當前事務,而後往MasterBatchCoordinator.SUCCESS_STREAM_ID($success)發送tuple,更新_currTransaction爲nextTransactionId;最後再調用sync觸發新的TransactionAttempt

MasterBatchCoordinator.fail

storm-1.2.2/storm-core/src/jvm/org/apache/storm/trident/topology/MasterBatchCoordinator.java緩存

@Override
    public void fail(Object msgId) {
        TransactionAttempt tx = (TransactionAttempt) msgId;
        TransactionStatus stored = _activeTx.remove(tx.getTransactionId());
        LOG.debug("Fail. [tx_attempt = {}], [tx_status = {}], [{}]", tx, stored, this);
        if(stored!=null && tx.equals(stored.attempt)) {
            _activeTx.tailMap(tx.getTransactionId()).clear();
            sync();
        }
    }
  • fail方法將當前事務從_activeTx中移除,而後清空_activeTx中txId大於這個失敗txId的數據,最後再調用sync判斷是否該觸發新的TransactionAttempt(注意這裏沒有變動_currTransaction,於是sync方法觸發新的TransactionAttempt的_txid仍是當前這個失敗的_currTransaction)

TridentSpoutCoordinator

storm-1.2.2/storm-core/src/jvm/org/apache/storm/trident/spout/TridentSpoutCoordinator.javaapp

public class TridentSpoutCoordinator implements IBasicBolt {
    public static final Logger LOG = LoggerFactory.getLogger(TridentSpoutCoordinator.class);
    private static final String META_DIR = "meta";

    ITridentSpout<Object> _spout;
    ITridentSpout.BatchCoordinator<Object> _coord;
    RotatingTransactionalState _state;
    TransactionalState _underlyingState;
    String _id;

    
    public TridentSpoutCoordinator(String id, ITridentSpout<Object> spout) {
        _spout = spout;
        _id = id;
    }
    
    @Override
    public void prepare(Map conf, TopologyContext context) {
        _coord = _spout.getCoordinator(_id, conf, context);
        _underlyingState = TransactionalState.newCoordinatorState(conf, _id);
        _state = new RotatingTransactionalState(_underlyingState, META_DIR);
    }

    @Override
    public void execute(Tuple tuple, BasicOutputCollector collector) {
        TransactionAttempt attempt = (TransactionAttempt) tuple.getValue(0);

        if(tuple.getSourceStreamId().equals(MasterBatchCoordinator.SUCCESS_STREAM_ID)) {
            _state.cleanupBefore(attempt.getTransactionId());
            _coord.success(attempt.getTransactionId());
        } else {
            long txid = attempt.getTransactionId();
            Object prevMeta = _state.getPreviousState(txid);
            Object meta = _coord.initializeTransaction(txid, prevMeta, _state.getState(txid));
            _state.overrideState(txid, meta);
            collector.emit(MasterBatchCoordinator.BATCH_STREAM_ID, new Values(attempt, meta));
        }
                
    }

    @Override
    public void cleanup() {
        _coord.close();
        _underlyingState.close();
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declareStream(MasterBatchCoordinator.BATCH_STREAM_ID, new Fields("tx", "metadata"));
    }

    @Override
    public Map<String, Object> getComponentConfiguration() {
        Config ret = new Config();
        ret.setMaxTaskParallelism(1);
        return ret;
    }   
}
  • TridentSpoutCoordinator的nextTuple根據streamId分別作不一樣的處理
  • 若是是MasterBatchCoordinator.SUCCESS_STREAM_ID($success)則表示master那邊接收到了ack已經成功了,而後coordinator就清除該txId以前的數據,而後回調ITridentSpout.BatchCoordinator的success方法
  • 若是是MasterBatchCoordinator.BATCH_STREAM_ID($batch)則要啓動新的TransactionAttempt,則往MasterBatchCoordinator.BATCH_STREAM_ID($batch)發送tuple,該tuple會被下游的bolt接收(在本實例就是使用TridentSpoutExecutor包裝了用戶spout的TridentBoltExecutor)

TridentBoltExecutor

storm-1.2.2/storm-core/src/jvm/org/apache/storm/trident/topology/TridentBoltExecutor.javaless

public class TridentBoltExecutor implements IRichBolt {
    public static final String COORD_STREAM_PREFIX = "$coord-";
    
    public static String COORD_STREAM(String batch) {
        return COORD_STREAM_PREFIX + batch;
    }

    RotatingMap<Object, TrackedBatch> _batches;

    @Override
    public void prepare(Map conf, TopologyContext context, OutputCollector collector) {        
        _messageTimeoutMs = context.maxTopologyMessageTimeout() * 1000L;
        _lastRotate = System.currentTimeMillis();
        _batches = new RotatingMap<>(2);
        _context = context;
        _collector = collector;
        _coordCollector = new CoordinatedOutputCollector(collector);
        _coordOutputCollector = new BatchOutputCollectorImpl(new OutputCollector(_coordCollector));
                
        _coordConditions = (Map) context.getExecutorData("__coordConditions");
        if(_coordConditions==null) {
            _coordConditions = new HashMap<>();
            for(String batchGroup: _coordSpecs.keySet()) {
                CoordSpec spec = _coordSpecs.get(batchGroup);
                CoordCondition cond = new CoordCondition();
                cond.commitStream = spec.commitStream;
                cond.expectedTaskReports = 0;
                for(String comp: spec.coords.keySet()) {
                    CoordType ct = spec.coords.get(comp);
                    if(ct.equals(CoordType.single())) {
                        cond.expectedTaskReports+=1;
                    } else {
                        cond.expectedTaskReports+=context.getComponentTasks(comp).size();
                    }
                }
                cond.targetTasks = new HashSet<>();
                for(String component: Utils.get(context.getThisTargets(),
                                        COORD_STREAM(batchGroup),
                                        new HashMap<String, Grouping>()).keySet()) {
                    cond.targetTasks.addAll(context.getComponentTasks(component));
                }
                _coordConditions.put(batchGroup, cond);
            }
            context.setExecutorData("_coordConditions", _coordConditions);
        }
        _bolt.prepare(conf, context, _coordOutputCollector);
    }

    //......

    @Override
    public void cleanup() {
        _bolt.cleanup();
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        _bolt.declareOutputFields(declarer);
        for(String batchGroup: _coordSpecs.keySet()) {
            declarer.declareStream(COORD_STREAM(batchGroup), true, new Fields("id", "count"));
        }
    }

    @Override
    public Map<String, Object> getComponentConfiguration() {
        Map<String, Object> ret = _bolt.getComponentConfiguration();
        if(ret==null) ret = new HashMap<>();
        ret.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, 5);
        // TODO: Need to be able to set the tick tuple time to the message timeout, ideally without parameterization
        return ret;
    }
}
  • prepare的時候,先建立了CoordinatedOutputCollector,以後用OutputCollector包裝,再最後包裝爲BatchOutputCollectorImpl,調用ITridentBatchBolt.prepare方法,ITridentBatchBolt這裏頭使用的實現類爲TridentSpoutExecutor
  • prepare初始化了RotatingMap<Object, TrackedBatch> _batches = new RotatingMap<>(2);
  • prepare主要作的是構建CoordCondition,這裏主要是計算expectedTaskReports以及targetTasks

TridentBoltExecutor.execute

storm-1.2.2/storm-core/src/jvm/org/apache/storm/trident/topology/TridentBoltExecutor.javajvm

@Override
    public void execute(Tuple tuple) {
        if(TupleUtils.isTick(tuple)) {
            long now = System.currentTimeMillis();
            if(now - _lastRotate > _messageTimeoutMs) {
                _batches.rotate();
                _lastRotate = now;
            }
            return;
        }
        String batchGroup = _batchGroupIds.get(tuple.getSourceGlobalStreamId());
        if(batchGroup==null) {
            // this is so we can do things like have simple DRPC that doesn't need to use batch processing
            _coordCollector.setCurrBatch(null);
            _bolt.execute(null, tuple);
            _collector.ack(tuple);
            return;
        }
        IBatchID id = (IBatchID) tuple.getValue(0);
        //get transaction id
        //if it already exists and attempt id is greater than the attempt there
        
        
        TrackedBatch tracked = (TrackedBatch) _batches.get(id.getId());
//        if(_batches.size() > 10 && _context.getThisTaskIndex() == 0) {
//            System.out.println("Received in " + _context.getThisComponentId() + " " + _context.getThisTaskIndex()
//                    + " (" + _batches.size() + ")" +
//                    "\ntuple: " + tuple +
//                    "\nwith tracked " + tracked +
//                    "\nwith id " + id + 
//                    "\nwith group " + batchGroup
//                    + "\n");
//            
//        }
        //System.out.println("Num tracked: " + _batches.size() + " " + _context.getThisComponentId() + " " + _context.getThisTaskIndex());
        
        // this code here ensures that only one attempt is ever tracked for a batch, so when
        // failures happen you don't get an explosion in memory usage in the tasks
        if(tracked!=null) {
            if(id.getAttemptId() > tracked.attemptId) {
                _batches.remove(id.getId());
                tracked = null;
            } else if(id.getAttemptId() < tracked.attemptId) {
                // no reason to try to execute a previous attempt than we've already seen
                return;
            }
        }
        
        if(tracked==null) {
            tracked = new TrackedBatch(new BatchInfo(batchGroup, id, _bolt.initBatchState(batchGroup, id)), _coordConditions.get(batchGroup), id.getAttemptId());
            _batches.put(id.getId(), tracked);
        }
        _coordCollector.setCurrBatch(tracked);
        
        //System.out.println("TRACKED: " + tracked + " " + tuple);
        
        TupleType t = getTupleType(tuple, tracked);
        if(t==TupleType.COMMIT) {
            tracked.receivedCommit = true;
            checkFinish(tracked, tuple, t);
        } else if(t==TupleType.COORD) {
            int count = tuple.getInteger(1);
            tracked.reportedTasks++;
            tracked.expectedTupleCount+=count;
            checkFinish(tracked, tuple, t);
        } else {
            tracked.receivedTuples++;
            boolean success = true;
            try {
                _bolt.execute(tracked.info, tuple);
                if(tracked.condition.expectedTaskReports==0) {
                    success = finishBatch(tracked, tuple);
                }
            } catch(FailedException e) {
                failBatch(tracked, e);
            }
            if(success) {
                _collector.ack(tuple);                   
            } else {
                _collector.fail(tuple);
            }
        }
        _coordCollector.setCurrBatch(null);
    }

    private TupleType getTupleType(Tuple tuple, TrackedBatch batch) {
        CoordCondition cond = batch.condition;
        if(cond.commitStream!=null
                && tuple.getSourceGlobalStreamId().equals(cond.commitStream)) {
            return TupleType.COMMIT;
        } else if(cond.expectedTaskReports > 0
                && tuple.getSourceStreamId().startsWith(COORD_STREAM_PREFIX)) {
            return TupleType.COORD;
        } else {
            return TupleType.REGULAR;
        }
    }

    private void failBatch(TrackedBatch tracked, FailedException e) {
        if(e!=null && e instanceof ReportedFailedException) {
            _collector.reportError(e);
        }
        tracked.failed = true;
        if(tracked.delayedAck!=null) {
            _collector.fail(tracked.delayedAck);
            tracked.delayedAck = null;
        }
    }
  • TridentBoltExecutor的execute方法首先判斷是不是tickTuple,若是是判斷距離_lastRotate的時間(prepare的時候初始化爲當時的時間)是否超過_messageTimeoutMs,若是是則進行_batches.rotate()操做;tickTuple的發射頻率爲Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS(topology.tick.tuple.freq.secs),在TridentBoltExecutor中它被設置爲5秒;_messageTimeoutMs爲context.maxTopologyMessageTimeout() * 1000L,它從整個topology的component的Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS(topology.message.timeout.secs,defaults.yaml中默認爲30)最大值*1000
  • _batches按TransactionAttempt的txId來存儲TrackedBatch信息,若是沒有則建立一個新的TrackedBatch;建立TrackedBatch時,會回調_bolt的initBatchState方法
  • 以後判斷tuple的類型,這裏分爲TupleType.COMMIT、TupleType.COORD、TupleType.REGULAR;若是是TupleType.COMMIT類型,則設置tracked.receivedCommit爲true,而後調用checkFinish方法;若是是TupleType.COORD類型,則更新reportedTasks及expectedTupleCount計數,再調用checkFinish方法;若是是TupleType.REGULAR類型(coordinator發送過來的batch信息),則更新receivedTuples計數,而後調用_bolt.execute方法(這裏的_bolt爲TridentSpoutExecutor),對於tracked.condition.expectedTaskReports==0的則立馬調用finishBatch,將該batch從_batches中移除;若是有FailedException則直接failBatch上報error信息,以後對tuple進行ack或者fail;若是下游是each操做,一個batch中若是是部分拋出FailedException異常,則須要等到全部batch中的tuple執行完,等到TupleType.COORD觸發檢測checkFinish,這個時候才能fail通知到master,也就是有一些滯後性,好比這個batch中有3個tuple,第二個tuple拋出FailedException,還會繼續執行第三個tuple,最後該batch的tuple都處理完了,才收到TupleType.COORD觸發檢測checkFinish。

TridentBoltExecutor.checkFinish

storm-1.2.2/storm-core/src/jvm/org/apache/storm/trident/topology/TridentBoltExecutor.javaide

private void checkFinish(TrackedBatch tracked, Tuple tuple, TupleType type) {
        if(tracked.failed) {
            failBatch(tracked);
            _collector.fail(tuple);
            return;
        }
        CoordCondition cond = tracked.condition;
        boolean delayed = tracked.delayedAck==null &&
                              (cond.commitStream!=null && type==TupleType.COMMIT
                               || cond.commitStream==null);
        if(delayed) {
            tracked.delayedAck = tuple;
        }
        boolean failed = false;
        if(tracked.receivedCommit && tracked.reportedTasks == cond.expectedTaskReports) {
            if(tracked.receivedTuples == tracked.expectedTupleCount) {
                finishBatch(tracked, tuple);                
            } else {
                //TODO: add logging that not all tuples were received
                failBatch(tracked);
                _collector.fail(tuple);
                failed = true;
            }
        }
        
        if(!delayed && !failed) {
            _collector.ack(tuple);
        }
        
    }

    private void failBatch(TrackedBatch tracked) {
        failBatch(tracked, null);
    }

    private void failBatch(TrackedBatch tracked, FailedException e) {
        if(e!=null && e instanceof ReportedFailedException) {
            _collector.reportError(e);
        }
        tracked.failed = true;
        if(tracked.delayedAck!=null) {
            _collector.fail(tracked.delayedAck);
            tracked.delayedAck = null;
        }
    }
  • TridentBoltExecutor在execute的時候,在tuple是TupleType.COMMIT以及TupleType.COORD的時候都會調用checkFinish
  • 一旦_bolt.execute(tracked.info, tuple)方法拋出FailedException,則會調用failBatch,它會標記tracked.failed爲true
  • checkFinish在發現tracked.failed爲true的時候,會調用_collector.fail(tuple),而後回調MasterBatchCoordinator的fail方法

TridentSpoutExecutor

storm-1.2.2/storm-core/src/jvm/org/apache/storm/trident/spout/TridentSpoutExecutor.javaui

public class TridentSpoutExecutor implements ITridentBatchBolt {
    public static final String ID_FIELD = "$tx";
    
    public static final Logger LOG = LoggerFactory.getLogger(TridentSpoutExecutor.class);

    AddIdCollector _collector;
    ITridentSpout<Object> _spout;
    ITridentSpout.Emitter<Object> _emitter;
    String _streamName;
    String _txStateId;
    
    TreeMap<Long, TransactionAttempt> _activeBatches = new TreeMap<>();

    public TridentSpoutExecutor(String txStateId, String streamName, ITridentSpout<Object> spout) {
        _txStateId = txStateId;
        _spout = spout;
        _streamName = streamName;
    }
    
    @Override
    public void prepare(Map conf, TopologyContext context, BatchOutputCollector collector) {
        _emitter = _spout.getEmitter(_txStateId, conf, context);
        _collector = new AddIdCollector(_streamName, collector);
    }

    @Override
    public void execute(BatchInfo info, Tuple input) {
        // there won't be a BatchInfo for the success stream
        TransactionAttempt attempt = (TransactionAttempt) input.getValue(0);
        if(input.getSourceStreamId().equals(MasterBatchCoordinator.COMMIT_STREAM_ID)) {
            if(attempt.equals(_activeBatches.get(attempt.getTransactionId()))) {
                ((ICommitterTridentSpout.Emitter) _emitter).commit(attempt);
                _activeBatches.remove(attempt.getTransactionId());
            } else {
                 throw new FailedException("Received commit for different transaction attempt");
            }
        } else if(input.getSourceStreamId().equals(MasterBatchCoordinator.SUCCESS_STREAM_ID)) {
            // valid to delete before what's been committed since 
            // those batches will never be accessed again
            _activeBatches.headMap(attempt.getTransactionId()).clear();
            _emitter.success(attempt);
        } else {            
            _collector.setBatch(info.batchId);
            _emitter.emitBatch(attempt, input.getValue(1), _collector);
            _activeBatches.put(attempt.getTransactionId(), attempt);
        }
    }

    @Override
    public void cleanup() {
        _emitter.close();
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        List<String> fields = new ArrayList<>(_spout.getOutputFields().toList());
        fields.add(0, ID_FIELD);
        declarer.declareStream(_streamName, new Fields(fields));
    }

    @Override
    public Map<String, Object> getComponentConfiguration() {
        return _spout.getComponentConfiguration();
    }

    @Override
    public void finishBatch(BatchInfo batchInfo) {
    }

    @Override
    public Object initBatchState(String batchGroup, Object batchId) {
        return null;
    }
}
  • TridentSpoutExecutor使用的BatchOutputCollector爲TridentBoltExecutor在prepare方法構造的,通過幾層包裝,先是CoordinatedOutputCollector,而後是OutputCollector,最後是BatchOutputCollectorImpl;這裏最主要的是CoordinatedOutputCollector包裝,它維護每一個taskId發出的tuple的數量;而在這個executor的prepare方法裏頭,該collector又被包裝爲AddIdCollector,主要是添加了batchId信息(即TransactionAttempt信息)
  • TridentSpoutExecutor的ITridentSpout就是包裝了用戶設置的原始spout(IBatchSpout類型)的BatchSpoutExecutor(假設原始spout是IBatchSpout類型的,於是會經過BatchSpoutExecutor包裝爲ITridentSpout類型),其execute方法根據不一樣stream類型進行不一樣處理,若是是master發過來的MasterBatchCoordinator.COMMIT_STREAM_ID($commit)則調用emitter的commit方法提交當前TransactionAttempt(本文的實例沒有commit信息),而後將該tx從_activeBatches中移除;若是是master發過來的MasterBatchCoordinator.SUCCESS_STREAM_ID($success)則先把_activeBatches中txId小於該txId的TransactionAttempt移除,而後調用emitter的success方法,標記TransactionAttempt成功,該方法回調原始spout(IBatchSpout類型)的ack方法
  • 非MasterBatchCoordinator.COMMIT_STREAM_ID($commit)及MasterBatchCoordinator.SUCCESS_STREAM_ID($success)類型的tuple,則是啓動batch的消息,這裏設置batchId,而後調用emitter的emitBatch進行數據發送(這裏傳遞的batchId就是TransactionAttempt的txId),同時將該TransactionAttempt放入_activeBatches中(這裏的batch至關於TransactionAttempt)

FixedBatchSpout

storm-1.2.2/storm-core/src/jvm/org/apache/storm/trident/testing/FixedBatchSpout.java

public class FixedBatchSpout implements IBatchSpout {

    Fields fields;
    List<Object>[] outputs;
    int maxBatchSize;
    HashMap<Long, List<List<Object>>> batches = new HashMap<Long, List<List<Object>>>();
    
    public FixedBatchSpout(Fields fields, int maxBatchSize, List<Object>... outputs) {
        this.fields = fields;
        this.outputs = outputs;
        this.maxBatchSize = maxBatchSize;
    }
    
    int index = 0;
    boolean cycle = false;
    
    public void setCycle(boolean cycle) {
        this.cycle = cycle;
    }
    
    @Override
    public void open(Map conf, TopologyContext context) {
        index = 0;
    }

    @Override
    public void emitBatch(long batchId, TridentCollector collector) {
        List<List<Object>> batch = this.batches.get(batchId);
        if(batch == null){
            batch = new ArrayList<List<Object>>();
            if(index>=outputs.length && cycle) {
                index = 0;
            }
            for(int i=0; index < outputs.length && i < maxBatchSize; index++, i++) {
                batch.add(outputs[index]);
            }
            this.batches.put(batchId, batch);
        }
        for(List<Object> list : batch){
            collector.emit(list);
        }
    }

    @Override
    public void ack(long batchId) {
        this.batches.remove(batchId);
    }

    @Override
    public void close() {
    }

    @Override
    public Map<String, Object> getComponentConfiguration() {
        Config conf = new Config();
        conf.setMaxTaskParallelism(1);
        return conf;
    }

    @Override
    public Fields getOutputFields() {
        return fields;
    }
    
}
  • 用戶使用的spout是IBatchSpout類型,這裏緩存了每一個batchId對應的tuple數據,實現的是transactional spout的語義

TridentTopology.newStream

storm-1.2.2/storm-core/src/jvm/org/apache/storm/trident/TridentTopology.java

public Stream newStream(String txId, IRichSpout spout) {
        return newStream(txId, new RichSpoutBatchExecutor(spout));
    }
    
    public Stream newStream(String txId, IBatchSpout spout) {
        Node n = new SpoutNode(getUniqueStreamId(), spout.getOutputFields(), txId, spout, SpoutNode.SpoutType.BATCH);
        return addNode(n);
    }
    
    public Stream newStream(String txId, ITridentSpout spout) {
        Node n = new SpoutNode(getUniqueStreamId(), spout.getOutputFields(), txId, spout, SpoutNode.SpoutType.BATCH);
        return addNode(n);
    }
    
    public Stream newStream(String txId, IPartitionedTridentSpout spout) {
        return newStream(txId, new PartitionedTridentSpoutExecutor(spout));
    }
    
    public Stream newStream(String txId, IOpaquePartitionedTridentSpout spout) {
        return newStream(txId, new OpaquePartitionedTridentSpoutExecutor(spout));
    }

    public Stream newStream(String txId, ITridentDataSource dataSource) {
        if (dataSource instanceof IBatchSpout) {
            return newStream(txId, (IBatchSpout) dataSource);
        } else if (dataSource instanceof ITridentSpout) {
            return newStream(txId, (ITridentSpout) dataSource);
        } else if (dataSource instanceof IPartitionedTridentSpout) {
            return newStream(txId, (IPartitionedTridentSpout) dataSource);
        } else if (dataSource instanceof IOpaquePartitionedTridentSpout) {
            return newStream(txId, (IOpaquePartitionedTridentSpout) dataSource);
        } else {
            throw new UnsupportedOperationException("Unsupported stream");
        }
    }
  • 用戶在TridentTopology.newStream能夠直接使用IBatchSpout相似的spout,使用它的好處就是TridentTopology在build的時候會使用BatchSpoutExecutor將其包裝爲ITridentSpout類型(免得用戶再去實現ITridentSpout的相關接口,屏蔽trident spout的相關邏輯,使得以前一直使用普通topology的用戶能夠快速上手trident topology)
  • BatchSpoutExecutor實現了ITridentSpout接口,將IBatchSpout適配爲ITridentSpout,使用的coordinator是EmptyCoordinator,使用的emitter是BatchSpoutEmitter
  • 若是用戶在TridentTopology.newStream使用的spout是IPartitionedTridentSpout類型,則TridentTopology在newStream方法內部會使用PartitionedTridentSpoutExecutor將其包裝爲ITridentSpout類型,對於IOpaquePartitionedTridentSpout則使用OpaquePartitionedTridentSpoutExecutor將其包裝爲ITridentSpout類型

小結

  • TridentTopology在newStream或者build方法裏頭會將ITridentDataSource中不是ITridentSpout類型的IBatchSpout(在build方法)、IPartitionedTridentSpout(在newStream方法)、IOpaquePartitionedTridentSpout(在newStream方法)適配爲ITridentSpout類型;分別使用BatchSpoutExecutor、PartitionedTridentSpoutExecutor、OpaquePartitionedTridentSpoutExecutor進行適配(TridentTopologyBuilder在buildTopology的時候,對於ITridentSpout類型的spout先用TridentSpoutExecutor包裝,再用TridentBoltExecutor包裝,最後轉換爲bolt,而整個TridentTopology真正的spout就是MasterBatchCoordinator;這裏能夠看到一個IBatchSpout的spout先通過BatchSpoutExecutor包裝爲ITridentSpout類型,以後再通過TridentSpoutExecutor及TridentBoltExecutor包裝爲bolt)
  • IBatchSpout的ack是針對batch維度的,也就是TransactionAttempt維度,注意這裏沒有fail方法,若是emitBatch方法拋出了FailedException異常,則TridentBoltExecutor會調用failBatch方法(一個batch的tuples會等全部tuple執行完再觸發checkFinish),進行reportError以及標記TrackedBatch的failed爲true,以後TridentBoltExecutor在checkFinish的時候,一旦發現tracked.failed爲true的時候,會調用_collector.fail(tuple),而後回調MasterBatchCoordinator的fail方法
  • MasterBatchCoordinator的fail方法會將當前TransactionAttempt從_activeTx移除,而後一併移除txId大於失敗的txId的數據,最後調用sync方法繼續TransactionAttempt(注意這裏沒有更改_currTransaction值,於是會繼續從失敗的txId開始重試,只有在ack方法裏頭會更改_currTransaction爲nextTransactionId)
  • TridentBoltExecutor的execute方法會根據tickTuple來檢測距離上次rotate是否超過_messageTimeoutMs(取component中Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS最大值*1000,這裏*1000是將秒轉換爲毫秒),超過的話進行rotate操做,_batches的最後一個bucket將會被移除掉;這裏的tickTuple的頻率爲5秒,Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS按30秒算的話,_messageTimeoutMs爲30*1000,至關於每5秒檢測一下距離上次rotate時間是否超過30秒,若是超過則進行rotate,丟棄最後一個bucket的數據(TrackedBatch),這裏至關於重置超時的TrackedBatch信息
  • 關於MasterBatchCoordinator的fail的狀況,有幾種狀況,一種是下游componnent主動拋出FailException,這個時候會觸發master的fail,再次重試TransactionAttempt;一種是下游component處理tuple時間超過Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS(topology.message.timeout.secs,defaults.yaml中默認爲30),這個時候ack會觸發master的fail,致使該TransactionAttempt失敗繼續重試,目前沒有對attempt的次數作限制,實際生產過程當中要注意,由於只要該batchId的一個tuple失敗,整個batchId的tuples都會重發,這個時候下游若是沒有作好處理,可能會出現一個batchId中前面部分tuple成功,後面部分失敗,致使成功的tuple不斷重複處理(要避免失敗的batch中tuples部分處理成功部分處理失敗這個問題就須要配合使用Trident的State)。

doc

相關文章
相關標籤/搜索