透視RxSwift核心邏輯

透視RxSwift核心邏輯

篇幅稍微有點長,瞭解程度不一樣,能夠跳過某些部分。api

  1. 若是對源碼比較熟悉的,建議直接看圖就好了,時序圖更加清晰。第一次摸索有必要閱讀文字內容。
  2. 貼出來的代碼省略了沒必要要的部分,用省略號代替。

示例

RxSwift的基礎用法就是很簡單的幾步bash

  1. 建立可觀察序列
  2. 監聽序列(訂閱信號)
  3. 銷燬序列
//建立序列
let ob = Observable<Any>.create { (observer) -> Disposable in
    //發送信號
    observer.onNext("今日份麻醬涼皮")
    observer.onCompleted()

    return Disposables.create()
}
//訂閱信號
let _ = ob.subscribe(onNext: { (text) in
    print("訂閱到:\(text)")
}, onError: { (error) in
    print(error)
}, onCompleted: {
    print("完成")
}) {
    print("銷燬")
}

控制檯輸出:

訂閱到:今日份麻醬涼皮
完成
銷燬
複製代碼

探究

在看源碼以前,應該對接觸到的類和協議有些認識,方便以後的理解。下面的關係圖在須要的時候回頭熟悉一下就行:閉包

類關係圖.png

究竟是什麼在支撐如此便捷的調用?ide

第一句Observable<Any>.create建立了一個可觀察序列Observable對象,第二句就是這個Observable序列對象訂閱了消息。函數

從輸出能夠看出,都是訂閱到的消息。那麼訂閱時傳入subscribe的閉包是何時調用的呢?fetch

單從如今的幾句代碼,也能猜出是第一句代碼的閉包中的observer.onNext的調用引發的。可是,咱們也沒有看到這個**create函數中的閉包是在哪裏執行的?**ui

爲了可以清晰的描述,暫且稱第一句create中的閉包爲**create閉包,第二句subscribe中的幾個閉包爲subscribe閉包**。spa

外面看不出來,那咱們只能進去RxSwift裏面探索下createsubscribe到底作了什麼?線程

create函數的實現code

extension ObservableType {
    public static func create(_ subscribe: @escaping (AnyObserver<E>) -> Disposable) -> Observable<E> {
        return AnonymousObservable(subscribe)
    }
}
複製代碼

原來這函數內部其實是建立了一個AnonymousObservable匿名可觀察序列對象。而以前的閉包也是給AnonymousObservable對象初始化用了。

AnonymousObservable

final private class AnonymousObservable<Element>: Producer<Element> {
    typealias SubscribeHandler = (AnyObserver<Element>) -> Disposable

    let _subscribeHandler: SubscribeHandler

    init(_ subscribeHandler: @escaping SubscribeHandler) {
        self._subscribeHandler = subscribeHandler
    }

    override func run<O : ObserverType>(_ observer: O, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where O.E == Element {
        let sink = AnonymousObservableSink(observer: observer, cancel: cancel)
        let subscription = sink.run(self)
        return (sink: sink, subscription: subscription)
    }
}
複製代碼

這裏在初始化的時候把create閉包賦值給了_subscribeHandler屬性。

到此爲止,Observable<Any>.create函數實際上建立了一個AnonymousObservable匿名可觀察序列對象,並保存了create閉包

一圖概千言1.png

。。。貌似這條不是主線啊!沒有找到任何一個問題的答案。

再來翻翻**subscribe函數**:

extension ObservableType {
    public func subscribe(onNext: ((E) -> Void)? = nil, onError: ((Swift.Error) -> Void)? = nil, onCompleted: (() -> Void)? = nil, onDisposed: (() -> Void)? = nil)
        -> Disposable {
            let disposable: Disposable
            
            ......
            
            let observer = AnonymousObserver<E> { event in
                ......
                
                switch event {
                case .next(let value):
                    onNext?(value)
                case .error(let error):
                    if let onError = onError {
                        onError(error)
                    }
                    else {
                        Hooks.defaultErrorHandler(callStack, error)
                    }
                    disposable.dispose()
                case .completed:
                    onCompleted?()
                    disposable.dispose()
                }
            }
            return Disposables.create(
                self.asObservable().subscribe(observer),
                disposable
            )
    }
}
複製代碼

這也是對定義在ObservableType協議中的函數的實現,返回了一個Disposable。這個Disposable就是用來管理訂閱的生命週期的,示例代碼中並無體現出來,實際是在訂閱信號的內部處理的。前面都沒什麼,後面建立了AnonymousObserver,而且在AnonymousObserver初始化時傳入了閉包,並賦值給_eventHandler屬性。

final class AnonymousObserver<ElementType> : ObserverBase<ElementType> {
    typealias Element = ElementType
    typealias EventHandler = (Event<Element>) -> Void
    
    private let _eventHandler : EventHandler
    
    init(_ eventHandler: @escaping EventHandler) {
        self._eventHandler = eventHandler
    }

    override func onCore(_ event: Event<Element>) {
        return self._eventHandler(event)
    }
}
複製代碼

以前,AnonymousObservable匿名序列對象,保存了create閉包。 此時,建立了AnonymousObserver匿名觀察者對象,保存了對subscribe閉包的回調執行的EventHandler閉包

又一條支線。。。一路走來,都是在建立對象,保存閉包。兩個主線疑問仍是無跡可尋。難道一開始就走上了歧路?非也!繼續看下去就明白了什麼叫「柳暗花明又一村」。

AnonymousObservablesubscribe函數中,在建立了AnonymousObserver對象後,還返回了一個新建的Disposable對象。重點就在這裏的第一個參數:self.asObservable().subscribe(observer)中。asObservable仍是返回了self,後面貼上的ObserverType中能夠看到。剩下的就是AnonymousObservable的父類Producer中的subscribe了:

class Producer<Element> : Observable<Element> {
    override func subscribe<O : ObserverType>(_ observer: O) -> Disposable where O.E == Element {
        if !CurrentThreadScheduler.isScheduleRequired {
            ......
        } else {
            return CurrentThreadScheduler.instance.schedule(()) { _ in
                let disposer = SinkDisposer()
                let sinkAndSubscription = self.run(observer, cancel: disposer)
                disposer.setSinkAndSubscription(sink: sinkAndSubscription.sink, subscription: sinkAndSubscription.subscription)

                return disposer
            }
        }
    }
}
複製代碼

因爲是在當前線程中執行的,只看else那部分。disposer相關的不用關心。關鍵是observer參數,這個參數中有對subscribe閉包的處理的EventHandler閉包observer傳入了self.run(observer, cancel)。因此,還要回頭再看看**AnonymousObservable類中的run函數**:

override func run<O : ObserverType>(_ observer: O, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where O.E == Element {
    let sink = AnonymousObservableSink(observer: observer, cancel: cancel)
    let subscription = sink.run(self)
    return (sink: sink, subscription: subscription)
}
複製代碼

這裏又建立了一個AnonymousObservableSink對象,observercancel繼續往初始化函數中丟:

final private class AnonymousObservableSink<O: ObserverType>: Sink<O>, ObserverType {
    typealias E = O.E
    typealias Parent = AnonymousObservable<E>
    // state
    private let _isStopped = AtomicInt(0)
    
    override init(observer: O, cancel: Cancelable) {
        super.init(observer: observer, cancel: cancel)
    }
    
    func on(_ event: Event<E>) {
        ......
        
        switch event {
        case .next:
            if load(self._isStopped) == 1 {
                return
            }
            self.forwardOn(event)
        case .error, .completed:
            if fetchOr(self._isStopped, 1) == 0 {
                self.forwardOn(event)
                self.dispose()
            }
        }
    }

    func run(_ parent: Parent) -> Disposable {
        return parent._subscribeHandler(AnyObserver(self))
    }
}
複製代碼

前面sink.run(self)self傳了進來,又對AnonymousObservable起了別名Parentparent._subscribeHandler不就是AnonymousObservable在調用它最開始保存的那個create閉包麼?AnyObserver(self)則把AnonymousObservableSink做爲AnyObserver初始化的參數。

public struct AnyObserver<Element> : ObserverType {
    public typealias E = Element
    public typealias EventHandler = (Event<Element>) -> Void

    private let observer: EventHandler

    public init(eventHandler: @escaping EventHandler) {
        self.observer = eventHandler
    }
    
    public init<O : ObserverType>(_ observer: O) where O.E == Element {
        self.observer = observer.on
    }
    public func on(_ event: Event<Element>) {
        return self.observer(event)
    }
    public func asObserver() -> AnyObserver<E> {
        return self
    }
}
複製代碼

其中還把AnonymousObservableSinkon函數賦值給了AnyObserver的屬性observerobserver就是EventHandler。這個EventHandlercreate閉包中會用到。

這不就是第二個主線疑問(create函數中的閉包什麼時候調用)的答案麼!

整理一下:

  • subscribe(onNext, onError, onCompleted, onDisposed) -> Disposable函數中建立Disposable開始
  • AnonymousObservable調用subscribe(observer)
  • AnonymousObservable調用run(observer, cancel)
  • 建立AnonymousObservableSink(observer: observer, cancel: cancel),而且sink.run(self)
  • parent._subscribeHandler(AnyObserver(self))

這是一條從subscribe閉包-->create閉包的線。

一圖概千言2.png

還沒完,還有個**create閉包中怎麼觸發subscribe閉包的?**

又臭又長的寫了這麼多,這裏就只看observer.onNext("今日份麻醬涼皮")吧。點進去看: observer.onNext("今日份麻醬涼皮")

extension ObserverType {
    public func onNext(_ element: E) {
        self.on(.next(element))
    }
    
    public func onCompleted() {
        self.on(.completed)
    }
    
    public func onError(_ error: Swift.Error) {
        self.on(.error(error))
    }
}
複製代碼

onNext中調用了on,在前面AnyObserver結構體定義中能夠看出,on函數中返回了self.observer(event),以前是把AnonymousObservableSinkon賦值給了這個self.observer,因此,此時會走到AnonymousObservableSinkon函數中。這裏面又調用了self.forwardOn(event),看下AnonymousObservableSink的父類Sink中定義的forwardOn

class Sink<O : ObserverType> : Disposable {
    fileprivate let _observer: O
    fileprivate let _cancel: Cancelable
    fileprivate let _disposed = AtomicInt(0)

    ......

    init(observer: O, cancel: Cancelable) {
        ......
        
        self._observer = observer
        self._cancel = cancel
    }

    final func forwardOn(_ event: Event<O.E>) {
        ......
        
        self._observer.on(event)
    }
}
複製代碼

forwardOn中走了一句self._observer.on(event)。這裏的_observer屬性不就是AnonymousObservableSink初始化時傳入的AnonymousObserver對象麼!

繼續跟AnonymousObserveron

final class AnonymousObserver<ElementType> : ObserverBase<ElementType> {
    typealias Element = ElementType
    typealias EventHandler = (Event<Element>) -> Void
    
    private let _eventHandler : EventHandler
    
    init(_ eventHandler: @escaping EventHandler) {
        ......
        self._eventHandler = eventHandler
    }

    override func onCore(_ event: Event<Element>) {
        return self._eventHandler(event)
    }
}
複製代碼

這裏沒有on,看父類ObserverBase:

class ObserverBase<ElementType> : Disposable, ObserverType {
    typealias E = ElementType
    
    private let _isStopped = AtomicInt(0)

    func on(_ event: Event<E>) {
        switch event {
        case .next:
            if load(self._isStopped) == 0 {
                self.onCore(event)
            }
        case .error, .completed:
            if fetchOr(self._isStopped, 1) == 0 {
                self.onCore(event)
            }
        }
    }
}
複製代碼

這裏的on函數中,有個.next分支,調用了self.onCore(event)。子類AnonymousObserver實現的onCore中又調用了self._eventHandler(event)

這個_eventHandler是什麼?不就是AnonymousObserver初始化時保存的對subscribe閉包處理的閉包麼!因此create閉包中的observer.onNext("今日份麻醬涼皮")就能觸發subscribe閉包了。

這是一條從AnonymousObservable調用_subscribeHandler(也就是create閉包)時的參數 AnyObserver-->subscribe閉包的線。

一圖概千言3.png

如今咱們看清楚了響應式的數據流:

  1. 在訂閱信號時建立了observer並執行建立序列時的閉包
  2. 在建立序列的閉包中有回調observer,監聽序列的變更而觸發訂閱信號的閉包

圖解

看清楚了麼?好像清楚的不太明顯。畢竟好幾個類、協議,又那麼多函數調來調去的。加把勁再擼一擼。既然這麼五花八門的調用流程搞清楚了,那就來弄清楚它主要都作了什麼?

一圖概千言,既然畫了圖,就少敲點鍵盤吧!

時序圖0.1.png

時序圖0.2.png

時序圖0.3.png

時序圖0.4.png

時序圖0.5.png

時序圖0.6.png

仔細看了流程圖就會發現,出現的幾個類中,幹活的主要是Anonymous開頭的那三個類。咱們在外面調用的操做,實際上是在使用RxSwift內部封裝的一些類。

  • 建立可觀察序列AnonymousObservable,並保存create閉包
  • 訂閱信號
    • 首先建立了一個AnonymousObserver,而且把對subscribe閉包的操做封裝成了EventHandler
    • 苦力活仍是AnonymousObservable來幹。
      • 在建立返回值Disposable中,由subscribe(observer),把AnonymousObserver傳給了AnonymousObservableSink
        • AnonymousObservableSink纔是信息處理的核心,由於他知道的太多了
        • AnonymousObservableSinkAnonymousObserver觀察者,AnonymousObserver持有着EventHandler
        • AnonymousObservableSink在調用run函數時也傳入了AnonymousObservable序列,AnonymousObservable就是create閉包的持有者。
        • AnonymousObservableSink初始化的時候,除了觀察者外,還有個管理序列生命週期的Disposable
      • AnonymousObservableSink做爲一個內部類,在被create閉包當作參數回調給外界時須要轉換爲AnyObserver,在這裏AnyObserver則是以閉包屬性的形式保留了AnonymousObservableSinkon函數
      • 後面在信號發生改變時就可讓AnyObserver經過這個屬性值聯繫到AnonymousObservableSink

訂閱信號:Observable-->AnonymousObservable-->AnonymousObserver-->AnonymousObservableSink-->AnyObserver-->create閉包

  • 發出信號
    • 這個過程基本就是和訂閱信號時相反的
    • create閉包中調用AnyObserveronNext開始
    • 經過AnyObserver.observer訪問閉包中的AnonymousObservableSink
    • AnonymousObservableSink擁有AnonymousObserver
    • AnonymousObserver掌控EventHandler
    • 句號

發出信號:create閉包-->AnyObserver-->AnonymousObservableSink-->AnonymousObserver-->subscribe閉包

相關文章
相關標籤/搜索