在上一篇文章中,主要描述了RxSwift的核心邏輯,也就是一個序列從建立到訂閱而後從發送消息到接收消息的整個流程是怎樣串聯起來的。還不太理解的同窗能夠移步到上一篇文章瞭解一下。api
這篇文章主要來分析一下RxSwift的幾個核心類和協議的實現和設計。bash
Observable
是可觀察序列,是全部可觀察序列的基類,咱們不會直接使用Observable
這個類,通常都是使用子類。Observable
也能夠理解成抽象類,實際上不是抽象類,由於可觀察序列最重要的一個訂閱序列的方法subscribe
必須在其子類中重寫。ide
咱們先來看看Observable
的源碼:函數
public class Observable<Element> : ObservableType {
/// Type of elements in sequence.
public typealias E = Element
init() {
#if TRACE_RESOURCES
_ = Resources.incrementTotal()
#endif
}
public func subscribe<O: ObserverType>(_ observer: O) -> Disposable where O.E == E {
rxAbstractMethod()
}
public func asObservable() -> Observable<E> {
return self
}
deinit {
#if TRACE_RESOURCES
_ = Resources.decrementTotal()
#endif
}
internal func composeMap<R>(_ transform: @escaping (Element) throws -> R) -> Observable<R> {
return _map(source: self, transform: transform)
}
}
複製代碼
Observable
實現了一個協議ObservableType
,並且ObservableType
協議繼承自ObservableConvertibleType
協議,因此在Observable
中實現了兩個協議方法:subscribe
和asObservable
。subscribe
方法沒有具體實現的邏輯,須要子類去實現。asObservable
方法返回的是self,看似用處不大,其實不是這樣的。asObservable
是很是有用的,若是一類是Observable
的子類,咱們能夠直接返回self,若是不是Observable
的子類,咱們能夠經過重寫這個協議方法來返回一個Observable
對象,這樣保證了協議的一致性。在使用的時候咱們能夠直接寫相似self.asObservable().subscribe(observer)
這樣的代碼,有利於保持代碼的簡潔性,是良好的封裝性的體現。因此我以爲這個設計很是的好,在咱們平常開發中也能夠借鑑。_ = Resources.incrementTotal()
和_ = Resources.decrementTotal()
這兩行代碼實際上是RxSwift內部實現的一個引用計數。這部份內容我會在後面的文章中再詳解。composeMap<R>
優化map的一個函數,不太理解用處。Observable
子類很是多,這裏不一一去看,主要區別在於對subscribe
方法的實現不同。final private class AnonymousObservableSink<O: ObserverType>: Sink<O>, ObserverType {
typealias E = O.E
typealias Parent = AnonymousObservable<E>
// state
private let _isStopped = AtomicInt(0)
#if DEBUG
fileprivate let _synchronizationTracker = SynchronizationTracker()
#endif
override init(observer: O, cancel: Cancelable) {
super.init(observer: observer, cancel: cancel)
}
func on(_ event: Event<E>) {
#if DEBUG
self._synchronizationTracker.register(synchronizationErrorMessage: .default)
defer { self._synchronizationTracker.unregister() }
#endif
switch event {
case .next:
if load(self._isStopped) == 1 {
return
}
self.forwardOn(event)
case .error, .completed:
if fetchOr(self._isStopped, 1) == 0 {
self.forwardOn(event)
self.dispose()
}
}
}
func run(_ parent: Parent) -> Disposable {
return parent._subscribeHandler(AnyObserver(self))
}
}
複製代碼
AnonymousObservableSink
是Sink
的子類,AnonymousObservableSink
自己遵照ObseverType
協議,與此同時實現了run
方法,雖然沒有實現subscribe
方法,可是已經足夠了,這樣AnonymousObservableSink
從某種程度來講也是Observable
。AnonymousObservableSink
是Observer和Observable的銜接的橋樑,也能夠理解成管道。它存儲了_observer
和銷燬者_cancel
。經過sink就能夠完成從Observable到Obsever的轉變。run
方法中的這行代碼parent._subscribeHandler(AnyObserver(self))
,其中parent是一個AnonymousObservable
對象。_subscribeHandler
這個block調用,代碼會執行到建立序列時的block。而後會調用發送信號的代碼obserber.onNext("發送信號")
,而後代碼會通過幾箇中間步驟會來到AnonymousObservableSink
類的on
方法。有問題或者建議和意見,歡迎你們評論或者私信。 喜歡的朋友能夠點下關注和喜歡,後續會持續更新文章。post