RxSwift之管道——AnonymousObservableSink

在上一篇RxSwift核心邏輯簡介中已經簡單分析了RxSwift的核心邏輯。swift

可是其中有一個點,解析得不是很清楚。那就是可觀察序列和訂閱者之間究竟是如何聯繫在一塊兒的?那麼這一篇就詳細分析一下連接二者的管道——AnonymousObservableSink。api

以前已經知道了,第二步訂閱信號之後,代碼會來到這裏。 閉包

run函數中,會初始化 AnonymousObservableSink,同時傳遞參數 observercancelcancel是垃圾處理器,處理內存回收,不是本篇的重點,先忽略。咱們重點關注 observer

一、AnonymousObservableSink <—> observer

先看看observer的來源。其是執行subscribe函數內部初始化的匿名觀察者AnonymousObserver的實例對象。 ide

其內部保存了訂閱信號時傳入的Event閉包。

這個實例對象在AnonymousObservableSink初始化時做爲初始化參數傳入。咱們再來看AnonymousObservableSink類的源碼。函數

final private class AnonymousObservableSink<O: ObserverType>: Sink<O>, ObserverType {
    typealias E = O.E
    typealias Parent = AnonymousObservable<E>

    // state
    private let _isStopped = AtomicInt(0)

    #if DEBUG
        fileprivate let _synchronizationTracker = SynchronizationTracker()
    #endif

    override init(observer: O, cancel: Cancelable) {
        super.init(observer: observer, cancel: cancel)
    }

    func on(_ event: Event<E>) {
        #if DEBUG
            self._synchronizationTracker.register(synchronizationErrorMessage: .default)
            defer { self._synchronizationTracker.unregister() }
        #endif
        switch event {
        case .next:
            if load(self._isStopped) == 1 {
                return
            }
            self.forwardOn(event)
        case .error, .completed:
            if fetchOr(self._isStopped, 1) == 0 {
                self.forwardOn(event)
                self.dispose()
            }
        }
    }

    func run(_ parent: Parent) -> Disposable {
        return parent._subscribeHandler(AnyObserver(self))
    }
}
複製代碼

AnonymousObservableSink在初始化時調用的是父類的初始化方法,咱們再找到其父類的源碼。post

class Sink<O : ObserverType> : Disposable {
    fileprivate let _observer: O
    fileprivate let _cancel: Cancelable
    fileprivate let _disposed = AtomicInt(0)

    #if DEBUG
        fileprivate let _synchronizationTracker = SynchronizationTracker()
    #endif

    init(observer: O, cancel: Cancelable) {
#if TRACE_RESOURCES
        _ = Resources.incrementTotal()
#endif
        self._observer = observer
        self._cancel = cancel
    }

    final func forwardOn(_ event: Event<O.E>) {
        #if DEBUG
            self._synchronizationTracker.register(synchronizationErrorMessage: .default)
            defer { self._synchronizationTracker.unregister() }
        #endif
        if isFlagSet(self._disposed, 1) {
            return
        }
        self._observer.on(event)
    }

    final func forwarder() -> SinkForward<O> {
        return SinkForward(forward: self)
    }

    final var disposed: Bool {
        return isFlagSet(self._disposed, 1)
    }

    func dispose() {
        fetchOr(self._disposed, 1)
        self._cancel.dispose()
    }

    deinit {
#if TRACE_RESOURCES
       _ =  Resources.decrementTotal()
#endif
    }
}
複製代碼

經過父類的初始化方法能夠看出,observer對象最終被保存在Sink類的_observer變量中。fetch

所以子類AnonymousObservableSink就擁有了觀察者observerthis

二、AnonymousObservableSink <—> 可觀察序列

AnonymousObservableSink被初始化後,會執行sink.run(self)函數,這裏的self即爲第一步建立可觀察序列中,建立的可觀察序列。spa

經過run函數,AnonymousObservableSink與可觀察序列之間就創建起了聯繫。3d

三、observer.onNext()中的observer究竟是什麼?

咱們知道,在第三步==發送信號==會執行observer.onNext()函數。這個observer會是咱們建立的可觀察序列嗎?咱們來分析一下源碼。

AnonymousObservableSink對象執行run函數時

func run(_ parent: Parent) -> Disposable {
    return parent._subscribeHandler(AnyObserver(self))
}
複製代碼

會調用parent_subscribeHandler。在上一篇中已經分析了_subscribeHandler,就是第一步建立可觀察序列時傳入的閉包。而給閉包返回的參數是一個AnyObserver(self)。因此,observer.onNext()中的observer不是咱們建立的可觀察序列,而是AnyObserver

查看源碼。

public struct AnyObserver<Element> : ObserverType {
    /// The type of elements in sequence that observer can observe.
    public typealias E = Element
    
    /// Anonymous event handler type.
    public typealias EventHandler = (Event<Element>) -> Void

    private let observer: EventHandler

    /// Construct an instance whose `on(event)` calls `eventHandler(event)`
    ///
    /// - parameter eventHandler: Event handler that observes sequences events.
    public init(eventHandler: @escaping EventHandler) {
        self.observer = eventHandler
    }
    
    /// Construct an instance whose `on(event)` calls `observer.on(event)`
    ///
    /// - parameter observer: Observer that receives sequence events.
    public init<O : ObserverType>(_ observer: O) where O.E == Element {
        self.observer = observer.on
    }
    
    /// Send `event` to this observer.
    ///
    /// - parameter event: Event instance.
    public func on(_ event: Event<Element>) {
        return self.observer(event)
    }

    /// Erases type of observer and returns canonical observer.
    ///
    /// - returns: type erased observer.
    public func asObserver() -> AnyObserver<E> {
        return self
    }
}
複製代碼

AnyObserver在初始化時,執行的是self.observer = observer.on這是什麼意思呢?

前面已經分析了。AnyObserverinit時傳入AnyObserver的是AnonymousObservableSink的實例對象。那麼observer.on實際上是AnonymousObservableSink.on,而經過上面的AnonymousObservableSink源碼能夠知道,on實際上是一個函數。因此,AnyObserver中的observer是一個函數。

既然observer.onNext()中的observer不是可觀察序列,而是AnyObserver對象,那麼第三步發送信號究竟是如何執行的呢?

四、observer.onNext()分析

點擊onNext()查看源碼。

其實調用的是 ObserverTypeself.on(.next(element))函數。可是 ObserverType是一個協議,只有函數聲明,沒有實現,因此,最後調用的是 AnyObserver.on(.next(element))函數。

從上面AnyObserver的源碼能夠看出。on函數,最終調用的實際上是self.observer(event),而在上一節的分析中,咱們已經知道。self.observer其實保存的是AnonymousObservableSink中的on函數。

因此最後會回到AnonymousObservableSink中調用self.forwardOn(event)。而AnonymousObservableSink中並無forwardOn函數。那麼老規矩,當前類中沒有的方法,那麼就去找它的父類Sink

在父類Sink中找到forwardOn()函數,函數中執行了self._observer.on(event)。咱們以前已經分析過了,self._observer其實就是訂閱信號時建立的匿名觀察者AnonymousObserver

因此,最後會調用AnonymousObserver.on。可是AnonymousObserver沒有on()函數。因此查找其父類ObserverBase

class ObserverBase<ElementType> : Disposable, ObserverType {
    typealias E = ElementType

    private let _isStopped = AtomicInt(0)

    func on(_ event: Event<E>) {
        switch event {
        case .next:
            if load(self._isStopped) == 0 {
                self.onCore(event)
            }
        case .error, .completed:
            if fetchOr(self._isStopped, 1) == 0 {
                self.onCore(event)
            }
        }
    }

    func onCore(_ event: Event<E>) {
        rxAbstractMethod()
    }

    func dispose() {
        fetchOr(self._isStopped, 1)
    }
}
複製代碼

分析發現,最後仍然會回到子類AnonymousObserver而且調用其onCore()函數。

onCore()函數中執行的是self._eventHandler(event)。在上一篇中已經分析了self._eventHandler就是訂閱信號時傳入的閉包。

因此,通過一系列的數據傳遞,observer.onNext(),最終會回調到訂閱信號時傳入的閉包中。以此完成數據的傳遞和流轉。

相關文章
相關標籤/搜索