聊聊storm TridentWindowManager的pendingTriggers

本文主要研究一下storm TridentWindowManager的pendingTriggershtml

TridentBoltExecutor.finishBatch

storm-core-1.2.2-sources.jar!/org/apache/storm/trident/topology/TridentBoltExecutor.javajava

private boolean finishBatch(TrackedBatch tracked, Tuple finishTuple) {
        boolean success = true;
        try {
            _bolt.finishBatch(tracked.info);
            String stream = COORD_STREAM(tracked.info.batchGroup);
            for(Integer task: tracked.condition.targetTasks) {
                _collector.emitDirect(task, stream, finishTuple, new Values(tracked.info.batchId, Utils.get(tracked.taskEmittedTuples, task, 0)));
            }
            if(tracked.delayedAck!=null) {
                _collector.ack(tracked.delayedAck);
                tracked.delayedAck = null;
            }
        } catch(FailedException e) {
            failBatch(tracked, e);
            success = false;
        }
        _batches.remove(tracked.info.batchId.getId());
        return success;
    }
  • 這裏調用_bolt的finishBatch方法,這個_bolt有兩個實現類,分別是TridentSpoutExecutor用於spout,一個是SubtopologyBolt用於普通的bolt

SubtopologyBolt.finishBatch

storm-core-1.2.2-sources.jar!/org/apache/storm/trident/planner/SubtopologyBolt.javaapache

public void finishBatch(BatchInfo batchInfo) {
        for(TridentProcessor p: _myTopologicallyOrdered.get(batchInfo.batchGroup)) {
            p.finishBatch((ProcessorContext) batchInfo.state);
        }
    }
  • SubtopologyBolt.finishBatch調用了一系列TridentProcessor的finishBatch操做

WindowTridentProcessor.finishBatch

storm-core-1.2.2-sources.jar!/org/apache/storm/trident/windowing/WindowTridentProcessor.javawindows

public void execute(ProcessorContext processorContext, String streamId, TridentTuple tuple) {
        // add tuple to the batch state
        Object state = processorContext.state[tridentContext.getStateIndex()];
        ((List<TridentTuple>) state).add(projection.create(tuple));
    }

   public void finishBatch(ProcessorContext processorContext) {

        Object batchId = processorContext.batchId;
        Object batchTxnId = getBatchTxnId(batchId);

        LOG.debug("Received finishBatch of : [{}] ", batchId);
        // get all the tuples in a batch and add it to trident-window-manager
        List<TridentTuple> tuples = (List<TridentTuple>) processorContext.state[tridentContext.getStateIndex()];
        tridentWindowManager.addTuplesBatch(batchId, tuples);

        List<Integer> pendingTriggerIds = null;
        List<String> triggerKeys = new ArrayList<>();
        Iterable<Object> triggerValues = null;

        if (retriedAttempt(batchId)) {
            pendingTriggerIds = (List<Integer>) windowStore.get(inprocessTriggerKey(batchTxnId));
            if (pendingTriggerIds != null) {
                for (Integer pendingTriggerId : pendingTriggerIds) {
                    triggerKeys.add(triggerKey(pendingTriggerId));
                }
                triggerValues = windowStore.get(triggerKeys);
            }
        }

        // if there are no trigger values in earlier attempts or this is a new batch, emit pending triggers.
        if(triggerValues == null) {
            pendingTriggerIds = new ArrayList<>();
            Queue<StoreBasedTridentWindowManager.TriggerResult> pendingTriggers = tridentWindowManager.getPendingTriggers();
            LOG.debug("pending triggers at batch: [{}] and triggers.size: [{}] ", batchId, pendingTriggers.size());
            try {
                Iterator<StoreBasedTridentWindowManager.TriggerResult> pendingTriggersIter = pendingTriggers.iterator();
                List<Object> values = new ArrayList<>();
                StoreBasedTridentWindowManager.TriggerResult triggerResult = null;
                while (pendingTriggersIter.hasNext()) {
                    triggerResult = pendingTriggersIter.next();
                    for (List<Object> aggregatedResult : triggerResult.result) {
                        String triggerKey = triggerKey(triggerResult.id);
                        triggerKeys.add(triggerKey);
                        values.add(aggregatedResult);
                        pendingTriggerIds.add(triggerResult.id);
                    }
                    pendingTriggersIter.remove();
                }
                triggerValues = values;
            } finally {
                // store inprocess triggers of a batch in store for batch retries for any failures
                if (!pendingTriggerIds.isEmpty()) {
                    windowStore.put(inprocessTriggerKey(batchTxnId), pendingTriggerIds);
                }
            }
        }

        collector.setContext(processorContext);
        int i = 0;
        for (Object resultValue : triggerValues) {
            collector.emit(new ConsList(new TriggerInfo(windowTaskId, pendingTriggerIds.get(i++)), (List<Object>) resultValue));
        }
        collector.setContext(null);
    }
  • WindowTridentProcessor所在的bolt,ack一個batch的全部tuple以後,會執行finishBatch操做
  • WindowTridentProcessor的execute,接收到一個tuple,堆積到processorContext.state
  • finishBatch的時候,從processorContext.state取出這一批tuple,而後調用tridentWindowManager.addTuplesBatch(batchId, tuples)
  • 以後調用tridentWindowManager.getPendingTriggers()獲取pendingTriggerIds存入store,同時獲取待觸發的triggerValues
  • 最後將triggerValues挨個構造TriggerInfo以及resultValue發送出去

StoreBasedTridentWindowManager.addTuplesBatch

storm-core-1.2.2-sources.jar!/org/apache/storm/trident/windowing/StoreBasedTridentWindowManager.java緩存

public void addTuplesBatch(Object batchId, List<TridentTuple> tuples) {
        LOG.debug("Adding tuples to window-manager for batch: [{}]", batchId);
        List<WindowsStore.Entry> entries = new ArrayList<>();
        for (int i = 0; i < tuples.size(); i++) {
            String key = keyOf(batchId);
            TridentTuple tridentTuple = tuples.get(i);
            entries.add(new WindowsStore.Entry(key+i, tridentTuple.select(inputFields)));
        }

        // tuples should be available in store before they are added to window manager
        windowStore.putAll(entries);

        for (int i = 0; i < tuples.size(); i++) {
            String key = keyOf(batchId);
            TridentTuple tridentTuple = tuples.get(i);
            addToWindowManager(i, key, tridentTuple);
        }

    }

    private void addToWindowManager(int tupleIndex, String effectiveBatchId, TridentTuple tridentTuple) {
        TridentTuple actualTuple = null;
        if (maxCachedTuplesSize == null || currentCachedTuplesSize.get() < maxCachedTuplesSize) {
            actualTuple = tridentTuple;
        }
        currentCachedTuplesSize.incrementAndGet();
        windowManager.add(new TridentBatchTuple(effectiveBatchId, System.currentTimeMillis(), tupleIndex, actualTuple));
    }
  • StoreBasedTridentWindowManager的addTuplesBatch方法,將這批tuple放入到windowStore,而後挨個addToWindowManager添加到windowManager

WindowManager.add

storm-core-1.2.2-sources.jar!/org/apache/storm/windowing/WindowManager.javaide

private final ConcurrentLinkedQueue<Event<T>> queue;

    /**
     * Add an event into the window, with {@link System#currentTimeMillis()} as
     * the tracking ts.
     *
     * @param event the event to add
     */
    public void add(T event) {
        add(event, System.currentTimeMillis());
    }

    /**
     * Add an event into the window, with the given ts as the tracking ts.
     *
     * @param event the event to track
     * @param ts    the timestamp
     */
    public void add(T event, long ts) {
        add(new EventImpl<T>(event, ts));
    }

    /**
     * Tracks a window event
     *
     * @param windowEvent the window event to track
     */
    public void add(Event<T> windowEvent) {
        // watermark events are not added to the queue.
        if (!windowEvent.isWatermark()) {
            queue.add(windowEvent);
        } else {
            LOG.debug("Got watermark event with ts {}", windowEvent.getTimestamp());
        }
        track(windowEvent);
        compactWindow();
    }
  • 添加tuple到ConcurrentLinkedQueue中

WindowManager.onTrigger

storm-core-1.2.2-sources.jar!/org/apache/storm/windowing/WindowManager.javathis

/**
     * The callback invoked by the trigger policy.
     */
    @Override
    public boolean onTrigger() {
        List<Event<T>> windowEvents = null;
        List<T> expired = null;
        try {
            lock.lock();
            /*
             * scan the entire window to handle out of order events in
             * the case of time based windows.
             */
            windowEvents = scanEvents(true);
            expired = new ArrayList<>(expiredEvents);
            expiredEvents.clear();
        } finally {
            lock.unlock();
        }
        List<T> events = new ArrayList<>();
        List<T> newEvents = new ArrayList<>();
        for (Event<T> event : windowEvents) {
            events.add(event.get());
            if (!prevWindowEvents.contains(event)) {
                newEvents.add(event.get());
            }
        }
        prevWindowEvents.clear();
        if (!events.isEmpty()) {
            prevWindowEvents.addAll(windowEvents);
            LOG.debug("invoking windowLifecycleListener onActivation, [{}] events in window.", events.size());
            windowLifecycleListener.onActivation(events, newEvents, expired);
        } else {
            LOG.debug("No events in the window, skipping onActivation");
        }
        triggerPolicy.reset();
        return !events.isEmpty();
    }
  • onTrigger方法首先調用scanEvents方法獲取windowEvents,以後區分爲events及newEvents,而後回調windowLifecycleListener.onActivation(events, newEvents, expired)方法

WindowManager.scanEvents

storm-core-1.2.2-sources.jar!/org/apache/storm/windowing/WindowManager.javadebug

/**
     * Scan events in the queue, using the expiration policy to check
     * if the event should be evicted or not.
     *
     * @param fullScan if set, will scan the entire queue; if not set, will stop
     *                 as soon as an event not satisfying the expiration policy is found
     * @return the list of events to be processed as a part of the current window
     */
    private List<Event<T>> scanEvents(boolean fullScan) {
        LOG.debug("Scan events, eviction policy {}", evictionPolicy);
        List<T> eventsToExpire = new ArrayList<>();
        List<Event<T>> eventsToProcess = new ArrayList<>();
        try {
            lock.lock();
            Iterator<Event<T>> it = queue.iterator();
            while (it.hasNext()) {
                Event<T> windowEvent = it.next();
                Action action = evictionPolicy.evict(windowEvent);
                if (action == EXPIRE) {
                    eventsToExpire.add(windowEvent.get());
                    it.remove();
                } else if (!fullScan || action == STOP) {
                    break;
                } else if (action == PROCESS) {
                    eventsToProcess.add(windowEvent);
                }
            }
            expiredEvents.addAll(eventsToExpire);
        } finally {
            lock.unlock();
        }
        eventsSinceLastExpiry.set(0);
        LOG.debug("[{}] events expired from window.", eventsToExpire.size());
        if (!eventsToExpire.isEmpty()) {
            LOG.debug("invoking windowLifecycleListener.onExpiry");
            windowLifecycleListener.onExpiry(eventsToExpire);
        }
        return eventsToProcess;
    }
  • scanEvents方法從ConcurrentLinkedQueue中獲取event,而後判斷是否過時,將其分爲expiredEvents、eventsToProcess兩類,返回eventsToProcess的events

TridentWindowLifeCycleListener.onActivation

storm-core-1.2.2-sources.jar!/org/apache/storm/trident/windowing/AbstractTridentWindowManager.javacode

/**
     * Listener to reeive any activation/expiry of windowing events and take further action on them.
     */
    class TridentWindowLifeCycleListener implements WindowLifecycleListener<T> {

        @Override
        public void onExpiry(List<T> expiredEvents) {
            LOG.debug("onExpiry is invoked");
            onTuplesExpired(expiredEvents);
        }

        @Override
        public void onActivation(List<T> events, List<T> newEvents, List<T> expired) {
            LOG.debug("onActivation is invoked with events size: [{}]", events.size());
            // trigger occurred, create an aggregation and keep them in store
            int currentTriggerId = triggerId.incrementAndGet();
            execAggregatorAndStoreResult(currentTriggerId, events);
        }
    }

    private void execAggregatorAndStoreResult(int currentTriggerId, List<T> tupleEvents) {
        List<TridentTuple> resultTuples = getTridentTuples(tupleEvents);

        // run aggregator to compute the result
        AccumulatedTuplesCollector collector = new AccumulatedTuplesCollector(delegateCollector);
        Object state = aggregator.init(currentTriggerId, collector);
        for (TridentTuple resultTuple : resultTuples) {
            aggregator.aggregate(state, resultTuple, collector);
        }
        aggregator.complete(state, collector);

        List<List<Object>> resultantAggregatedValue = collector.values;

        ArrayList<WindowsStore.Entry> entries = Lists.newArrayList(new WindowsStore.Entry(windowTriggerCountId, currentTriggerId + 1),
                new WindowsStore.Entry(WindowTridentProcessor.generateWindowTriggerKey(windowTaskId, currentTriggerId), resultantAggregatedValue));
        windowStore.putAll(entries);

        pendingTriggers.add(new TriggerResult(currentTriggerId, resultantAggregatedValue));
    }
  • onActivation方法調用了execAggregatorAndStoreResult,它會調用window的aggregator,而後將結果存到windowStore,同時將resultantAggregatedValue做爲TriggerResult添加到pendingTriggers中

小結

  • WindowTridentProcessor所在的TridentBoltExecutor,它在接收到spout的tuple的時候,調用processor的execute方法,將tuple緩存到ProcessorContext中;一系列的processor的execute方法執行完以後,就ack該tuple
  • 當WindowTridentProcessor所在的TridentBoltExecutor對一個batch的全部tuple ack完以後,會觸發checkFinish操做,而後執行finishBatch操做,而finishBatch操做會調用一系列TridentProcessor的finishBatch操做(好比WindowTridentProcessor -> ProjectedProcessor -> PartitionPersistProcessor -> EachProcessor -> AggregateProcessor)
  • WindowTridentProcessor.finishBatch從processorContext.state取出這一批tuple,而後調用tridentWindowManager.addTuplesBatch(batchId, tuples),將這批tuple放入到windowStore,而後添加到windowManager的ConcurrentLinkedQueue中;以後調用tridentWindowManager.getPendingTriggers()獲取pendingTriggerIds存入store,同時獲取待觸發的triggerValues,將triggerValues挨個構造TriggerInfo以及resultValue發送出去
  • 而WindowManager.onTrigger方法,在window操做時間窗口觸發時被調用,它從windowManager的ConcurrentLinkedQueue中獲取windowEvent,而後傳遞給TridentWindowLifeCycleListener.onActivation
  • TridentWindowLifeCycleListener.onActivation方法則會執行window的aggregator的init、aggregate、complete操做獲取聚合結果resultantAggregatedValue,而後放入pendingTriggers,至此完成window trigger與WindowTridentProcessor的銜接

doc

相關文章
相關標籤/搜索