RxSwift (二)序列核心邏輯分析swift
RxSwift (三)Observable的建立,訂閱,銷燬設計模式
RxSwift(五)(Rxswift對比swift,oc用法)markdown
RxSwift (十) 基礎使用篇 1- 序列,訂閱,銷燬函數
RxSwift學習之十二 (基礎使用篇 3- UI控件擴展) @TOCoop
上一篇博客:Rxswift學習之(一)函數響應式編程思想只是簡單的分析了序列的核心邏輯。本篇博客主要針對上一篇作一下更加深刻的探討,若是有那些地方分析有誤,還請留言:QQ:282889543,讓咱們彼此提升,彼此成就。源碼分析
總的來講分析Rxswift的核心邏輯仍是按照三部曲:建立序列,訂閱序列,銷燬序列。核心思想是萬物皆序列。
咱們先來看下建立Observable所涉及的類的繼承關係: 以下圖:
針對上面的類圖,簡單分析下類的關係和設計思想: 首先分層實施的很完全,每一層都只解決一件事情,一層層疊起來結構很是清晰: AnonymousObservable
-> Producer
-> Observable
-> ObservableType
-> ObservableConvertibleType
其次咱們簡單分解一下每一個類都作了些什麼:
Observable
類型協議,方法只有一個asObservable
,這有什麼好處呢?asObservable
,它提供抽象方法subscribe,即咱們常說的訂閱,只有外部訂閱了該對象,才能真正實現對該對象進行觀察。Observable
的全部方法,並實現subscribe 方法Producer
的全部方法,而且增長了屬性let _subscribeHandler: SubscribeHandler
用來保存建立序列時傳入的閉包,也就相對於擁有了調用這個序列的能力,此外它還實現run方法,這也是建立序列最核心關鍵的方法。在run()方法中它建立一個AnonymousObservableSink
final private類的對象,而這個對象sink能夠稱之爲管子,它相似於manager的角色,擁有序列和訂閱,銷燬能力。這裏有兩個疑惑:問題1.
AnonymousObservableSink
爲何要定義成final private類,不能被繼承,也不能被外部訪問? 問題2. 建立的Observable
是如何關聯到訂閱的?
這兩個問題咱們後面再分析。
最後,咱們總結一下設計思想:
事實上用戶所使用的
Observable
,都是Producer
的子類和AnonymousObservable
平行的子類,只不過用戶不須要關心其具體實現罷了 每個相似AnonymousObservable
的類,還有一個與之相關的類AnonymousObservableSink
,Sink即管道,全部這些組合起來才能讓其真正運行起來,AnonymousObservableSink
同時擁有序列,訂閱功能,相似於咱們項目中常常用的manager角色。 整個設計向上經過組合協議的方式描述其特性,向下經過子類化的方式屏蔽其實現細節,相似於工廠模式,這樣的類也能夠叫類簇。
經過上面類繼承關係,其實咱們不難理解序列的建立流程,它確實也是隻有比較簡單的幾部,寥寥幾行代碼就搞定了,難點是上面拋出的幾個問題:
下面咱們將經過一個簡單的Rxswift的實例來分析一下序列的建立,訂閱,銷燬直接的流程和關係。
實例1:
//1. 建立序列 let ob = Observable<Any>.create { (obserber) -> Disposable in // 3:發送信號 obserber.onNext("kyl發送了信號") obserber.onCompleted() return Disposables.create() } // 2:訂閱信號 let _ = ob.subscribe(onNext: { (text) in print("訂閱到:\(text)") }, onError: { (error) in print("error: \(error)") }, onCompleted: { print("完成") }) { print("銷燬") } 複製代碼
上面實例1 的這段代碼能夠用酷C老師的一個圖來清晰的表達:
從上面的代碼和關係圖,咱們可能會產生這樣一個疑惑:
問題3: 建立的ob序列,僅僅只是經過
ob.subscribe()
訂閱了一下,爲何咱們在ob建立時的尾隨閉包(咱們這裏給個名字叫閉包A
)裏面調用了obserber.onNext("kyl發送了信號")
這個代碼,咱們就能夠訂閱到let _ = ob.subscribe(onNext: { (text) in print("訂閱到:\(text)") }
這裏會打印:」訂閱到:kyl發送了信號「。咱們沒有看見他們之間有任何關聯,怎麼ob發送消息,subcribe()的onNext閉包就能夠觸發呢,這是爲何呢?
咱們能夠這裏能夠簡單推理下:ob.subscribe()這個訂閱方法確定作了一些事情,在某個地方調用了閉包A
,才能實現這個功能。具體是怎麼樣實現的呢?下面咱們將經過分析源碼來解答這個疑惑。
從上面的代碼咱們能夠知道,建立序列就一行代碼:let ob = Observable<Any>.create { (obserber) ->
而這一行代碼實際上是作了好多事情的。
首先咱們經過一個流程圖來初略的瞭解一下序列建立流程:
建立序列的Rxswift原碼很簡單,從上圖能夠看出,直接一行代碼return AnonymousObservable(subscribe)
就結束了,這裏咱們們並無找到咱們須要的答案,甚至咱們有點愈來愈暈感受。
final private class AnonymousObservable<Element>: Producer<Element> { typealias SubscribeHandler = (AnyObserver<Element>) -> Disposable let _subscribeHandler: SubscribeHandler init(_ subscribeHandler: @escaping SubscribeHandler) { self._subscribeHandler = subscribeHandler } override func run<O : ObserverType>(_ observer: O, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where O.E == Element { let sink = AnonymousObservableSink(observer: observer, cancel: cancel) let subscription = sink.run(self) return (sink: sink, subscription: subscription) } } 複製代碼
咱們先作個深呼吸,放輕鬆,此路不通那咱們來嘗試分析其餘方向,不能在一棵樹上吊死。下面咱們來分析一下訂閱的流程。
回顧上面實例1中的訂閱代碼:let _ = ob.subscribe(onNext: { (text) in
這行代碼又作了些什麼事情呢?下面咱們經過源碼來深刻分析一下:
subscribe()
的源碼以下:public func subscribe(onNext: ((E) -> Void)? = nil, onError: ((Swift.Error) -> Void)? = nil, onCompleted: (() -> Void)? = nil, onDisposed: (() -> Void)? = nil) -> Disposable { ... 上面代碼不是咱們要分析的重點,...表示忽略了這次的一段源碼 /*注意,這次定義了一個AnonymousObserver()對象,以參數的形式, 構造方法裏面傳入了一個尾隨閉包eventHandler, 在這個閉包裏面,當收到event的不一樣事件, 會觸發並調用,咱們 `let _ = ob.subscribe(onNext: { (text) in` 這個方法傳入閉包 */ let observer = AnonymousObserver<E> { event in ... switch event { case .next(let value): onNext?(value) //調用訂閱時傳入的ob.subscribe(onNext:閉包 case .error(let error): if let onError = onError { onError(error)//調用訂閱時傳入的ob.subscribe(onError:閉包 } else { Hooks.defaultErrorHandler(callStack, error) } disposable.dispose() case .completed: onCompleted?()//調用訂閱時傳入的ob.subscribe(onCompleted:閉包 disposable.dispose() } } return Disposables.create( self.asObservable().subscribe(observer), disposable )/*這裏直接返回了Disposables對象,用來釋放資源, 在它的構造函數裏面直接調用了self.asObservable().subscribe(observer), 而asObservable()就是咱們建立的序列ob,也就是ob.subscribe(), 並傳入了,在這段代碼裏面建立的局部變量let observer = AnonymousObserver<E>,*/ } 複製代碼
經過上面源碼咱們能夠知道:subscribe()這個方法,以參數的形式傳入了onNext()閉包,onError()閉包,onComplete()閉包,在函數裏面建立了一個AnonymousObserver對象observer,這個對象建立的時候傳入了一個閉包,當收到不一樣event事件時,會分別調用咱們subscribe()傳入的onNext,onError,onComplete這三個閉包。最重要一點是return Disposables.create( self.asObservable().subscribe(observer), disposable )
這句代碼調用了咱們真正的subscribe()函數,並以參數的形式傳入了AnonymousObserver對象,self.asObservable()就是咱們create()函數建立的序列ob, 而到此處咱們能夠清晰的看到,咱們訂閱時傳入參數閉包和咱們建立的ob創建了一個鏈條。
這裏咱們又有一個疑問:self.asObservable()爲何就是咱們create()函數返回的ob呢?
要解答這個問題,我須要回顧一下上面分析的Observable類的繼承關係:Observable
-> ObservableType
-> ObservableConvertibleType
即Observable繼承ObservableType協議,ObservableType又繼承ObservableConvertibleType協議,而咱們的ObservableConvertibleType提供了抽象方法asObservable(),咱們Observable類中實現了asObservable()這個方法,它直接返回self就它本身。
下面經過源碼來證明:
/// /// It represents a push style sequence. public class Observable<Element> : ObservableType { ... public func asObservable() -> Observable<E> { return self } ... } 複製代碼
分析了Rxswift訂閱subscribe()
的源碼感受很是nice, 咱們找到了咱們ob 建立時傳入的閉包和咱們訂閱時的閉包存在了一條鏈條關係,也就是隻要ob發送了消息,那咱們的訂閱者必定能夠按照這個鏈條收到消息。可是咱們仍是不知道究竟是怎麼調用的,怎麼觸發的。
並且咱們注意到self.asObservable().subscribe(observer)
也就是AnonymousObservable
調用了subscribe()
方法,可是在AnonymousObservable
類中咱們並無找到subscribe()的定義,因此咱們須要來看他的父類Producer
class Producer<Element> : Observable<Element> { override init() { super.init() } override func subscribe<O : ObserverType>(_ observer: O) -> Disposable where O.E == Element { if !CurrentThreadScheduler.isScheduleRequired { // The returned disposable needs to release all references once it was disposed. let disposer = SinkDisposer() /*重點在這裏了,這裏調用了run()方法,一切疑惑都清晰了,咱們知道了run()調用時傳入了observer,而且建立了sink管子,而這個管子具有了序列的功能,能夠調用on()方法。 */ let sinkAndSubscription = self.run(observer, cancel: disposer) disposer.setSinkAndSubscription(sink: sinkAndSubscription.sink, subscription: sinkAndSubscription.subscription) return disposer } else { return CurrentThreadScheduler.instance.schedule(()) { _ in let disposer = SinkDisposer() let sinkAndSubscription = self.run(observer, cancel: disposer) disposer.setSinkAndSubscription(sink: sinkAndSubscription.sink, subscription: sinkAndSubscription.subscription) return disposer } } } func run<O : ObserverType>(_ observer: O, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where O.E == Element { rxAbstractMethod() } } 複製代碼
果真不出咱們所料,在Producer
中咱們找到了subscribe()的方法定義,到此咱們能夠總結出很清晰的幾條線索了:
Producer
實現了ObservableType
協議的subscribe()方法。在這個方法裏面調用了self.run(observer, cancel: disposer)
override func run<O : ObserverType>(_ observer: O, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where O.E == Element { //1.建立了一個sink管道對象,並將observer也就create()建立 //序列時傳入的閉包傳給了sink let sink = AnonymousObservableSink(observer: observer, cancel: cancel) //2. sink調用本身的run()方法,並把AnonymousObservable做爲參數傳入。 let subscription = sink.run(self) //返回一個元組,包含sink管道信息。 return (sink: sink, subscription: subscription) } 複製代碼
(3)AnonymousObservableSink類中run()方法中調用parent._subscribeHandler(AnyObserver(self))
其中parent就是咱們(2)中sink.run(self)
傳入的self,也就是AnonymousObservable對象;而且咱們前面已經知道_subscribeHandler就是建立序列時保存的那個經過create()函數參數傳入的 閉包:let ob = Observable<Any>.create { (obserber) -> Disposable in // 3:發送信號 obserber.onNext("kyl發送了信號") obserber.onCompleted() return Disposables.create() }
。 如今已經很清晰了parent._subscribeHandler(AnyObserver(self))
執行閉包,這行代碼就會調用obserber.onNext("kyl發送了信號")
這個行代碼。
如今咱們能夠經過一個流程圖來總結咱們代碼執行的流程:
上面的訂閱序列流程分析:咱們弄明白了從訂閱序列到調用create()函數時傳入的參數閉包調用的邏輯,可是這個閉包發送onNext()信號後,怎麼到訂閱消息的onNext()閉包咱們還不是很清晰。所以咱們須要分析AnonymousObserver
咱們先來看下AnonymousObserver
類
AnonymousObserver
源碼定義以下:final class AnonymousObserver<ElementType> : ObserverBase<ElementType> { typealias Element = ElementType typealias EventHandler = (Event<Element>) -> Void private let _eventHandler : EventHandler /*構造函數,保存了EventHandler尾隨閉包*/ init(_ eventHandler: @escaping EventHandler) { #if TRACE_RESOURCES _ = Resources.incrementTotal() #endif self._eventHandler = eventHandler } //覆寫了onCore方法,調用了EventHandler閉包 override func onCore(_ event: Event<Element>) { return self._eventHandler(event) } #if TRACE_RESOURCES deinit { _ = Resources.decrementTotal() } #endif } 複製代碼
從AnonymousObserver
源碼中咱們並無找到onNext()方法,那咱們只能沿着它的繼承鏈往上查找,這裏須要瞭解一下類的繼承關係:
經過分析類的繼承關係,咱們得知:這樣一個關係鏈:
AnonymousObserver對象的on()方法會調用onCore()方法,ObserverType裏面有onNext,onError,onComplete方法。可是on()是如何調用的,什麼時候調用的呢?
要解決這個疑問,咱們須要再次回到咱們建立序列的代碼:
public static func create(_ subscribe: @escaping (AnyObserver<E>) -> Disposable) -> Observable<E> { return AnonymousObservable(subscribe) } 複製代碼
建立序列的create()方法傳入了一個subscribe閉包,並返回了AnonymousObservable對象。其中subscribe閉包就是咱們序列建立時參數形式傳入 閉包。而且AnonymousObservable初始化時將這個閉包保存起來了self._subscribeHandler = subscribeHandler
AnonymousObservable 有一個run()方法,run方法裏面建立了一個AnonymousObservableSink
對象sink,具體源碼以下:
final private class AnonymousObservable<Element>: Producer<Element> { typealias SubscribeHandler = (AnyObserver<Element>) -> Disposable let _subscribeHandler: SubscribeHandler init(_ subscribeHandler: @escaping SubscribeHandler) { self._subscribeHandler = subscribeHandler } override func run<O : ObserverType>(_ observer: O, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where O.E == Element { let sink = AnonymousObservableSink(observer: observer, cancel: cancel) let subscription = sink.run(self) return (sink: sink, subscription: subscription) } } 複製代碼
分析了這麼久,繞了一圈,終於發現關鍵就在AnonymousObservableSink
管子這個對象裏面了。sink這是個神奇的管子。它就保存了序列,也保存了訂閱,還保存了用於銷燬的disposed 也就是同時擁有了建立序列,訂閱序列,銷燬序列功能。
咱們來分析下AnonymousObservableSink
的源碼:
final private class AnonymousObservableSink<O: ObserverType>: Sink<O>, ObserverType { typealias E = O.E //這裏的Parent就是咱們上面分析的AnonymousObservable,很是重要 typealias Parent = AnonymousObservable<E> // state private let _isStopped = AtomicInt(0) #if DEBUG fileprivate let _synchronizationTracker = SynchronizationTracker() #endif // 構造方法,傳入了observer序列,和Cancelable override init(observer: O, cancel: Cancelable) { super.init(observer: observer, cancel: cancel) } //這裏實現 了ObserverType協議的on()方法 func on(_ event: Event<E>) { #if DEBUG self._synchronizationTracker.register(synchronizationErrorMessage: .default) defer { self._synchronizationTracker.unregister() } #endif switch event { case .next: if load(self._isStopped) == 1 { return } //調用了父類的發佈,self.forwardOn()會調用本身的on()方法 self.forwardOn(event) case .error, .completed: if fetchOr(self._isStopped, 1) == 0 { self.forwardOn(event) self.dispose() } } } func run(_ parent: Parent) -> Disposable { /*調用了_subscribeHandler閉包,這個閉包就是咱們以前建立序列時傳入閉包。 parent就是傳入進來的序列,這裏序列的閉包裏傳入了self而且強轉爲AnyObserver 這裏將self傳給了閉包_subscribeHandler,這樣_subscribeHandler也就具有了subcribe的能力。 */ return parent._subscribeHandler(AnyObserver(self)) } } 複製代碼
其中Sink類的源碼以下:
class Sink<O : ObserverType> : Disposable { fileprivate let _observer: O fileprivate let _cancel: Cancelable fileprivate let _disposed = AtomicInt(0) #if DEBUG fileprivate let _synchronizationTracker = SynchronizationTracker() #endif init(observer: O, cancel: Cancelable) { #if TRACE_RESOURCES _ = Resources.incrementTotal() #endif self._observer = observer self._cancel = cancel } final func forwardOn(_ event: Event<O.E>) { #if DEBUG self._synchronizationTracker.register(synchronizationErrorMessage: .default) defer { self._synchronizationTracker.unregister() } #endif if isFlagSet(self._disposed, 1) { return } //這裏調用了傳入observer.on()方法, self._observer.on(event) } final func forwarder() -> SinkForward<O> { return SinkForward(forward: self) } final var disposed: Bool { return isFlagSet(self._disposed, 1) } func dispose() { fetchOr(self._disposed, 1) self._cancel.dispose() } deinit { #if TRACE_RESOURCES _ = Resources.decrementTotal() #endif } } 複製代碼
從源碼分析咱們得知:
咱們的sink保存了咱們的序列,當咱們調用ob.onNext()發送信號時,因爲咱們的sink已經持有了ob, 這樣sink會調用on()方法,在on()方法裏面會調用self.forwardOn(event),而在fowardOn()裏面會調用self._observer.on(event)。這樣個人疑問就解決了,答案就是sink調用了on()方法。
這裏咱們再來總結一下總的流程:
create()
返回了一個ob
, 這個ob就是序列,建立的時候傳遞了一個閉包A
。在閉包A中調用了ob.onNext()
發送了信號。ob.subscribe()
方法,這個方法會建立一個AnonymousObserver
對象,並調用了self.asObservable().subscribe(observer)
。self.asObservable()
實際就是咱們的ob
, 也就是ob調用了subscribe().而AnonymousObserver中沒有找到subscribe()。AnonymousObserver
的父類中找到了subscribe(),發現subscribe()調用了AnonymousObserver的run()
方法。sink.run(self)
,sink是AnonymousObservableSink的對象,而在sink的run()方法中parent._subscribeHandler(AnyObserver(self))
調用了建立序列時保存的閉包A (parent就是AnonymousObserver),這樣就解釋了訂閱時,回調了A閉包的緣由。public func subscribe(onNext: ((E) -> Void)? = nil, onError: ((Swift.Error) -> Void)? = nil, onCompleted: (() -> Void)? = nil, onDisposed: (() -> Void)? = nil) -> Disposable { ... 上面代碼不是咱們要分析的重點,...表示忽略了這次的一段源碼 /*注意,這次定義了一個AnonymousObserver()對象,以參數的形式, 構造方法裏面傳入了一個尾隨閉包eventHandler, 在這個閉包裏面,當收到event的不一樣事件, 會觸發並調用,咱們 `let _ = ob.subscribe(onNext: { (text) in` 這個方法傳入閉包 */ let observer = AnonymousObserver<E> { event in ... switch event { case .next(let value): onNext?(value) //調用訂閱時傳入的 複製代碼
這裏調用ob.subscribe()的時候,咱們建立了AnonymousObserver和咱們subscribe()傳入的onNext()閉包作了一個綁定,當AnonymousObserver.onNext()
調用的時候一定會回調subscribe()傳入的onNext()閉包。而10中的_observer對象指的就是let observer = AnonymousObserver