一個幫助咱們簡化異步編程的Swift框架。git
Observable
:產生事件Observer
:響應事件Operator
:建立變化組合事件Disposable
:管理綁定(訂閱)的生命週期Schedulers
:線程隊列調配Observable
與Observer
之間的關係let _ = Observable<Int>.create { (observer) -> Disposable in
observer.onNext(1)
observer.onNext(2)
observer.onNext(3)
observer.onCompleted()
return Disposables.create()
}.subscribe(onNext: { (num) in
print("receive num \(num)")
}, onError: { (error) in
print("error: \(error.localizedDescription)")
}, onCompleted: {
print("receive complete")
})
複製代碼
如上代碼出現兩個重要的方法create
和subscribe
。顧名思義,create
方法是建立一個Observable
對象,而subscribe
方法是建立一個訂閱事件。咱們先關注下create
方法如何建立一個Observable
對象。github
//Create.swift
extension ObservableType {
public static func create(_ subscribe: @escaping (AnyObserver<Element>) -> Disposable) -> Observable<Element> {
return AnonymousObservable(subscribe)
}
}
複製代碼
首先看它傳入的參數爲一個閉包:AnyObserver<Element> -> Disposable
,而後返回的是一個Observable<Element>
對象。對比咱們的例子,咱們能夠肯定Element
爲咱們指定的Int
,即泛型Element
表示數據源類型。編程
上面返回的是一個AnonymousObservable
對象,並將閉包做爲參數傳入。swift
//Create.swift
final private class AnonymousObservable<Element> : Producer<Element> {
typealias SubscribeHandler = (AnyObserver<Element>) -> Disposable
let _subscribeHandler: SubscribeHandler
init(_ subscribeHandler: @escaping SubscribeHandler) {
self._subscribeHandler = subscribeHandler
}
}
複製代碼
AnonymousObservable
將傳入的閉包賦值給變量_subscribeHandler
。至此建立完了一個Observable
對象。而後執行其subscribe
方法:api
public func subscribe(onNext: ((Element) -> Void)? = nil, onError: ((Swift.Error) -> Void)? = nil, onCompleted:(() -> Void)? = nil, onDisposed:(() -> Void)? = nil) -> Disposable {
let disposable: Disposable
if let disposed = onDisposed {
disposable = Disposables.create(with: disposed)
} else {
disposable = Disposables.create()
}
let observer = AnonymousObserver<Element> { event in
switch event {
case .next(let value):
onNext?(value)
case .error(let error):
if let onError = onError {
onError(error)
}
disposable.dispose()
case .completed:
onCompleted?()
disposable.dispose()
}
}
return Disposables.create(self.asObservable().subscribe(observer), disposable)
}
複製代碼
這一段代碼比較長,咱們先從參數下手,能夠看到參數中包括onNext
(產生下一個事件)、onError
(產生錯誤)、onCompleted
(產生完成)和onDisposed
四個不一樣的閉包。咱們先暫時無論Disposed
部份內容,直接看到下面相關代碼:閉包
let observer = AnonymousObserver<Element> { event in
switch event {
case .next(let value):
onNext?(value)
case .error(let error):
if let onError = onError {
onError(error)
}
disposable.dispose()
case .completed:
onCompleted?()
disposable.dispose()
}
}
複製代碼
上面代碼建立了一個AnonymousObserver
對象,並將參數的閉包事件與自身產生的event
事件關聯在一塊兒。框架
final class AnonymousObserver<Element>: ObserverBase<Element> {
typealias EventHandler = (Event<Element>) -> Void
private let _eventHandler: EventHandler
init(_ eventHandler: @escaping EventHandler) {
self._eventHandler = eventHandler
}
}
複製代碼
AnonymousObserver
對象將上面關聯的一個事件轉換閉包做爲參數存儲到變量_eventHandler
中。其實能夠簡單地理解AnonymousObserver
對象將上面subscribe
方法中的參數閉包存儲起來了。異步
再回到subscribe
方法,看到最後一句代碼:ide
return Disposables.create(self.asObservable().subscribe(observer), disposable)
複製代碼
咱們關注到這裏self.asObservable().subscribe(observer)
,首先調用了asObservable()
方法:異步編程
//ObservableConvertibleType.swift
public protocol ObservableConvertibleType {
associatedtype Element
func asObservable() -> Observable<Element>
}
//Observable.swift
public class Observable<Element> : ObservableType {
public func subscribe<Observer: ObserverType>(_ observer: Observer) -> Disposable where Observer.Element == Element {
rxAbstractMethod()
}
public func asObservable() -> Observable<Element> {
return self
}
}
複製代碼
如上所示,咱們能夠看到asObservable()
方法返回的本身自己Observable
。但咱們看到這裏對應的subscribe
方法爲"抽象方法",上面咱們建立的是AnonymousObservable
對象,在它的父類Producer
中實現了:
class Producer<Element> : Observable<Element> {
override func subscribe<Observer: ObserverType>(_ observer: Observer) -> Disposable where Observer.Element == Element {
if !CurrentThreadScheduler.isScheduleRequired {
let disposer = SinkDisposer()
let sinkAndSubscription = self.run(observer, cancel: disposer)
disposer.setSinkAndSubscription(sink: sinkAndSubscription.sink, subscription: sinkAndSubscription.subscription)
return disposer
} else {
return CurrentThreadScheduler.instance.schedule(()) { _ in
let disposer = SinkDisposer()
let sinkAndSubscription = self.run(observer, cancel: disposer)
disposer.setSinkAndSubscription(sink: sinkAndSubscription.sink, subscription: sinkAndSubscription.subscription)
return disposer
}
}
}
func run<Observer: ObserverType>(_ observer: Observer, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where Observer.Element == Element {
rxAbstractMethod()
}
}
複製代碼
一樣咱們先暫時無論Scheduler
相關內容,這裏調用了self.run()
方法,但它自己並未實現該方法,一樣咱們在AnonymousObservale
中能夠找到:
final private class AnonymousObservable<Element> : Producer<Element> {
override func run<Observer: ObserverType>(_ observer: Observer, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where Element == Observer.Element {
let sink = AnonymousObservableSink(observer: observer, cancel: cancel)
let subscription = sink.run(self)
return (sink: sink, subscription: subscription)
}
}
複製代碼
在run
方法中建立了一個AnonymousObservableSink
方法,而後調用了它的run
方法。
final private class AnonymousObservableSink<Observer: ObserverType>: Sink<Observer>, ObserverType {
typealias Element = Observer.Element
typealias Parent = AnonymousObservable<Element>
private let _isStopped = AtomicInt(0)
override init(observer: Observer, cancel: Cancelable) {
super.init(observer: observer, cancel: cancel)
}
func run(_ parent: Parent) -> Disposable {
return parent._subscribeHandler(AnyObserver(self))
}
}
複製代碼
咱們先看到其run
方法,能夠看到它執行了parent._subscribeHandler(AnyObserver(self))
,這一句很關鍵,這裏的parent
其實指的是咱們在調用create
方法時,建立的AnonymousObservable
對象。所以,這裏的_subscribeHandler
就是咱們create
方法傳遞的參數閉包。咱們能夠看到這裏建立了一個AnyObserver
對象傳入到閉包中。
回到例子中的閉包內容:
{ (observer) -> Disposable in
observer.onNext(1)
observer.onNext(2)
observer.onNext(3)
observer.onCompleted()
return Disposables.create()
}
複製代碼
這裏調用了onNext
方法產生元素1
:
public protocol ObserverType {
associatedtype Element
func on(_ event: Event<Element>)
}
extension ObserverType {
public func onNext(_ element: Element) {
self.on(.next(element))
}
}
複製代碼
這裏調用了on
方法傳遞元素,而咱們上面知道這裏的ObserverType
是AnyObserver
對象:
public struct AnyObserver<Element> : ObserverType {
public typealias EventHandler = (Event<Element>) -> Void
private let observer: EventHandler
public init<Observer: ObserverType>(_ observer: Observer) where Observer.Element == Element {
self.observer = observer.on
}
public func on(_ event: Event<Element>) {
return self.observer(event)
}
}
複製代碼
接着調用了self.observer(event)
將事件傳遞下去,而這裏的observer
是在建立parent._subscribeHandler(AnyObserver(self))
時傳入的。即self.observer = observer.on => AnonymousObservableSink.on
。
final private class AnonymousObservableSink<Observer: ObserverType>: Sink<Observer>, ObserverType {
func on(_ event: Event<Element>) {
switch event {
case .next:
if load(self._isStopped) == 1 {
return
}
self.forwardOn(event)
case .error, .completed:
if fetchOr(self._isStopped, 1) == 0 {
self.forwardOn(event)
self.dispose()
}
}
}
}
複製代碼
所以,事件會傳遞到AnonymousObservableSink
中,並經過fowardOn
方法繼續傳遞事件.
class Sink<Observer: ObserverType>: Disposable {
fileprivate let _observer: Observer
fileprivate let _cancel: Cancelable
init(observer: Observer, cancel: Cancelable) {
self._observer = observer
self._cancel = cancel
}
final func forwardOn(_ event: Event<Observer.Element>) {
if isFlagSet(self._diposed, 1) {
return
}
self._observer.on(event)
}
}
複製代碼
這裏調用了self._observer.on(event)
方法傳遞事件,而這裏的_observer
對象就是咱們在調用subscribe
方法時,傳遞進來的AnonymousObserver
對象。而AnonymousObserver
自己沒有實現on
方法,而是在父類ObserverBase
中實現了:
class ObserverBase<Element> : Disposable, ObserverType {
private let _isStopped = AtomicInt(0)
func on(_ event: Event<Element>) {
switch event {
case .next:
if load(self._isStopped) == 0 {
self.onCore(event)
}
case .error, .completed:
if fetchOr(self._isStopped, 1) == 0 {
self.onCore(event)
}
}
}
}
複製代碼
最後調用了onCore
方法傳遞事件:
final class AnonymousObserver<Element>: ObserverBase<Element> {
typealias EventHandler = (Event<Element>) -> Void
private let _eventHandler: EventHandler
init(_ eventHandler: @escaping EventHandler) {
self._eventHandler = eventHandler
}
override func onCore(_ event: Event<Element>) {
return self._eventHandler(event)
}
}
複製代碼
而這裏的_eventHandler
便是咱們調用subscribe
方法時,建立的閉包(將外部的Event
和內部的Event
關聯)
public func subscribe(onNext: ((Element) -> Void)? = nil, onError: ((Swift.Error) -> Void)? = nil, onCompleted:(() -> Void)? = nil, onDisposed:(() -> Void)? = nil) -> Disposable {
....
let observer = AnonymousObserver<Element> { event in
switch event {
case .next(let value):
onNext?(value)
case .error(let error):
if let onError = onError {
onError(error)
}
disposable.dispose()
case .completed:
onCompleted?()
disposable.dispose()
}
}
return Disposables.create(self.asObservable().subscribe(observer), disposable)
}
複製代碼
所以,咱們會在外部接收到receive num 1
的事件消息。