RxSwift源碼分析(二)-Observable和AnonymousObservableSink解析

上一篇文章中,主要描述了RxSwift的核心邏輯,也就是一個序列從建立到訂閱而後從發送消息到接收消息的整個流程是怎樣串聯起來的。還不太理解的同窗能夠移步到上一篇文章瞭解一下。api

這篇文章主要來分析一下RxSwift的幾個核心類和協議的實現和設計。bash

Observable類解析

Observable是可觀察序列,是全部可觀察序列的基類,咱們不會直接使用Observable這個類,通常都是使用子類。Observable也能夠理解成抽象類,實際上不是抽象類,由於可觀察序列最重要的一個訂閱序列的方法subscribe必須在其子類中重寫。ide

咱們先來看看Observable的源碼:函數

public class Observable<Element> : ObservableType {
    /// Type of elements in sequence.
    public typealias E = Element
    
    init() {
#if TRACE_RESOURCES
        _ = Resources.incrementTotal()
#endif
    }
    
    public func subscribe<O: ObserverType>(_ observer: O) -> Disposable where O.E == E {
        rxAbstractMethod()
    }
    
    public func asObservable() -> Observable<E> {
        return self
    }
    
    deinit {
#if TRACE_RESOURCES
        _ = Resources.decrementTotal()
#endif
    }
    internal func composeMap<R>(_ transform: @escaping (Element) throws -> R) -> Observable<R> {
        return _map(source: self, transform: transform)
    }
}
複製代碼
  • Observable實現了一個協議ObservableType,並且ObservableType協議繼承自ObservableConvertibleType協議,因此在Observable中實現了兩個協議方法:subscribeasObservable
  • subscribe方法沒有具體實現的邏輯,須要子類去實現。
  • asObservable方法返回的是self,看似用處不大,其實不是這樣的。asObservable是很是有用的,若是一類是Observable的子類,咱們能夠直接返回self,若是不是Observable的子類,咱們能夠經過重寫這個協議方法來返回一個Observable對象,這樣保證了協議的一致性。在使用的時候咱們能夠直接寫相似self.asObservable().subscribe(observer)這樣的代碼,有利於保持代碼的簡潔性,是良好的封裝性的體現。因此我以爲這個設計很是的好,在咱們平常開發中也能夠借鑑。
  • _ = Resources.incrementTotal()_ = Resources.decrementTotal()這兩行代碼實際上是RxSwift內部實現的一個引用計數。這部份內容我會在後面的文章中再詳解。
  • composeMap<R>優化map的一個函數,不太理解用處。
  • Observable子類很是多,這裏不一一去看,主要區別在於對subscribe方法的實現不同。

AnonymousObservableSink類解析

final private class AnonymousObservableSink<O: ObserverType>: Sink<O>, ObserverType {
    typealias E = O.E
    typealias Parent = AnonymousObservable<E>

    // state
    private let _isStopped = AtomicInt(0)

    #if DEBUG
        fileprivate let _synchronizationTracker = SynchronizationTracker()
    #endif

    override init(observer: O, cancel: Cancelable) {
        super.init(observer: observer, cancel: cancel)
    }

    func on(_ event: Event<E>) {
        #if DEBUG
            self._synchronizationTracker.register(synchronizationErrorMessage: .default)
            defer { self._synchronizationTracker.unregister() }
        #endif
        switch event {
        case .next:
            if load(self._isStopped) == 1 {
                return
            }
            self.forwardOn(event)
        case .error, .completed:
            if fetchOr(self._isStopped, 1) == 0 {
                self.forwardOn(event)
                self.dispose()
            }
        }
    }

    func run(_ parent: Parent) -> Disposable {
        return parent._subscribeHandler(AnyObserver(self))
    }
}
複製代碼
  • AnonymousObservableSinkSink的子類,AnonymousObservableSink自己遵照ObseverType協議,與此同時實現了run方法,雖然沒有實現subscribe方法,可是已經足夠了,這樣AnonymousObservableSink從某種程度來講也是Observable
  • AnonymousObservableSink是Observer和Observable的銜接的橋樑,也能夠理解成管道。它存儲了_observer和銷燬者_cancel。經過sink就能夠完成從Observable到Obsever的轉變。
  • run方法中的這行代碼parent._subscribeHandler(AnyObserver(self)),其中parent是一個AnonymousObservable對象。_subscribeHandler這個block調用,代碼會執行到建立序列時的block。而後會調用發送信號的代碼obserber.onNext("發送信號"),而後代碼會通過幾箇中間步驟會來到AnonymousObservableSink類的on方法。

有問題或者建議和意見,歡迎你們評論或者私信。 喜歡的朋友能夠點下關注和喜歡,後續會持續更新文章。post

相關文章
相關標籤/搜索