在上一篇RxSwift核心邏輯簡介中已經簡單分析了RxSwift的核心邏輯。swift
可是其中有一個點,解析得不是很清楚。那就是可觀察序列和訂閱者之間究竟是如何聯繫在一塊兒的?那麼這一篇就詳細分析一下連接二者的管道——AnonymousObservableSink。api
以前已經知道了,第二步訂閱信號之後,代碼會來到這裏。 閉包
run
函數中,會初始化
AnonymousObservableSink
,同時傳遞參數
observer和
cancel,
cancel是垃圾處理器,處理內存回收,不是本篇的重點,先忽略。咱們重點關注
observer。
AnonymousObservableSink
<—> observer先看看observer的來源。其是執行subscribe
函數內部初始化的匿名觀察者AnonymousObserver
的實例對象。 ide
這個實例對象在AnonymousObservableSink初始化時做爲初始化參數傳入。咱們再來看AnonymousObservableSink
類的源碼。函數
final private class AnonymousObservableSink<O: ObserverType>: Sink<O>, ObserverType {
typealias E = O.E
typealias Parent = AnonymousObservable<E>
// state
private let _isStopped = AtomicInt(0)
#if DEBUG
fileprivate let _synchronizationTracker = SynchronizationTracker()
#endif
override init(observer: O, cancel: Cancelable) {
super.init(observer: observer, cancel: cancel)
}
func on(_ event: Event<E>) {
#if DEBUG
self._synchronizationTracker.register(synchronizationErrorMessage: .default)
defer { self._synchronizationTracker.unregister() }
#endif
switch event {
case .next:
if load(self._isStopped) == 1 {
return
}
self.forwardOn(event)
case .error, .completed:
if fetchOr(self._isStopped, 1) == 0 {
self.forwardOn(event)
self.dispose()
}
}
}
func run(_ parent: Parent) -> Disposable {
return parent._subscribeHandler(AnyObserver(self))
}
}
複製代碼
類AnonymousObservableSink
在初始化時調用的是父類的初始化方法,咱們再找到其父類的源碼。post
class Sink<O : ObserverType> : Disposable {
fileprivate let _observer: O
fileprivate let _cancel: Cancelable
fileprivate let _disposed = AtomicInt(0)
#if DEBUG
fileprivate let _synchronizationTracker = SynchronizationTracker()
#endif
init(observer: O, cancel: Cancelable) {
#if TRACE_RESOURCES
_ = Resources.incrementTotal()
#endif
self._observer = observer
self._cancel = cancel
}
final func forwardOn(_ event: Event<O.E>) {
#if DEBUG
self._synchronizationTracker.register(synchronizationErrorMessage: .default)
defer { self._synchronizationTracker.unregister() }
#endif
if isFlagSet(self._disposed, 1) {
return
}
self._observer.on(event)
}
final func forwarder() -> SinkForward<O> {
return SinkForward(forward: self)
}
final var disposed: Bool {
return isFlagSet(self._disposed, 1)
}
func dispose() {
fetchOr(self._disposed, 1)
self._cancel.dispose()
}
deinit {
#if TRACE_RESOURCES
_ = Resources.decrementTotal()
#endif
}
}
複製代碼
經過父類的初始化方法能夠看出,observer對象最終被保存在Sink
類的_observer
變量中。fetch
所以子類AnonymousObservableSink
就擁有了觀察者observer。this
AnonymousObservableSink
<—> 可觀察序列AnonymousObservableSink
被初始化後,會執行sink.run(self)
函數,這裏的self即爲第一步建立可觀察序列中,建立的可觀察序列。spa
經過run
函數,AnonymousObservableSink
與可觀察序列之間就創建起了聯繫。3d
observer.onNext()
中的observer究竟是什麼?咱們知道,在第三步==發送信號==會執行observer.onNext()
函數。這個observer會是咱們建立的可觀察序列嗎?咱們來分析一下源碼。
在AnonymousObservableSink
對象執行run
函數時
func run(_ parent: Parent) -> Disposable {
return parent._subscribeHandler(AnyObserver(self))
}
複製代碼
會調用parent的_subscribeHandler
。在上一篇中已經分析了_subscribeHandler
,就是第一步建立可觀察序列時傳入的閉包。而給閉包返回的參數是一個AnyObserver(self)
。因此,observer.onNext()
中的observer
不是咱們建立的可觀察序列,而是AnyObserver
。
查看源碼。
public struct AnyObserver<Element> : ObserverType {
/// The type of elements in sequence that observer can observe.
public typealias E = Element
/// Anonymous event handler type.
public typealias EventHandler = (Event<Element>) -> Void
private let observer: EventHandler
/// Construct an instance whose `on(event)` calls `eventHandler(event)`
///
/// - parameter eventHandler: Event handler that observes sequences events.
public init(eventHandler: @escaping EventHandler) {
self.observer = eventHandler
}
/// Construct an instance whose `on(event)` calls `observer.on(event)`
///
/// - parameter observer: Observer that receives sequence events.
public init<O : ObserverType>(_ observer: O) where O.E == Element {
self.observer = observer.on
}
/// Send `event` to this observer.
///
/// - parameter event: Event instance.
public func on(_ event: Event<Element>) {
return self.observer(event)
}
/// Erases type of observer and returns canonical observer.
///
/// - returns: type erased observer.
public func asObserver() -> AnyObserver<E> {
return self
}
}
複製代碼
AnyObserver
在初始化時,執行的是self.observer = observer.on
。這是什麼意思呢?
前面已經分析了。AnyObserver
init時傳入AnyObserver
的是AnonymousObservableSink
的實例對象。那麼observer.on
實際上是AnonymousObservableSink.on
,而經過上面的AnonymousObservableSink
源碼能夠知道,on
實際上是一個函數。因此,AnyObserver
中的observer
是一個函數。
既然observer.onNext()
中的observer
不是可觀察序列,而是AnyObserver
對象,那麼第三步發送信號究竟是如何執行的呢?
observer.onNext()
分析點擊onNext()
查看源碼。
ObserverType
的
self.on(.next(element))
函數。可是
ObserverType
是一個協議,只有函數聲明,沒有實現,因此,最後調用的是
AnyObserver.on(.next(element))
函數。
從上面AnyObserver
的源碼能夠看出。on
函數,最終調用的實際上是self.observer(event)
,而在上一節的分析中,咱們已經知道。self.observer
其實保存的是AnonymousObservableSink
中的on
函數。
因此最後會回到AnonymousObservableSink
中調用self.forwardOn(event)
。而AnonymousObservableSink
中並無forwardOn
函數。那麼老規矩,當前類中沒有的方法,那麼就去找它的父類Sink
。
在父類Sink
中找到forwardOn()
函數,函數中執行了self._observer.on(event)
。咱們以前已經分析過了,self._observer
其實就是訂閱信號時建立的匿名觀察者AnonymousObserver
。
因此,最後會調用AnonymousObserver.on
。可是AnonymousObserver
沒有on()
函數。因此查找其父類ObserverBase
class ObserverBase<ElementType> : Disposable, ObserverType {
typealias E = ElementType
private let _isStopped = AtomicInt(0)
func on(_ event: Event<E>) {
switch event {
case .next:
if load(self._isStopped) == 0 {
self.onCore(event)
}
case .error, .completed:
if fetchOr(self._isStopped, 1) == 0 {
self.onCore(event)
}
}
}
func onCore(_ event: Event<E>) {
rxAbstractMethod()
}
func dispose() {
fetchOr(self._isStopped, 1)
}
}
複製代碼
分析發現,最後仍然會回到子類AnonymousObserver
而且調用其onCore()
函數。
而onCore()
函數中執行的是self._eventHandler(event)
。在上一篇中已經分析了self._eventHandler
就是訂閱信號時傳入的閉包。
因此,通過一系列的數據傳遞,observer.onNext()
,最終會回調到訂閱信號時傳入的閉包中。以此完成數據的傳遞和流轉。