RxSwift源碼解析一

RxSwift源碼解析一

1、介紹

一個幫助咱們簡化異步編程的Swift框架。git

2、核心

  • Observable:產生事件
  • Observer:響應事件
  • Operator:建立變化組合事件
  • Disposable:管理綁定(訂閱)的生命週期
  • Schedulers:線程隊列調配

3、ObservableObserver之間的關係

  • 例子
let _ = Observable<Int>.create { (observer) -> Disposable in
    observer.onNext(1)
    observer.onNext(2)
    observer.onNext(3)
    observer.onCompleted()
    return Disposables.create()
}.subscribe(onNext: { (num) in
    print("receive num \(num)")
}, onError: { (error) in
    print("error: \(error.localizedDescription)")
}, onCompleted: {
    print("receive complete")
}) 
複製代碼

如上代碼出現兩個重要的方法createsubscribe。顧名思義,create方法是建立一個Observable對象,而subscribe方法是建立一個訂閱事件。咱們先關注下create方法如何建立一個Observable對象。github

//Create.swift
extension ObservableType {
    
    public static func create(_ subscribe: @escaping (AnyObserver<Element>) -> Disposable) -> Observable<Element> {
        return AnonymousObservable(subscribe)
    }
    
}
複製代碼

首先看它傳入的參數爲一個閉包:AnyObserver<Element> -> Disposable,而後返回的是一個Observable<Element>對象。對比咱們的例子,咱們能夠肯定Element爲咱們指定的Int,即泛型Element表示數據源類型。編程

上面返回的是一個AnonymousObservable對象,並將閉包做爲參數傳入。swift

//Create.swift
final private class AnonymousObservable<Element> : Producer<Element> {
    
    typealias SubscribeHandler = (AnyObserver<Element>) -> Disposable
    
    let _subscribeHandler: SubscribeHandler
    
    init(_ subscribeHandler: @escaping SubscribeHandler) {
        self._subscribeHandler = subscribeHandler
    }
    
}
複製代碼

AnonymousObservable將傳入的閉包賦值給變量_subscribeHandler。至此建立完了一個Observable對象。而後執行其subscribe方法:api

public func subscribe(onNext: ((Element) -> Void)? = nil, onError: ((Swift.Error) -> Void)? = nil, onCompleted:(() -> Void)? = nil, onDisposed:(() -> Void)? = nil) -> Disposable {
    let disposable: Disposable
    
    if let disposed = onDisposed {
        disposable = Disposables.create(with: disposed)
    } else {
        disposable = Disposables.create()
    }
    
    let observer = AnonymousObserver<Element> { event in
        switch event {
        case .next(let value):
            onNext?(value)
        case .error(let error):
            if let onError = onError {
                onError(error)
            }
        
            disposable.dispose()
        case .completed:
            onCompleted?()
            disposable.dispose()
        }
    }
    return Disposables.create(self.asObservable().subscribe(observer), disposable)
}
複製代碼

這一段代碼比較長,咱們先從參數下手,能夠看到參數中包括onNext(產生下一個事件)、onError(產生錯誤)、onCompleted(產生完成)和onDisposed四個不一樣的閉包。咱們先暫時無論Disposed部份內容,直接看到下面相關代碼:閉包

let observer = AnonymousObserver<Element> { event in
    switch event {
    case .next(let value):
        onNext?(value)
    case .error(let error):
        if let onError = onError {
            onError(error)
        }
    
        disposable.dispose()
    case .completed:
        onCompleted?()
        disposable.dispose()
    }
}
複製代碼

上面代碼建立了一個AnonymousObserver對象,並將參數的閉包事件與自身產生的event事件關聯在一塊兒。框架

final class AnonymousObserver<Element>: ObserverBase<Element> {
    
    typealias EventHandler = (Event<Element>) -> Void
    
    private let _eventHandler: EventHandler
    
    init(_ eventHandler: @escaping EventHandler) {
        self._eventHandler = eventHandler
    }
    
}
複製代碼

AnonymousObserver對象將上面關聯的一個事件轉換閉包做爲參數存儲到變量_eventHandler中。其實能夠簡單地理解AnonymousObserver對象將上面subscribe方法中的參數閉包存儲起來了。異步

再回到subscribe方法,看到最後一句代碼:ide

return Disposables.create(self.asObservable().subscribe(observer), disposable)
複製代碼

咱們關注到這裏self.asObservable().subscribe(observer),首先調用了asObservable()方法:異步編程

//ObservableConvertibleType.swift
public protocol ObservableConvertibleType {
    
    associatedtype Element
    
    func asObservable() -> Observable<Element>
}

//Observable.swift
public class Observable<Element> : ObservableType {
    
    public func subscribe<Observer: ObserverType>(_ observer: Observer) -> Disposable where Observer.Element == Element {
        rxAbstractMethod()
    }
    
    public func asObservable() -> Observable<Element> {
        return self
    }
    
}
複製代碼

如上所示,咱們能夠看到asObservable()方法返回的本身自己Observable。但咱們看到這裏對應的subscribe方法爲"抽象方法",上面咱們建立的是AnonymousObservable對象,在它的父類Producer中實現了:

class Producer<Element> : Observable<Element> {
    
    override func subscribe<Observer: ObserverType>(_ observer: Observer) -> Disposable where Observer.Element == Element {
        if !CurrentThreadScheduler.isScheduleRequired {
            let disposer = SinkDisposer()
            let sinkAndSubscription = self.run(observer, cancel: disposer)
            disposer.setSinkAndSubscription(sink: sinkAndSubscription.sink, subscription: sinkAndSubscription.subscription)
            return disposer
        } else {
            return CurrentThreadScheduler.instance.schedule(()) { _ in
                let disposer = SinkDisposer()
                let sinkAndSubscription = self.run(observer, cancel: disposer)
                disposer.setSinkAndSubscription(sink: sinkAndSubscription.sink, subscription: sinkAndSubscription.subscription)
                return disposer
            }
        }
    }
    
    func run<Observer: ObserverType>(_ observer: Observer, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where Observer.Element == Element {
        rxAbstractMethod()
    }
    
}
複製代碼

一樣咱們先暫時無論Scheduler相關內容,這裏調用了self.run()方法,但它自己並未實現該方法,一樣咱們在AnonymousObservale中能夠找到:

final private class AnonymousObservable<Element> : Producer<Element> {
    
    override func run<Observer: ObserverType>(_ observer: Observer, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where Element == Observer.Element {
        let sink = AnonymousObservableSink(observer: observer, cancel: cancel)
        let subscription = sink.run(self)
        return (sink: sink, subscription: subscription)
    }
    
}
複製代碼

run方法中建立了一個AnonymousObservableSink方法,而後調用了它的run方法。

final private class AnonymousObservableSink<Observer: ObserverType>: Sink<Observer>, ObserverType {
    
    typealias Element = Observer.Element
    typealias Parent = AnonymousObservable<Element>
    
    private let _isStopped = AtomicInt(0)
    
    override init(observer: Observer, cancel: Cancelable) {
        super.init(observer: observer, cancel: cancel)
    }
    
    func run(_ parent: Parent) -> Disposable {
        return parent._subscribeHandler(AnyObserver(self))
    }
    
}
複製代碼

咱們先看到其run方法,能夠看到它執行了parent._subscribeHandler(AnyObserver(self)),這一句很關鍵,這裏的parent其實指的是咱們在調用create方法時,建立的AnonymousObservable對象。所以,這裏的_subscribeHandler就是咱們create方法傳遞的參數閉包。咱們能夠看到這裏建立了一個AnyObserver對象傳入到閉包中。

回到例子中的閉包內容:

{ (observer) -> Disposable in
    observer.onNext(1)
    observer.onNext(2)
    observer.onNext(3)
    observer.onCompleted()
    return Disposables.create()
}
複製代碼

這裏調用了onNext方法產生元素1

public protocol ObserverType {
    associatedtype Element
    
    func on(_ event: Event<Element>)
}

extension ObserverType {
    
    public func onNext(_ element: Element) {
        self.on(.next(element))
    }
    
}
複製代碼

這裏調用了on方法傳遞元素,而咱們上面知道這裏的ObserverTypeAnyObserver對象:

public struct AnyObserver<Element> : ObserverType {
    
    public typealias EventHandler = (Event<Element>) -> Void
    
    private let observer: EventHandler
    
    public init<Observer: ObserverType>(_ observer: Observer) where Observer.Element == Element {
        self.observer = observer.on
    }
    
    public func on(_ event: Event<Element>) {
        return self.observer(event)
    }
  
}
複製代碼

接着調用了self.observer(event)將事件傳遞下去,而這裏的observer是在建立parent._subscribeHandler(AnyObserver(self))時傳入的。即self.observer = observer.on => AnonymousObservableSink.on

final private class AnonymousObservableSink<Observer: ObserverType>: Sink<Observer>, ObserverType {
    
    func on(_ event: Event<Element>) {
        switch event {
        case .next:
            if load(self._isStopped) == 1 {
                return
            }
            self.forwardOn(event)
        case .error, .completed:
            if fetchOr(self._isStopped, 1) == 0 {
                self.forwardOn(event)
                self.dispose()
            }
        }
    }
    
}
複製代碼

所以,事件會傳遞到AnonymousObservableSink中,並經過fowardOn方法繼續傳遞事件.

class Sink<Observer: ObserverType>: Disposable {
    
    fileprivate let _observer: Observer
    fileprivate let _cancel: Cancelable
    
    init(observer: Observer, cancel: Cancelable) {
        self._observer = observer
        self._cancel = cancel
    }
    
    final func forwardOn(_ event: Event<Observer.Element>) {
        if isFlagSet(self._diposed, 1) {
            return
        }
        self._observer.on(event)
    }
   
}
複製代碼

這裏調用了self._observer.on(event)方法傳遞事件,而這裏的_observer對象就是咱們在調用subscribe方法時,傳遞進來的AnonymousObserver對象。而AnonymousObserver自己沒有實現on方法,而是在父類ObserverBase中實現了:

class ObserverBase<Element> : Disposable, ObserverType {
    
    private let _isStopped = AtomicInt(0)
    
    func on(_ event: Event<Element>) {
        switch event {
        case .next:
            if load(self._isStopped) == 0 {
                self.onCore(event)
            }
        case .error, .completed:
            if fetchOr(self._isStopped, 1) == 0 {
                self.onCore(event)
            }
        }
    }
    
}
複製代碼

最後調用了onCore方法傳遞事件:

final class AnonymousObserver<Element>: ObserverBase<Element> {
    
    typealias EventHandler = (Event<Element>) -> Void
    
    private let _eventHandler: EventHandler
    
    init(_ eventHandler: @escaping EventHandler) {
        self._eventHandler = eventHandler
    }
    
    override func onCore(_ event: Event<Element>) {
        return self._eventHandler(event)
    }
    
}
複製代碼

而這裏的_eventHandler便是咱們調用subscribe方法時,建立的閉包(將外部的Event和內部的Event關聯)

public func subscribe(onNext: ((Element) -> Void)? = nil, onError: ((Swift.Error) -> Void)? = nil, onCompleted:(() -> Void)? = nil, onDisposed:(() -> Void)? = nil) -> Disposable {
    ....
    let observer = AnonymousObserver<Element> { event in
        switch event {
        case .next(let value):
            onNext?(value)
        case .error(let error):
            if let onError = onError {
                onError(error)
            }
        
            disposable.dispose()
        case .completed:
            onCompleted?()
            disposable.dispose()
        }
    }
    return Disposables.create(self.asObservable().subscribe(observer), disposable)
}
複製代碼

所以,咱們會在外部接收到receive num 1的事件消息。

4、Observable與Observer的執行過程

相關文章
相關標籤/搜索