本文做爲《Java編程方法論:響應式Rxjava與代碼設計實戰》一書第二章 Rxjava中的Subject一節的補充解讀。java
首先來看一個Demo:react
@Test
void replay_PublishSubject_test() {
PublishSubject<Object> publishSubject = PublishSubject.create();
ConnectableObservable<Object> replay = publishSubject.replay();
ForkJoinPool forkJoinPool = ForkJoinPool.commonPool();
List<Integer> integers = new ArrayList<>();
for (int i=1;i<10;i++){
integers.add(i);
}
Disposable subscribe1 = replay.subscribe(x -> {
log("一郎神: " + x);
}, Throwable::printStackTrace, () -> System.out.println("Emission completed"));
Disposable subscribe2 = replay.subscribe(x -> {
log("二郎神: " + x);
}, Throwable::printStackTrace, () -> System.out.println("Emission completed"));
Disposable subscribe3 = replay.subscribe(x -> {
log("三郎神: " + x);
}, Throwable::printStackTrace, () -> System.out.println("Emission completed"));
AtomicInteger atomicInteger = new AtomicInteger(integers.size());
try {
forkJoinPool.submit(() -> {
integers.forEach(id -> {
sleep(1,TimeUnit.SECONDS);
publishSubject.onNext(id);
if (atomicInteger.decrementAndGet() == 0) {
publishSubject.onComplete();
}
});
});
replay.connect();
sleep(2,TimeUnit.SECONDS);
subscribe1.dispose();
sleep(1,TimeUnit.SECONDS);
//replay.connect(consumer -> consumer.dispose());
publishSubject.onComplete();
System.out.println("test");
} finally {
try {
forkJoinPool.shutdown();
int shutdownDelaySec = 2;
System.out.println("………………等待 " + shutdownDelaySec + " 秒後結束服務……………… ");
forkJoinPool.awaitTermination(shutdownDelaySec, TimeUnit.SECONDS);
} catch (Exception ex) {
System.out.println("捕獲到 forkJoinPool.awaitTermination()方法的異常: " + ex.getClass().getName());
} finally {
System.out.println("調用 forkJoinPool.shutdownNow()結束服務...");
List<Runnable> l = forkJoinPool.shutdownNow();
System.out.println("還剩 " + l.size() + " 個任務等待被執行,服務已關閉 ");
}
}
}
複製代碼
獲得的結果以下所示:編程
ForkJoinPool.commonPool-worker-3: 一郎神: 1
ForkJoinPool.commonPool-worker-3: 二郎神: 1
ForkJoinPool.commonPool-worker-3: 三郎神: 1
ForkJoinPool.commonPool-worker-3: 二郎神: 2
ForkJoinPool.commonPool-worker-3: 三郎神: 2
Emission completed
Emission completed
test
………………等待 2 秒後結束服務………………
調用 forkJoinPool.shutdownNow()結束服務...
還剩 0 個任務等待被執行,服務已關閉
複製代碼
在調用subscribe1.dispose()
的時候,完成了訂閱者自行解除訂閱關係的約定,而假如後面調用的是replay.connect(consumer -> consumer.dispose())
,依然會在發送元素的過程當中強行中斷,不帶任何通知。而在使用publishSubject.onComplete()
後,則能夠很優雅地通知後續訂閱者優雅地結束。 如圖2-3
所示,咱們按照圖中文字操做,並在System.out.println("test")
這行打斷點查看狀態,發現其餘2個訂閱者並無被移除,爲何會出現這種狀況?學習
經過publishSubject.replay()
,咱們獲得了一個ConnectableObservable
對象,具體以下:this
//io.reactivex.Observable#replay
public final ConnectableObservable<T> replay() {
return ObservableReplay.createFrom(this);
}
複製代碼
結合前面ConnectableObservable
相關知識的學習,在調用replay.subscribe(...)
時,會將下游的訂閱者與DEFAULT_UNBOUNDED_FACTORY
所獲得的UnboundedReplayBuffer
對象經過一個ReplayObserver
對象創建起聯繫:atom
//ObservableReplay#createFrom
public static <T> ConnectableObservable<T> createFrom(ObservableSource<? extends T> source) {
return create(source, DEFAULT_UNBOUNDED_FACTORY);
}
//ObservableReplay#create
static <T> ConnectableObservable<T> create(ObservableSource<T> source, final BufferSupplier<T> bufferFactory) {
// the current connection to source needs to be shared between the operator and its onSubscribe call
final AtomicReference<ReplayObserver<T>> curr = new AtomicReference<ReplayObserver<T>>();
//注意此處
ObservableSource<T> onSubscribe = new ReplaySource<T>(curr, bufferFactory);
//此處這個curr會做爲ObservableReplay下current字段的值,記住,它是個引用類型對象
return RxJavaPlugins.onAssembly(new ObservableReplay<T>(onSubscribe, source, curr, bufferFactory));
}
//ObservableReplay#subscribeActual
protected void subscribeActual(Observer<? super T> observer) {
onSubscribe.subscribe(observer);
}
//ObservableReplay.ReplaySource#subscribe
public void subscribe(Observer<? super T> child) {
for (;;) {
ReplayObserver<T> r = curr.get();
if (r == null) {
ReplayBuffer<T> buf = bufferFactory.call();
ReplayObserver<T> u = new ReplayObserver<T>(buf);
//此時ObservableReplay中current字段的值所指對象也會發生改變
if (!curr.compareAndSet(null, u)) {
continue;
}
r = u;
}
InnerDisposable<T> inner = new InnerDisposable<T>(r, child);
child.onSubscribe(inner);
//經過ReplayObserver的observers字段將下游訂閱者管理起來
r.add(inner);
if (inner.isDisposed()) {
r.remove(inner);
return;
}
//此處UnboundedReplayBuffer對象與下游訂閱者創建聯繫
r.buffer.replay(inner);
break;
}
}
}
複製代碼
當調用replay.connect(consumer -> consumer.dispose())
時,經過current
獲取上面獲得的ReplayObserver
對象,並調用該對象的dispose()
方法(由replay.connect(...)
中傳入的Consumer
實現可得),此時會將ObservableReplay
中的observers
字段設定爲TERMINATED
,同時將ObservableReplay
自身身爲AtomicReference
角色所存儲值設定爲DISPOSED
,即將ObservableReplay
中current
的值設定爲了DISPOSED
。spa
//ObservableReplay#connect
public void connect(Consumer<? super Disposable> connection) {
boolean doConnect;
ReplayObserver<T> ps;
for (;;) {
ps = current.get();
if (ps == null || ps.isDisposed()) {
ReplayBuffer<T> buf = bufferFactory.call();
ReplayObserver<T> u = new ReplayObserver<T>(buf);
if (!current.compareAndSet(ps, u)) {
continue;
}
ps = u;
}
doConnect = !ps.shouldConnect.get() && ps.shouldConnect.compareAndSet(false, true);
break;
}
try {
connection.accept(ps);
} catch (Throwable ex) {
if (doConnect) {
ps.shouldConnect.compareAndSet(true, false);
}
Exceptions.throwIfFatal(ex);
throw ExceptionHelper.wrapOrThrow(ex);
}
if (doConnect) {
source.subscribe(ps);
}
}
//ObservableReplay.ReplayObserver#dispose
public void dispose() {
observers.set(TERMINATED);
DisposableHelper.dispose(this);
}
//DisposableHelper#dispose
public static boolean dispose(AtomicReference<Disposable> field) {
Disposable current = field.get();
Disposable d = DISPOSED;
if (current != d) {
current = field.getAndSet(d);
if (current != d) {
if (current != null) {
current.dispose();
}
return true;
}
}
return false;
}
複製代碼
能夠看到,ReplayObserver
只是解除了與下游訂閱者的關係,但並無進一步對下游訂閱者進行結束的操做,這樣與UnboundedReplayBuffer
對象創建聯繫的訂閱者,若是buffer中的元素還未消費完畢,會持續消費直至所存元素下發完畢,但要注意的是,該buffer中並未存放結束事件(即經過調用UnboundedReplayBuffer#complete
往該隊列中存放NotificationLite.complete()
元素)。同時下游訂閱者也並未調用dispose()
方法,因此下面所示源碼中的output.isDisposed()
結果爲false
。請注意下面所示源碼中<1>
處的代碼:設計
public void replay(InnerDisposable<T> output) {
if (output.getAndIncrement() != 0) {
return;
}
final Observer<? super T> child = output.child;
int missed = 1;
for (;;) {
if (output.isDisposed()) {
return;
}
int sourceIndex = size;
Integer destinationIndexObject = output.index();
int destinationIndex = destinationIndexObject != null ? destinationIndexObject : 0;
while (destinationIndex < sourceIndex) {
Object o = get(destinationIndex);
//此處很關鍵
if (NotificationLite.accept(o, child)) {//<1>
return;
}
if (output.isDisposed()) {
return;
}
destinationIndex++;
}
output.index = destinationIndex;
missed = output.addAndGet(-missed);
if (missed == 0) {
break;
}
}
}
}
//io.reactivex.internal.util.NotificationLite#accept
public static <T> boolean accept(Object o, Observer<? super T> s) {
if (o == COMPLETE) {
s.onComplete();
return true;
} else
if (o instanceof ErrorNotification) {
s.onError(((ErrorNotification)o).e);
return true;
}
s.onNext((T)o);
return false;
}
複製代碼
若是調用了UnboundedReplayBuffer#complete
,那麼在元素下發到最後時,就會出現o == COMPLETE
爲true
,此時會調用下游訂閱者的onComplete()
方法。3d
//ObservableReplay.UnboundedReplayBuffer#complete
public void onComplete() {
if (!done) {
done = true;
buffer.complete();
replayFinal();
}
}
//ObservableReplay.UnboundedReplayBuffer#complete
public void complete() {
add(NotificationLite.complete());
size++;
}
//io.reactivex.internal.util.NotificationLite#complete
public static Object complete() {
return COMPLETE;
}
複製代碼
至此,關於replay_PublishSubject_test()
示例中所展示的疑點已經解讀完畢。code