篇幅稍微有點長,瞭解程度不一樣,能夠跳過某些部分。api
- 若是對源碼比較熟悉的,建議直接看圖就好了,時序圖更加清晰。第一次摸索有必要閱讀文字內容。
- 貼出來的代碼省略了沒必要要的部分,用省略號代替。
RxSwift的基礎用法就是很簡單的幾步:bash
//建立序列
let ob = Observable<Any>.create { (observer) -> Disposable in
//發送信號
observer.onNext("今日份麻醬涼皮")
observer.onCompleted()
return Disposables.create()
}
//訂閱信號
let _ = ob.subscribe(onNext: { (text) in
print("訂閱到:\(text)")
}, onError: { (error) in
print(error)
}, onCompleted: {
print("完成")
}) {
print("銷燬")
}
控制檯輸出:
訂閱到:今日份麻醬涼皮
完成
銷燬
複製代碼
在看源碼以前,應該對接觸到的類和協議有些認識,方便以後的理解。下面的關係圖在須要的時候回頭熟悉一下就行:閉包
究竟是什麼在支撐如此便捷的調用?ide
第一句Observable<Any>.create
建立了一個可觀察序列Observable
對象,第二句就是這個Observable
序列對象訂閱了消息。函數
從輸出能夠看出,都是訂閱到的消息。那麼訂閱時傳入subscribe
的閉包是何時調用的呢?fetch
單從如今的幾句代碼,也能猜出是第一句代碼的閉包中的observer.onNext
的調用引發的。可是,咱們也沒有看到這個**create
函數中的閉包是在哪裏執行的?**ui
爲了可以清晰的描述,暫且稱第一句
create
中的閉包爲**create閉包
,第二句subscribe
中的幾個閉包爲subscribe閉包
**。spa
外面看不出來,那咱們只能進去RxSwift
裏面探索下create
和subscribe
到底作了什麼?線程
create
函數的實現:code
extension ObservableType {
public static func create(_ subscribe: @escaping (AnyObserver<E>) -> Disposable) -> Observable<E> {
return AnonymousObservable(subscribe)
}
}
複製代碼
原來這函數內部其實是建立了一個AnonymousObservable
匿名可觀察序列對象。而以前的閉包也是給AnonymousObservable
對象初始化用了。
AnonymousObservable
類:
final private class AnonymousObservable<Element>: Producer<Element> {
typealias SubscribeHandler = (AnyObserver<Element>) -> Disposable
let _subscribeHandler: SubscribeHandler
init(_ subscribeHandler: @escaping SubscribeHandler) {
self._subscribeHandler = subscribeHandler
}
override func run<O : ObserverType>(_ observer: O, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where O.E == Element {
let sink = AnonymousObservableSink(observer: observer, cancel: cancel)
let subscription = sink.run(self)
return (sink: sink, subscription: subscription)
}
}
複製代碼
這裏在初始化的時候把create閉包
賦值給了_subscribeHandler
屬性。
到此爲止,
Observable<Any>.create
函數實際上建立了一個AnonymousObservable
匿名可觀察序列對象,並保存了create閉包
。
。。。貌似這條不是主線啊!沒有找到任何一個問題的答案。
再來翻翻**subscribe
函數**:
extension ObservableType {
public func subscribe(onNext: ((E) -> Void)? = nil, onError: ((Swift.Error) -> Void)? = nil, onCompleted: (() -> Void)? = nil, onDisposed: (() -> Void)? = nil)
-> Disposable {
let disposable: Disposable
......
let observer = AnonymousObserver<E> { event in
......
switch event {
case .next(let value):
onNext?(value)
case .error(let error):
if let onError = onError {
onError(error)
}
else {
Hooks.defaultErrorHandler(callStack, error)
}
disposable.dispose()
case .completed:
onCompleted?()
disposable.dispose()
}
}
return Disposables.create(
self.asObservable().subscribe(observer),
disposable
)
}
}
複製代碼
這也是對定義在ObservableType
協議中的函數的實現,返回了一個Disposable
。這個Disposable
就是用來管理訂閱的生命週期的,示例代碼中並無體現出來,實際是在訂閱信號的內部處理的。前面都沒什麼,後面建立了AnonymousObserver
,而且在AnonymousObserver
初始化時傳入了閉包,並賦值給_eventHandler
屬性。
final class AnonymousObserver<ElementType> : ObserverBase<ElementType> {
typealias Element = ElementType
typealias EventHandler = (Event<Element>) -> Void
private let _eventHandler : EventHandler
init(_ eventHandler: @escaping EventHandler) {
self._eventHandler = eventHandler
}
override func onCore(_ event: Event<Element>) {
return self._eventHandler(event)
}
}
複製代碼
以前,
AnonymousObservable
匿名序列對象,保存了create閉包
。 此時,建立了AnonymousObserver
匿名觀察者對象,保存了對subscribe閉包
的回調執行的EventHandler閉包
。
又一條支線。。。一路走來,都是在建立對象,保存閉包。兩個主線疑問仍是無跡可尋。難道一開始就走上了歧路?非也!繼續看下去就明白了什麼叫「柳暗花明又一村」。
AnonymousObservable
的subscribe
函數中,在建立了AnonymousObserver
對象後,還返回了一個新建的Disposable
對象。重點就在這裏的第一個參數:self.asObservable().subscribe(observer)
中。asObservable
仍是返回了self
,後面貼上的ObserverType
中能夠看到。剩下的就是AnonymousObservable
的父類Producer
中的subscribe
了:
class Producer<Element> : Observable<Element> {
override func subscribe<O : ObserverType>(_ observer: O) -> Disposable where O.E == Element {
if !CurrentThreadScheduler.isScheduleRequired {
......
} else {
return CurrentThreadScheduler.instance.schedule(()) { _ in
let disposer = SinkDisposer()
let sinkAndSubscription = self.run(observer, cancel: disposer)
disposer.setSinkAndSubscription(sink: sinkAndSubscription.sink, subscription: sinkAndSubscription.subscription)
return disposer
}
}
}
}
複製代碼
因爲是在當前線程中執行的,只看else
那部分。disposer
相關的不用關心。關鍵是observer
參數,這個參數中有對subscribe閉包
的處理的EventHandler閉包
。observer
傳入了self.run(observer, cancel)
。因此,還要回頭再看看**AnonymousObservable
類中的run
函數**:
override func run<O : ObserverType>(_ observer: O, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where O.E == Element {
let sink = AnonymousObservableSink(observer: observer, cancel: cancel)
let subscription = sink.run(self)
return (sink: sink, subscription: subscription)
}
複製代碼
這裏又建立了一個AnonymousObservableSink
對象,observer
和cancel
繼續往初始化函數中丟:
final private class AnonymousObservableSink<O: ObserverType>: Sink<O>, ObserverType {
typealias E = O.E
typealias Parent = AnonymousObservable<E>
// state
private let _isStopped = AtomicInt(0)
override init(observer: O, cancel: Cancelable) {
super.init(observer: observer, cancel: cancel)
}
func on(_ event: Event<E>) {
......
switch event {
case .next:
if load(self._isStopped) == 1 {
return
}
self.forwardOn(event)
case .error, .completed:
if fetchOr(self._isStopped, 1) == 0 {
self.forwardOn(event)
self.dispose()
}
}
}
func run(_ parent: Parent) -> Disposable {
return parent._subscribeHandler(AnyObserver(self))
}
}
複製代碼
前面sink.run(self)
把self
傳了進來,又對AnonymousObservable
起了別名Parent
。parent._subscribeHandler
不就是AnonymousObservable
在調用它最開始保存的那個create閉包
麼?AnyObserver(self)
則把AnonymousObservableSink
做爲AnyObserver
初始化的參數。
public struct AnyObserver<Element> : ObserverType {
public typealias E = Element
public typealias EventHandler = (Event<Element>) -> Void
private let observer: EventHandler
public init(eventHandler: @escaping EventHandler) {
self.observer = eventHandler
}
public init<O : ObserverType>(_ observer: O) where O.E == Element {
self.observer = observer.on
}
public func on(_ event: Event<Element>) {
return self.observer(event)
}
public func asObserver() -> AnyObserver<E> {
return self
}
}
複製代碼
其中還把AnonymousObservableSink
的on
函數賦值給了AnyObserver
的屬性observer
,observer
就是EventHandler
。這個EventHandler
在create閉包
中會用到。
這不就是第二個主線疑問(create
函數中的閉包什麼時候調用)的答案麼!
整理一下:
subscribe(onNext, onError, onCompleted, onDisposed) -> Disposable
函數中建立Disposable
開始AnonymousObservable
調用subscribe(observer)
AnonymousObservable
調用run(observer, cancel)
AnonymousObservableSink(observer: observer, cancel: cancel)
,而且sink.run(self)
parent._subscribeHandler(AnyObserver(self))
這是一條從
subscribe閉包
-->create閉包
的線。
還沒完,還有個**create閉包
中怎麼觸發subscribe閉包
的?**
又臭又長的寫了這麼多,這裏就只看observer.onNext("今日份麻醬涼皮")
吧。點進去看: observer.onNext("今日份麻醬涼皮")
:
extension ObserverType {
public func onNext(_ element: E) {
self.on(.next(element))
}
public func onCompleted() {
self.on(.completed)
}
public func onError(_ error: Swift.Error) {
self.on(.error(error))
}
}
複製代碼
onNext
中調用了on
,在前面AnyObserver
結構體定義中能夠看出,on
函數中返回了self.observer(event)
,以前是把AnonymousObservableSink
的on
賦值給了這個self.observer
,因此,此時會走到AnonymousObservableSink
的on
函數中。這裏面又調用了self.forwardOn(event)
,看下AnonymousObservableSink
的父類Sink
中定義的forwardOn
:
class Sink<O : ObserverType> : Disposable {
fileprivate let _observer: O
fileprivate let _cancel: Cancelable
fileprivate let _disposed = AtomicInt(0)
......
init(observer: O, cancel: Cancelable) {
......
self._observer = observer
self._cancel = cancel
}
final func forwardOn(_ event: Event<O.E>) {
......
self._observer.on(event)
}
}
複製代碼
forwardOn
中走了一句self._observer.on(event)
。這裏的_observer
屬性不就是AnonymousObservableSink
初始化時傳入的AnonymousObserver
對象麼!
繼續跟AnonymousObserver
的on
:
final class AnonymousObserver<ElementType> : ObserverBase<ElementType> {
typealias Element = ElementType
typealias EventHandler = (Event<Element>) -> Void
private let _eventHandler : EventHandler
init(_ eventHandler: @escaping EventHandler) {
......
self._eventHandler = eventHandler
}
override func onCore(_ event: Event<Element>) {
return self._eventHandler(event)
}
}
複製代碼
這裏沒有on
,看父類ObserverBase
:
class ObserverBase<ElementType> : Disposable, ObserverType {
typealias E = ElementType
private let _isStopped = AtomicInt(0)
func on(_ event: Event<E>) {
switch event {
case .next:
if load(self._isStopped) == 0 {
self.onCore(event)
}
case .error, .completed:
if fetchOr(self._isStopped, 1) == 0 {
self.onCore(event)
}
}
}
}
複製代碼
這裏的on
函數中,有個.next
分支,調用了self.onCore(event)
。子類AnonymousObserver
實現的onCore
中又調用了self._eventHandler(event)
。
這個_eventHandler
是什麼?不就是AnonymousObserver
初始化時保存的對subscribe閉包
處理的閉包麼!因此create閉包
中的observer.onNext("今日份麻醬涼皮")
就能觸發subscribe閉包
了。
這是一條從
AnonymousObservable
調用_subscribeHandler
(也就是create閉包
)時的參數AnyObserver
-->subscribe閉包
的線。
如今咱們看清楚了響應式的數據流:
看清楚了麼?好像清楚的不太明顯。畢竟好幾個類、協議,又那麼多函數調來調去的。加把勁再擼一擼。既然這麼五花八門的調用流程搞清楚了,那就來弄清楚它主要都作了什麼?
一圖概千言,既然畫了圖,就少敲點鍵盤吧!
仔細看了流程圖就會發現,出現的幾個類中,幹活的主要是Anonymous
開頭的那三個類。咱們在外面調用的操做,實際上是在使用RxSwift
內部封裝的一些類。
AnonymousObservable
,並保存create閉包
AnonymousObserver
,而且把對subscribe閉包
的操做封裝成了EventHandler
AnonymousObservable
來幹。
Disposable
中,由subscribe(observer)
,把AnonymousObserver
傳給了AnonymousObservableSink
AnonymousObservableSink
纔是信息處理的核心,由於他知道的太多了AnonymousObservableSink
有AnonymousObserver
觀察者,AnonymousObserver
持有着EventHandler
。AnonymousObservableSink
在調用run
函數時也傳入了AnonymousObservable
序列,AnonymousObservable
就是create閉包
的持有者。AnonymousObservableSink
初始化的時候,除了觀察者外,還有個管理序列生命週期的Disposable
。AnonymousObservableSink
做爲一個內部類,在被create閉包
當作參數回調給外界時須要轉換爲AnyObserver
,在這裏AnyObserver
則是以閉包屬性的形式保留了AnonymousObservableSink
的on
函數AnyObserver
經過這個屬性值聯繫到AnonymousObservableSink
訂閱信號:
Observable
-->AnonymousObservable
-->AnonymousObserver
-->AnonymousObservableSink
-->AnyObserver
-->create閉包
create閉包
中調用AnyObserver
的onNext
開始AnyObserver.observer
訪問閉包中的AnonymousObservableSink
AnonymousObservableSink
擁有AnonymousObserver
AnonymousObserver
掌控EventHandler
發出信號:
create閉包
-->AnyObserver
-->AnonymousObservableSink
-->AnonymousObserver
-->subscribe閉包