RxSwift - 核心流程源碼解析

因爲項目的須要,筆者最近在學習swift,爲了更加高效的擼代碼,研究了RxSwift這個牛逼的庫,對於它的核心流程,進行了一番探索(實際上耗費了畢生心血),對於下面的敘述,若是錯誤,還望你們多多留言指正,謝謝了🙏🙏🙏swift

核心代碼塊

override func viewDidLoad() {
        super.viewDidLoad()
        //一、建立序列
        let ob = Observable<Any>.create { (obserber) -> Disposable in
            //三、發送信號
            obserber.onNext("佐籩")
            obserber.onError(NSError.init(domain: "1234", code: 10086, userInfo: nil))
            obserber.onCompleted()
            return Disposables.create()
        }
        
        //二、訂閱信號
        let _ = ob.subscribe(onNext: { (text) in
            print("訂閱到:\(text)")
        }, onError: { (error) in
            print("錯誤:\(error)")
        }, onCompleted: {
            print("完成")
        }) {
            print("銷燬")
        }
    }
複製代碼

運行代碼結果以下:api

核心代碼如上代碼所示,共三步bash

  • 建立序列
  • 訂閱信號
  • 發送信號

那麼對於上面的流程中提出一下幾個問題:閉包

  • 序列是如何建立?
  • 序列是如何接受到發送的信號的?
  • 整個流程又是如何實現的?

下面筆者會跟着你們一塊兒進入源碼,一探究竟!
想要具體瞭解原理的小夥伴,最好帶着源碼進行閱讀下面的內容,便於你的理解dom

源碼解析

建立序列

序列的建立很簡單,就是下面一行代碼ide

let ob = Observable<Any>.create { 閉包A }
複製代碼

進入查看create,這裏的create只能在create.swift文件裏查看,否則跟蹤不到這個方法😅源碼分析

但是,爲神馬會走到這裏來呢?學習

進入對象Observable查看,會獲得下面的繼承關係fetch

因此在ObservableType的擴展中查看create方法,具體實現以下ui

public static func create(_ subscribe: @escaping (AnyObserver<Element>) -> Disposable) -> Observable<Element> {
        return AnonymousObservable(subscribe)
    }
複製代碼

能夠看到,返回了一個AnonymousObservable對象,而且傳入了一個參數subscribe,經過建立的代碼能夠知道,這裏的subscribe就是外面咱們定義的閉包,後面文章中筆者會稱爲它爲閉包A,也就是發送信號的部分。

進入AnonymousObservable類的查看源碼,

final private class AnonymousObservable<Element>: Producer<Element> {
    typealias SubscribeHandler = (AnyObserver<Element>) -> Disposable

    let _subscribeHandler: SubscribeHandler

    init(_ subscribeHandler: @escaping SubscribeHandler) {
        self._subscribeHandler = subscribeHandler
    }

    override func run<Observer: ObserverType>(_ observer: Observer, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where Observer.Element == Element {
        let sink = AnonymousObservableSink(observer: observer, cancel: cancel)
        let subscription = sink.run(self)
        return (sink: sink, subscription: subscription)
    }
}
複製代碼

這個類的實現很簡單,經過init初始化方法能夠看到,閉包A被保存在了_subscribeHandler屬性中。 同時這裏還提供了一個很是重要run方法,後面會走到這裏,因此這裏先強調一下。

到此,序列的建立就結束了,下面在看訂閱信號的解析

訂閱信號

//二、訂閱信號
        let _ = ob.subscribe(onNext: { (text) in
            print("訂閱到:\(text)")
        }, onError: { (error) in
            print("錯誤:\(error)")
        }, onCompleted: {
            print("完成")
        }) {
            print("銷燬")
        }
複製代碼

從上面代碼能夠知道,重點解析在於方法subscribe,進入源碼,過濾掉其餘無關代碼以下

嗯,不錯,就這兩行代碼,信號的訂閱到發送,就結束了。

此刻請容許筆者經過下面內容來表達一下此刻的心裏

嗯。。。耐下心來,慢慢琢磨能夠知道,想要突破,那麼關鍵代碼就在下面這一行

self.asObservable().subscribe(observer)
複製代碼

經過斷點調試,能夠知道這裏的self表示着AnonymousObservable,或者經過咱們寫的核心代碼塊也能夠知道,首先建立了一個序列ob,它的類型上面說過了是AnonymousObservable類型,接着訂閱信號是直接用ob調用的方法subscribe,因此這裏的self表示着AnonymousObservable,它就是建立序列時返回的對象。

可是在上面對AnonymousObservable解析中,並無發現它擁有一個叫作subscribe的方法,再次回到AnonymousObservable類的源碼中,能夠發現,它是繼承自Producer類,不錯,在它的父類Producer中咱們發現了subscribe方法。

這裏停一下,由於此時傳了一個參數observer,它也是一個AnonymousObserver對象,同時帶着一個尾隨閉包,暫且稱這個參數爲閉包B,以便後面使用它的時候,咱們能夠容易想起來。

接着上面的思路,進入Producer類中查看,在subscribe方法中,調用了一個run方法,本類中,並無對這個方法進行具體實現,而是分發給了它的子類去實現,那麼這裏又須要回到AnonymousObservable類中,在上面的序列建立的內容中,筆者也就和你們說了這裏run方法的重要,下面具體分析。

在此以前,須要注意的是,參數閉包B一直在傳遞。

override func run<Observer: ObserverType>(_ observer: Observer, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where Observer.Element == Element {
        let sink = AnonymousObservableSink(observer: observer, cancel: cancel)
        let subscription = sink.run(self)
        return (sink: sink, subscription: subscription)
    }
複製代碼

建立了一個實例對象sink,而後調用了一個run方法。這裏看不出來什麼東西,進入AnonymousObservableSink查看源碼。

能夠發現下面幾點

  • 第一,重寫了父類的init方法,而傳進來的閉包B,傳遞給了父類Sink,同時賦值給了父類的_observer屬性。
  • 第二,方法on,參數爲event
  • 第三,方法run,參數parent

OK,AnonymousObservableSink類的內容,差很少就這麼多,回到以前的代碼中

let subscription = sink.run(self)
複製代碼

這行代碼裏的self表示類AnonymousObservable,我想你們應該都沒有意見,那麼具體看下這個方法的實現。

func run(_ parent: Parent) -> Disposable {
        return parent._subscribeHandler(AnyObserver(self))
    }
複製代碼

轉化一下代碼AnonymousObservable._subscribeHandler(AnyObserver(self)),這樣看的話,我相信你們應該很是的熟悉了。

AnonymousObservable是什麼? 它是在咱們建立序列時的返回的那個對象。
_subscribeHandler是什麼? 它是在咱們建立序列時傳的參數閉包A。

那麼這句話的意思就是執行閉包A,即發送信號

發送信號

分兩步來分析AnonymousObservable._subscribeHandler(AnyObserver(self))這行代碼;

一、發送信號的閉包

回到發送信號的閉包中,

obserber.onNext("佐籩")
複製代碼

這裏只分析onNext(),由於其餘原理同樣,就不過多描述了啊!

按住command,追蹤onNext()方法,能夠跳轉到ObserverType擴展中

public func onNext(_ element: Element) {
        self.on(.next(element))
    }
複製代碼

能夠經過斷點調試知道,這裏的self指的是AnyObserver類,或者能夠回到create.swift文件中的create方法中查看

這裏已經把對應的obserber的類型已經描述的很清晰了。obserberAnyObserver類型,且遵循ObserverType協議,調用的onNext()方法就是ObserverType協議裏的方法(查看一下AnyObserver類的源碼便知)。

回到AnyObserver類中,查看on方法

public func on(_ event: Event<Element>) {
        return self.observer(event)
    }
複製代碼

能夠斷點看一下,這裏已經把信號傳了進來

那麼這裏的 self.observer()具體是什麼呢?請看下面的分析

二、解析 AnyObserver(self)

經過上面流程能夠知道,關鍵代碼在於AnyObserver(self),經過進入源碼查看

public init<Observer: ObserverType>(_ observer: Observer) where Observer.Element == Element {
        self.observer = observer.on
    }
複製代碼

在初始化代碼裏有這麼一個操做,observer.on賦值的操做,這裏的observer是經過初始化構造方法傳來的,回顧上面的流程能夠知道,它是AnonymousObservableSink類。那麼它執行on方法,跟着代碼走,會調用方法forwardOn,最後會來到Sink類裏的forwardOn方法,源碼以下

final func forwardOn(_ event: Event<Observer.Element>) {
        #if DEBUG
            self._synchronizationTracker.register(synchronizationErrorMessage: .default)
            defer { self._synchronizationTracker.unregister() }
        #endif
        if isFlagSet(self._disposed, 1) {
            return
        }
        self._observer.on(event)
    }
複製代碼

關鍵代碼在於self._observer.on(event),在訂閱信號的分析中,特別說明了_observer是閉包B,若是這裏有疑問的小夥伴,可會回到前面看一下。

閉包B的具體結構以下(部分代碼省略):

AnonymousObserver<Element> { event in
                switch event {
                case .next(let value):
                    onNext?(value)
                case .error(let error):
                    if let onError = onError {
                        onError(error)
                    }
                    else {
                        Hooks.defaultErrorHandler(callStack, error)
                    }
                    disposable.dispose()
                case .completed:
                    onCompleted?()
                    disposable.dispose()
                }
            }
複製代碼

那麼對於self._observer.on(event),能夠理解爲類AnonymousObserver調用方法on,進入源碼查看,AnonymousObserver類中並沒有該方法,而後經過它的父類ObserverBase能夠找到該方法,具體實現以下:

func on(_ event: Event<Element>) {
        switch event {
        case .next:
            if load(self._isStopped) == 0 {
                self.onCore(event)
            }
        case .error, .completed:
            if fetchOr(self._isStopped, 1) == 0 {
                self.onCore(event)
            }
        }
    }
複製代碼

緊接着,執行self.onCore(event),一樣的,該方法,父類ObserverBase並無具體實現,而是分發給子類去實現,回到類AnonymousObserver,查看該方法的實現代碼

override func onCore(_ event: Event<Element>) {
        return self._eventHandler(event)
    }
複製代碼

那麼對於self._eventHandler()是個什麼東西,能夠具體分析一下,在當前類AnonymousObserver中,

typealias EventHandler = (Event<Element>) -> Void
    
    private let _eventHandler : EventHandler
複製代碼

追溯到枚舉Event

public enum Event<Element> {
    /// Next element is produced.
    case next(Element)

    /// Sequence terminated with an error.
    case error(Swift.Error)

    /// Sequence completed successfully.
    case completed
}
複製代碼

到這裏,我想你們應該能夠知道下面咱們應該走到哪裏了,沒錯,就是剛剛說的閉包B,根據event執行不一樣的方法,好比這裏所說的onNext方法,那麼就會打印輸出訂閱到:佐籩

總結

OK,到這裏關於RxSwift的核心流程的源碼分析已經結束了,瞭解了其中的原理,筆者相信,對於RxSwift的運用會更加駕輕就熟。

好難,筆者在整理這份筆記的時候,確實花費了很多時間,本想畫個流程圖來幫助你們去理解,奈什麼時候間有限,後續筆者會整理一份思路較清晰的流程圖;

上述的分析流程,若是有什麼錯誤,但願你們給筆者留言指正一下,謝謝!!!

持續學習,但願你們持續關注!!!

相關文章
相關標籤/搜索