Rxswift (六)銷燬者Dispose源碼分析

Rxswift(一)函數響應式編程思想編程

RxSwift (二)序列核心邏輯分析swift

RxSwift (三)Observable的建立,訂閱,銷燬api

RxSwift(四)高階函數數組

RxSwift(五)(Rxswift對比swift,oc用法)markdown

Rxswift (六)銷燬者Dispose源碼分析多線程

RxSwift(七)Rxswift對比swift用法閉包

RxSwift (十) 基礎使用篇 1- 序列,訂閱,銷燬app

RxSwift學習之十二 (基礎使用篇 3- UI控件擴展) @TOCide

Rxswift銷燬者Dispose簡介

  1. 先經過一張思惟導圖初步瞭解一下銷燬者Dispose它擁有什麼,作了一些什麼事情:
    image
  2. 本編文章主要是圍繞上面這張圖來展開,重點分析Dispose()是怎麼銷燬序列的。
  3. 從上圖咱們能夠看出銷燬者後的第一個根節點是dispose和disposeBag.那他們分別是什麼呢?答案將在下面講解。

Rxswift銷燬者類和重要函數介紹

1. DisposeBag

1.1 DisposeBag是什麼

RxSwift和RxCocoa還有一個額外的工具來輔助處理ARC和內存管理:即DisposeBag。這是Observer對象的一個虛擬」包」,當它們的父對象被釋放時,這個虛擬包會被丟棄。 當帶有DisposeBag屬性的對象調用deinit()時,虛擬包將被清空,且每個一次性(disposable)Observer會自動取消訂閱它所觀察的內容。這容許ARC像一般同樣回收內存。 若是沒有DisposeBag,會有兩種結果:或者Observer會產生一個retain cycle,被無限期的綁定到被觀察對象上;或者意外地被釋放,致使程序崩潰。 因此要成爲一個ARC的良民,記得設置Observable對象時,將它們添加到DisposeBag中。這樣,它們才能被很好地清理掉。函數

當一個Observable(被觀察者)被觀察訂閱後,就會產生一個Disposable實例,經過這個實例,咱們就能進行資源的釋放了。 對於RxSwift中資源的釋放,也就是解除綁定、釋放空間,有兩種方法,分別是顯式釋放以及隱式釋放:

  • 顯式釋放 可讓咱們在代碼中直接調用釋放方法進行資源的釋放 以下面的實例:
let dispose = textField.rx_text
           .bindTo(label.rx_sayHelloObserver)
dispose.dispose()
複製代碼
  • 隱式釋放 隱式釋放則經過DisposeBag來進行,它相似於Objective-C ARC中的自動釋放池機制,當咱們建立了某個實例後,會被添加到所在線程的自動釋放池中,而自動釋放池會在一個RunLoop週期後進行池子的釋放與重建;DisposeBag對於RxSwift就像自動釋放池同樣,咱們把資源添加到DisposeBag中,讓資源隨着DisposeBag一塊兒釋放。

以下實例:

let disposeBag = DisposeBag()
func binding() {
       textField.rx_text
           .bindTo(label.rx_sayHelloObserver)
           .addDisposableTo(self.disposeBag)
}
複製代碼

上面代碼中方法addDisposableTo會對DisposeBag進行弱引用,因此這個DisposeBag要被實例引用着,通常可做爲實例的成員變量,當實例被銷燬了,成員DisposeBag會跟着銷燬,從而使得RxSwift在此實例上綁定的資源獲得釋放。

從上面的講解咱們大體明白了DisposeBag就像咱們咱們OC內存管理裏的自動釋放池。他充當了一個垃圾回收袋的角色,你只需把序列加入了disposeBag,disposeBag就會在合適的時候幫咱們釋放資源,那麼它是怎麼作到的呢?

1.2 DisposeBag的實現源碼分析

1.2.1. 先看一下類圖:

image

1.2.2. 具體分析源碼流程

  1. 當咱們調用disposed()方法的時候,會調用Dispose類的insert()方法,將銷燬者dispose加入的_disposables數組中。 具體源碼以下:
public final class DisposeBag: DisposeBase {
    
    private var _lock = SpinLock()
    
    // state
    fileprivate var _disposables = [Disposable]()
    fileprivate var _isDisposed = false
    
    /// Constructs new empty dispose bag.
    public override init() {
        super.init()
    }

    /// Adds `disposable` to be disposed when dispose bag is being deinited.
    ///
    /// - parameter disposable: Disposable to add.
    public func insert(_ disposable: Disposable) {
        self._insert(disposable)?.dispose()
    }
    
    private func _insert(_ disposable: Disposable) -> Disposable? {
        //這裏爲了爲了防止多線程下出現搶佔資源問題,須要加鎖控制同步訪問
        self._lock.lock(); defer { self._lock.unlock() }
        if self._isDisposed {//判斷若是調用過了_dispose()說明已經被釋放過了,不須要再釋放,保證對稱性,則直接返回
            return disposable
        }
        //保存到數組中
        self._disposables.append(disposable)

        return nil
    }

    /// This is internal on purpose, take a look at `CompositeDisposable` instead.
    private func dispose() {
        // 1.獲取到全部保存的銷燬者
        let oldDisposables = self._dispose()

        // 2.遍歷每一個銷燬者,掉用每個銷燬者的dispose()釋放資源
        for disposable in oldDisposables {
            disposable.dispose()
        }
    }

    private func _dispose() -> [Disposable] {
        self._lock.lock(); defer { self._lock.unlock() }

        // 獲取到全部保存的銷燬者
        let disposables = self._disposables
        
        self._disposables.removeAll(keepingCapacity: false)
        self._isDisposed = true //這個變量用來記錄是否垃圾袋數組被清空過
        
        return disposables
    }
    
    deinit {
        //當DisposeBag自身對象被銷燬時,調用本身的dispose(),遍歷銷燬數組中全部保存的銷燬者,
        self.dispose()
    }
}
複製代碼
  1. 上面的源碼流程經過一個圖來標識
    image
  2. 總結一下上面的DisposeBag處理流程:
  • 當咱們調用序列的dispose()方法是,DisposeBag調用insert()方法將咱們的須要銷燬的序列保存起來存放在_disposables數組中。
  • 當咱們的DisposeBag銷燬時,如定義的局部變量出了做用域後,就會被銷燬,首先會調用咱們的deinit()方法 如上圖4,在deinit()裏面會執行本身的dispose()方法,而後變量以前保存的全部須要釋放的_disposables數組,依次調用他們本身的dispose()方法。

2. fetchOr()函數

  1. fetchOr 函數的做用相似於標記,先來看一下fetchOr()函數的源碼:
func fetchOr(_ this: AtomicInt, _ mask: Int32) -> Int32 {
    this.lock()
    let oldValue = this.value
    this.value |= mask
    this.unlock()
    return oldValue
}
複製代碼

源碼很簡單,可是做用不小。代碼中this 是傳入的AtomicInt值,其內部僅有一個value值。 fetchOr 先將 this.value copy一份,做爲結果返回。而將 this.value 和 mask 作或 (|) 運算。而且將或運算的結果賦值給 this.value。

  1. 咱們經過一個表來理解這個函數的執行結果:
this.value mask oldValue 或 運算後this.value 返回值
0 1 0 1 0
1 1 1 1 1
0 2 0 2 0
1 2 1 3 1

就是作了一次或運算,實際的10進制結果不變,只是改變了裏面的二進制位,能夠用來作標誌位,只是C語言裏面常常用的方法,即一個Int類型處理自己的值可使用外,還能夠經過按位與,或,來改變它的標誌位,達到傳遞值的目的,這樣每一個位均可以取代一個bool類型,常常用做枚舉。

運算符 二進制 十進制 說明
0000 0001 1
0000 0010 2
或運算 0000 0011 3
  1. 經過上面的分析,我得知 fetchOr ()函數的做用就是,能夠確保每段代碼只被執行一次,就至關於一個標誌位,若是初始值爲0 ,若是傳入參數1,假設這段代碼重複執行5次,只有第一次會從0變爲1,後面四次調用都是爲1,不會發送變化。

Dispose核心邏輯

Dispose 實例代碼分析

  • 學過Rxswift的童鞋都知道dispose()調用後,會向咱們oc裏面的引用計數器同樣,釋放咱們的資源。釋放的時候咱們還能夠監聽到被銷燬的事件回調。那麼有沒有思考過Dispose是如何作到的呢?

要知道這個答案,咱們只能經過源碼來一步步分析:

  • 首先,咱們來看一段實例代碼:

實例1:

func limitObservable(){
        // 建立序列
        let ob = Observable<Any>.create { (observer) -> Disposable in
            observer.onNext("kongyulu")
            return Disposables.create { print("銷燬釋放了")} // dispose.dispose()
        }
        // 序列訂閱
        let dispose = ob.subscribe(onNext: { (anything) in
            print("訂閱到了:\(anything)")
        }, onError: { (error) in
            print("訂閱到了:\(error)")
        }, onCompleted: {
            print("完成了")
        }) {
            print("銷燬回調")
        }
        print("執行完畢")
        //dispose.dispose()
    }
複製代碼
  1. 上面的代碼執行結果以下:
    image
  2. 經過上面的結果咱們知道,這個建立的序列沒有被銷燬,即沒有打印「銷燬釋放了」,也沒有打印「銷燬回調」。這是爲何呢?這個問題咱們後面再經過分析源碼Rx源碼就知道了。
  3. 如今咱們把上面代碼的那行註釋放開dispose.dispose() 這行代碼去掉註釋,而後從新運行,輸出結果以下:
    image
  4. 經過上面的代碼咱們看到了,建立的序列銷燬了,銷燬回調也執行了。那爲何加上了dispose.dispose() 就能夠了呢?
  5. 此外咱們再來修改一下咱們的代碼:

實例2:

func limitObservable(){
        // 建立序列
        let ob = Observable<Any>.create { (observer) -> Disposable in
            observer.onNext("kongyulu")
            observer.onCompleted()
            return Disposables.create { print("銷燬釋放了")} // dispose.dispose()
        }
        // 序列訂閱
        let dispose = ob.subscribe(onNext: { (anything) in
            print("訂閱到了:\(anything)")
        }, onError: { (error) in
            print("訂閱到了:\(error)")
        }, onCompleted: {
            print("完成了")
        }) {
            print("銷燬回調")
        }
        print("執行完畢")
        //dispose.dispose()
    }
複製代碼

上面的實例2 的代碼相對與實例1 就多了一行代碼:observer.onCompleted() : 咱們再來看一下輸出結果:

image
這裏咱們能夠看到咱們多加了一行 observer.onCompleted()代碼後,就也打印了銷燬回調,銷燬釋放了,這是什麼邏輯呢? why?

  • 下面就讓咱們帶着三個問題去探索一下Rxswift底層是如何實現的

Dispose 流程源碼解析

再分析Dispose源碼前,咱們必須先深刻理解序列的建立,訂閱流程這個是基礎,只有理解了這個,才能真正理解Dispose的原理。 這個其實在以前的博客已經分析過了,詳情能夠參考我以前的博客:序列核心邏輯

爲了便於更好的理解,我在這裏還再一次理一下具體的流程:

1. 序列建立,訂閱流程

  • (1) 當咱們執行代碼let ob = Observable<Any>.create { (observer) -> Disposable in 這裏面是一個閉包咱們稱爲閉包A } 時,實際會來到Create.swift文件的第20行:
public static func create(_ subscribe: @escaping (AnyObserver<Element>) -> Disposable) -> Observable<Element> {
        return AnonymousObservable(subscribe)
    }
複製代碼
  • (2) create()函數返回一個AnonymousObservable(subscribe)對象,並將咱們的閉包A傳入到了AnonymousObservable的構造函數裏面,AnonymousObservable將這個閉包A保存到了let _subscribeHandler: SubscribeHandler這個變量中存起來了。_subscribeHandler這個變量保存了序列ob建立時 傳入的閉包A (其中閉包A要求傳入AnyObserver類型做爲參數)
final private class AnonymousObservable<Element>: Producer<Element> {
    typealias SubscribeHandler = (AnyObserver<Element>) -> Disposable
    //這個變量保存了序列ob建立時 傳入的閉包A (其中閉包A要求傳入AnyObserver類型做爲參數)
    let _subscribeHandler: SubscribeHandler

    init(_ subscribeHandler: @escaping SubscribeHandler) {
        self._subscribeHandler = subscribeHandler //這個變量保存了序列ob建立時 傳入的閉包A
    }
    ...下面代碼先不看省略掉
}
複製代碼
  • (3)咱們調用let dispose = ob.subscribe(onNext: { (anything) in print("訂閱到了:\(anything)" } 這行代碼進行序列ob的訂閱操做,這行代碼,咱們跟進源碼能夠查看到:在ObservableType+Extensions.swift文件的第39行:
public func subscribe(onNext: ((Element) -> Void)? = nil, onError: ((Swift.Error) -> Void)? = nil, onCompleted: (() -> Void)? = nil, onDisposed: (() -> Void)? = nil)
        -> Disposable {
            let disposable: Disposable
            
            ... 此處代碼先不分析省略
            
            let observer = AnonymousObserver<Element> { 
             這個裏面是一個尾隨閉包的內容這裏先不分析
            }
            return Disposables.create(
                self.asObservable().subscribe(observer),
                disposable
            )
    }
複製代碼
  • (4)從上面subscribe()的源碼能夠看到,在函數中建立了一個AnonymousObserver的對象,而後直接就return Disposables.create()結束了。

  • (5)這裏咱們並無發現訂閱和咱們的閉包A有任何關係,那關鍵就在self.asObservable().subscribe(observer)這行代碼裏面了,我來分析一下這行代碼到底作了些什麼。

  • (6)咱們要理解(5)中的這行代碼,就須要先理解一下類的集成關係: AnonymousObservable -> Producer -> Observable -> ObservableType -> ObservableConvertibleType 詳情以下圖:

    image

  • (7)經過繼承關係,咱們能夠順着繼承鏈往上找父類,咱們能夠找到是在Observable類中定義了這個asObservable()方法:

public class Observable<Element> : ObservableType {
    ...此處省略不關注的代碼
    
    public func asObservable() -> Observable<Element> {
        return self
    }
    
    ...此處省略不關注的代碼
}

複製代碼
  • (8)經過源碼分析,我得知 asObservable()就是返回self ,而(3)的代碼調用是的self.asObservable().subscribe(observer) 這行代碼的self就是咱們建立的序列ob, 因此self.asObservable()返回的就是ob咱們最開始建立的可觀察序列。self.asObservable().subscribe(observer) 中的observer就是咱們在public func subscribe(onNext: ((Element) -> Void)? = nil, onError: ((Swift.Error) -> Void)? = nil, onCompleted: (() -> Void)? = nil, onDisposed: (() -> Void)? = nil) 方法實現中建立的局部變量:let observer = AnonymousObserver<Element> { 這個裏面是一個尾隨閉包的內容這裏先不分析 } 咱們將這個局部變量傳入了Observable的subscribe()方法。
  • (9)接着咱們就要分享Observable的subscribe()方法作了些什麼了。
  • (10)當咱們調用實例2中的這行代碼:let dispose = ob.subscribe(onNext: { (anything) in print("訂閱到了:\(anything)") } 的時候,實際調用了 ObservableType協議的subscribe()方法,在這個方法裏面咱們建立了一個AnonymousObserver對象,並經過self.asObservable().subscribe(observer) 傳入了ob.susbscribe(observer) (注意:這裏的ob就是咱們create()建立的AnonymousObservable對象,而observer就是subscribe時建立臨時局部AnonymousObserver對象,這些上面已經分析過了)。
  • (11)然而經過上面的類圖,咱們能夠看到在ob(AnonymousObservable)類中並無一個subscribe()的方法,那麼咱們只能先找它的父類Producer.
  • (12)經過前面的類圖分析,能夠知道Producer繼承Observerable可觀察序列,遵循了ObservableType協議(這個協議定義一個subscribe()接口),因此咱們Producer中一定會實現這個接口。我來看一下源碼:
override func subscribe<Observer: ObserverType>(_ observer: Observer) -> Disposable where Observer.Element == Element {
        if !CurrentThreadScheduler.isScheduleRequired {
            // The returned disposable needs to release all references once it was disposed.
            let disposer = SinkDisposer()
            //下面這行代碼是重點,調用了本身的run()方法,並傳入了兩個參數:
            //參數1:observer:就是咱們`self.asObservable().subscribe(observer)` 傳入的`AnonymousObserver`對象
            //參數2:disposer:就SinkDisposer()對象後將銷燬會再分析。
            let sinkAndSubscription = self.run(observer, cancel: disposer)
            disposer.setSinkAndSubscription(sink: sinkAndSubscription.sink, subscription: sinkAndSubscription.subscription)

            return disposer
        }
        else {
            return CurrentThreadScheduler.instance.schedule(()) { _ in
                let disposer = SinkDisposer()
                let sinkAndSubscription = self.run(observer, cancel: disposer)
                disposer.setSinkAndSubscription(sink: sinkAndSubscription.sink, subscription: sinkAndSubscription.subscription)

                return disposer
            }
        }
    }
複製代碼
  • (13)經過上面源碼分析,咱們得知Producer實現的subscribe()接口裏面,調用了本身的run()方法,並在run()方法裏面傳入了observer:就是咱們self.asObservable().subscribe(observer) 傳入的AnonymousObserver對象。那接下來我看一下run()作了一些什麼事情:
func run<Observer: ObserverType>(_ observer: Observer, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where Observer.Element == Element {
        rxAbstractMethod()
    }
複製代碼
  • (14)從上面Producer中的run()方法,咱們能夠知道在這個方法並無作任何事情,就一行rxAbstractMethod(),而這個rxAbstractMethod()只是一個抽象方法。那咱們的子類AnonymousObservable中確定覆寫了run()方法。接下來咱們再看一下AnonymousObservablerun()的源碼:
override func run<Observer: ObserverType>(_ observer: Observer, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where Observer.Element == Element {
        //建立了一個管子AnonymousObservableSink,並傳給了管子兩個參數:
         //參數1:observer:就是咱們`self.asObservable().subscribe(observer)` 傳入的`AnonymousObserver`對象
            //參數2:disposer:就SinkDisposer()對象後將銷燬會再分析。
        let sink = AnonymousObservableSink(observer: observer, cancel: cancel)
        let subscription = sink.run(self)
        return (sink: sink, subscription: subscription)
    }
複製代碼
  • (15)在上面的run()源碼中咱們能夠看到:在AnonymousObservablerun()方法中。首先,建立了一個AnonymousObservableSink對象sink,並將observer(也就是咱們self.asObservable().subscribe(observer) 傳入的AnonymousObserver對象)傳入;其次,調用了sink.run(self) 方法返回了subscription,而後直接飯後一個元組,也就是run()方法返回了一個元組:(sink: sink, subscription: subscription)。 可是咱們的重點是在sink管子上面。AnonymousObservableSink是一個相似於manager角色,它保存了序列,訂閱者,銷燬者三個信息,還具有調度能力。咱們序列和訂閱者就是通這個管子來作橋樑,實現通信。
  • (16)接下來咱們分析AnonymousObservableSink管子作了一些什麼呢?咱們來看一下AnonymousObservableSink的源碼:
final private class AnonymousObservableSink<Observer: ObserverType>: Sink<Observer>, ObserverType {
    typealias Element = Observer.Element 
    //這裏給了一個別名:Parent就是AnonymousObservable序列
    typealias Parent = AnonymousObservable<Element>

    // state
    private let _isStopped = AtomicInt(0)

    #if DEBUG
        fileprivate let _synchronizationTracker = SynchronizationTracker()
    #endif

    override init(observer: Observer, cancel: Cancelable) {
    //傳入了observer:就是咱們`self.asObservable().subscribe(observer)` 傳入的`AnonymousObserver`對象
        super.init(observer: observer, cancel: cancel)
    }

func on(_ event: Event<Element>) {
    #if DEBUG
        self._synchronizationTracker.register(synchronizationErrorMessage: .default)
        defer { self._synchronizationTracker.unregister() }
    #endif
    switch event {
    case .next:
        if load(self._isStopped) == 1 {//若是已經執行過.error, .completed,就不會繼續執行self.forwardOn(event)代碼,意思就是隻有對象生命週期內執行過.complete,.error事件,就不會再執行forwardOn,除非從新激活改變條件值。
            return
        }
        self.forwardOn(event)
    case .error, .completed:
    //fetchOr()這個方法上面已經講解過,做用就是控制確保只會執行一次
        if fetchOr(self._isStopped, 1) == 0 {//若是從沒有執行過就執行一次,不然不執行。以確保下面代碼在對象生命週期內,不管on()調用多少次,都只會執行一次。
            self.forwardOn(event)
            self.dispose()
        }
    }
}

//這是一個很重要的方法,
    func run(_ parent: Parent) -> Disposable {
    //這裏傳入parent就是AnonymousObservable序列,也就是咱們最開始create()序列ob,_subscribeHandler就是咱們建立序列時傳入的閉包A(閉包A就相對一個函數,要求傳入一個參數,這個參數就是AnyObserver(self))
        return parent._subscribeHandler(AnyObserver(self))
    }
}
複製代碼
  • (17)經過上面AnonymousObservableSink的源碼,咱們得知有一下幾點結論:
    • AnonymousObservableSink.init初始化時傳入了observer:就是咱們self.asObservable().subscribe(observer) 傳入的AnonymousObserver對象。
    • AnonymousObservableSink有一個on()方法,這個方法根據傳入的參數event作了不一樣的處理,但都會至少調用一次self.forwardOn(event)方法。每次若是onNext事件都會調用一次forwardOn()。可是.error, .completed事件最多隻會調用一次forwardOn()
    • AnonymousObservableSink的run()方法是核心方法,是它會回調咱們最開始create()建立時傳遞的閉包A,並將咱們調用ob.subscribe()訂閱時,函數內部建立的AnonymousObserver對象經過咱們AnonymousObservableSink對象sink,也就是AnyObserver(self)中的self包裝成一個AnyObserver結構體以後,做爲參數傳入閉包A,這樣就將咱們的序列和訂閱者創建了聯繫。
    • **特別注意:**不少人認爲傳入咱們閉包A 的就是AnonymousObserver 實際上不是,傳入閉包A的時一個AnyObserver結構體
    • 經過AnonymousObservableSink的run()方法咱們成功把咱們最開始的ob.subscibe()訂閱時建立的閉包經過AnyObserver(self)做爲參數傳給了閉包A,當咱們在閉包A裏面調用這行代碼時:observer.onNext("kongyulu")時,因爲通過ob.subscribe()訂閱以後,AnyObserver(self)就是咱們的observer.了,而此時的observer是一個結構體,它擁有了咱們的管子AnonymousObservableSink對象的on()方法。
    • 在實例1中:當咱們發送observer.onNext("kongyulu")序列消息時,實際上會經過咱們的管子AnonymousObservableSink.on()來調度,最終調度咱們訂閱時的閉包:onNext()閉包Blet dispose = ob.subscribe(onNext: { (anything) in print("訂閱到了:\(anything)") }
    • 那麼如今最大的疑問就是**:AnonymousObservableSink.on()時如何從observer.onNext("kongyulu")調度到咱們閉包B?**
  • (18)要分析上面這個問題,咱們須要先來分析一下結構體AnyObserver(self)作了什麼:先看一下AnyObsevrer的源碼
public struct AnyObserver<Element> : ObserverType {
    /// Anonymous event handler type.
    
    public typealias EventHandler = (Event<Element>) -> Void
    //這裏定義了別名EventHandler就是一個傳入事件的閉包
    private let observer: EventHandler

    /// Construct an instance whose `on(event)` calls `eventHandler(event)`
    ///
    /// - parameter eventHandler: Event handler that observes sequences events.
    
    public init(eventHandler: @escaping EventHandler) {
    //self.observer保存了AnonymousObservableSink對象
        self.observer = eventHandler
    }
    
    /// Construct an instance whose `on(event)` calls `observer.on(event)`
    ///
    /// - parameter observer: Observer that receives sequence events.
    //初始化時要求傳入一個ObserverType,而這個是(17)點分析中AnyObserver(self)代碼中的self,實際上就是AnonymousObservableSink對象,
    public init<Observer: ObserverType>(_ observer: Observer) where Observer.Element == Element {
    //這段代碼直接保存了AnonymousObservableSink.on()方法
    //self.observer實際就是一個on()方法
        self.observer = observer.on
    }
    
    /// Send `event` to this observer.
    ///
    /// - parameter event: Event instance.
    public func on(_ event: Event<Element>) {
    //這裏調用on方法實際就是調用AnonymousObservableSink.on(event)方法
        return self.observer(event)
    }

    /// Erases type of observer and returns canonical observer.
    ///
    /// - returns: type erased observer.
    public func asObserver() -> AnyObserver<Element> {
        return self
    }
}
複製代碼
  • (19)經過上面AnyObserver源碼分析,咱們得知AnyObserver初始化時保存了咱們管子AnonymousObservableSinkon()方法,而且本身有一個on方法,在他本身的on方法裏面再去調用AnonymousObservableSink.on()方法。這樣就只是包裝了一層不讓外界知道咱們的AnonymousObservableSink類,爲啥這樣設計呢?這樣設計有幾點好處:

    • 起到徹底封裝效果,外界徹底不須要知道咱們的管子AnonymousObservableSink類,他們不關心咱們AnonymousObservableSink類時如何實現的,使用者只須要用這個接口on()就好了,至於on()是如何實現的,經過誰實現並不須要關心。
    • 起到解耦的效果,AnyObserver 並無擁有咱們AnonymousObservableSink對象,它只是擁有了AnonymousObservableSink的on()接口,只須要AnonymousObservableSink實現這個on()接口該作的事情就能夠了。至於AnonymousObservableSink內部怎麼改(只要on()接口不改)的都不會影響到AnyObserver
  • (20)如今咱們的重點就在on()方法上面:

    • 當咱們實例1中執行:observer.onNext("kongyulu")這行代碼時,實際就會調用:AnyObserver.onNext()方法。(因爲咱們AnyObserver繼承了ObserverType協議,也就擁有了ObserverTypeonNext()方法,此處若是不清楚能夠往上回看類繼承關係)
  • (21)AnyObserver.onNext()調用的時候會調用本身的on()方法:

ObserverType的接口定義

extension ObserverType {
    public func onNext(_ element: Element) {
        self.on(.next(element))//這裏會調回到AnyObserver的on()方法,AnyObserver繼承ObserverType,重寫了on()接口
    }
    public func onCompleted() {
        self.on(.completed)
    }
    public func onError(_ error: Swift.Error) {
        self.on(.error(error))
    }
}
複製代碼
  • (22) AnyObserver.on() 方法會調用 AnonymousObservableSink.on()方法。
  • (23)AnonymousObservableSink.on(event)會調用 AnonymousObservableSink.forwardOn(event)
  • (24)而在AnonymousObservableSink中沒有定義forwardOn()方法,咱們找到它的父類Sink裏面實現了forwardOn() 源碼以下:
class Sink<Observer: ObserverType> : Disposable {
    fileprivate let _observer: Observer
    fileprivate let _cancel: Cancelable
    fileprivate let _disposed = AtomicInt(0)

    #if DEBUG
        fileprivate let _synchronizationTracker = SynchronizationTracker()
    #endif

    init(observer: Observer, cancel: Cancelable) {
#if TRACE_RESOURCES
        _ = Resources.incrementTotal()
#endif
//初始化保存了self._observer實際就是:咱們`self.asObservable().subscribe(observer)` 傳入的`AnonymousObserver`對象
        self._observer = observer
        self._cancel = cancel
    }

    final func forwardOn(_ event: Event<Observer.Element>) {
        #if DEBUG
            self._synchronizationTracker.register(synchronizationErrorMessage: .default)
            defer { self._synchronizationTracker.unregister() }
        #endif
        if isFlagSet(self._disposed, 1) {
            return
        }
        // 這裏實際調用了`AnonymousObserver.on()`方法。
        self._observer.on(event)
    }

   ... 這次代碼省略,不須要關注
}

複製代碼
  • (25)從上面的源碼咱們能夠看到:Sink.forwardOn()實際調用了AnonymousObserver.on(),說白了就是:咱們最開始實例1的observer.onNext("kongyulu")這行代碼執行時,ob.onNext() 先調用AnyObserver.on()AnyObserver.on()又會調用AnonymousObservableSink.on()AnonymousObservableSink.on()又會調用AnonymousObservableSink.forwardOn(),接着AnonymousObservableSink.forwardOn()又會調用AnonymousObservableSink父類的Sink.forwardOn(),最後由Sink.forwardOn()調用了AnonymousObserver.on()
  • (26)到這裏咱們思路基本清晰了,咱們再回到AnonymousObserver.on()方法定義:
  1. 首先咱們查看類定義以下,並無找到由on()方法:
final class AnonymousObserver<Element>: ObserverBase<Element> {
//次處給尾隨閉包取了一個別名
    typealias EventHandler = (Event<Element>) -> Void
    
    private let _eventHandler : EventHandler
    
    init(_ eventHandler: @escaping EventHandler) {
#if TRACE_RESOURCES
        _ = Resources.incrementTotal()
#endif
//這裏保存了一個傳入的尾隨閉包:這個尾隨閉包就是咱們ob.subscribe()時建立let observer = AnonymousObserver<Element> { event in這裏是個尾隨閉包B} 這裏傳入_eventHandler保存的就是尾隨閉包B
        self._eventHandler = eventHandler
    }

    override func onCore(_ event: Event<Element>) {
    //這裏回調了咱們的尾隨閉包B
        return self._eventHandler(event)
    }
    
#if TRACE_RESOURCES
    deinit {
        _ = Resources.decrementTotal()
    }
#endif
}
複製代碼
  1. 因而咱們來找它的父類ObserverBase:
class ObserverBase<Element> : Disposable, ObserverType {
    private let _isStopped = AtomicInt(0)

    func on(_ event: Event<Element>) {
        switch event {
        case .next:
            if load(self._isStopped) == 0 {
                self.onCore(event)//這裏實際調用的是子類的onCore()
            }
        case .error, .completed:
            if fetchOr(self._isStopped, 1) == 0 {
                self.onCore(event)
            }
        }
    }

    func onCore(_ event: Event<Element>) {
        rxAbstractMethod()
    }

    func dispose() {
        fetchOr(self._isStopped, 1)
    }
}
複製代碼
  1. 咱們經過分析父類源碼得知ObserverBase.on()最終調用了AnonymousObserver.onCore(),而在AnonymousObserver.onCore()裏回調了_eventHandler(event)閉包B,而閉包B就是咱們最初ob.subscribe()序列訂閱時建立AnonymousObserver的尾隨閉包,這樣這個尾隨閉包最終調用了咱們訂閱的onNext()方法。這樣就解釋了:實例1中,執行observer.onNext("kongyulu")這行代碼就會回調let dispose = ob.subscribe(onNext: { (anything) in print("訂閱到了:\(anything)") } 從而打印了 「訂閱到了:kongyulu」

具體AnonymousObserver {B}的尾隨閉包B的代碼以下:

public func subscribe(onNext: ((Element) -> Void)? = nil, onError: ((Swift.Error) -> Void)? = nil, onCompleted: (() -> Void)? = nil, onDisposed: (() -> Void)? = nil)
        -> Disposable {
           
           ...這次無關代碼先省略 
           //特別注意:AnonymousObserver<Element> { B}括號裏面的尾隨閉包咱們稱爲B,最終會經過AnonymousObserver.onCore()函數調用閉包B
            let observer = AnonymousObserver<Element> { event in
                ...這次無關代碼先省略 
                switch event {
                case .next(let value):
                    onNext?(value) //這裏調用onNext(value)實際就是
                    
                case .error(let error):
                    if let onError = onError {
                        onError(error)
                    }
                    else {
                        Hooks.defaultErrorHandler(callStack, error)
                    }
                    disposable.dispose()
                case .completed:
                    onCompleted?()
                    disposable.dispose()
                }
            }
            
            return Disposables.create(
                self.asObservable().subscribe(observer),
                disposable
            )
    }
複製代碼
  • (27)經過(26)點分析咱們應該弄明白了整個訂閱的流程了, 簡單總結就是
  1. 咱們ob.create(閉包A)建立時將閉包A保存在AnonymousObservable 裏變量_subscribeHandler
  2. 當咱們調用ob.subscribe(閉包B)訂閱序列時,會首先建立一個AnonymousObserver對象,而且會帶一個尾隨閉包C。而後經過self.asObservable().subscribe(AnonymousObserver) 通過一系列轉化將AnyObserver傳遞給了閉包A
  3. 其中2中說一系列轉化能夠簡單解釋爲:
  • self.asObservable().subscribe(AnonymousObserver) 實際就是ob.subscribe(AnonymousObserver)

  • ob.subscribe(AnonymousObserver)實際就是Producer.subscribe(AnonymousObserver)

  • Producer.subscribe(AnonymousObserver)會調用self.run(AnonymousObserver)

  • self.run(AnonymousObserver) 會建立一個AnonymousObservableSink管子對象sink,而後調用sink.run(AnonymousObservable)調用了管子的run()方法,並將ob傳入了管子sink.

  • 而咱們管子的sink.run(AnonymousObservable)方法裏面調用了parent._subscribeHandler(AnyObserver(self))實際就是ob._subscribeHandler(AnyObserver(AnonymousObservableSink))也就是調用了閉包A

  • 而咱們閉包A須要傳入一個參數就是AnyObserver(AnonymousObservableSink),實際上AnyObserver只是一個結構體,它保存了AnonymousObservableSink.on()方法。

  • 當咱們在閉包A裏面調用observer.onNext("kongyulu")實際上就是AnyObserver.onNext("kongyulu"),而AnyObserver.onNext("kongyulu")會調用AnyObserver.on()

  • AnyObserver.on()接着又調用AnonymousObservableSink.on(event)這裏event裏面

  • AnonymousObservableSink類中AnonymousObservableSink.on(event) 接着又會去調用它本身的forwardOn(event)也就是AnonymousObservableSink.forwardOn(event)

  • AnonymousObservableSink.forwardOn(event) 其實是調用它父類Sink.forwardOn(event)而在Sink父類初始化的時候已經保存了AnonymousObserver對象_observer。

  • Sink.forwardOn(event)會調用的是AnonymousObserver.on(event)

  • AnonymousObserver.on(event)實際會調用本身父類的ObserverBase.on(event)

  • ObserverBase.on(event) 實際又會調用子類的AnonymousObserver.onCore(event)

  • AnonymousObserver.onCore(event) 會調用self._eventHandler(event)而這裏_eventHandler就是保存AnonymousObserver建立時傳入的尾隨閉包C這樣就回調了閉包C

  • 閉包C中又根據event的事件不一樣,回調了閉包B,例如如event=.onNext事件,就會回調閉包B onNext{},也就是let dispose = ob.subscribe(onNext: { (anything) in print("訂閱到了:\(anything)") }, onError: { (error) in print("訂閱到了:\(error)") }, onCompleted: { print("完成了") }) { print("銷燬回調") } 裏面的這段代碼:onNext: { (anything) in print("訂閱到了:\(anything)") 從而就會打印:「訂閱到了:kongyulu」

  • (28)

  • (29)

最後這裏經過一個流程圖來表達整個建立,訂閱過程

2. 序列建立,訂閱圖解

序列建立流程圖:孔雨露(QQ:282889543)

3. 序列訂閱流程

3.1 序列銷燬方式

上面講解了序列的建立,訂閱流程,在分析建立序列,訂閱序列的源碼時,咱們已經隱隱約約的看到了咱們開篇分析的dispose(),貌似在整個源碼中各處都有着dispose的代碼,那麼序列究竟是怎麼銷燬的呢?

爲了解決這個疑問,咱們下面將經過分析源碼,來探索一下序列的銷燬流程。

這裏先看一張序列生命週期時序圖:

序列生命週期時序圖
經過這張時序圖,結合上面的序列建立,訂閱的流程分析,我能夠先得出序列會被銷燬的3種方式:

  • 方式一經過發送事件,讓序列生命週期自動結束來釋放序列資源。 一個序列只要發出了 error 或者 completed 事件,它的生命週期將結束,那麼全部內部資源都會被釋放,不須要咱們手動釋放。(這個結論在本篇博客討論實例1實例2的時候已經驗證了,只要發送了completed和error事件,就會調用onComplete並打印「銷燬了」信息)

  • 方式二經過主動調用dispose()來釋放。例如你須要提早釋放序列資源或取消訂閱的話,那麼你能夠對返回的可被清除的資源(Disposable) 調用 dispose 方法。

  • 方式三經過垃圾袋DisposeBag來回收資源,達到自動釋放,這是官方推薦的方式。官方推薦使用清除包(DisposeBag)來管理訂閱的生命週期,通常是把資源加入到一個全局的DisposeBag裏面,它跟隨着頁面的生命週期,當頁面銷燬時DisposeBag也會隨之銷燬,同時DisposeBag裏面的資源也會被一一釋放。(這個結論在上面的DisposeBag分析中也證明了)

3.2 序列銷燬實例分析

咱們先來回顧一下本篇博客開始分析的實例1的代碼 實例1:

func limitObservable(){
        // 建立序列
        let ob = Observable<Any>.create { (observer) -> Disposable in
            observer.onNext("kongyulu")
            return Disposables.create { print("銷燬釋放了")} // dispose.dispose()
        }
        // 序列訂閱
        let dispose = ob.subscribe(onNext: { (anything) in
            print("訂閱到了:\(anything)")
        }, onError: { (error) in
            print("訂閱到了:\(error)")
        }, onCompleted: {
            print("完成了")
        }) {
            print("銷燬回調")
        }
        print("執行完畢")
        //dispose.dispose()
    }
複製代碼
  1. 上面的代碼執行結果以下:
    image
  2. 經過上面的結果咱們知道,這個建立的序列沒有被銷燬,即沒有打印「銷燬釋放了」,也沒有打印「銷燬回調」。這是爲何呢?這個問題咱們後面再經過分析源碼Rx源碼就知道了。
  3. 如今咱們把上面代碼的那行註釋放開dispose.dispose() 這行代碼去掉註釋,而後從新運行,輸出結果以下:
    image
3.3 序列銷燬源碼分析
  1. 經過上面實例1的代碼,首先能夠看到,在建立序列Observable<Any>.create()方法有一個尾隨閉包,須要返回一個實現了Disposable協議的實例。而就是經過return Disposables.create { print("銷燬釋放了")} 這行代碼返回的。由此咱們確認Disposables.create { print("銷燬釋放了")}很是重要,咱們先來發分析一下Disposables.create源碼。
  2. 進入到Disposables.create()源碼:咱們想直接點擊進去發現Disposables就是一個空結構體
public struct Disposables {
    private init() {}
}
複製代碼

看這個結構體連初始化方法都是私有的,說明它不能被繼承,因而咱們推測Disposables.create()必定經過擴展的方式實現的。因此咱們在項目中搜索extension Disposables {關鍵字,能夠找到以下:

image
這樣咱們找到第一個:AnonymousDisposable.swift文件進入找到第55行:

extension Disposables {

    /// Constructs a new disposable with the given action used for disposal.
    ///
    /// - parameter dispose: Disposal action which will be run upon calling `dispose`.
    public static func create(with dispose: @escaping () -> Void) -> Cancelable {
        return AnonymousDisposable(disposeAction: dispose)//這裏dispose就是咱們傳入的尾隨閉包
    }

}
複製代碼

經過上面源碼,咱們看到直接一行return AnonymousDisposable(disposeAction: dispose)就結束了,而dispose 就是咱們實例1中 Disposables.create { print("銷燬釋放了")} // dispose.dispose() } 這行代碼裏面的尾隨閉包: { print("銷燬釋放了")} 這裏咱們給他一個別名成爲:閉包D

  1. 不用思考,接下來咱們確定要進入AnonymousDisposable類實現一探究竟:
fileprivate final class AnonymousDisposable : DisposeBase, Cancelable {
    public typealias DisposeAction = () -> Void

    private let _isDisposed = AtomicInt(0)
    private var _disposeAction: DisposeAction?

    /// - returns: Was resource disposed.
    public var isDisposed: Bool {
        return isFlagSet(self._isDisposed, 1)
    }

    fileprivate init(_ disposeAction: @escaping DisposeAction) {
        self._disposeAction = disposeAction
        super.init()
    }

    // Non-deprecated version of the constructor, used by `Disposables.create(with:)`
    fileprivate init(disposeAction: @escaping DisposeAction) {
        self._disposeAction = disposeAction
        super.init()
    }

    /// Calls the disposal action if and only if the current instance hasn't been disposed yet.
    ///
    /// After invoking disposal action, disposal action will be dereferenced.
    fileprivate func dispose() {
        if fetchOr(self._isDisposed, 1) == 0 {
            if let action = self._disposeAction {
                self._disposeAction = nil
                action()
            }
        }
    }
}
複製代碼
  1. 分析上面AnonymousDisposable類定義源碼,咱們能夠得出如下結論:
  • 初始化的時候把外界傳過來的閉包進行保存,傳入進來的閉包咱們就是咱們第2點中分析的閉包D{ print("銷燬釋放了")}
  • 有一個dispose()方法,經過fetchOr(self._isDisposed, 1) == 0這行代碼控制dispose()裏面的內容只會被執行一次。(不管dispose()方法被執行多少次,if let action = self._disposeAction { self._disposeAction = nil action() } 這段代碼最多會被執行一次)
  • dispose()方法中先把self._disposeAction賦值給臨時變量action,而後置空self._disposeAction,再執行action()。這樣操做的緣由是若是_disposeAction閉包是一個耗時操做,也可以保證_disposeAction可以當即釋放。
  1. AnonymousDisposable裏面咱們只看到了一些常規的保存等操做,結合咱們最開始分析序列的建立流程經驗(AnonymousDisposable就相似於AnonymousObservable),咱們能夠推斷核心代碼實現確定在訂閱這一塊。

  2. 接下來,咱們進入到observable.subscribe()方法來探究一些subscribe()的源碼實現。

public func subscribe(onNext: ((Element) -> Void)? = nil, onError: ((Swift.Error) -> Void)? = nil, onCompleted: (() -> Void)? = nil, onDisposed: (() -> Void)? = nil)
    -> Disposable {
    //1.這裏定義disposable局部變量
    let disposable: Disposable
    //2.建立了Disposables對象
    if let disposed = onDisposed {
        disposable = Disposables.create(with: disposed)
    }
    else {
        disposable = Disposables.create()
    }
    //3.建立了一個AnonymousObserver對象,有一個重要的尾隨閉包
    let observer = AnonymousObserver<Element> { event in
        switch event {
        case .next(let value):
            onNext?(value)
        case .error(let error):
            if let onError = onError {
                onError(error)
            }
            else {
                Hooks.defaultErrorHandler(callStack, error)
            }
            disposable.dispose() //這裏當收到error事件就會回收釋放資源
        case .completed:
            onCompleted?()
            disposable.dispose() //這裏當收到completed事件就會回收釋放資源
        }
    }
    
    return Disposables.create(
        self.asObservable().subscribe(observer),
        disposable//這裏將咱們建立的局部變量傳給了self.asObservable().subscribe,也就是咱們的Producer.subscribe
    )
}
複製代碼

分析上面subscribe()源碼,結合開始的分析,咱們能夠得出如下結論:

  • subscribe()建立了一個Disposable對象,並保存了銷燬回調閉包,當執行銷燬時,會把消息回調出去。
  • 在收到錯誤或者完成事件時會執行disposable.dispose()釋放資源。
  • return Disposables.create( self.asObservable().subscribe(observer), disposable ),這裏返回的Disposable對象就是咱們外面手動調用dispose.dispose()方法的dispose對象,或者說是加入到全局的DisposeBag的銷燬者。
  1. 由6的分析,咱們清楚知道最後一行代碼return Disposables.create( self.asObservable().subscribe(observer), disposable )時關鍵點,咱們接下進入:Disposables.create()源碼:
public static func create(_ disposable1: Disposable, _ disposable2: Disposable) -> Cancelable {
    return BinaryDisposable(disposable1, disposable2)//返回一個二元銷燬者對象。
}
複製代碼

上面代碼咱們看到create()直接返回了一個BinaryDisposable二元銷燬者類對象,並將disposable1disposable2傳入給了BinaryDisposable

  • 這裏的disposable1就是self.asObservable().subscribe(observer) 也就是Producer..subscribe(observer)返回的disposer
  • disposable2就是咱們subscribe()中建立局部變量let disposable: Disposable
  1. 接着咱們來分析BinaryDisposable類究竟是什麼:
private final class BinaryDisposable : DisposeBase, Cancelable {

    private let _isDisposed = AtomicInt(0)

    // state
    private var _disposable1: Disposable?
    private var _disposable2: Disposable?

    /// - returns: Was resource disposed.
    var isDisposed: Bool {
        return isFlagSet(self._isDisposed, 1)
    }

    init(_ disposable1: Disposable, _ disposable2: Disposable) {
        self._disposable1 = disposable1
        self._disposable2 = disposable2
        super.init()
    }

    func dispose() {
        if fetchOr(self._isDisposed, 1) == 0 {
            self._disposable1?.dispose()
            self._disposable2?.dispose()
            self._disposable1 = nil
            self._disposable2 = nil
        }
    }
}
複製代碼
相關文章
相關標籤/搜索