02.RxSwift - 核心邏輯源碼分析

Observable 的流程
// 1: 建立序列
_ = Observable<String>.create { (obserber) -> Disposable in
    // 3:發送信號
    obserber.onNext("Cooci - 框架班級")
    return Disposables.create()  // 這個銷燬不影響咱們此次的解讀
    // 2: 訂閱序列
    }.subscribe(onNext: { (text) in
        print("訂閱到:\(text)")
    })
複製代碼
  • 建立序列 - Observable<Any>.create
  • 訂閱序列 - ob.subscribe
  • 發送信號 - obserber.onNext() obserber.onCompleted() obserber.onError()
核心邏輯源碼分析

幾個主要類的繼承結構,方便你們更好的理解:html

1.Observable序列的建立
extension ObservableType {
    // MARK: create
    /**
     Creates an observable sequence from a specified subscribe method implementation.

     - seealso: [create operator on reactivex.io](http://reactivex.io/documentation/operators/create.html)

     - parameter subscribe: Implementation of the resulting observable sequence's `subscribe` method. - returns: The observable sequence with the specified implementation for the `subscribe` method. */ public static func create(_ subscribe: @escaping (AnyObserver<E>) -> Disposable) -> Observable<E> { return AnonymousObservable(subscribe) } } 複製代碼
final private class AnonymousObservable<Element>: Producer<Element> {
    typealias SubscribeHandler = (AnyObserver<Element>) -> Disposable

    let _subscribeHandler: SubscribeHandler

    init(_ subscribeHandler: @escaping SubscribeHandler) {
        self._subscribeHandler = subscribeHandler
    }

    override func run<O : ObserverType>(_ observer: O, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where O.E == Element {
        let sink = AnonymousObservableSink(observer: observer, cancel: cancel)
        let subscription = sink.run(self)
        return (sink: sink, subscription: subscription)
    }
}

複製代碼
  • create 方法的時候建立了並返回了一個內部對象ob: AnonymousObservable
  • AnonymousObservable 內部保存了_subscribeHandler閉包
  • AnonymousObservable 繼承了Producer,得到了subscribe功能
2.Observable序列的訂閱
extension ObservableType {
  public func subscribe(onNext: ((E) -> Void)? = nil, onError: ((Swift.Error) -> Void)? = nil, onCompleted: (() -> Void)? = nil, onDisposed: (() -> Void)? = nil)
        -> Disposable {
            let disposable: Disposable
            
            if let disposed = onDisposed {
                disposable = Disposables.create(with: disposed)
            }
            else {
                disposable = Disposables.create()
            }
            
            #if DEBUG
                let synchronizationTracker = SynchronizationTracker()
            #endif
            
            let callStack = Hooks.recordCallStackOnError ? Hooks.customCaptureSubscriptionCallstack() : []
            
            let observer = AnonymousObserver<E> { event in
                
                #if DEBUG
                    synchronizationTracker.register(synchronizationErrorMessage: .default)
                    defer { synchronizationTracker.unregister() }
                #endif
                
                switch event {
                case .next(let value):
                    onNext?(value)
                case .error(let error):
                    if let onError = onError {
                        onError(error)
                    }
                    else {
                        Hooks.defaultErrorHandler(callStack, error)
                    }
                    disposable.dispose()
                case .completed:
                    onCompleted?()
                    disposable.dispose()
                }
            }
            return Disposables.create(
                self.asObservable().subscribe(observer),
                disposable
            )
     }
  }
}
複製代碼
  • ObservableType 拓展實現了subscribe
  • 建立了一個內部觀察者AnonymousObserver對象,它這裏的初始化是閉包參數,保存了外界的 onNext, onError , onCompleted , onDisposed 的處理回調閉包的調用
  • 調用self.asObservable()返回一個原序列(AnonymousObservable)對象:ob
  • 最後 ob調用父類Producersubscribe()方法,並把這個內部觀察observer者帶了過去

進入Producersubscribe()方法:參數(observer)react

override func subscribe<O : ObserverType>(_ observer: O) -> Disposable where O.E == Element {
        if !CurrentThreadScheduler.isScheduleRequired {
            // The returned disposable needs to release all references once it was disposed.
            let disposer = SinkDisposer()
            let sinkAndSubscription = self.run(observer, cancel: disposer)
            disposer.setSinkAndSubscription(sink: sinkAndSubscription.sink, subscription: sinkAndSubscription.subscription)

            return disposer
        }
        else {
            return CurrentThreadScheduler.instance.schedule(()) { _ in
                let disposer = SinkDisposer()
                let sinkAndSubscription = self.run(observer, cancel: disposer)
                disposer.setSinkAndSubscription(sink: sinkAndSubscription.sink, subscription: sinkAndSubscription.subscription)

                return disposer
            }
        }
    }
複製代碼
  • 這裏主要關心self.run的調用,其餘無關訂閱流程的先無論
  • Producer調用run最終會定位到子類AnonymousObservable.run

回到AnonymousObservable.runapi

override func run<O : ObserverType>(_ observer: O, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where O.E == Element {
        let sink = AnonymousObservableSink(observer: observer, cancel: cancel)
        let subscription = sink.run(self)
        return (sink: sink, subscription: subscription)
    }
複製代碼
  • 建立了中介者AnonymousObservableSink對象
  • 中介者對象調用run,並傳入參數:self(AnonymousObservable自己),觀察者observer

進入AnonymousObservableSinkbash

final private class AnonymousObservableSink<O: ObserverType>: Sink<O>, ObserverType {
    typealias E = O.E
    typealias Parent = AnonymousObservable<E>

    // state
    private let _isStopped = AtomicInt(0)

    #if DEBUG
        fileprivate let _synchronizationTracker = SynchronizationTracker()
    #endif

    override init(observer: O, cancel: Cancelable) {
        super.init(observer: observer, cancel: cancel)
    }

    func on(_ event: Event<E>) {
        #if DEBUG
            self._synchronizationTracker.register(synchronizationErrorMessage: .default)
            defer { self._synchronizationTracker.unregister() }
        #endif
        switch event {
        case .next:
            if load(self._isStopped) == 1 {
                return
            }
            self.forwardOn(event)
        case .error, .completed:
            if fetchOr(self._isStopped, 1) == 0 {
                self.forwardOn(event)
                self.dispose()
            }
        }
    }
    func run(_ parent: Parent) -> Disposable {
        return parent._subscribeHandler(AnyObserver(self))
    }
}
複製代碼
  • AnonymousObservable別名Parent
  • 初始化調用了父類Sink的初始化:super.init,並保存觀察者_observer
  • 實現了on, 就是next,error,completed的事件閉包
  • on內調用了forwardOn
  • run方法裏的parent就是AnonymousObservable,這裏就至關於AnonymousObservable._subscribeHandler,參數AnyObserver對象
  • AnyObserver經過傳入self(AnonymousObservableSink自己)初始化發放信號對象

進入AnyObserver內:閉包

public struct AnyObserver<Element> : ObserverType {
    ...
    public init<O : ObserverType>(_ observer: O) where O.E == Element {
        self.observer = observer.on
    }
   ...
    public func on(_ event: Event<Element>) {
        return self.observer(event)
    }
}
複製代碼
  • sinkon保存爲observer(observer這裏就是保存了一個函數,並非以前的觀察者),即:AnonymousObservableSink.on
  • on方法內調用self.observer,即調用了AnonymousObservableSink.on
  • 到了這裏最後執行了AnonymousObservableSink.on內部的forwardOn

這裏須要注意:AnonymousObservableSink.run內部經過AnonymousObservable._subscribeHandler傳入一個消息發送者AnyObserver,這裏AnyObserver即咱們create閉包中的observer框架

3.Observable序列發送信號
//1.建立信號
    let ob = Observable<Any>.create { (observer) -> Disposable in
       //3.發送信號
        observer.onNext("RxSwift核心邏輯")
       //observer.onError("error的" as! Error)
       observer.onCompleted()
       return Disposables.create()
 }
複製代碼
  • 這裏閉包中的observer本質就是AnyObserver
  • 當咱們發送信息onNext, onCompleted, onError的直接就來到ObserverType的拓展
  • 繼承關係請看前邊的繼承圖
public protocol ObserverType {
    associatedtype E
    func on(_ event: Event<E>)
}

extension ObserverType {
    public func onNext(_ element: E) {
        self.on(.next(element))
    }
    public func onCompleted() {
        self.on(.completed)
    }
    public func onError(_ error: Swift.Error) {
        self.on(.error(error))
    }
}

複製代碼
  • 內部調用self.on,本質就是調用(類的繼承關係請看繼承圖)
  1. AnyObserver.on -> AnonymousObservableSink.on -> AnonymousObservableSink.forwardOn -> 經過父類調用
func on(_ event: Event<E>) {
        #if DEBUG
            self._synchronizationTracker.register(synchronizationErrorMessage: .default)
            defer { self._synchronizationTracker.unregister() }
        #endif
        switch event {
        case .next:
            if load(self._isStopped) == 1 {
                return
            }
            self.forwardOn(event)
        case .error, .completed:
            if fetchOr(self._isStopped, 1) == 0 {
                self.forwardOn(event)
                self.dispose()
            }
        }
    }
複製代碼
  1. Sink.forwardOn -> Sink._observer.on-> AnonymousObserver.on-> 經過父類調用
final func forwardOn(_ event: Event<O.E>) {
        #if DEBUG
            self._synchronizationTracker.register(synchronizationErrorMessage: .default)
            defer { self._synchronizationTracker.unregister() }
        #endif
        if isFlagSet(self._disposed, 1) {
            return
        }
        self._observer.on(event)
    }
複製代碼

3.ObserverBase.on->AnonymousObserver.onCore->AnonymousObserver._eventHandleride

func on(_ event: Event<E>) {
        switch event {
        case .next:
            if load(self._isStopped) == 0 {
                self.onCore(event)
            }
        case .error, .completed:
            if fetchOr(self._isStopped, 1) == 0 {
                self.onCore(event)
            }
        }
    }

    override func onCore(_ event: Event<Element>) {
        return self._eventHandler(event)
    }

複製代碼

4.最後就經過AnonymousObserver建立時候的閉包,把onNetxt,onError,onComplete回調出去了函數

總結關係圖:

相關文章
相關標籤/搜索