RxSwift (二)序列核心邏輯分析swift
RxSwift (三)Observable的建立,訂閱,銷燬api
RxSwift(五)(Rxswift對比swift,oc用法)markdown
RxSwift (十) 基礎使用篇 1- 序列,訂閱,銷燬app
RxSwift學習之十二 (基礎使用篇 3- UI控件擴展) @TOCide
RxSwift和RxCocoa還有一個額外的工具來輔助處理ARC和內存管理:即DisposeBag。這是Observer對象的一個虛擬」包」,當它們的父對象被釋放時,這個虛擬包會被丟棄。 當帶有DisposeBag屬性的對象調用deinit()時,虛擬包將被清空,且每個一次性(disposable)Observer會自動取消訂閱它所觀察的內容。這容許ARC像一般同樣回收內存。 若是沒有DisposeBag,會有兩種結果:或者Observer會產生一個retain cycle,被無限期的綁定到被觀察對象上;或者意外地被釋放,致使程序崩潰。 因此要成爲一個ARC的良民,記得設置Observable對象時,將它們添加到DisposeBag中。這樣,它們才能被很好地清理掉。函數
當一個Observable(被觀察者)被觀察訂閱後,就會產生一個Disposable實例,經過這個實例,咱們就能進行資源的釋放了。 對於RxSwift中資源的釋放,也就是解除綁定、釋放空間,有兩種方法,分別是顯式釋放以及隱式釋放:
let dispose = textField.rx_text .bindTo(label.rx_sayHelloObserver) dispose.dispose() 複製代碼
DisposeBag
來進行,它相似於Objective-C ARC中的自動釋放池機制,當咱們建立了某個實例後,會被添加到所在線程的自動釋放池中,而自動釋放池會在一個RunLoop週期後進行池子的釋放與重建;DisposeBag對於RxSwift就像自動釋放池同樣,咱們把資源添加到DisposeBag中,讓資源隨着DisposeBag一塊兒釋放。以下實例:
let disposeBag = DisposeBag() func binding() { textField.rx_text .bindTo(label.rx_sayHelloObserver) .addDisposableTo(self.disposeBag) } 複製代碼
上面代碼中方法addDisposableTo會對DisposeBag進行弱引用,因此這個DisposeBag要被實例引用着,通常可做爲實例的成員變量,當實例被銷燬了,成員DisposeBag會跟着銷燬,從而使得RxSwift在此實例上綁定的資源獲得釋放。
從上面的講解咱們大體明白了DisposeBag就像咱們咱們OC內存管理裏的自動釋放池。他充當了一個垃圾回收袋的角色,你只需把序列加入了disposeBag,disposeBag就會在合適的時候幫咱們釋放資源,那麼它是怎麼作到的呢?
public final class DisposeBag: DisposeBase { private var _lock = SpinLock() // state fileprivate var _disposables = [Disposable]() fileprivate var _isDisposed = false /// Constructs new empty dispose bag. public override init() { super.init() } /// Adds `disposable` to be disposed when dispose bag is being deinited. /// /// - parameter disposable: Disposable to add. public func insert(_ disposable: Disposable) { self._insert(disposable)?.dispose() } private func _insert(_ disposable: Disposable) -> Disposable? { //這裏爲了爲了防止多線程下出現搶佔資源問題,須要加鎖控制同步訪問 self._lock.lock(); defer { self._lock.unlock() } if self._isDisposed {//判斷若是調用過了_dispose()說明已經被釋放過了,不須要再釋放,保證對稱性,則直接返回 return disposable } //保存到數組中 self._disposables.append(disposable) return nil } /// This is internal on purpose, take a look at `CompositeDisposable` instead. private func dispose() { // 1.獲取到全部保存的銷燬者 let oldDisposables = self._dispose() // 2.遍歷每一個銷燬者,掉用每個銷燬者的dispose()釋放資源 for disposable in oldDisposables { disposable.dispose() } } private func _dispose() -> [Disposable] { self._lock.lock(); defer { self._lock.unlock() } // 獲取到全部保存的銷燬者 let disposables = self._disposables self._disposables.removeAll(keepingCapacity: false) self._isDisposed = true //這個變量用來記錄是否垃圾袋數組被清空過 return disposables } deinit { //當DisposeBag自身對象被銷燬時,調用本身的dispose(),遍歷銷燬數組中全部保存的銷燬者, self.dispose() } } 複製代碼
dispose()
方法是,DisposeBag
調用insert()
方法將咱們的須要銷燬的序列保存起來存放在_disposables
數組中。_disposables
數組,依次調用他們本身的dispose()方法。func fetchOr(_ this: AtomicInt, _ mask: Int32) -> Int32 { this.lock() let oldValue = this.value this.value |= mask this.unlock() return oldValue } 複製代碼
源碼很簡單,可是做用不小。代碼中this
是傳入的AtomicInt值,其內部僅有一個value值。 fetchOr 先將 this.value copy一份,做爲結果返回。而將 this.value 和 mask 作或 (|) 運算。而且將或運算的結果賦值給 this.value。
this.value | mask | oldValue | 或 運算後this.value | 返回值 |
---|---|---|---|---|
0 | 1 | 0 | 1 | 0 |
1 | 1 | 1 | 1 | 1 |
0 | 2 | 0 | 2 | 0 |
1 | 2 | 1 | 3 | 1 |
就是作了一次或運算,實際的10進制結果不變,只是改變了裏面的二進制位,能夠用來作標誌位,只是C語言裏面常常用的方法,即一個Int類型處理自己的值可使用外,還能夠經過按位與,或,來改變它的標誌位,達到傳遞值的目的,這樣每一個位均可以取代一個bool類型,常常用做枚舉。
運算符 | 二進制 | 十進制 | 說明 |
---|---|---|---|
0000 0001 | 1 | ||
0000 0010 | 2 | ||
或運算 | 0000 0011 | 3 |
要知道這個答案,咱們只能經過源碼來一步步分析:
實例1:
func limitObservable(){ // 建立序列 let ob = Observable<Any>.create { (observer) -> Disposable in observer.onNext("kongyulu") return Disposables.create { print("銷燬釋放了")} // dispose.dispose() } // 序列訂閱 let dispose = ob.subscribe(onNext: { (anything) in print("訂閱到了:\(anything)") }, onError: { (error) in print("訂閱到了:\(error)") }, onCompleted: { print("完成了") }) { print("銷燬回調") } print("執行完畢") //dispose.dispose() } 複製代碼
dispose.dispose()
這行代碼去掉註釋,而後從新運行,輸出結果以下:
dispose.dispose()
就能夠了呢?實例2:
func limitObservable(){ // 建立序列 let ob = Observable<Any>.create { (observer) -> Disposable in observer.onNext("kongyulu") observer.onCompleted() return Disposables.create { print("銷燬釋放了")} // dispose.dispose() } // 序列訂閱 let dispose = ob.subscribe(onNext: { (anything) in print("訂閱到了:\(anything)") }, onError: { (error) in print("訂閱到了:\(error)") }, onCompleted: { print("完成了") }) { print("銷燬回調") } print("執行完畢") //dispose.dispose() } 複製代碼
上面的實例2 的代碼相對與實例1 就多了一行代碼:observer.onCompleted()
: 咱們再來看一下輸出結果:
observer.onCompleted()
代碼後,就也打印了銷燬回調,銷燬釋放了,這是什麼邏輯呢? why?
再分析Dispose源碼前,咱們必須先深刻理解序列的建立,訂閱流程這個是基礎,只有理解了這個,才能真正理解Dispose的原理。 這個其實在以前的博客已經分析過了,詳情能夠參考我以前的博客:序列核心邏輯
爲了便於更好的理解,我在這裏還再一次理一下具體的流程:
let ob = Observable<Any>.create { (observer) -> Disposable in 這裏面是一個閉包咱們稱爲閉包A }
時,實際會來到Create.swift文件的第20行:public static func create(_ subscribe: @escaping (AnyObserver<Element>) -> Disposable) -> Observable<Element> { return AnonymousObservable(subscribe) } 複製代碼
AnonymousObservable(subscribe)
對象,並將咱們的閉包A傳入到了AnonymousObservable的構造函數裏面,AnonymousObservable將這個閉包A保存到了let _subscribeHandler: SubscribeHandler
這個變量中存起來了。_subscribeHandler
這個變量保存了序列ob建立時 傳入的閉包A (其中閉包A要求傳入AnyObserver
類型做爲參數)final private class AnonymousObservable<Element>: Producer<Element> { typealias SubscribeHandler = (AnyObserver<Element>) -> Disposable //這個變量保存了序列ob建立時 傳入的閉包A (其中閉包A要求傳入AnyObserver類型做爲參數) let _subscribeHandler: SubscribeHandler init(_ subscribeHandler: @escaping SubscribeHandler) { self._subscribeHandler = subscribeHandler //這個變量保存了序列ob建立時 傳入的閉包A } ...下面代碼先不看省略掉 } 複製代碼
let dispose = ob.subscribe(onNext: { (anything) in print("訂閱到了:\(anything)" }
這行代碼進行序列ob的訂閱操做,這行代碼,咱們跟進源碼能夠查看到:在ObservableType+Extensions.swift文件的第39行:public func subscribe(onNext: ((Element) -> Void)? = nil, onError: ((Swift.Error) -> Void)? = nil, onCompleted: (() -> Void)? = nil, onDisposed: (() -> Void)? = nil) -> Disposable { let disposable: Disposable ... 此處代碼先不分析省略 let observer = AnonymousObserver<Element> { 這個裏面是一個尾隨閉包的內容這裏先不分析 } return Disposables.create( self.asObservable().subscribe(observer), disposable ) } 複製代碼
(4)從上面subscribe()
的源碼能夠看到,在函數中建立了一個AnonymousObserver的對象,而後直接就return Disposables.create()結束了。
(5)這裏咱們並無發現訂閱和咱們的閉包A有任何關係,那關鍵就在self.asObservable().subscribe(observer)
這行代碼裏面了,我來分析一下這行代碼到底作了些什麼。
(6)咱們要理解(5)中的這行代碼,就須要先理解一下類的集成關係: AnonymousObservable
-> Producer
-> Observable
-> ObservableType
-> ObservableConvertibleType
詳情以下圖:
(7)經過繼承關係,咱們能夠順着繼承鏈往上找父類,咱們能夠找到是在Observable
類中定義了這個asObservable()
方法:
public class Observable<Element> : ObservableType { ...此處省略不關注的代碼 public func asObservable() -> Observable<Element> { return self } ...此處省略不關注的代碼 } 複製代碼
self.asObservable().subscribe(observer)
這行代碼的self就是咱們建立的序列ob, 因此self.asObservable()
返回的就是ob咱們最開始建立的可觀察序列。self.asObservable().subscribe(observer)
中的observer就是咱們在public func subscribe(onNext: ((Element) -> Void)? = nil, onError: ((Swift.Error) -> Void)? = nil, onCompleted: (() -> Void)? = nil, onDisposed: (() -> Void)? = nil)
方法實現中建立的局部變量:let observer = AnonymousObserver<Element> { 這個裏面是一個尾隨閉包的內容這裏先不分析 }
咱們將這個局部變量傳入了Observable的subscribe()
方法。subscribe()
方法作了些什麼了。let dispose = ob.subscribe(onNext: { (anything) in print("訂閱到了:\(anything)") }
的時候,實際調用了 ObservableType
協議的subscribe()方法,在這個方法裏面咱們建立了一個AnonymousObserver
對象,並經過self.asObservable().subscribe(observer)
傳入了ob.susbscribe(observer) (注意:這裏的ob就是咱們create()建立的AnonymousObservable對象,而observer就是subscribe時建立臨時局部AnonymousObserver對象,這些上面已經分析過了)。override func subscribe<Observer: ObserverType>(_ observer: Observer) -> Disposable where Observer.Element == Element { if !CurrentThreadScheduler.isScheduleRequired { // The returned disposable needs to release all references once it was disposed. let disposer = SinkDisposer() //下面這行代碼是重點,調用了本身的run()方法,並傳入了兩個參數: //參數1:observer:就是咱們`self.asObservable().subscribe(observer)` 傳入的`AnonymousObserver`對象 //參數2:disposer:就SinkDisposer()對象後將銷燬會再分析。 let sinkAndSubscription = self.run(observer, cancel: disposer) disposer.setSinkAndSubscription(sink: sinkAndSubscription.sink, subscription: sinkAndSubscription.subscription) return disposer } else { return CurrentThreadScheduler.instance.schedule(()) { _ in let disposer = SinkDisposer() let sinkAndSubscription = self.run(observer, cancel: disposer) disposer.setSinkAndSubscription(sink: sinkAndSubscription.sink, subscription: sinkAndSubscription.subscription) return disposer } } } 複製代碼
Producer
實現的subscribe()
接口裏面,調用了本身的run()
方法,並在run()
方法裏面傳入了observer:就是咱們self.asObservable().subscribe(observer)
傳入的AnonymousObserver
對象。那接下來我看一下run()作了一些什麼事情:func run<Observer: ObserverType>(_ observer: Observer, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where Observer.Element == Element { rxAbstractMethod() } 複製代碼
rxAbstractMethod()
,而這個rxAbstractMethod()只是一個抽象方法。那咱們的子類AnonymousObservable中確定覆寫了run()方法。接下來咱們再看一下AnonymousObservable
的run()
的源碼:override func run<Observer: ObserverType>(_ observer: Observer, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where Observer.Element == Element { //建立了一個管子AnonymousObservableSink,並傳給了管子兩個參數: //參數1:observer:就是咱們`self.asObservable().subscribe(observer)` 傳入的`AnonymousObserver`對象 //參數2:disposer:就SinkDisposer()對象後將銷燬會再分析。 let sink = AnonymousObservableSink(observer: observer, cancel: cancel) let subscription = sink.run(self) return (sink: sink, subscription: subscription) } 複製代碼
run()
源碼中咱們能夠看到:在AnonymousObservable
的run()
方法中。首先,建立了一個AnonymousObservableSink
對象sink
,並將observer
(也就是咱們self.asObservable().subscribe(observer)
傳入的AnonymousObserver
對象)傳入;其次,調用了sink.run(self)
方法返回了subscription,而後直接飯後一個元組,也就是run()方法返回了一個元組:(sink: sink, subscription: subscription)
。 可是咱們的重點是在sink管子上面。AnonymousObservableSink
是一個相似於manager角色,它保存了序列,訂閱者,銷燬者三個信息,還具有調度能力。咱們序列和訂閱者就是通這個管子來作橋樑,實現通信。AnonymousObservableSink
管子作了一些什麼呢?咱們來看一下AnonymousObservableSink
的源碼:final private class AnonymousObservableSink<Observer: ObserverType>: Sink<Observer>, ObserverType { typealias Element = Observer.Element //這裏給了一個別名:Parent就是AnonymousObservable序列 typealias Parent = AnonymousObservable<Element> // state private let _isStopped = AtomicInt(0) #if DEBUG fileprivate let _synchronizationTracker = SynchronizationTracker() #endif override init(observer: Observer, cancel: Cancelable) { //傳入了observer:就是咱們`self.asObservable().subscribe(observer)` 傳入的`AnonymousObserver`對象 super.init(observer: observer, cancel: cancel) } func on(_ event: Event<Element>) { #if DEBUG self._synchronizationTracker.register(synchronizationErrorMessage: .default) defer { self._synchronizationTracker.unregister() } #endif switch event { case .next: if load(self._isStopped) == 1 {//若是已經執行過.error, .completed,就不會繼續執行self.forwardOn(event)代碼,意思就是隻有對象生命週期內執行過.complete,.error事件,就不會再執行forwardOn,除非從新激活改變條件值。 return } self.forwardOn(event) case .error, .completed: //fetchOr()這個方法上面已經講解過,做用就是控制確保只會執行一次 if fetchOr(self._isStopped, 1) == 0 {//若是從沒有執行過就執行一次,不然不執行。以確保下面代碼在對象生命週期內,不管on()調用多少次,都只會執行一次。 self.forwardOn(event) self.dispose() } } } //這是一個很重要的方法, func run(_ parent: Parent) -> Disposable { //這裏傳入parent就是AnonymousObservable序列,也就是咱們最開始create()序列ob,_subscribeHandler就是咱們建立序列時傳入的閉包A(閉包A就相對一個函數,要求傳入一個參數,這個參數就是AnyObserver(self)) return parent._subscribeHandler(AnyObserver(self)) } } 複製代碼
self.asObservable().subscribe(observer)
傳入的AnonymousObserver
對象。self.forwardOn(event)
方法。每次若是onNext事件都會調用一次forwardOn()
。可是.error, .completed事件最多隻會調用一次forwardOn()
。AnonymousObserver
對象經過咱們AnonymousObservableSink對象sink,也就是AnyObserver(self)
中的self
包裝成一個AnyObserver結構體以後,做爲參數傳入閉包A,這樣就將咱們的序列和訂閱者創建了聯繫。AnonymousObserver
實際上不是,傳入閉包A的時一個AnyObserver結構體AnyObserver(self)
做爲參數傳給了閉包A,當咱們在閉包A裏面調用這行代碼時:observer.onNext("kongyulu")
時,因爲通過ob.subscribe()訂閱以後,AnyObserver(self)
就是咱們的observer.了,而此時的observer是一個結構體,它擁有了咱們的管子AnonymousObservableSink
對象的on()方法。observer.onNext("kongyulu")
序列消息時,實際上會經過咱們的管子AnonymousObservableSink.on()
來調度,最終調度咱們訂閱時的閉包:onNext()閉包B:let dispose = ob.subscribe(onNext: { (anything) in print("訂閱到了:\(anything)") }
。AnonymousObservableSink.on()
時如何從observer.onNext("kongyulu")
調度到咱們閉包B?**AnyObserver(self)
作了什麼:先看一下AnyObsevrer的源碼public struct AnyObserver<Element> : ObserverType { /// Anonymous event handler type. public typealias EventHandler = (Event<Element>) -> Void //這裏定義了別名EventHandler就是一個傳入事件的閉包 private let observer: EventHandler /// Construct an instance whose `on(event)` calls `eventHandler(event)` /// /// - parameter eventHandler: Event handler that observes sequences events. public init(eventHandler: @escaping EventHandler) { //self.observer保存了AnonymousObservableSink對象 self.observer = eventHandler } /// Construct an instance whose `on(event)` calls `observer.on(event)` /// /// - parameter observer: Observer that receives sequence events. //初始化時要求傳入一個ObserverType,而這個是(17)點分析中AnyObserver(self)代碼中的self,實際上就是AnonymousObservableSink對象, public init<Observer: ObserverType>(_ observer: Observer) where Observer.Element == Element { //這段代碼直接保存了AnonymousObservableSink.on()方法 //self.observer實際就是一個on()方法 self.observer = observer.on } /// Send `event` to this observer. /// /// - parameter event: Event instance. public func on(_ event: Event<Element>) { //這裏調用on方法實際就是調用AnonymousObservableSink.on(event)方法 return self.observer(event) } /// Erases type of observer and returns canonical observer. /// /// - returns: type erased observer. public func asObserver() -> AnyObserver<Element> { return self } } 複製代碼
(19)經過上面AnyObserver源碼分析,咱們得知AnyObserver
初始化時保存了咱們管子AnonymousObservableSink
的on()
方法,而且本身有一個on方法,在他本身的on方法裏面再去調用AnonymousObservableSink.on()方法。這樣就只是包裝了一層不讓外界知道咱們的AnonymousObservableSink
類,爲啥這樣設計呢?這樣設計有幾點好處:
AnonymousObservableSink
類,他們不關心咱們AnonymousObservableSink
類時如何實現的,使用者只須要用這個接口on()就好了,至於on()是如何實現的,經過誰實現並不須要關心。AnyObserver
並無擁有咱們AnonymousObservableSink
對象,它只是擁有了AnonymousObservableSink
的on()接口,只須要AnonymousObservableSink
實現這個on()接口該作的事情就能夠了。至於AnonymousObservableSink
內部怎麼改(只要on()接口不改)的都不會影響到AnyObserver
。(20)如今咱們的重點就在on()方法上面:
observer.onNext("kongyulu")
這行代碼時,實際就會調用:AnyObserver.onNext()
方法。(因爲咱們AnyObserver繼承了ObserverType協議,也就擁有了ObserverType
的onNext()
方法,此處若是不清楚能夠往上回看類繼承關係)(21)AnyObserver.onNext()
調用的時候會調用本身的on()
方法:
ObserverType的接口定義
extension ObserverType { public func onNext(_ element: Element) { self.on(.next(element))//這裏會調回到AnyObserver的on()方法,AnyObserver繼承ObserverType,重寫了on()接口 } public func onCompleted() { self.on(.completed) } public func onError(_ error: Swift.Error) { self.on(.error(error)) } } 複製代碼
AnyObserver.on()
方法會調用 AnonymousObservableSink.on()
方法。AnonymousObservableSink.on(event)
會調用 AnonymousObservableSink.forwardOn(event)
forwardOn()
方法,咱們找到它的父類Sink裏面實現了forwardOn()
源碼以下:class Sink<Observer: ObserverType> : Disposable { fileprivate let _observer: Observer fileprivate let _cancel: Cancelable fileprivate let _disposed = AtomicInt(0) #if DEBUG fileprivate let _synchronizationTracker = SynchronizationTracker() #endif init(observer: Observer, cancel: Cancelable) { #if TRACE_RESOURCES _ = Resources.incrementTotal() #endif //初始化保存了self._observer實際就是:咱們`self.asObservable().subscribe(observer)` 傳入的`AnonymousObserver`對象 self._observer = observer self._cancel = cancel } final func forwardOn(_ event: Event<Observer.Element>) { #if DEBUG self._synchronizationTracker.register(synchronizationErrorMessage: .default) defer { self._synchronizationTracker.unregister() } #endif if isFlagSet(self._disposed, 1) { return } // 這裏實際調用了`AnonymousObserver.on()`方法。 self._observer.on(event) } ... 這次代碼省略,不須要關注 } 複製代碼
Sink.forwardOn()
實際調用了AnonymousObserver.on()
,說白了就是:咱們最開始實例1的observer.onNext("kongyulu")
這行代碼執行時,ob.onNext() 先調用AnyObserver.on()
,AnyObserver.on()
又會調用AnonymousObservableSink.on()
,AnonymousObservableSink.on()
又會調用AnonymousObservableSink.forwardOn()
,接着AnonymousObservableSink.forwardOn()
又會調用AnonymousObservableSink父類的Sink.forwardOn()
,最後由Sink.forwardOn()
調用了AnonymousObserver.on()
。AnonymousObserver.on()
方法定義:final class AnonymousObserver<Element>: ObserverBase<Element> { //次處給尾隨閉包取了一個別名 typealias EventHandler = (Event<Element>) -> Void private let _eventHandler : EventHandler init(_ eventHandler: @escaping EventHandler) { #if TRACE_RESOURCES _ = Resources.incrementTotal() #endif //這裏保存了一個傳入的尾隨閉包:這個尾隨閉包就是咱們ob.subscribe()時建立let observer = AnonymousObserver<Element> { event in這裏是個尾隨閉包B} 這裏傳入_eventHandler保存的就是尾隨閉包B self._eventHandler = eventHandler } override func onCore(_ event: Event<Element>) { //這裏回調了咱們的尾隨閉包B return self._eventHandler(event) } #if TRACE_RESOURCES deinit { _ = Resources.decrementTotal() } #endif } 複製代碼
class ObserverBase<Element> : Disposable, ObserverType { private let _isStopped = AtomicInt(0) func on(_ event: Event<Element>) { switch event { case .next: if load(self._isStopped) == 0 { self.onCore(event)//這裏實際調用的是子類的onCore() } case .error, .completed: if fetchOr(self._isStopped, 1) == 0 { self.onCore(event) } } } func onCore(_ event: Event<Element>) { rxAbstractMethod() } func dispose() { fetchOr(self._isStopped, 1) } } 複製代碼
ObserverBase.on()
最終調用了AnonymousObserver.onCore()
,而在AnonymousObserver.onCore()
裏回調了_eventHandler(event)閉包B,而閉包B就是咱們最初ob.subscribe()序列訂閱時建立AnonymousObserver的尾隨閉包,這樣這個尾隨閉包最終調用了咱們訂閱的onNext()方法。這樣就解釋了:實例1中,執行observer.onNext("kongyulu")
這行代碼就會回調let dispose = ob.subscribe(onNext: { (anything) in print("訂閱到了:\(anything)") }
從而打印了 「訂閱到了:kongyulu」具體AnonymousObserver {B}的尾隨閉包B的代碼以下:
public func subscribe(onNext: ((Element) -> Void)? = nil, onError: ((Swift.Error) -> Void)? = nil, onCompleted: (() -> Void)? = nil, onDisposed: (() -> Void)? = nil) -> Disposable { ...這次無關代碼先省略 //特別注意:AnonymousObserver<Element> { B}括號裏面的尾隨閉包咱們稱爲B,最終會經過AnonymousObserver.onCore()函數調用閉包B let observer = AnonymousObserver<Element> { event in ...這次無關代碼先省略 switch event { case .next(let value): onNext?(value) //這裏調用onNext(value)實際就是 case .error(let error): if let onError = onError { onError(error) } else { Hooks.defaultErrorHandler(callStack, error) } disposable.dispose() case .completed: onCompleted?() disposable.dispose() } } return Disposables.create( self.asObservable().subscribe(observer), disposable ) } 複製代碼
閉包A
)建立時將閉包A保存在AnonymousObservable
裏變量_subscribeHandler
。閉包B
)訂閱序列時,會首先建立一個AnonymousObserver
對象,而且會帶一個尾隨閉包C
。而後經過self.asObservable().subscribe(AnonymousObserver)
通過一系列轉化將AnyObserver
傳遞給了閉包A
。self.asObservable().subscribe(AnonymousObserver)
實際就是ob.subscribe(AnonymousObserver)
ob.subscribe(AnonymousObserver)
實際就是Producer.subscribe(AnonymousObserver)
Producer.subscribe(AnonymousObserver)
會調用self.run(AnonymousObserver)
self.run(AnonymousObserver)
會建立一個AnonymousObservableSink
管子對象sink,而後調用sink.run(AnonymousObservable)
調用了管子的run()方法,並將ob傳入了管子sink.
而咱們管子的sink.run(AnonymousObservable)
方法裏面調用了parent._subscribeHandler(AnyObserver(self))
實際就是ob._subscribeHandler(AnyObserver(AnonymousObservableSink))
也就是調用了閉包A
而咱們閉包A
須要傳入一個參數就是AnyObserver(AnonymousObservableSink)
,實際上AnyObserver
只是一個結構體,它保存了AnonymousObservableSink.on()
方法。
當咱們在閉包A
裏面調用observer.onNext("kongyulu")
實際上就是AnyObserver.onNext("kongyulu")
,而AnyObserver.onNext("kongyulu")
會調用AnyObserver.on()
AnyObserver.on()
接着又調用AnonymousObservableSink.on(event)
這裏event裏面
AnonymousObservableSink類中AnonymousObservableSink.on(event)
接着又會去調用它本身的forwardOn(event)
也就是AnonymousObservableSink.forwardOn(event)
AnonymousObservableSink.forwardOn(event)
其實是調用它父類Sink.forwardOn(event)
而在Sink父類初始化的時候已經保存了AnonymousObserver
對象_observer。
Sink.forwardOn(event)
會調用的是AnonymousObserver.on(event)
AnonymousObserver.on(event)
實際會調用本身父類的ObserverBase.on(event)
ObserverBase.on(event)
實際又會調用子類的AnonymousObserver.onCore(event)
AnonymousObserver.onCore(event)
會調用self._eventHandler(event)
而這裏_eventHandler就是保存AnonymousObserver建立時傳入的尾隨閉包C
這樣就回調了閉包C
閉包C
中又根據event的事件不一樣,回調了閉包B
,例如如event=.onNext事件,就會回調閉包B
onNext{},也就是let dispose = ob.subscribe(onNext: { (anything) in print("訂閱到了:\(anything)") }, onError: { (error) in print("訂閱到了:\(error)") }, onCompleted: { print("完成了") }) { print("銷燬回調") }
裏面的這段代碼:onNext: { (anything) in print("訂閱到了:\(anything)")
從而就會打印:「訂閱到了:kongyulu」
(28)
(29)
最後這裏經過一個流程圖來表達整個建立,訂閱過程
上面講解了序列的建立,訂閱流程,在分析建立序列,訂閱序列的源碼時,咱們已經隱隱約約的看到了咱們開篇分析的dispose(),貌似在整個源碼中各處都有着dispose的代碼,那麼序列究竟是怎麼銷燬的呢?
爲了解決這個疑問,咱們下面將經過分析源碼,來探索一下序列的銷燬流程。
這裏先看一張序列生命週期時序圖:
經過這張時序圖,結合上面的序列建立,訂閱的流程分析,我能夠先得出序列會被銷燬的3種方式:方式一: 經過發送事件,讓序列生命週期自動結束來釋放序列資源。 一個序列只要發出了 error 或者 completed 事件,它的生命週期將結束,那麼全部內部資源都會被釋放,不須要咱們手動釋放。(這個結論在本篇博客討論實例1和實例2的時候已經驗證了,只要發送了completed和error事件,就會調用onComplete並打印「銷燬了」信息)
方式二: 經過主動調用dispose()來釋放。例如你須要提早釋放序列資源或取消訂閱的話,那麼你能夠對返回的可被清除的資源(Disposable) 調用 dispose 方法。
方式三: 經過垃圾袋DisposeBag來回收資源,達到自動釋放,這是官方推薦的方式。官方推薦使用清除包(DisposeBag)來管理訂閱的生命週期,通常是把資源加入到一個全局的DisposeBag裏面,它跟隨着頁面的生命週期,當頁面銷燬時DisposeBag也會隨之銷燬,同時DisposeBag裏面的資源也會被一一釋放。(這個結論在上面的DisposeBag分析中也證明了)
咱們先來回顧一下本篇博客開始分析的實例1的代碼 實例1:
func limitObservable(){ // 建立序列 let ob = Observable<Any>.create { (observer) -> Disposable in observer.onNext("kongyulu") return Disposables.create { print("銷燬釋放了")} // dispose.dispose() } // 序列訂閱 let dispose = ob.subscribe(onNext: { (anything) in print("訂閱到了:\(anything)") }, onError: { (error) in print("訂閱到了:\(error)") }, onCompleted: { print("完成了") }) { print("銷燬回調") } print("執行完畢") //dispose.dispose() } 複製代碼
dispose.dispose()
這行代碼去掉註釋,而後從新運行,輸出結果以下:
Observable<Any>.create()
方法有一個尾隨閉包,須要返回一個實現了Disposable
協議的實例。而就是經過return Disposables.create { print("銷燬釋放了")}
這行代碼返回的。由此咱們確認Disposables.create { print("銷燬釋放了")}
很是重要,咱們先來發分析一下Disposables.create
源碼。public struct Disposables { private init() {} } 複製代碼
看這個結構體連初始化方法都是私有的,說明它不能被繼承,因而咱們推測Disposables.create()
必定經過擴展的方式實現的。因此咱們在項目中搜索extension Disposables {
關鍵字,能夠找到以下:
extension Disposables { /// Constructs a new disposable with the given action used for disposal. /// /// - parameter dispose: Disposal action which will be run upon calling `dispose`. public static func create(with dispose: @escaping () -> Void) -> Cancelable { return AnonymousDisposable(disposeAction: dispose)//這裏dispose就是咱們傳入的尾隨閉包 } } 複製代碼
經過上面源碼,咱們看到直接一行return AnonymousDisposable(disposeAction: dispose)
就結束了,而dispose
就是咱們實例1中 Disposables.create { print("銷燬釋放了")} // dispose.dispose() }
這行代碼裏面的尾隨閉包: { print("銷燬釋放了")}
這裏咱們給他一個別名成爲:閉包D
AnonymousDisposable
類實現一探究竟:fileprivate final class AnonymousDisposable : DisposeBase, Cancelable { public typealias DisposeAction = () -> Void private let _isDisposed = AtomicInt(0) private var _disposeAction: DisposeAction? /// - returns: Was resource disposed. public var isDisposed: Bool { return isFlagSet(self._isDisposed, 1) } fileprivate init(_ disposeAction: @escaping DisposeAction) { self._disposeAction = disposeAction super.init() } // Non-deprecated version of the constructor, used by `Disposables.create(with:)` fileprivate init(disposeAction: @escaping DisposeAction) { self._disposeAction = disposeAction super.init() } /// Calls the disposal action if and only if the current instance hasn't been disposed yet. /// /// After invoking disposal action, disposal action will be dereferenced. fileprivate func dispose() { if fetchOr(self._isDisposed, 1) == 0 { if let action = self._disposeAction { self._disposeAction = nil action() } } } } 複製代碼
{ print("銷燬釋放了")}
dispose()
方法,經過fetchOr(self._isDisposed, 1) == 0
這行代碼控制dispose()
裏面的內容只會被執行一次。(不管dispose()
方法被執行多少次,if let action = self._disposeAction { self._disposeAction = nil action() }
這段代碼最多會被執行一次)dispose()
方法中先把self._disposeAction
賦值給臨時變量action
,而後置空self._disposeAction
,再執行action()
。這樣操做的緣由是若是_disposeAction
閉包是一個耗時操做,也可以保證_disposeAction
可以當即釋放。在AnonymousDisposable
裏面咱們只看到了一些常規的保存等操做,結合咱們最開始分析序列的建立流程經驗(AnonymousDisposable
就相似於AnonymousObservable
),咱們能夠推斷核心代碼實現確定在訂閱這一塊。
接下來,咱們進入到observable.subscribe()
方法來探究一些subscribe()
的源碼實現。
public func subscribe(onNext: ((Element) -> Void)? = nil, onError: ((Swift.Error) -> Void)? = nil, onCompleted: (() -> Void)? = nil, onDisposed: (() -> Void)? = nil) -> Disposable { //1.這裏定義disposable局部變量 let disposable: Disposable //2.建立了Disposables對象 if let disposed = onDisposed { disposable = Disposables.create(with: disposed) } else { disposable = Disposables.create() } //3.建立了一個AnonymousObserver對象,有一個重要的尾隨閉包 let observer = AnonymousObserver<Element> { event in switch event { case .next(let value): onNext?(value) case .error(let error): if let onError = onError { onError(error) } else { Hooks.defaultErrorHandler(callStack, error) } disposable.dispose() //這裏當收到error事件就會回收釋放資源 case .completed: onCompleted?() disposable.dispose() //這裏當收到completed事件就會回收釋放資源 } } return Disposables.create( self.asObservable().subscribe(observer), disposable//這裏將咱們建立的局部變量傳給了self.asObservable().subscribe,也就是咱們的Producer.subscribe ) } 複製代碼
分析上面subscribe()
源碼,結合開始的分析,咱們能夠得出如下結論:
subscribe()
建立了一個Disposable
對象,並保存了銷燬回調閉包,當執行銷燬時,會把消息回調出去。disposable.dispose()
釋放資源。return Disposables.create( self.asObservable().subscribe(observer), disposable )
,這裏返回的Disposable
對象就是咱們外面手動調用dispose.dispose()
方法的dispose
對象,或者說是加入到全局的DisposeBag
的銷燬者。return Disposables.create( self.asObservable().subscribe(observer), disposable )
時關鍵點,咱們接下進入:Disposables.create()
源碼:public static func create(_ disposable1: Disposable, _ disposable2: Disposable) -> Cancelable { return BinaryDisposable(disposable1, disposable2)//返回一個二元銷燬者對象。 } 複製代碼
上面代碼咱們看到create()直接返回了一個BinaryDisposable
二元銷燬者類對象,並將disposable1
,disposable2
傳入給了BinaryDisposable
。
disposable1
就是self.asObservable().subscribe(observer)
也就是Producer..subscribe(observer)
返回的disposerdisposable2
就是咱們subscribe()中建立局部變量let disposable: Disposable
BinaryDisposable
類究竟是什麼:private final class BinaryDisposable : DisposeBase, Cancelable { private let _isDisposed = AtomicInt(0) // state private var _disposable1: Disposable? private var _disposable2: Disposable? /// - returns: Was resource disposed. var isDisposed: Bool { return isFlagSet(self._isDisposed, 1) } init(_ disposable1: Disposable, _ disposable2: Disposable) { self._disposable1 = disposable1 self._disposable2 = disposable2 super.init() } func dispose() { if fetchOr(self._isDisposed, 1) == 0 { self._disposable1?.dispose() self._disposable2?.dispose() self._disposable1 = nil self._disposable2 = nil } } } 複製代碼