RxSwift (二)序列核心邏輯分析

Rxswift(一)函數響應式編程思想編程

RxSwift (二)序列核心邏輯分析swift

RxSwift (三)Observable的建立,訂閱,銷燬設計模式

RxSwift(四)高階函數api

RxSwift(五)(Rxswift對比swift,oc用法)markdown

Rxswift (六)銷燬者Dispose源碼分析閉包

RxSwift(七)Rxswift對比swift用法ide

RxSwift (十) 基礎使用篇 1- 序列,訂閱,銷燬函數

RxSwift學習之十二 (基礎使用篇 3- UI控件擴展) @TOCoop

RxSwift序列核心邏輯

上一篇博客:Rxswift學習之(一)函數響應式編程思想只是簡單的分析了序列的核心邏輯。本篇博客主要針對上一篇作一下更加深刻的探討,若是有那些地方分析有誤,還請留言:QQ:282889543,讓咱們彼此提升,彼此成就。源碼分析

總的來講分析Rxswift的核心邏輯仍是按照三部曲:建立序列,訂閱序列,銷燬序列。核心思想是萬物皆序列。

1. 序列的建立

Observable可觀察者序列

咱們先來看下建立Observable所涉及的類的繼承關係: 以下圖:

針對上面的類圖,簡單分析下類的關係和設計思想: 首先分層實施的很完全,每一層都只解決一件事情,一層層疊起來結構很是清晰: AnonymousObservable -> Producer -> Observable -> ObservableType -> ObservableConvertibleType

其次咱們簡單分解一下每一個類都作了些什麼:

  • ObservableConvertibleType:顧名思義便可轉換爲Observable 類型協議,方法只有一個asObservable,這有什麼好處呢?
  1. 用戶不須要關注其具體是哪一個類型對象
  2. 讓用戶更多的關注其核心功能
  • ObservableType:也是個協議,繼承了ObservableConvertibleType協議的asObservable,它提供抽象方法subscribe,即咱們常說的訂閱,只有外部訂閱了該對象,才能真正實現對該對象進行觀察。
  • Observable:真正的類,能夠稱之爲元類,對於用戶來講Observable 的功能是完整的,由於它已經具有了全部的用戶所須要的功能,儘管有些方法並無獲得實現還是抽象方法。 Producer: 它繼承了Observable的全部方法,並實現subscribe 方法
  • AnonymousObservable:它繼承了Producer的全部方法,而且增長了屬性let _subscribeHandler: SubscribeHandler用來保存建立序列時傳入的閉包,也就相對於擁有了調用這個序列的能力,此外它還實現run方法,這也是建立序列最核心關鍵的方法。在run()方法中它建立一個AnonymousObservableSink final private類的對象,而這個對象sink能夠稱之爲管子,它相似於manager的角色,擁有序列和訂閱,銷燬能力。這裏有兩個疑惑:

問題1. AnonymousObservableSink爲何要定義成final private類,不能被繼承,也不能被外部訪問? 問題2. 建立的Observable是如何關聯到訂閱的?

這兩個問題咱們後面再分析。

最後,咱們總結一下設計思想:

事實上用戶所使用的 Observable ,都是 Producer 的子類和AnonymousObservable平行的子類,只不過用戶不須要關心其具體實現罷了 每個相似AnonymousObservable的類,還有一個與之相關的類AnonymousObservableSink,Sink即管道,全部這些組合起來才能讓其真正運行起來,AnonymousObservableSink同時擁有序列,訂閱功能,相似於咱們項目中常常用的manager角色。 整個設計向上經過組合協議的方式描述其特性,向下經過子類化的方式屏蔽其實現細節,相似於工廠模式,這樣的類也能夠叫類簇。

序列建立的流程

經過上面類繼承關係,其實咱們不難理解序列的建立流程,它確實也是隻有比較簡單的幾部,寥寥幾行代碼就搞定了,難點是上面拋出的幾個問題:

下面咱們將經過一個簡單的Rxswift的實例來分析一下序列的建立,訂閱,銷燬直接的流程和關係。

實例1

//1. 建立序列
   let ob = Observable<Any>.create { (obserber) -> Disposable in
            // 3:發送信號
            obserber.onNext("kyl發送了信號")
            obserber.onCompleted()
            return Disposables.create()
        }
    
        // 2:訂閱信號
        let _ = ob.subscribe(onNext: { (text) in
            print("訂閱到:\(text)")
        }, onError: { (error) in
            print("error: \(error)")
        }, onCompleted: {
            print("完成")
        }) {
            print("銷燬")
        }
複製代碼

上面實例1 的這段代碼能夠用酷C老師的一個圖來清晰的表達:

從上面的代碼和關係圖,咱們可能會產生這樣一個疑惑:

問題3: 建立的ob序列,僅僅只是經過ob.subscribe()訂閱了一下,爲何咱們在ob建立時的尾隨閉包(咱們這裏給個名字叫閉包A)裏面調用了obserber.onNext("kyl發送了信號")這個代碼,咱們就能夠訂閱到let _ = ob.subscribe(onNext: { (text) in print("訂閱到:\(text)") } 這裏會打印:」訂閱到:kyl發送了信號「。咱們沒有看見他們之間有任何關聯,怎麼ob發送消息,subcribe()的onNext閉包就能夠觸發呢,這是爲何呢?

咱們能夠這裏能夠簡單推理下:ob.subscribe()這個訂閱方法確定作了一些事情,在某個地方調用了閉包A,才能實現這個功能。具體是怎麼樣實現的呢?下面咱們將經過分析源碼來解答這個疑惑。

從上面的代碼咱們能夠知道,建立序列就一行代碼:let ob = Observable<Any>.create { (obserber) -> 而這一行代碼實際上是作了好多事情的。

首先咱們經過一個流程圖來初略的瞭解一下序列建立流程:

建立序列的Rxswift原碼很簡單,從上圖能夠看出,直接一行代碼return AnonymousObservable(subscribe)就結束了,這裏咱們們並無找到咱們須要的答案,甚至咱們有點愈來愈暈感受。

  • AnonymousObservable類源碼
final private class AnonymousObservable<Element>: Producer<Element> {
    typealias SubscribeHandler = (AnyObserver<Element>) -> Disposable

    let _subscribeHandler: SubscribeHandler

    init(_ subscribeHandler: @escaping SubscribeHandler) {
        self._subscribeHandler = subscribeHandler
    }

    override func run<O : ObserverType>(_ observer: O, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where O.E == Element {
        let sink = AnonymousObservableSink(observer: observer, cancel: cancel)
        let subscription = sink.run(self)
        return (sink: sink, subscription: subscription)
    }
}
複製代碼

咱們先作個深呼吸,放輕鬆,此路不通那咱們來嘗試分析其餘方向,不能在一棵樹上吊死。下面咱們來分析一下訂閱的流程。

2.訂閱

回顧上面實例1中的訂閱代碼:let _ = ob.subscribe(onNext: { (text) in這行代碼又作了些什麼事情呢?下面咱們經過源碼來深刻分析一下:

  • Rxswift訂閱subscribe()的源碼以下:
public func subscribe(onNext: ((E) -> Void)? = nil, onError: ((Swift.Error) -> Void)? = nil, onCompleted: (() -> Void)? = nil, onDisposed: (() -> Void)? = nil)
        -> Disposable {
            
            ... 上面代碼不是咱們要分析的重點,...表示忽略了這次的一段源碼
            /*注意,這次定義了一個AnonymousObserver()對象,以參數的形式, 構造方法裏面傳入了一個尾隨閉包eventHandler, 在這個閉包裏面,當收到event的不一樣事件, 會觸發並調用,咱們 `let _ = ob.subscribe(onNext: { (text) in` 這個方法傳入閉包 */
            let observer = AnonymousObserver<E> { event in
          
                ...
                
                switch event {
                case .next(let value):
                    onNext?(value) //調用訂閱時傳入的ob.subscribe(onNext:閉包
                case .error(let error):
                    if let onError = onError {
                        onError(error)//調用訂閱時傳入的ob.subscribe(onError:閉包
                    }
                    else {
                        Hooks.defaultErrorHandler(callStack, error)
                    }
                    disposable.dispose()
                case .completed:
                    onCompleted?()//調用訂閱時傳入的ob.subscribe(onCompleted:閉包
                    disposable.dispose()
                }
            }
            return Disposables.create(
                self.asObservable().subscribe(observer),
                disposable
            )/*這裏直接返回了Disposables對象,用來釋放資源, 在它的構造函數裏面直接調用了self.asObservable().subscribe(observer), 而asObservable()就是咱們建立的序列ob,也就是ob.subscribe(), 並傳入了,在這段代碼裏面建立的局部變量let observer = AnonymousObserver<E>,*/
    }
複製代碼

經過上面源碼咱們能夠知道:subscribe()這個方法,以參數的形式傳入了onNext()閉包,onError()閉包,onComplete()閉包,在函數裏面建立了一個AnonymousObserver對象observer,這個對象建立的時候傳入了一個閉包,當收到不一樣event事件時,會分別調用咱們subscribe()傳入的onNext,onError,onComplete這三個閉包。最重要一點是return Disposables.create( self.asObservable().subscribe(observer), disposable )這句代碼調用了咱們真正的subscribe()函數,並以參數的形式傳入了AnonymousObserver對象,self.asObservable()就是咱們create()函數建立的序列ob, 而到此處咱們能夠清晰的看到,咱們訂閱時傳入參數閉包和咱們建立的ob創建了一個鏈條。

這裏咱們又有一個疑問:self.asObservable()爲何就是咱們create()函數返回的ob呢?

要解答這個問題,我須要回顧一下上面分析的Observable類的繼承關係:Observable -> ObservableType -> ObservableConvertibleType 即Observable繼承ObservableType協議,ObservableType又繼承ObservableConvertibleType協議,而咱們的ObservableConvertibleType提供了抽象方法asObservable(),咱們Observable類中實現了asObservable()這個方法,它直接返回self就它本身。

下面經過源碼來證明:

///
/// It represents a push style sequence.
public class Observable<Element> : ObservableType {

    ...
    
    public func asObservable() -> Observable<E> {
        return self
    }
    
    ...
}
複製代碼

分析了Rxswift訂閱subscribe()的源碼感受很是nice, 咱們找到了咱們ob 建立時傳入的閉包和咱們訂閱時的閉包存在了一條鏈條關係,也就是隻要ob發送了消息,那咱們的訂閱者必定能夠按照這個鏈條收到消息。可是咱們仍是不知道究竟是怎麼調用的,怎麼觸發的。

並且咱們注意到self.asObservable().subscribe(observer)也就是AnonymousObservable調用了subscribe()方法,可是在AnonymousObservable類中咱們並無找到subscribe()的定義,因此咱們須要來看他的父類Producer

  • Producer的源碼以下:
class Producer<Element> : Observable<Element> {
    override init() {
        super.init()
    }

    override func subscribe<O : ObserverType>(_ observer: O) -> Disposable where O.E == Element {
        if !CurrentThreadScheduler.isScheduleRequired {
            // The returned disposable needs to release all references once it was disposed.
            let disposer = SinkDisposer()
            /*重點在這裏了,這裏調用了run()方法,一切疑惑都清晰了,咱們知道了run()調用時傳入了observer,而且建立了sink管子,而這個管子具有了序列的功能,能夠調用on()方法。 */
            let sinkAndSubscription = self.run(observer, cancel: disposer)
            disposer.setSinkAndSubscription(sink: sinkAndSubscription.sink, subscription: sinkAndSubscription.subscription)

            return disposer
        }
        else {
            return CurrentThreadScheduler.instance.schedule(()) { _ in
                let disposer = SinkDisposer()
                let sinkAndSubscription = self.run(observer, cancel: disposer)
                disposer.setSinkAndSubscription(sink: sinkAndSubscription.sink, subscription: sinkAndSubscription.subscription)

                return disposer
            }
        }
    }

    func run<O : ObserverType>(_ observer: O, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where O.E == Element {
        rxAbstractMethod()
    }
}

複製代碼

果真不出咱們所料,在Producer中咱們找到了subscribe()的方法定義,到此咱們能夠總結出很清晰的幾條線索了

  • (1)經過前面的類繼承關係能夠知道是Producer實現了ObservableType協議的subscribe()方法。在這個方法裏面調用了self.run(observer, cancel: disposer)
  • (2) self.run()實際上就是AnonymousObservable.run(), 這這個方法裏面作了三件事情:
override func run<O : ObserverType>(_ observer: O, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where O.E == Element {
//1.建立了一個sink管道對象,並將observer也就create()建立
//序列時傳入的閉包傳給了sink
        let sink = AnonymousObservableSink(observer: observer, cancel: cancel)
        //2. sink調用本身的run()方法,並把AnonymousObservable做爲參數傳入。
        let subscription = sink.run(self)
        //返回一個元組,包含sink管道信息。
        return (sink: sink, subscription: subscription)
    }
複製代碼
  • (3)AnonymousObservableSink類中run()方法中調用parent._subscribeHandler(AnyObserver(self)) 其中parent就是咱們(2)中sink.run(self)傳入的self,也就是AnonymousObservable對象;而且咱們前面已經知道_subscribeHandler就是建立序列時保存的那個經過create()函數參數傳入的 閉包:let ob = Observable<Any>.create { (obserber) -> Disposable in // 3:發送信號 obserber.onNext("kyl發送了信號") obserber.onCompleted() return Disposables.create() }。 如今已經很清晰了parent._subscribeHandler(AnyObserver(self)) 執行閉包,這行代碼就會調用obserber.onNext("kyl發送了信號")這個行代碼。

  • 如今咱們能夠經過一個流程圖來總結咱們代碼執行的流程:

上面的訂閱序列流程分析:咱們弄明白了從訂閱序列到調用create()函數時傳入的參數閉包調用的邏輯,可是這個閉包發送onNext()信號後,怎麼到訂閱消息的onNext()閉包咱們還不是很清晰。所以咱們須要分析AnonymousObserver

咱們先來看下AnonymousObserver

  • AnonymousObserver源碼定義以下:
final class AnonymousObserver<ElementType> : ObserverBase<ElementType> {
    typealias Element = ElementType
    
    typealias EventHandler = (Event<Element>) -> Void
    
    private let _eventHandler : EventHandler
    
    /*構造函數,保存了EventHandler尾隨閉包*/
    init(_ eventHandler: @escaping EventHandler) {
#if TRACE_RESOURCES
        _ = Resources.incrementTotal()
#endif
        self._eventHandler = eventHandler
    }

    //覆寫了onCore方法,調用了EventHandler閉包
    override func onCore(_ event: Event<Element>) {
        return self._eventHandler(event)
    }
    
#if TRACE_RESOURCES
    deinit {
        _ = Resources.decrementTotal()
    }
#endif
}

複製代碼

AnonymousObserver源碼中咱們並無找到onNext()方法,那咱們只能沿着它的繼承鏈往上查找,這裏須要瞭解一下類的繼承關係:

  • AnonymousObserver的繼承關係:

經過分析類的繼承關係,咱們得知:這樣一個關係鏈:

AnonymousObserver對象的on()方法會調用onCore()方法,ObserverType裏面有onNext,onError,onComplete方法。可是on()是如何調用的,什麼時候調用的呢?

要解決這個疑問,咱們須要再次回到咱們建立序列的代碼:

public static func create(_ subscribe: @escaping (AnyObserver<E>) -> Disposable) -> Observable<E> {
        return AnonymousObservable(subscribe)
    }
複製代碼

建立序列的create()方法傳入了一個subscribe閉包,並返回了AnonymousObservable對象。其中subscribe閉包就是咱們序列建立時參數形式傳入 閉包。而且AnonymousObservable初始化時將這個閉包保存起來了self._subscribeHandler = subscribeHandler AnonymousObservable 有一個run()方法,run方法裏面建立了一個AnonymousObservableSink對象sink,具體源碼以下:

final private class AnonymousObservable<Element>: Producer<Element> {
    typealias SubscribeHandler = (AnyObserver<Element>) -> Disposable

    let _subscribeHandler: SubscribeHandler

    init(_ subscribeHandler: @escaping SubscribeHandler) {
        self._subscribeHandler = subscribeHandler
    }

    override func run<O : ObserverType>(_ observer: O, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where O.E == Element {
        let sink = AnonymousObservableSink(observer: observer, cancel: cancel)
        let subscription = sink.run(self)
        return (sink: sink, subscription: subscription)
    }
}
複製代碼

分析了這麼久,繞了一圈,終於發現關鍵就在AnonymousObservableSink管子這個對象裏面了。sink這是個神奇的管子。它就保存了序列,也保存了訂閱,還保存了用於銷燬的disposed 也就是同時擁有了建立序列,訂閱序列,銷燬序列功能。

咱們來分析下AnonymousObservableSink的源碼:

final private class AnonymousObservableSink<O: ObserverType>: Sink<O>, ObserverType {
    typealias E = O.E
    //這裏的Parent就是咱們上面分析的AnonymousObservable,很是重要
    typealias Parent = AnonymousObservable<E>

    // state
    private let _isStopped = AtomicInt(0)

    #if DEBUG
        fileprivate let _synchronizationTracker = SynchronizationTracker()
    #endif

// 構造方法,傳入了observer序列,和Cancelable
    override init(observer: O, cancel: Cancelable) {
        super.init(observer: observer, cancel: cancel)
    }

//這裏實現 了ObserverType協議的on()方法
    func on(_ event: Event<E>) {
        #if DEBUG
            self._synchronizationTracker.register(synchronizationErrorMessage: .default)
            defer { self._synchronizationTracker.unregister() }
        #endif
        switch event {
        case .next:
            if load(self._isStopped) == 1 {
                return
            }
            //調用了父類的發佈,self.forwardOn()會調用本身的on()方法
            self.forwardOn(event)
        case .error, .completed:
            if fetchOr(self._isStopped, 1) == 0 {
                self.forwardOn(event)
                self.dispose()
            }
        }
    }

    func run(_ parent: Parent) -> Disposable {
    /*調用了_subscribeHandler閉包,這個閉包就是咱們以前建立序列時傳入閉包。 parent就是傳入進來的序列,這裏序列的閉包裏傳入了self而且強轉爲AnyObserver 這裏將self傳給了閉包_subscribeHandler,這樣_subscribeHandler也就具有了subcribe的能力。 */
        return parent._subscribeHandler(AnyObserver(self))
    }
}
複製代碼

其中Sink類的源碼以下:

class Sink<O : ObserverType> : Disposable {
    fileprivate let _observer: O
    fileprivate let _cancel: Cancelable
    fileprivate let _disposed = AtomicInt(0)

    #if DEBUG
        fileprivate let _synchronizationTracker = SynchronizationTracker()
    #endif

    init(observer: O, cancel: Cancelable) {
#if TRACE_RESOURCES
        _ = Resources.incrementTotal()
#endif
        self._observer = observer
        self._cancel = cancel
    }

    final func forwardOn(_ event: Event<O.E>) {
        #if DEBUG
            self._synchronizationTracker.register(synchronizationErrorMessage: .default)
            defer { self._synchronizationTracker.unregister() }
        #endif
        if isFlagSet(self._disposed, 1) {
            return
        }
        //這裏調用了傳入observer.on()方法,
        self._observer.on(event)
    }

    final func forwarder() -> SinkForward<O> {
        return SinkForward(forward: self)
    }

    final var disposed: Bool {
        return isFlagSet(self._disposed, 1)
    }

    func dispose() {
        fetchOr(self._disposed, 1)
        self._cancel.dispose()
    }

    deinit {
#if TRACE_RESOURCES
       _ =  Resources.decrementTotal()
#endif
    }
}
複製代碼

從源碼分析咱們得知:

  • 咱們的sink保存了咱們的序列,當咱們調用ob.onNext()發送信號時,因爲咱們的sink已經持有了ob, 這樣sink會調用on()方法,在on()方法裏面會調用self.forwardOn(event),而在fowardOn()裏面會調用self._observer.on(event)。這樣個人疑問就解決了,答案就是sink調用了on()方法。

  • 這裏咱們再來總結一下總的流程:

  1. 建立序列時create()返回了一個ob, 這個ob就是序列,建立的時候傳遞了一個閉包A。在閉包A中調用了ob.onNext()發送了信號。
  2. 訂閱序列時調用ob.subscribe()方法,這個方法會建立一個AnonymousObserver對象,並調用了self.asObservable().subscribe(observer)
  3. self.asObservable()實際就是咱們的ob, 也就是ob調用了subscribe().而AnonymousObserver中沒有找到subscribe()。
  4. 咱們在AnonymousObserver的父類中找到了subscribe(),發現subscribe()調用了AnonymousObserver的run()方法。
  5. 在AnonymousObserver的run()方法中,建立了一個管子sink,並調用了sink.run(self),sink是AnonymousObservableSink的對象,而在sink的run()方法中parent._subscribeHandler(AnyObserver(self))調用了建立序列時保存的閉包A (parent就是AnonymousObserver),這樣就解釋了訂閱時,回調了A閉包的緣由。
  6. 至於怎麼調用onNext()方法也是經過sink來實現的。
  7. sink已經持有了ob ,當咱們在A閉包裏面調用ob.onNext()發送信號時,實際會經過sink.on()來調用。首先sink.on()會調用forwardOn().
  8. 在forwardOn()中會調用self._observer.on(event)。
  9. _observer.on()會調用_observer.onCore()
  10. _observer.onCore(event)會根據event的類型判斷是調用onNext(),onError(),onComplete()中間一個,因爲咱們傳遞的是onNext事件,因此會調用onNext() ,而這個_observer.onNext()會調用咱們訂閱時傳入閉包subscribe(onNext:).
  11. 爲何回調的緣由是:
public func subscribe(onNext: ((E) -> Void)? = nil, onError: ((Swift.Error) -> Void)? = nil, onCompleted: (() -> Void)? = nil, onDisposed: (() -> Void)? = nil)
        -> Disposable {
            
            ... 上面代碼不是咱們要分析的重點,...表示忽略了這次的一段源碼
            /*注意,這次定義了一個AnonymousObserver()對象,以參數的形式, 構造方法裏面傳入了一個尾隨閉包eventHandler, 在這個閉包裏面,當收到event的不一樣事件, 會觸發並調用,咱們 `let _ = ob.subscribe(onNext: { (text) in` 這個方法傳入閉包 */
            let observer = AnonymousObserver<E> { event in
          
                ...
                
                switch event {
                case .next(let value):
                    onNext?(value) //調用訂閱時傳入的
複製代碼

這裏調用ob.subscribe()的時候,咱們建立了AnonymousObserver和咱們subscribe()傳入的onNext()閉包作了一個綁定,當AnonymousObserver.onNext()調用的時候一定會回調subscribe()傳入的onNext()閉包。而10中的_observer對象指的就是let observer = AnonymousObserver

  • 仍是經過這張圖來解釋最簡潔:

3. 銷燬

RxSwift給咱們的展現的設計思惟

iOS 經常使用設計模式

相關文章
相關標籤/搜索