FluxInterval實例及解析

本文主要研究下FluxInterval的機制java

FluxInterval

reactor-core-3.1.3.RELEASE-sources.jar!/reactor/core/publisher/FluxInterval.javareact

/**
 * Periodically emits an ever increasing long value either via a ScheduledExecutorService
 * or a custom async callback function
 * @see <a href="https://github.com/reactor/reactive-streams-commons">Reactive-Streams-Commons</a>
 */
final class FluxInterval extends Flux<Long> {

    final Scheduler timedScheduler;
    
    final long initialDelay;
    
    final long period;
    
    final TimeUnit unit;

    FluxInterval(
            long initialDelay, 
            long period, 
            TimeUnit unit, 
            Scheduler timedScheduler) {
        if (period < 0L) {
            throw new IllegalArgumentException("period >= 0 required but it was " + period);
        }
        this.initialDelay = initialDelay;
        this.period = period;
        this.unit = Objects.requireNonNull(unit, "unit");
        this.timedScheduler = Objects.requireNonNull(timedScheduler, "timedScheduler");
    }
    
    @Override
    public void subscribe(CoreSubscriber<? super Long> actual) {
        Worker w = timedScheduler.createWorker();

        IntervalRunnable r = new IntervalRunnable(actual, w);

        actual.onSubscribe(r);

        try {
            w.schedulePeriodically(r, initialDelay, period, unit);
        }
        catch (RejectedExecutionException ree) {
            if (!r.cancelled) {
                actual.onError(Operators.onRejectedExecution(ree, r, null, null,
                        actual.currentContext()));
            }
        }
    }
}
能夠看到這裏利用Scheduler來建立一個定時調度任務IntervalRunnable

IntervalRunnable

static final class IntervalRunnable implements Runnable, Subscription,
                                                   InnerProducer<Long> {
        final CoreSubscriber<? super Long> actual;
        
        final Worker worker;
        
        volatile long requested;
        static final AtomicLongFieldUpdater<IntervalRunnable> REQUESTED =
                AtomicLongFieldUpdater.newUpdater(IntervalRunnable.class, "requested");
        
        long count;
        
        volatile boolean cancelled;

        IntervalRunnable(CoreSubscriber<? super Long> actual, Worker worker) {
            this.actual = actual;
            this.worker = worker;
        }

        @Override
        public CoreSubscriber<? super Long> actual() {
            return actual;
        }

        @Override
        @Nullable
        public Object scanUnsafe(Attr key) {
            if (key == Attr.CANCELLED) return cancelled;

            return InnerProducer.super.scanUnsafe(key);
        }

        @Override
        public void run() {
            if (!cancelled) {
                if (requested != 0L) {
                    actual.onNext(count++);
                    if (requested != Long.MAX_VALUE) {
                        REQUESTED.decrementAndGet(this);
                    }
                } else {
                    cancel();
                    
                    actual.onError(Exceptions.failWithOverflow("Could not emit tick " + count + " due to lack of requests" +
                            " (interval doesn't support small downstream requests that replenish slower than the ticks)"));
                }
            }
        }
        
        @Override
        public void request(long n) {
            if (Operators.validate(n)) {
                Operators.addCap(REQUESTED, this, n);
            }
        }
        
        @Override
        public void cancel() {
            if (!cancelled) {
                cancelled = true;
                worker.dispose();
            }
        }
    }
這裏重點看requested變量,run方法每次判斷requested,若是requested爲0則銷燬worker,不然則每次發射一個元素計數就減一
而subscriber若是有繼續request的話,則會增長requested的值

實例1

public static void main(String[] args) throws InterruptedException {
        Flux<Long> flux = Flux.interval(Duration.ofMillis(1))
                .doOnNext(e -> {
                    System.out.println(e);
                }).doOnError(e -> e.printStackTrace());

        System.out.println("begin to subscribe");
        flux.subscribe(e -> {
            System.out.println(e);
            try {
                TimeUnit.MINUTES.sleep(30);
            } catch (InterruptedException e1) {
                e1.printStackTrace();
            }
        });
        TimeUnit.MINUTES.sleep(30);
    }
這個例子requested是Long.MAX_VALUE,可是因爲subscribe的線程跟運行interval的線程同樣,因爲裏頭執行了sleep操做也致使interval的調度也跟着阻塞住了。

實例2

public static void main(String[] args) throws InterruptedException {
        Flux<Long> flux = Flux.interval(Duration.ofMillis(1))
                .doOnNext(e -> {
                    System.out.println(e);
                })
                //NOTE 這裏request prefetch=256個
                .publishOn(Schedulers.newElastic("publish-thread"))
                .doOnError(e -> e.printStackTrace());

        System.out.println("begin to subscribe");
        AtomicInteger count = new AtomicInteger(0);
        //NOTE 得有subscribe才能觸發request
        flux.subscribe(e -> {
            LOGGER.info("receive:{}",e);
            try {
                //NOTE 使用publishOn將subscribe與interval的線程分開
                if(count.get() == 0){
                    TimeUnit.MINUTES.sleep(2);
                }
                count.incrementAndGet();
            } catch (InterruptedException e1) {
                e1.printStackTrace();
            }
        });
        TimeUnit.MINUTES.sleep(30);
    }
使用publishOn將subscriber線程與interval線程隔離,使其sleep不阻塞interval
這裏publishOn隱含了一個prefetch參數,默認是Queues.SMALL_BUFFER_SIZE即Math.max(16,Integer.parseInt(System.getProperty("reactor.bufferSize.small", "256")));
public final Flux<T> publishOn(Scheduler scheduler) {
        return publishOn(scheduler, Queues.SMALL_BUFFER_SIZE);
    }

    final Flux<T> publishOn(Scheduler scheduler, boolean delayError, int prefetch, int lowTide) {
        if (this instanceof Callable) {
            if (this instanceof Fuseable.ScalarCallable) {
                @SuppressWarnings("unchecked")
                Fuseable.ScalarCallable<T> s = (Fuseable.ScalarCallable<T>) this;
                try {
                    return onAssembly(new FluxSubscribeOnValue<>(s.call(), scheduler));
                }
                catch (Exception e) {
                    //leave FluxSubscribeOnCallable defer exception call
                }
            }
            @SuppressWarnings("unchecked")
            Callable<T> c = (Callable<T>)this;
            return onAssembly(new FluxSubscribeOnCallable<>(c, scheduler));
        }

        return onAssembly(new FluxPublishOn<>(this, scheduler, delayError, prefetch, lowTide, Queues.get(prefetch)));
    }
這裏使用Queues.get(prefetch)建立一個間接的隊列來盛放元素

這個實例最後輸出git

//......
21:06:03.108 [publish-thread-2] INFO com.example.demo.FluxTest - receive:254
21:06:03.108 [publish-thread-2] INFO com.example.demo.FluxTest - receive:255
reactor.core.Exceptions$OverflowException: Could not emit tick 256 due to lack of requests (interval doesn't support small downstream requests that replenish slower than the ticks)
    at reactor.core.Exceptions.failWithOverflow(Exceptions.java:215)
    at reactor.core.publisher.FluxInterval$IntervalRunnable.run(FluxInterval.java:121)
    at reactor.core.scheduler.PeriodicWorkerTask.call(PeriodicWorkerTask.java:59)
    at reactor.core.scheduler.PeriodicWorkerTask.run(PeriodicWorkerTask.java:73)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)
因爲第一次request默認是256,以後在發射256個元素以後,subscriber沒有跟上,致使interval的worker被cancel掉了,因而後續消費完256個元素以後,緊挨着就是OverflowException這個異常

小結

reactor自己並不依賴線程,只有interval,delayElements等方法纔會建立線程。而reactor自己是觀察者設計模式的擴展,採用push+backpressure模式,一開始調用subscribe方法就觸發request N請求推送數據,以後publisher就onNext推送數據,直到complete或cancel。實例1是由於線程阻塞致使interval的onNext阻塞,實例2是interval被cancel掉致使flux關閉。github

相關文章
相關標籤/搜索