RxJava之併發處理(SerializedSubject)

在併發狀況下,不推薦使用一般的Subject對象,而是推薦使用SerializedSubject,併發時只容許一個線程調用onnext等方法!
官方說明:css

When you use an ordinary Subject as a Subscriber, you must take care not to call its Subscriber.onNext method (or its other on methods) from multiple threads, as this could lead to non-serialized calls, which violates the Observable contract and creates an ambiguity in the resulting Subject. 

大體意思是當咱們使用普通的Subject,必需要注意不要在多線程狀況下調用onNext 方法,這樣是違反了Observable 協議而且會致使執行結果返回帶有有歧義的值(線程併發致使返回值混淆了)!java

To protect a Subject from this danger, you can convert it into a SerializedSubject with code like the following: 

mySafeSubject = new SerializedSubject( myUnsafeSubject );

官方文檔說的很明白了,只須要使用SerializedSubject封裝原來的
Subject便可!!markdown

測試demo:多線程

public class MultiThread {
    public static void main(String[] args) throws InterruptedException {

        final PublishSubject<Integer> subject = PublishSubject.create();

        subject.subscribe(new Action1<Integer>() {

            @Override
            public void call(Integer t) {
                System.out.println("======onnext===>value:" + t + ",threadId:" + Thread.currentThread().getId());
            }

        });

        final SerializedSubject<Integer, Integer> ser = new SerializedSubject<Integer, Integer>(subject);

        for (int i = 0; i < 20; i++) {
            final int value = i;
            new Thread() {
                public void run() {
                    ser.onNext((int) (value * 10000 + Thread.currentThread().getId()));
                };
            }.start();
        }

        Thread.sleep(2000);
        //
        // for (int i = 11; i < 20; i++) {
        // final int value = i;
        // new Thread() {
        // public void run() {
        // subject.onNext(value);
        // };
        // }.start();
        // }

    }
}

執行結果:併發

======onnext===>value:10,threadId:10
======onnext===>value:10011,threadId:10
======onnext===>value:50015,threadId:10
======onnext===>value:40014,threadId:10
======onnext===>value:30013,threadId:10
======onnext===>value:20012,threadId:10
======onnext===>value:70017,threadId:10
======onnext===>value:60016,threadId:10
======onnext===>value:80018,threadId:10
======onnext===>value:100020,threadId:10
======onnext===>value:90019,threadId:10
======onnext===>value:110021,threadId:21
======onnext===>value:130023,threadId:23
======onnext===>value:120022,threadId:23
======onnext===>value:160026,threadId:26
======onnext===>value:150025,threadId:25
======onnext===>value:140024,threadId:25
======onnext===>value:170027,threadId:25
======onnext===>value:180028,threadId:25
======onnext===>value:190029,threadId:25app

上面的結果有點暈了,爲何不是在一個線程上呢?和我以前覺得的SerializedSubject時將值放在一個線程上而後處理的想法有些出入了!ide

源碼面前了無祕密,SerializedSubject跟進去看看測試

public SerializedSubject(final Subject<T, R> actual) {
        super(new OnSubscribe<R>() {

            @Override
            public void call(Subscriber<? super R> child) {
                actual.unsafeSubscribe(child);
            }

        });
        this.actual = actual;
        this.observer = new SerializedObserver<T>(actual);
    }

其實SerializedSubject的處理是交給了SerializedObserver,繼續跟進到SerializedObserver,類註釋:ui

/**
 * Enforces single-threaded, serialized, ordered execution of {@link #onNext}, {@link #onCompleted}, and
 * {@link #onError}.
 * <p>
 * When multiple threads are emitting and/or notifying they will be serialized by:
 * </p><ul>
 * <li>Allowing only one thread at a time to emit</li>
 * <li>Adding notifications to a queue if another thread is already emitting</li>
 * <li>Not holding any locks or blocking any threads while emitting</li>
 * </ul>
 * 
 * @param <T>
 *          the type of items expected to be observed by the {@code Observer}
 */

這裏一看就明白了,他是隻保證同時只有一個線程調用 {@link #onNext}, {@link #onCompleted}, and{@link #onError}.方法,並非將全部emit的值放到一個線程上而後處理,這就解釋了爲何執行結果不是所有在一個線程上的緣由 了!this

再看看源碼再onnext方法:

@Override
    public void onNext(T t) {
        FastList list;

//同步鎖
        synchronized (this) {
            if (terminated) {
                return;
            }
            if (emitting) {
                if (queue == null) {
                    queue = new FastList();
                }
                queue.add(t != null ? t : NULL_SENTINEL);
                // another thread is emitting so we add to the queue and return
                return;
            }
            // we can emit
            emitting = true;
            // reference to the list to drain before emitting our value
            list = queue;
            queue = null;
        }

        // we only get here if we won the right to emit, otherwise we returned in the if(emitting) block above
        boolean skipFinal = false;
        try {
            int iter = MAX_DRAIN_ITERATION;
            do {
                drainQueue(list);
                if (iter == MAX_DRAIN_ITERATION) {
                    // after the first draining we emit our own value
                    actual.onNext(t);
                }
                --iter;
                if (iter > 0) {
                    synchronized (this) {
                        list = queue;
                        queue = null;
                        if (list == null) {
                            emitting = false;
                            skipFinal = true;
                            return;
                        }
                    }
                }
            } while (iter > 0);
        } finally {
            if (!skipFinal) {
                synchronized (this) {
                    if (terminated) {
                        list = queue;
                        queue = null;
                    } else {
                        emitting = false;
                        list = null;
                    }
                }
            }
        }

        // this will only drain if terminated (done here outside of synchronized block)
        drainQueue(list);
    }

// another thread is emitting so we add to the queue and return 若是有其餘線程正在處理,則將emit的值放到隊列上,線程執行完畢後,會順序emit隊列上的值!!這樣就保證了一次只會有一個線程調用!!!

相關文章
相關標籤/搜索