ObservableReplay與PublishSubject在一塊兒所產生的化學效應

本文做爲《Java編程方法論:響應式Rxjava與代碼設計實戰》一書第二章 Rxjava中的Subject一節的補充解讀。java

首先來看一個Demo:react

@Test
    void replay_PublishSubject_test() {
        PublishSubject<Object> publishSubject = PublishSubject.create();
        ConnectableObservable<Object> replay = publishSubject.replay();
        ForkJoinPool forkJoinPool = ForkJoinPool.commonPool();
        List<Integer> integers = new ArrayList<>();
        for (int i=1;i<10;i++){
            integers.add(i);
        }
        Disposable subscribe1 = replay.subscribe(x -> {
            log("一郎神: " + x);
        }, Throwable::printStackTrace, () -> System.out.println("Emission completed"));

        Disposable subscribe2 = replay.subscribe(x -> {
            log("二郎神: " + x);
        }, Throwable::printStackTrace, () -> System.out.println("Emission completed"));
        Disposable subscribe3 = replay.subscribe(x -> {
            log("三郎神: " + x);
        }, Throwable::printStackTrace, () -> System.out.println("Emission completed"));
        AtomicInteger atomicInteger = new AtomicInteger(integers.size());
        try {
            forkJoinPool.submit(() -> {
                integers.forEach(id -> {
                    sleep(1,TimeUnit.SECONDS);
                    publishSubject.onNext(id);
                    if (atomicInteger.decrementAndGet() == 0) {
                        publishSubject.onComplete();
                    }
                });
            });
          
            replay.connect();
            sleep(2,TimeUnit.SECONDS);
            subscribe1.dispose();
            sleep(1,TimeUnit.SECONDS);
            //replay.connect(consumer -> consumer.dispose());
            publishSubject.onComplete();
            System.out.println("test");

        } finally  {
            try {
                forkJoinPool.shutdown();
                int shutdownDelaySec = 2;
                System.out.println("………………等待 " + shutdownDelaySec + " 秒後結束服務……………… ");
                forkJoinPool.awaitTermination(shutdownDelaySec, TimeUnit.SECONDS);
            } catch (Exception ex) {
                System.out.println("捕獲到 forkJoinPool.awaitTermination()方法的異常: " + ex.getClass().getName());
            } finally {
                System.out.println("調用 forkJoinPool.shutdownNow()結束服務...");
                List<Runnable> l = forkJoinPool.shutdownNow();
                System.out.println("還剩 " + l.size() + " 個任務等待被執行,服務已關閉 ");
            }
        }
    }
複製代碼

獲得的結果以下所示:編程

ForkJoinPool.commonPool-worker-3: 一郎神: 1
ForkJoinPool.commonPool-worker-3: 二郎神: 1
ForkJoinPool.commonPool-worker-3: 三郎神: 1
ForkJoinPool.commonPool-worker-3: 二郎神: 2
ForkJoinPool.commonPool-worker-3: 三郎神: 2
Emission completed
Emission completed
test
………………等待 2 秒後結束服務……………… 
調用 forkJoinPool.shutdownNow()結束服務...
還剩 0 個任務等待被執行,服務已關閉 
複製代碼

在調用subscribe1.dispose()的時候,完成了訂閱者自行解除訂閱關係的約定,而假如後面調用的是replay.connect(consumer -> consumer.dispose()),依然會在發送元素的過程當中強行中斷,不帶任何通知。而在使用publishSubject.onComplete()後,則能夠很優雅地通知後續訂閱者優雅地結束。 如圖2-3所示,咱們按照圖中文字操做,並在System.out.println("test")這行打斷點查看狀態,發現其餘2個訂閱者並無被移除,爲何會出現這種狀況?學習

經過publishSubject.replay(),咱們獲得了一個ConnectableObservable對象,具體以下:this

//io.reactivex.Observable#replay
public final ConnectableObservable<T> replay() {
    return ObservableReplay.createFrom(this);
}
複製代碼

結合前面ConnectableObservable相關知識的學習,在調用replay.subscribe(...)時,會將下游的訂閱者與DEFAULT_UNBOUNDED_FACTORY所獲得的UnboundedReplayBuffer對象經過一個ReplayObserver對象創建起聯繫:atom

//ObservableReplay#createFrom
public static <T> ConnectableObservable<T> createFrom(ObservableSource<? extends T> source) {
    return create(source, DEFAULT_UNBOUNDED_FACTORY);
}
//ObservableReplay#create
static <T> ConnectableObservable<T> create(ObservableSource<T> source, final BufferSupplier<T> bufferFactory) {
    // the current connection to source needs to be shared between the operator and its onSubscribe call
    final AtomicReference<ReplayObserver<T>> curr = new AtomicReference<ReplayObserver<T>>();
    //注意此處
    ObservableSource<T> onSubscribe = new ReplaySource<T>(curr, bufferFactory);
    //此處這個curr會做爲ObservableReplay下current字段的值,記住,它是個引用類型對象
    return RxJavaPlugins.onAssembly(new ObservableReplay<T>(onSubscribe, source, curr, bufferFactory));
}
//ObservableReplay#subscribeActual
protected void subscribeActual(Observer<? super T> observer) {
    onSubscribe.subscribe(observer);
}
//ObservableReplay.ReplaySource#subscribe
public void subscribe(Observer<? super T> child) {
        for (;;) {
            ReplayObserver<T> r = curr.get();
            if (r == null) {
                ReplayBuffer<T> buf = bufferFactory.call();
                ReplayObserver<T> u = new ReplayObserver<T>(buf);
                //此時ObservableReplay中current字段的值所指對象也會發生改變
                if (!curr.compareAndSet(null, u)) {
                    continue;
                }
                r = u;
            }
            InnerDisposable<T> inner = new InnerDisposable<T>(r, child);
            child.onSubscribe(inner);
            //經過ReplayObserver的observers字段將下游訂閱者管理起來
            r.add(inner);
            if (inner.isDisposed()) {
                r.remove(inner);
                return;
            }
            //此處UnboundedReplayBuffer對象與下游訂閱者創建聯繫
            r.buffer.replay(inner);
            break; 
        }
    }
}
複製代碼

當調用replay.connect(consumer -> consumer.dispose())時,經過current獲取上面獲得的ReplayObserver對象,並調用該對象的dispose()方法(由replay.connect(...)中傳入的Consumer實現可得),此時會將ObservableReplay中的observers字段設定爲TERMINATED,同時將ObservableReplay自身身爲AtomicReference角色所存儲值設定爲DISPOSED,即將ObservableReplaycurrent的值設定爲了DISPOSEDspa

//ObservableReplay#connect
public void connect(Consumer<? super Disposable> connection) {
    boolean doConnect;
    ReplayObserver<T> ps;
    for (;;) {
        ps = current.get();
        if (ps == null || ps.isDisposed()) {
            ReplayBuffer<T> buf = bufferFactory.call();
            ReplayObserver<T> u = new ReplayObserver<T>(buf);
            if (!current.compareAndSet(ps, u)) {
                continue;
            }
            ps = u;
        }
        doConnect = !ps.shouldConnect.get() && ps.shouldConnect.compareAndSet(false, true);
        break; 
    }
    
    try {
        connection.accept(ps);
    } catch (Throwable ex) {
        if (doConnect) {
            ps.shouldConnect.compareAndSet(true, false);
        }
        Exceptions.throwIfFatal(ex);
        throw ExceptionHelper.wrapOrThrow(ex);
    }
    if (doConnect) {
        source.subscribe(ps);
    }
}
//ObservableReplay.ReplayObserver#dispose
public void dispose() {
    observers.set(TERMINATED);
    DisposableHelper.dispose(this);
}
//DisposableHelper#dispose
public static boolean dispose(AtomicReference<Disposable> field) {
    Disposable current = field.get();
    Disposable d = DISPOSED;
    if (current != d) {
        current = field.getAndSet(d);
        if (current != d) {
            if (current != null) {
                current.dispose();
            }
            return true;
        }
    }
    return false;
}
複製代碼

能夠看到,ReplayObserver只是解除了與下游訂閱者的關係,但並無進一步對下游訂閱者進行結束的操做,這樣與UnboundedReplayBuffer對象創建聯繫的訂閱者,若是buffer中的元素還未消費完畢,會持續消費直至所存元素下發完畢,但要注意的是,該buffer中並未存放結束事件(即經過調用UnboundedReplayBuffer#complete往該隊列中存放NotificationLite.complete()元素)。同時下游訂閱者也並未調用dispose()方法,因此下面所示源碼中的output.isDisposed()結果爲false。請注意下面所示源碼中<1>處的代碼:設計

public void replay(InnerDisposable<T> output) {
        if (output.getAndIncrement() != 0) {
            return;
        }

        final Observer<? super T> child = output.child;

        int missed = 1;

        for (;;) {
            if (output.isDisposed()) {
                return;
            }
            int sourceIndex = size;

            Integer destinationIndexObject = output.index();
            int destinationIndex = destinationIndexObject != null ? destinationIndexObject : 0;

            while (destinationIndex < sourceIndex) {
                Object o = get(destinationIndex);
                //此處很關鍵
                if (NotificationLite.accept(o, child)) {//<1>
                    return;
                }
                if (output.isDisposed()) {
                    return;
                }
                destinationIndex++;
            }

            output.index = destinationIndex;
            missed = output.addAndGet(-missed);
            if (missed == 0) {
                break;
            }
        }
    }
}
//io.reactivex.internal.util.NotificationLite#accept
public static <T> boolean accept(Object o, Observer<? super T> s) {
    if (o == COMPLETE) {
        s.onComplete();
        return true;
    } else
    if (o instanceof ErrorNotification) {
        s.onError(((ErrorNotification)o).e);
        return true;
    }
    s.onNext((T)o);
    return false;
}
複製代碼

若是調用了UnboundedReplayBuffer#complete,那麼在元素下發到最後時,就會出現o == COMPLETEtrue,此時會調用下游訂閱者的onComplete()方法。3d

//ObservableReplay.UnboundedReplayBuffer#complete
public void onComplete() {
    if (!done) {
        done = true;
        buffer.complete();
        replayFinal();
    }
}
//ObservableReplay.UnboundedReplayBuffer#complete
public void complete() {
    add(NotificationLite.complete());
    size++;
}
//io.reactivex.internal.util.NotificationLite#complete
public static Object complete() {
    return COMPLETE;
}
複製代碼

至此,關於replay_PublishSubject_test()示例中所展示的疑點已經解讀完畢。code

相關文章
相關標籤/搜索