ReactiveX 是一個庫,用於經過使用可觀察序列來編寫異步的、基於事件的程序。html
它擴展了觀察者模式以支持數據、事件序列,並添加了容許你以聲明方式組合序列的操做符,同時抽象對低層線程、同步、線程安全等。react
本文主要做爲 RxSwift 的入門文章,對 RxSwift 中的一些基礎內容、經常使用實踐,作些介紹。git
本文地址爲:http://www.javashuo.com/article/p-puprblib-v.html,轉載請註明出處。程序員
觀察者模式(這裏指Observable(Element> Sequence
)和正常序列(Sequence
)的等價性對於理解 Rx 是至關重要的。github
每一個 Observable
序列只是一個序列。Observable
與 Swift 的 Sequence
相比,其主要優勢是能夠異步接收元素。這是 RxSwift 的核心。正則表達式
Observable
(ObservableType
) 與 Sequence
等價Observable.subscribe
方法與 Sequence.makeIterator
方法等價Observable.subscribe
方法來接受序列元素,而不是在返回的 iterator 上調用 next()
方法Sequence 是一個簡單、熟悉的概念,很容易可視化。swift
人是具備巨大視覺皮層的生物。當咱們能夠輕鬆地想象一個概念時,理解它就容易多了。api
咱們能夠經過嘗試模擬每一個Rx操做符內的事件狀態機到序列上的高級別操做來解除認知負擔。數組
若是咱們不使用 Rx 而是使用模型異步系統(model asynchronous systems),這可能意味着咱們的代碼會充滿狀態機和瞬態,這些正式咱們須要模擬的,而不是抽象。安全
List
和 Sequence
多是數學家和程序員首先學習的概念之一。
這是一個數字的序列:
--1--2--3--4--5--6--| // 正常結束
另外一個字符序列:
--a--b--a--a--a---d---X // terminates with error
一些序列是有限的,而一些序列是無限的,好比一個按鈕點擊的列:
---tap-tap-------tap--->
這些被叫作 marble diagram。能夠在rxmarbles.com瞭解更多的 marble diagram。
若是咱們將序列愈發指定爲正則表達式,它將以下所示:
next*(error | completed)?
這描述瞭如下內容:
error
或 completed
事件,這個 Sequence 就不能再產生其餘元素在 Rx 中, Sequence 被描述爲一個 push interface(也叫作 callbak)。
enum Event<Element> { case next(Element) // next element of a sequence case error(Swift.Error) // sequence failed with error case completed // sequence terminated successfully } class Observable<Element> { func subscribe(_ observer: Observer<Element>) -> Disposable } protocol ObserverType { func on(_ event: Event<Element>) }
當序列發送 error
或 completed
事件時,將釋放計算序列元素的全部內部資源。
要當即取消序列元素的生成,並釋放資源,能夠在返回的訂閱(subscription)上調用 dispose
。
若是一個序列在有限時間內結束,則不調用 dispose
或者不使用 disposed(by: disposeBag)
不會形成任何永久性資源泄漏。可是,這些資源會一直被使用,直到序列完成(完成產生元素,或者返回一個錯誤)。
若是一個序列沒有自行終止,好比一系列的按鈕點擊,資源會被永久分配,直到 dispose
被手動調用(在 disposeBag 內調用,使用 takeUntil
操做符,或者其餘方式)。
使用 dispose bag 或者 takeUtil
操做符是一個確保資源被清除的魯棒(robust)的方式。即便序列將在有限時間內終止,咱們也推薦在生產環境中使用它們。
被觀察的序列(observed sequence)有另外一種終止的方式。當咱們使用完一個序列而且想要釋放分配用於計算即將到來的元素的全部資源時,咱們能夠在一個訂閱上調用 dispose
。
這時一個使用 interval
操做符的例子:
let scheduler = SerialDispatchQueueScheduler(qos: .default) let subscription = Observable<Int>.interval(0.3, scheduler: scheduler) .subscribe { event in print(event) } Thread.sleep(forTimeInterval: 2.0) subscription.dispose()
上邊的例子打印:
0 1 2 3 4 5
注意,你一般不但願調用 dispose
,這只是一個例子。手動調用 dispose
一般是一種糟糕的代碼味道。dispose 訂閱有更好的方式,好比 DisposeBag
、takeUntil
操做符、或者一些其餘的機制。
那麼,上邊的代碼是否能夠在 dispose
被執行後,打印任何東西?答案是,是狀況而定。
scheduler
是串行調度器(serial scheduler),好比 MainScheduler
,dispose
在相同的串行調度器上調用,那麼答案就是 no。你僅僅有兩個過程在並行執行。
「能夠在以後打印某些內容嗎?」這個問題,在這兩個過程在不一樣調度上執行的狀況下甚至沒有意義。
若是咱們的代碼是這樣的:
let subscription = Observable<Int>.interval(0.3, scheduler: scheduler) .observeOn(MainScheduler.instance) .subscribe { event in print(event) } // .... subscription.dispose() // called from main thread
在 dispose
調用返回後,不會打印任何東西。
一樣,在這個例子中:
let subscription = Observable<Int>.interval(0.3, scheduler: scheduler) .observeOn(serialScheduler) .subscribe { event in print(event) } // ... subscription.dispose() // executing on same `serialScheduler`
在 dispose
調用返回後,也不會打印任何東西。
Dispose bags are used to return ARC like behavior to RX.
當一個 DisposeBag
被釋放時,它會在每個可被 dispose 的對象(disposables)上調用 dispose
。
它沒有 dispose
方法,所以不容許故意顯式地調用 dispose
。若是須要當即清理,咱們能夠建立一個新的 DisposeBag
。
self.disposeBag = DisposeBag()
這將清除舊的引用,並引發資源清理。
若是仍然須要手動清理,可使用 CompositeDisposable
。它具備所需的行爲,但一旦調用了 dispose
方法,它將當即處理任何新添加可被dispose的對象(disposable)。
另外一種在 dealloc 時自動處理(dispose)訂閱的方式是使用 takeUtil
操做符。
sequence .takeUntil(self.rx.deallocated) .subscribe { print($0) }
Observable
guarantees還有一些額外的保證,全部的序列產生者(sequence producers、Observable
s),必須遵照.
它們在哪個線程上產生元素可有可無,但若是它們生成一個元素併發送給觀察者observer.on(.next(nextElement))
,那麼在 observer.on
方法執行完成前,它們不能發送下一個元素。
若是 .next
事件尚未完成,那麼生產者也不能發送終止 .completed
或 .error
。
簡而言之,考慮如下示例:
someObservable .subscribe { (e: Event<Element>) in print("Event processing started") // processing print("Event processing ended") }
它始終打印:
Event processing started Event processing ended Event processing started Event processing ended Event processing started Event processing ended
它永遠沒法打印:
Event processing started Event processing started Event processing ended Event processing ended
Observable
(aka observable sequence)關於觀察者有一個重要的事情須要理解。
建立 observable 時,它不會僅僅由於它已建立而執行任何工做。
確實,Observable
能夠經過多種方式產生元素。其中一些會致使反作用,一些會影響現有的運行過程,例如點擊鼠標事件等。
可是,若是隻調用一個返回 Observable
的方法,那麼沒有序列生成,也沒有反作用。Observable
僅僅定義序列的生成方法以及用於元素生成的參數。序列生成始於 subscribe
方法被調用。
例如,假設你有一個相似原型的方法:
func searchWikipedia(searchTerm: String) -> Observable<Results> {}
let searchForMe = searchWikipedia("me") // no requests are performed, no work is being done, no URL requests were fired let cancel = searchForMe // sequence generation starts now, URL requests are fired .subscribe(onNext: { results in print(results) })
有許多方法能夠生成你本身的 Observable
序列,最簡單方法或許是使用 create
函數。
RxSwift 提供了一個方法能夠建立一個序列,這個序列訂閱時返回一個元素。這個方法是 just
。咱們親自實現一下:
func myJust<E>(_ element: E) -> Observable<E> { return Observable.create { observer in observer.on(.next(element)) observer.on(.completed) return Disposables.create() } } myJust(0) .subscribe(onNext: { n in print(n) })
這會打印:
0
不錯,create
函數是什麼?
它只是一個便利方法,使你可使用 Swift 的閉包輕鬆實現 subscribe
方法。像 subscribe
方法同樣,它帶有一個參數 observer
,並返回 disposable。
以這種方式實現的序列其實是同步的(synchronous)。它將生成元素,並在 subscribe
調用返回 disposable 表示訂閱前終止。所以,它返回的 disposable 並不重要,生成元素的過程不會被中斷。
當生成同步序列,一般用於返回的 disposable 是 NopDisposable
的單例。
如今,咱們來建立一個從數組中返回元素的 observable。
func myFrom<E>(_ sequence: [E]) -> Observable<E> { return Observable.create { observer in for element in sequence { observer.on(.next(element)) } observer.on(.completed) return Disposables.create() } } let stringCounter = myFrom(["first", "second"]) print("Started ----") // first time stringCounter .subscribe(onNext: { n in print(n) }) print("----") // again stringCounter .subscribe(onNext: { n in print(n) }) print("Ended ----")
上邊的例子會打印:
Started ---- first second ---- first second Ended ----
Observable
that perfroms workOK,如今更有趣了。咱們來建立前邊示例中使用的 interval
操做符。
這至關於 dispatch queue schedulers 的實際實現
func myInterval(_ interval: TimeInterval) -> Observable<Int> { return Observable.create { observer in print("Subscribed") let timer = DispatchSource.makeTimerSource(queue: DispatchQueue.global()) timer.scheduleRepeating(deadline: DispatchTime.now() + interval, interval: interval) let cancel = Disposables.create { print("Disposed") timer.cancel() } var next = 0 timer.setEventHandler { if cancel.isDisposed { return } observer.on(.next(next)) next += 1 } timer.resume() return cancel } }
let counter = myInterval(0.1) print("Started ----") let subscription = counter .subscribe(onNext: { n in print(n) }) Thread.sleep(forTimeInterval: 0.5) subscription.dispose() print("Ended ----")
上邊的示例會打印:
Started ---- Subscribed 0 1 2 3 4 Disposed Ended ----
若是這樣寫:
let counter = myInterval(0.1) print("Started ----") let subscription1 = counter .subscribe(onNext: { n in print("First \(n)") }) let subscription2 = counter .subscribe(onNext: { n in print("Second \(n)") }) Thread.sleep(forTimeInterval: 0.5) subscription1.dispose() Thread.sleep(forTimeInterval: 0.5) subscription2.dispose() print("Ended ----")
那麼打印以下:
Started ---- Subscribed Subscribed First 0 Second 0 First 1 Second 1 First 2 Second 2 First 3 Second 3 First 4 Second 4 Disposed Second 5 Second 6 Second 7 Second 8 Second 9 Disposed Ended ----
訂閱後的每一個訂閱者(subscriber)同行會生成本身獨立的元素序列。默認狀況下,操做符是無狀態的。無狀態的操做符遠多於有狀態的操做符。
share
operator可是,若是你但願多個觀察者從一個訂閱共享事件(元素),該怎麼辦?
有兩件事須要定義:
一般是一個這樣的組合,replay(1).refCount
,也就是 share(replay: 1)
。
let counter = myInterval(0.1) .share(replay: 1) print("Started ----") let subscription1 = counter .subscribe(onNext: { n in print("First \(n)") }) let subscription2 = counter .subscribe(onNext: { n in print("Second \(n)") }) Thread.sleep(forTimeInterval: 0.5) subscription1.dispose() Thread.sleep(forTimeInterval: 0.5) subscription2.dispose() print("Ended ----")
這將打印:
Started ---- Subscribed First 0 Second 0 First 1 Second 1 First 2 Second 2 First 3 Second 3 First 4 Second 4 First 5 Second 5 Second 6 Second 7 Second 8 Second 9 Disposed Ended ----
請注意如今只有一個 Subscribed 和 Disposed 事件。
對 URL 可觀察對象(observable)的行爲是等效的。
下面的例子展現瞭如何的 HTTP 請求封裝在 Rx 中,這種封裝很是像 interval
操做符的模式。
extension Reactive where Base: URLSession { public func response(_ request: URLRequest) -> Observable<(Data, HTTPURLResponse)> { return Observable.create { observer in let task = self.dataTaskWithRequest(request) { (data, response, error) in guard let response = response, let data = data else { observer.on(.error(error ?? RxCocoaURLError.Unknown)) return } guard let httpResponse = response as? HTTPURLResponse else { observer.on(.error(RxCocoaURLError.nonHTTPResponse(response: response))) return } observer.on(.next(data, httpResponse)) observer.on(.completed) } task.resume() return Disposables.create { task.cancel() } } } }
RxSwift 實現了許多操做符。
全部操做符的的 marble diagram 能夠在 ReactiveX.io 看到。
在 Playgrouds 裏邊幾乎有全部操做符的演示。
若是你須要一個操做符,而且不知道如何找到它,這裏有一個操做符的決策樹。
有兩種方式能夠建立自定義的操做符。
全部的內部代碼都使用高度優化的運算符版本,所以它們不是最好的教程材料。這就是爲何咱們很是鼓勵使用標準運算符。
幸運的是,有一種簡單的方法來建立操做符。建立新的操做符實際上就是建立可觀察對象,前邊的章節已經描述瞭如何作到這一點。
來看一下爲優化的 map
操做符的實現:
extension ObservableType { func myMap<R>(transform: @escaping (E) -> R) -> Observable<R> { return Observable.create { observer in let subscription = self.subscribe { e in switch e { case .next(let value): let result = transform(value) observer.on(.next(result)) case .error(let error): observer.on(.error(error)) case .completed: observer.on(.completed) } } return subscription } } }
如今可使用自定義的 map 了:
let subscription = myInterval(0.1) .myMap { e in return "This is simply \(e)" } .subscribe(onNext: { n in print(n) })
這將打印:
Subscribed This is simply 0 This is simply 1 This is simply 2 This is simply 3 This is simply 4 This is simply 5 This is simply 6 This is simply 7 This is simply 8 ...
那麼,若是用自定義運算符解決某些狀況太難了呢? 你能夠退出 Rx monad,在命令性世界中執行操做,而後使用 Subjects
再次將結果隧道傳輸到Rx。
下邊的例子是不該該被常常實踐的,是糟糕的代碼味道,可是你能夠這麼作。
let magicBeings: Observable<MagicBeing> = summonFromMiddleEarth() magicBeings .subscribe(onNext: { being in // exit the Rx monad self.doSomeStateMagic(being) }) .disposed(by: disposeBag) // // Mess // let kitten = globalParty( // calculate something in messy world being, UIApplication.delegate.dataSomething.attendees ) kittens.on(.next(kitten)) // send result back to rx // // Another mess // let kittens = BehaviorRelay(value: firstKitten) // again back in Rxmonad kittens.asObservable() .map { kitten in return kitten.purr() } // ....
每一次你這樣寫的時候,其餘人可能在其餘地方寫這樣的代碼:
kittens .subscribe(onNext: { kitten in // do something with kitten }) .disposed(by: disposeBag)
因此,不要嘗試這麼作。
有兩種錯誤機制。
錯誤處理很是直接,若是一個序列以錯誤而終止,則全部依賴的序列都將以錯誤而終止。這是一般的短路邏輯。
你可使用 catch
操做符從可觀察對象的失敗中恢復,有各類各樣的可讓你詳細指定恢復。
還有 retry
操做符,能夠在序列出錯的狀況下重試。
KVO 是一個 Objective-C 的機制。這意味着他沒有考慮類型安全,該項目試圖解決這個問題的一部分。
有兩種內置的方式支持 KVO:
// KVO extension Reactive where Base: NSObject { public func observe<E>(type: E.Type, _ keyPath: String, options: KeyValueObservingOptions, retainSelf: Bool = true) -> Observable<E?> {} } #if !DISABLE_SWIZZLING // KVO extension Reactive where Base: NSObject { public func observeWeakly<E>(type: E.Type, _ keyPath: String, options: KeyValueObservingOptions) -> Observable<E?> {} } #endif
看一下觀察 UIView
的 frame 的例子,注意 UIKit 並不聽從 KVO,可是這樣能夠
view .rx.observe(CGRect.self, "frame") .subscribe(onNext: { frame in ... })
或
view .rx.observeWeakly(CGRect.self, "frame") .subscribe(onNext: { frame in ... })
rx.observe
rx.observe
有更好的性能,由於它只是對 KVO 機制的包裝,可是它使用場景有限。
它可用於觀察從全部權圖表中的self或祖先開始的 path(retainSelf = false)
它可用於觀察從全部權圖中的後代開始的 path(retainSelf = true)
path 必須只包含 strong 屬性,不然你可能會由於在 dealloc 以前沒有取消註冊KVO觀察者而致使系統崩潰。
例如:
self.rx.observe(CGRect.self, "view.frame", retainSelf: false)
rx.observeWeakly
rx.observeWeakly
比 rx.observe
慢一些,由於它必須在若引用的狀況下處理對象釋放。
它不只適用於 rx.observe
適用的全部場景,還適用於:
KVO 是 Objective-C 的機制,因此它重度以來 NSValue
。
RxCocoa 內置支持 KVO 觀察 CGRect
、CGSize
、CGPoint
結構體。
當觀察其餘結構體時,須要手動從 NSValue
中提早值。
這裏有展現如何經過實現 KVORepresentable
協議,爲其餘的結構體擴展 KVO 觀察和 *rx.observe**方法。
在綁定到 UIKit 控件時,Observable 須要在 UI 層中知足某些要求。
Observable
須要在 MainScheduler
發送值,這只是普通的 UIKit/Cocoa 要求。
你的 API 最好在 MainScheduler
上返回結果。若是你試圖從後臺線程綁定一些東西到 UI,在 Debug build 中,RxCocoa 一般會拋出異常來通知你。
能夠經過添加 observeOn(MainScheduler.instance)
來修復該問題。
你沒法將失敗綁定到 UIKit 控件,由於這是爲定義的行爲。
若是你不知道 Observable
是否能夠失敗,你能夠經過使用 catchErrorJustReturn(valueThatIsReturnedWhenErrorHappens)
來確保它不會失敗,可是錯誤發生後,基礎序列仍將完成。
若是所需行爲是基礎序列繼續產生元素,則須要某些版本的 retry
操做符。
你一般但願在 UI 層中共享訂閱,你不但願單獨的 HTTP 調用將相同的數據綁定到多個 UI 元素。
假設你有這樣的代碼:
let searchResults = searchText .throttle(0.3, $.mainScheduler) .distinctUntilChanged .flatMapLatest { query in API.getSearchResults(query) .retry(3) .startWith([]) // clears results on new search term .catchErrorJustReturn([]) } .share(replay: 1) // <- notice the `share` operator
你一般想要的是在計算後共享搜索結果,這就是 share
的含義。
在 UI 層中,在轉換鏈的末尾添加 share
一般是一個很好的經驗法則。由於你想要共享計算結果,而不是把 searcResults 綁定到多個 UI 元素時,觸發多個 HTTP 鏈接。
另外,請參閱 Driver
,它旨在透明地包裝這些 share
調用,確保在主 UI 縣城上觀察元素,而且不會將錯誤綁定到 UI。
原文爲RxSwift/Getting Started,本文在原文基礎上依自身須要略有修改。