RxSwift 入門

ReactiveX 是一個庫,用於經過使用可觀察序列來編寫異步的、基於事件的程序。html

它擴展了觀察者模式以支持數據、事件序列,並添加了容許你以聲明方式組合序列的操做符,同時抽象對低層線程、同步、線程安全等。react

本文主要做爲 RxSwift 的入門文章,對 RxSwift 中的一些基礎內容、經常使用實踐,作些介紹。git

本文地址爲:http://www.javashuo.com/article/p-puprblib-v.html,轉載請註明出處。程序員

Observables aka Sequences

Basics

觀察者模式(這裏指Observable(Element> Sequence)和正常序列(Sequence)的等價性對於理解 Rx 是至關重要的。github

每一個 Observable 序列只是一個序列。Observable 與 Swift 的 Sequence 相比,其主要優勢是能夠異步接收元素。這是 RxSwift 的核心。正則表達式

  • Observable(ObservableType) 與 Sequence 等價
  • Observable.subscribe 方法與 Sequence.makeIterator方法等價
  • Observer(callback)須要被傳遞到 Observable.subscribe 方法來接受序列元素,而不是在返回的 iterator 上調用 next() 方法

Sequence 是一個簡單、熟悉的概念,很容易可視化。swift

人是具備巨大視覺皮層的生物。當咱們能夠輕鬆地想象一個概念時,理解它就容易多了。api

咱們能夠經過嘗試模擬每一個Rx操做符內的事件狀態機到序列上的高級別操做來解除認知負擔。數組

若是咱們不使用 Rx 而是使用模型異步系統(model asynchronous systems),這可能意味着咱們的代碼會充滿狀態機和瞬態,這些正式咱們須要模擬的,而不是抽象。安全

ListSequence 多是數學家和程序員首先學習的概念之一。

這是一個數字的序列:

--1--2--3--4--5--6--|   // 正常結束

另外一個字符序列:

--a--b--a--a--a---d---X     // terminates with error

一些序列是有限的,而一些序列是無限的,好比一個按鈕點擊的列:

---tap-tap-------tap--->

這些被叫作 marble diagram。能夠在rxmarbles.com瞭解更多的 marble diagram。

若是咱們將序列愈發指定爲正則表達式,它將以下所示:

next*(error | completed)?

這描述瞭如下內容:

  • Sequence 能夠有 0 個 或者多個元素
  • 一旦收到 errorcompleted 事件,這個 Sequence 就不能再產生其餘元素

在 Rx 中, Sequence 被描述爲一個 push interface(也叫作 callbak)。

enum Event<Element>  {
    case next(Element)      // next element of a sequence
    case error(Swift.Error) // sequence failed with error
    case completed          // sequence terminated successfully
}

class Observable<Element> {
    func subscribe(_ observer: Observer<Element>) -> Disposable
}

protocol ObserverType {
    func on(_ event: Event<Element>)
}

當序列發送 errorcompleted 事件時,將釋放計算序列元素的全部內部資源。

要當即取消序列元素的生成,並釋放資源,能夠在返回的訂閱(subscription)上調用 dispose

若是一個序列在有限時間內結束,則不調用 dispose 或者不使用 disposed(by: disposeBag) 不會形成任何永久性資源泄漏。可是,這些資源會一直被使用,直到序列完成(完成產生元素,或者返回一個錯誤)。

若是一個序列沒有自行終止,好比一系列的按鈕點擊,資源會被永久分配,直到 dispose 被手動調用(在 disposeBag 內調用,使用 takeUntil 操做符,或者其餘方式)。

使用 dispose bag 或者 takeUtil 操做符是一個確保資源被清除的魯棒(robust)的方式。即便序列將在有限時間內終止,咱們也推薦在生產環境中使用它們。

Disposing

被觀察的序列(observed sequence)有另外一種終止的方式。當咱們使用完一個序列而且想要釋放分配用於計算即將到來的元素的全部資源時,咱們能夠在一個訂閱上調用 dispose

這時一個使用 interval 操做符的例子:

let scheduler = SerialDispatchQueueScheduler(qos: .default)
let subscription = Observable<Int>.interval(0.3, scheduler: scheduler)
    .subscribe { event in
        print(event)
    }

Thread.sleep(forTimeInterval: 2.0)

subscription.dispose()

上邊的例子打印:

0
1
2
3
4
5

注意,你一般不但願調用 dispose,這只是一個例子。手動調用 dispose 一般是一種糟糕的代碼味道。dispose 訂閱有更好的方式,好比 DisposeBagtakeUntil操做符、或者一些其餘的機制。

那麼,上邊的代碼是否能夠在 dispose 被執行後,打印任何東西?答案是,是狀況而定。

  • 若是上邊的 scheduler 是串行調度器(serial scheduler),好比 MainSchedulerdispose 在相同的串行調度器上調用,那麼答案就是 no。
  • 不然,答案是 yes。

你僅僅有兩個過程在並行執行。

  • 一個在產生元素
  • 另外一個 dispose 訂閱

「能夠在以後打印某些內容嗎?」這個問題,在這兩個過程在不一樣調度上執行的狀況下甚至沒有意義。

若是咱們的代碼是這樣的:

let subscription = Observable<Int>.interval(0.3, scheduler: scheduler)
            .observeOn(MainScheduler.instance)
            .subscribe { event in
                print(event)
            }

// ....

subscription.dispose() // called from main thread

dispose 調用返回後,不會打印任何東西。

一樣,在這個例子中:

let subscription = Observable<Int>.interval(0.3, scheduler: scheduler)
            .observeOn(serialScheduler)
            .subscribe { event in
                print(event)
            }

// ...

subscription.dispose() // executing on same `serialScheduler`

dispose 調用返回後,也不會打印任何東西。

Dispose Bag

Dispose bags are used to return ARC like behavior to RX.

當一個 DisposeBag 被釋放時,它會在每個可被 dispose 的對象(disposables)上調用 dispose

它沒有 dispose 方法,所以不容許故意顯式地調用 dispose。若是須要當即清理,咱們能夠建立一個新的 DisposeBag

self.disposeBag = DisposeBag()

這將清除舊的引用,並引發資源清理。

若是仍然須要手動清理,可使用 CompositeDisposable。它具備所需的行爲,但一旦調用了 dispose 方法,它將當即處理任何新添加可被dispose的對象(disposable)。

Take until

另外一種在 dealloc 時自動處理(dispose)訂閱的方式是使用 takeUtil 操做符。

sequence
    .takeUntil(self.rx.deallocated)
    .subscribe {
        print($0)
    }

Implicit Observable guarantees

還有一些額外的保證,全部的序列產生者(sequence producers、Observable s),必須遵照.

它們在哪個線程上產生元素可有可無,但若是它們生成一個元素併發送給觀察者observer.on(.next(nextElement)),那麼在 observer.on 方法執行完成前,它們不能發送下一個元素。

若是 .next 事件尚未完成,那麼生產者也不能發送終止 .completed.error

簡而言之,考慮如下示例:

someObservable
  .subscribe { (e: Event<Element>) in
      print("Event processing started")
      // processing
      print("Event processing ended")
  }

它始終打印:

Event processing started
Event processing ended
Event processing started
Event processing ended
Event processing started
Event processing ended

它永遠沒法打印:

Event processing started
Event processing started
Event processing ended
Event processing ended

Creating your own Observable (aka observable sequence)

關於觀察者有一個重要的事情須要理解。

建立 observable 時,它不會僅僅由於它已建立而執行任何工做。

確實,Observable 能夠經過多種方式產生元素。其中一些會致使反作用,一些會影響現有的運行過程,例如點擊鼠標事件等。

可是,若是隻調用一個返回 Observable 的方法,那麼沒有序列生成,也沒有反作用。Observable僅僅定義序列的生成方法以及用於元素生成的參數。序列生成始於 subscribe 方法被調用。

例如,假設你有一個相似原型的方法:

func searchWikipedia(searchTerm: String) -> Observable<Results> {}
let searchForMe = searchWikipedia("me")

// no requests are performed, no work is being done, no URL requests were fired

let cancel = searchForMe
  // sequence generation starts now, URL requests are fired
  .subscribe(onNext: { results in
      print(results)
  })

有許多方法能夠生成你本身的 Observable 序列,最簡單方法或許是使用 create 函數。

RxSwift 提供了一個方法能夠建立一個序列,這個序列訂閱時返回一個元素。這個方法是 just。咱們親自實現一下:

func myJust<E>(_ element: E) -> Observable<E> {
    return Observable.create { observer in
        observer.on(.next(element))
        observer.on(.completed)
        return Disposables.create()
    }
}

myJust(0)
    .subscribe(onNext: { n in
      print(n)
    })

這會打印:

0

不錯,create 函數是什麼?

它只是一個便利方法,使你可使用 Swift 的閉包輕鬆實現 subscribe 方法。像 subscribe 方法同樣,它帶有一個參數 observer,並返回 disposable。

以這種方式實現的序列其實是同步的(synchronous)。它將生成元素,並在 subscribe 調用返回 disposable 表示訂閱前終止。所以,它返回的 disposable 並不重要,生成元素的過程不會被中斷。

當生成同步序列,一般用於返回的 disposable 是 NopDisposable 的單例。

如今,咱們來建立一個從數組中返回元素的 observable。

func myFrom<E>(_ sequence: [E]) -> Observable<E> {
    return Observable.create { observer in
        for element in sequence {
            observer.on(.next(element))
        }

        observer.on(.completed)
        return Disposables.create()
    }
}

let stringCounter = myFrom(["first", "second"])

print("Started ----")

// first time
stringCounter
    .subscribe(onNext: { n in
        print(n)
    })

print("----")

// again
stringCounter
    .subscribe(onNext: { n in
        print(n)
    })

print("Ended ----")

上邊的例子會打印:

Started ----
first
second
----
first
second
Ended ----

Creating an Observable that perfroms work

OK,如今更有趣了。咱們來建立前邊示例中使用的 interval 操做符。

這至關於 dispatch queue schedulers 的實際實現

func myInterval(_ interval: TimeInterval) -> Observable<Int> {
    return Observable.create { observer in
        print("Subscribed")
        let timer = DispatchSource.makeTimerSource(queue: DispatchQueue.global())
        timer.scheduleRepeating(deadline: DispatchTime.now() + interval, interval: interval)

        let cancel = Disposables.create {
            print("Disposed")
            timer.cancel()
        }

        var next = 0
        timer.setEventHandler {
            if cancel.isDisposed {
                return
            }
            observer.on(.next(next))
            next += 1
        }
        timer.resume()

        return cancel
    }
}
let counter = myInterval(0.1)

print("Started ----")

let subscription = counter
    .subscribe(onNext: { n in
        print(n)
    })


Thread.sleep(forTimeInterval: 0.5)

subscription.dispose()

print("Ended ----")

上邊的示例會打印:

Started ----
Subscribed
0
1
2
3
4
Disposed
Ended ----

若是這樣寫:

let counter = myInterval(0.1)

print("Started ----")

let subscription1 = counter
    .subscribe(onNext: { n in
        print("First \(n)")
    })
let subscription2 = counter
    .subscribe(onNext: { n in
        print("Second \(n)")
    })

Thread.sleep(forTimeInterval: 0.5)

subscription1.dispose()

Thread.sleep(forTimeInterval: 0.5)

subscription2.dispose()

print("Ended ----")

那麼打印以下:

Started ----
Subscribed
Subscribed
First 0
Second 0
First 1
Second 1
First 2
Second 2
First 3
Second 3
First 4
Second 4
Disposed
Second 5
Second 6
Second 7
Second 8
Second 9
Disposed
Ended ----

訂閱後的每一個訂閱者(subscriber)同行會生成本身獨立的元素序列。默認狀況下,操做符是無狀態的。無狀態的操做符遠多於有狀態的操做符。

Sharing subscription and share operator

可是,若是你但願多個觀察者從一個訂閱共享事件(元素),該怎麼辦?

有兩件事須要定義:

  • 如何處理在新訂閱者有興趣觀察它們以前收到的過去的元素(replay lastest only, replay all, replay last n)
  • 如何決定什麼時候出發共享的訂閱(refCount, manual or some other algorithm)

一般是一個這樣的組合,replay(1).refCount,也就是 share(replay: 1)

let counter = myInterval(0.1)
    .share(replay: 1)

print("Started ----")

let subscription1 = counter
    .subscribe(onNext: { n in
        print("First \(n)")
    })
let subscription2 = counter
    .subscribe(onNext: { n in
        print("Second \(n)")
    })

Thread.sleep(forTimeInterval: 0.5)

subscription1.dispose()

Thread.sleep(forTimeInterval: 0.5)

subscription2.dispose()

print("Ended ----")

這將打印:

Started ----
Subscribed
First 0
Second 0
First 1
Second 1
First 2
Second 2
First 3
Second 3
First 4
Second 4
First 5
Second 5
Second 6
Second 7
Second 8
Second 9
Disposed
Ended ----

請注意如今只有一個 Subscribed 和 Disposed 事件。

對 URL 可觀察對象(observable)的行爲是等效的。

下面的例子展現瞭如何的 HTTP 請求封裝在 Rx 中,這種封裝很是像 interval 操做符的模式。

extension Reactive where Base: URLSession {
    public func response(_ request: URLRequest) -> Observable<(Data, HTTPURLResponse)> {
        return Observable.create { observer in
            let task = self.dataTaskWithRequest(request) { (data, response, error) in
                guard let response = response, let data = data else {
                    observer.on(.error(error ?? RxCocoaURLError.Unknown))
                    return
                }

                guard let httpResponse = response as? HTTPURLResponse else {
                    observer.on(.error(RxCocoaURLError.nonHTTPResponse(response: response)))
                    return
                }

                observer.on(.next(data, httpResponse))
                observer.on(.completed)
            }

            task.resume()

            return Disposables.create {
                task.cancel()
            }
        }
    }
}

Operator

RxSwift 實現了許多操做符。

全部操做符的的 marble diagram 能夠在 ReactiveX.io 看到。

Playgrouds 裏邊幾乎有全部操做符的演示。

若是你須要一個操做符,而且不知道如何找到它,這裏有一個操做符的決策樹

Custom operators

有兩種方式能夠建立自定義的操做符。

Easy way

全部的內部代碼都使用高度優化的運算符版本,所以它們不是最好的教程材料。這就是爲何咱們很是鼓勵使用標準運算符。

幸運的是,有一種簡單的方法來建立操做符。建立新的操做符實際上就是建立可觀察對象,前邊的章節已經描述瞭如何作到這一點。

來看一下爲優化的 map 操做符的實現:

extension ObservableType {
    func myMap<R>(transform: @escaping (E) -> R) -> Observable<R> {
        return Observable.create { observer in
            let subscription = self.subscribe { e in
                    switch e {
                    case .next(let value):
                        let result = transform(value)
                        observer.on(.next(result))
                    case .error(let error):
                        observer.on(.error(error))
                    case .completed:
                        observer.on(.completed)
                    }
                }

            return subscription
        }
    }
}


如今可使用自定義的 map 了:

let subscription = myInterval(0.1)
    .myMap { e in
        return "This is simply \(e)"
    }
    .subscribe(onNext: { n in
        print(n)
    })

這將打印:

Subscribed
This is simply 0
This is simply 1
This is simply 2
This is simply 3
This is simply 4
This is simply 5
This is simply 6
This is simply 7
This is simply 8
...

Life happens

那麼,若是用自定義運算符解決某些狀況太難了呢? 你能夠退出 Rx monad,在命令性世界中執行操做,而後使用 Subjects 再次將結果隧道傳輸到Rx。

下邊的例子是不該該被常常實踐的,是糟糕的代碼味道,可是你能夠這麼作。

let magicBeings: Observable<MagicBeing> = summonFromMiddleEarth()

magicBeings
  .subscribe(onNext: { being in     // exit the Rx monad
      self.doSomeStateMagic(being)
  })
  .disposed(by: disposeBag)

//
//  Mess
//
let kitten = globalParty(   // calculate something in messy world
  being,
  UIApplication.delegate.dataSomething.attendees
)
kittens.on(.next(kitten))   // send result back to rx

//
// Another mess
//
let kittens = BehaviorRelay(value: firstKitten) // again back in Rxmonad
kittens.asObservable()
  .map { kitten in
    return kitten.purr()
  }
  // ....

每一次你這樣寫的時候,其餘人可能在其餘地方寫這樣的代碼:

kittens
  .subscribe(onNext: { kitten in
    // do something with kitten
  })
  .disposed(by: disposeBag)

因此,不要嘗試這麼作。

Error handling

有兩種錯誤機制。

Asynchrouous error handling mechanism in observables

錯誤處理很是直接,若是一個序列以錯誤而終止,則全部依賴的序列都將以錯誤而終止。這是一般的短路邏輯。

你可使用 catch 操做符從可觀察對象的失敗中恢復,有各類各樣的可讓你詳細指定恢復。

還有 retry 操做符,能夠在序列出錯的狀況下重試。

KVO

KVO 是一個 Objective-C 的機制。這意味着他沒有考慮類型安全,該項目試圖解決這個問題的一部分。

有兩種內置的方式支持 KVO:

// KVO
extension Reactive where Base: NSObject {
    public func observe<E>(type: E.Type, _ keyPath: String, options: KeyValueObservingOptions, retainSelf: Bool = true) -> Observable<E?> {}
}

#if !DISABLE_SWIZZLING
// KVO
extension Reactive where Base: NSObject {
    public func observeWeakly<E>(type: E.Type, _ keyPath: String, options: KeyValueObservingOptions) -> Observable<E?> {}
}
#endif

看一下觀察 UIView 的 frame 的例子,注意 UIKit 並不聽從 KVO,可是這樣能夠

view
  .rx.observe(CGRect.self, "frame")
  .subscribe(onNext: { frame in
    ...
  })

view
  .rx.observeWeakly(CGRect.self, "frame")
  .subscribe(onNext: { frame in
    ...
  })

rx.observe

rx.observe 有更好的性能,由於它只是對 KVO 機制的包裝,可是它使用場景有限。

  • 它可用於觀察從全部權圖表中的self或祖先開始的 path(retainSelf = false)

  • 它可用於觀察從全部權圖中的後代開始的 path(retainSelf = true)

  • path 必須只包含 strong 屬性,不然你可能會由於在 dealloc 以前沒有取消註冊KVO觀察者而致使系統崩潰。

例如:

self.rx.observe(CGRect.self, "view.frame", retainSelf: false)

rx.observeWeakly

rx.observeWeaklyrx.observe 慢一些,由於它必須在若引用的狀況下處理對象釋放。

它不只適用於 rx.observe 適用的全部場景,還適用於:

  • 由於它不會持有被觀察的對象,因此它能夠用來觀察全部權關係位置的任意對象
  • 它能夠用來觀察 weak 屬性

Observing structs

KVO 是 Objective-C 的機制,因此它重度以來 NSValue

RxCocoa 內置支持 KVO 觀察 CGRectCGSizeCGPoint 結構體。

當觀察其餘結構體時,須要手動從 NSValue 中提早值。

這裏有展現如何經過實現 KVORepresentable 協議,爲其餘的結構體擴展 KVO 觀察和 *rx.observe**方法。

UI layer tips

在綁定到 UIKit 控件時,Observable 須要在 UI 層中知足某些要求。

Threading

Observable 須要在 MainScheduler 發送值,這只是普通的 UIKit/Cocoa 要求。

你的 API 最好在 MainScheduler 上返回結果。若是你試圖從後臺線程綁定一些東西到 UI,在 Debug build 中,RxCocoa 一般會拋出異常來通知你。

能夠經過添加 observeOn(MainScheduler.instance) 來修復該問題。

Error

你沒法將失敗綁定到 UIKit 控件,由於這是爲定義的行爲。

若是你不知道 Observable 是否能夠失敗,你能夠經過使用 catchErrorJustReturn(valueThatIsReturnedWhenErrorHappens) 來確保它不會失敗,可是錯誤發生後,基礎序列仍將完成。

若是所需行爲是基礎序列繼續產生元素,則須要某些版本的 retry 操做符。

Sharing subscription

你一般但願在 UI 層中共享訂閱,你不但願單獨的 HTTP 調用將相同的數據綁定到多個 UI 元素。

假設你有這樣的代碼:

let searchResults = searchText
    .throttle(0.3, $.mainScheduler)
    .distinctUntilChanged
    .flatMapLatest { query in
        API.getSearchResults(query)
            .retry(3)
            .startWith([]) // clears results on new search term
            .catchErrorJustReturn([])
    }
    .share(replay: 1)    // <- notice the `share` operator

你一般想要的是在計算後共享搜索結果,這就是 share 的含義。

在 UI 層中,在轉換鏈的末尾添加 share 一般是一個很好的經驗法則。由於你想要共享計算結果,而不是把 searcResults 綁定到多個 UI 元素時,觸發多個 HTTP 鏈接。

另外,請參閱 Driver,它旨在透明地包裝這些 share 調用,確保在主 UI 縣城上觀察元素,而且不會將錯誤綁定到 UI。


原文爲RxSwift/Getting Started,本文在原文基礎上依自身須要略有修改。

相關文章
相關標籤/搜索