最近在學習RxSwift相關的內容,在這裏記錄一些基本的知識點,以便從此查閱。編程
在RxSwift中,最關鍵的一個概念是可觀察序列(Observable Sequence),它至關於Swift中的序列(Sequence),可觀察序列中的每一個元素都是一個事件,咱們知道Swift的序列中能夠包含任意多個元素,相似的,可觀察序列會不斷產生新的事件直到發生錯誤或正常結束爲止。訂閱者(Observer)經過訂閱(subscribe)一個可觀察隊列來接收序列所產生的新事件,只有在有觀察者的狀況下序列才能夠發送事件。swift
例如,使用of
操做建立一個可觀察序列:api
let seq = Observable.of(1, 2, 3)
複製代碼
of
是一種用來建立Observable的簡便操做,在上面的代碼中建立了一個類型爲Observable<Int>
的Observable,裏面包含了三個元素:1,2,3。多線程
來看看Observable中都提供了哪些操做,可觀察序列是一個實現了ObservableType
協議的類型,ObservableType
協議的定義很是簡單:閉包
protocol ObservableType : ObservableConvertibleType {
associatedtype E
func subscribe<O: ObserverType>(_ observer: O) -> Disposable where O.E == E
}
複製代碼
其中E
是一個關聯類型,表示序列中元素的類型,除此以外協議只定義了一個方法:subscribe
,用於向可觀察序列添加一個觀察者(ObserverType
類型):併發
// 接收閉包的subscribe函數是經過協議擴展提供的簡便方法
seq.subscribe { (event) in
print(event)
}
複製代碼
subscribe
至關於Swift序列中的遍歷操做(makeIterator),如上,向seq序列添加一個觀察者,在序列中有新的事件時調用該閉包,上面的代碼會輸出1,2,3。app
觀察者是實現了ObserverType
協議的對象,ObserverType
協議一樣十分簡單:ide
public protocol ObserverType {
associatedtype E
func on(_ event: Event<E>)
}
複製代碼
E
爲觀察者所觀察序列中的元素類型,當序列中有新的事件產生時,會調用on
方法來接收新的事件。其中事件的類型Event
是一個枚舉,其中包含3個類型:函數
enum Event<Element> {
case next(Element)
case error(Swift.Error)
case completed
}
複製代碼
.next
:表示序列中產生了下一個事件,關聯值Element保存了該事件的值。.error
:序列產生了一個錯誤,關聯值Error保存了錯誤類型,在這以後序列會直接結束(再也不產生新的next事件)。.completed
:序列正常結束。除了產生錯誤和天然結束之外,還能夠手動結束觀察,在使用subscribe
訂閱一個可觀察序列時,會返回一個Disposable
類型的對象。這裏的Disposable
是一個協議,只定義了一個方法:學習
protocol Disposable {
func dispose()
}
複製代碼
dispose
方法用來結束這次訂閱並釋放可觀察序列中的相關資源,一般來講你並不須要直接調用該方法,而是經過調用其擴展方法addDisposableTo
將Disposable
添加到一個DisposeBag
對象中。DisposeBag對象會自動管理全部添加到其中的Disposable對象,在DisposeBag對象銷燬的時候會自動調用其中全部Disposable的dispose方法釋放資源。
也可使用takeUntil
來自動結束訂閱:
seq.takeUntil(otherSeq)
.subscribe({ (event) in
print(event)
})
複製代碼
在otherSeq序列發出任意類型的事件以後,自動結束本次訂閱。
經過Observable
類型提供的方法create
能夠建立一個自定義的可觀察序列:
let seq = Observable<Int>.create { (observer) -> Disposable in
observer.on(.next(1))
observer.on(.completed)
return Disposables.create {
// do some cleanup
}
}
複製代碼
create
方法使用一個閉包來建立自定義的序列,閉包接收一個ObserverType
的參數observer,並經過observer來發送相應的事件。如上面的代碼,建立了一個Observable<Int>
類型的可觀察序列,訂閱該序列的觀察者會收到事件1和一個完成事件。最後create
方法返回一個本身建立的Disposable
對象,能夠在這裏進行一些相關的資源回收操做。
除了create
方法以外,RxSwift中提供了不少中簡便的方法用於建立序列,經常使用的有:
just:建立一個只包含一個值的可觀察序列:
let justSeq = Observable.just(1)
justSeq.subscribe { (event) in
print(event)
}
---- example output ----
next(1)
completed
複製代碼
of:of
和just
有點相似,不一樣的是of
能夠將一系列元素建立成事件隊列,該Observable
依次發送相應事件和結束事件:
let ofSeq = Observable.of(1, 2, 3)
ofSeq.subscribe { (event) in
print(event)
}
---- example output ----
next(1)
next(2)
next(3)
completed
複製代碼
empty:這種類型的Observable只發送結束(Completed)事件
let emptySequence = Observable<String>.empty()
複製代碼
error:該隊列只發送一個error
事件,傳遞一個自定義的錯誤類型。
let errorSeq = Observable<TestError>.error(TestError.Error1)
複製代碼
一般在咱們在訂閱一個可觀察序列的時候,每一次的訂閱行爲都是獨立的,也就是說:
let seq = Observable.of(1, 2)
// 1
seq.subscribe { (event) in
print("sub 1: \(event)")
}
// 2
seq.subscribe { (event) in
print("sub 2: \(event)")
}
---- example output ----
sub 1: next(1)
sub 1: next(2)
sub 1: completed
sub 2: next(1)
sub 2: next(2)
sub 2: completed
複製代碼
咱們連續訂閱同一序列兩次,每次都會接收到相同的事件,第二次訂閱時並無由於第一次訂閱的行爲致使元素"耗盡"。有些時候咱們但願讓全部的觀察者都共享同一份事件,這個時候可使用share
share:share
是ObservableType
協議的一個擴展方法,它返回一個可觀察序列,該序列的全部觀察者都會共享同一份訂閱,上面的代碼加上share以後:
let seq = Observable.of(1, 2).share()
// 1
seq.subscribe { (event) in
print("sub 1: \(event)")
}
// 2
seq.subscribe { (event) in
print("sub 2: \(event)")
}
---- example output ----
sub 1: next(1)
sub 1: next(2)
sub 1: completed
sub 2: completed
複製代碼
能夠看到,在第一次訂閱時序列已經將全部的事件發送,後面再進行第二次訂閱的時候只收到了一個完成事件。
shareReplay:shareReplay
的用法與share
相似,它的方法簽名以下:
func shareReplay(_ bufferSize: Int) -> Observable<Element>
複製代碼
不一樣的地方在於,shareReplay
接收一個整型參數bufferSize,指定緩衝區大小,訂閱該序列的觀察者會當即收到最近bufferSize條事件。
在Swift的序列Sequence
中,可使用map、flatMap和reduce等常見的函數式方法對其中的元素進行變換,RxSwift中的可觀察序列一樣也支持這些方法。
map:這是map
方法的簽名:
func map<Result>(_ transform: @escaping (E) throws -> Result) -> Observable<Result>
複製代碼
在一個自定義的閉包中對序列的每個元素進行變換,返回一個包含轉換後結果的可觀察序列,與Swift中Sequence
的map相似。
let mappedSeq: Observable<String> = seq.map { (element) -> String in
return "value: \(element)"
}
複製代碼
flatMap:先來看看flatMap
的簽名:
func flatMap<O: ObservableConvertibleType>(_ selector: @escaping (E) throws -> O)
-> Observable<O.E>
複製代碼
關於flatMap的做用一樣能夠類比Sequence
,Sequence
中的flatMap
閉包遍歷每個元素進行處理後返回一個新的序列,最後會將這些序列"展平",獲得一個包含全部序列元素的新序列:
let array = [1, 2]
let res = array.flatMap { (n) -> [String] in
return ["\(n)a", "\(n)b"]
}
// res: ["1a", "1b", "2a", "2b"]
複製代碼
RxSwift中的flatMap
用法與之相似,flatMap
中的閉包會遍歷可觀察序列中的全部元素,並返回一個新的可觀察序列,最後flatMap
會返回一個包含全部元素的可觀察序列:
let seq = Observable.of(1, 2)
.flatMap { (n) -> Observable<String> in
return Observable.of("\(n)a", "\(n)b") // (1)
}
.subscribe { (event) in
print(event)
}
// 獲得的seq類型爲Observable<String>
---- example output ----
next(1a)
next(1b)
next(2a)
next(2b)
completed
複製代碼
在閉包中建立了若干個可觀察序列(1),這些序列中發送的next
事件都會被傳遞到seq序列中,其中任何一個序列發生錯誤(發送了error
事件)時,seq序列都會直接結束,再也不繼續接收事件;可是隻有全部序列都完成(發送了completed
事件)後,seq序列纔會正常結束。
flatMapLatest:做用與flatMap
相似,可是對於閉包中生成的可觀察序列,它並不會保留全部的序列的訂閱,在遍歷結束後,只保留最後建立的序列的訂閱,以前建立的Observables都會取消訂閱(相應序列的dispose方法也會被調用):
// 與上一個例子相同的代碼,僅將flatMap改爲flatMapLatest
let seq = Observable.of(1, 2)
.flatMapLatest { (n) -> Observable<String> in
return Observable.of("\(n)a", "\(n)b") // (1)
}
.subscribe { (event) in
print(event)
}
---- example output ----
next(1a)
next(2a)
next(2b)
completed
複製代碼
由於訂閱關係的改變,如今只有當最後建立的那個Observable正常結束時,seq纔會收到completed
事件。
在這種狀況下,flatMapLatest
會獲得與flatMap
相同的輸出:
let seq = Observable.of(1, 2)
.flatMapLatest { (n) -> Observable<String> in
return Observable<String>.create({ (observer) -> Disposable in
observer.onNext("\(n)a")
observer.onNext("\(n)b")
return Disposables.create { }
})
}
.subscribe { (event) in
print(event)
}
複製代碼
這是由於在上面的這個例子中所建立的Observable是同步建立元素的,沒法被打斷。
相似的方法還有flatMapFirst
,使用方法能夠類比flatMapLatest
。
reduce和scan:reduce
的做用與Sequence中定義的同樣,它接收一個初始值和一個閉包,在Observable中的每一個值上調用該閉包,並將每一步的結果做爲下一次調用的輸入:
Observable.of(1, 2, 3).reduce(0) { (first, num) -> Float in
return Float(first + num)
}
.subscribe { (event) in
print(event)
}
// 輸出:next(6.0), completed
複製代碼
在上面的代碼中,提供了一個初始值0,在閉包中計算和,並將結果序列的元素類型改爲Float
,序列的觀察者最後接收到全部元素的和。
scan
的做用相似於reduce
,它跟reduce
之間惟一的區別在於,scan
會發送每一次調用閉包後的結果:
Observable.of(1, 2, 3).scan(0) { (first, num) -> Float in
return Float(first + num)
}
.subscribe { (event) in
print(event)
}
// 輸出:next(1.0), next(3.0), next(6.0), completed
複製代碼
startWith:在序列的開頭加入一個指定的元素
Observable.of(2, 3).startWith(1).subscribe { (event) in
print(event)
}
---- example output ----
next(1)
next(2)
next(3)
completed
複製代碼
訂閱該序列以後,會當即收到startWith
指定的事件,即便此時序列並無開始發送事件。
merge:當你有多個類型相同的Observable,可使用merge
方法將它們合併起來,同時訂閱全部Observable中的事件:
let seq1 = Observable.just(1)
let seq2 = Observable.just(2)
let seq = Observable.of(seq1, seq2).merge()
seq.subscribe { (event) in
print(event)
}
---- example output ----
next(1)
next(2)
completed
複製代碼
只有當Observable中的元素也是Observable類型的時候纔可使用merge
方法,當其中一個序列發生錯誤的時候,seq都會被終止,一樣的只有全部序列都完成以後,seq纔會收到完成事件。
zip:zip
方法也能夠將多個Observable合併在一塊兒,與merge
不一樣的是,zip
提供了一個閉包用來對多個Observable中的元素進行組合變化,最後得到一個新的序列:
let seq1 = Observable.just(1)
let seq2 = Observable.just(2)
let seq: Observable<String> = Observable.zip(seq1, seq2) { (num1, num2) -> String in
return "\(num1 + num2)"
}
seq.subscribe { (event) in
print(event)
}
---- example output ----
next(3)
completed
複製代碼
zip
方法按照參數個數的不一樣有多個版本,最多支持合併8個可觀察序列,須要注意的一點是,閉包所接收的參數是各個序列中對應位置的元素。也就是說,若是seq1發送了一個事件,而seq2發送了多個事件,閉包也只會被執行一次,seq中只有一個元素。
組合的Observable中任意一個發生錯誤,最後的seq都會直接出錯終止,當全部的Observable都發出completed事件後,seq纔會正常結束。
combineLatest:combineLatest
一樣用於將多個序列組合成一個,使用方法與zip
同樣,可是它的調用機制跟zip
不一樣,每當其中一個序列有新元素時,combineLatest
都會從其餘全部序列中取出最後一個元素,傳入閉包中生成新的元素添加到結果序列中。
Subject
對象至關於一種中間的代理和橋樑的做用,它既是觀察者又是可觀察序列,在向一個Subject
對象添加觀察者以後,能夠經過該Subject
向其發送事件。Subject
對象並不會主動發送completed事件,而且在發送了error或completed事件以後,Subject
中的序列會直接終結,沒法再發送新的消息。Subject
一樣也分爲幾種類型:
PublishSubject:PublishSubject
的訂閱者只會收到在其訂閱(subscribe)以後發送的事件
let subject = PublishSubject<Int>()
subject.onNext(1)
subject.subscribe { (event) in
print(event)
}
subject.onNext(2)
---- example output ----
next(2)
複製代碼
能夠看到,觀察者只收到了事件2,在訂閱以前發送的事件1並無接收到。
ReplaySubject:ReplaySubject
在初始化時指定一個大小爲n的緩衝區,裏面會保存最近發送的n條事件,在訂閱以後,觀察者會當即收到緩衝區中的事件:
let subject = ReplaySubject<Int>.create(bufferSize: 2)
subject.onNext(1)
subject.subscribe { (event) in
print(event)
}
subject.onNext(2)
---- example output ----
next(1)
next(2)
複製代碼
BehaviorSubject:BehaviorSubject
在初始化時須要提供一個默認值,在訂閱時觀察者會馬上收到序列上一次發送的事件,若是沒有發送過事件則會收到默認值:
let subject = BehaviorSubject(value: 1)
subject.subscribe { (event) in
print(event)
}
---- example output ----
next(1)
複製代碼
Variable:Variable
是對BehaviorSubject
的一個封裝,行爲上與BehaviorSubject
相似。Variable
沒有on
之類的方法來發送事件,取而代之的是一個value
屬性,向value
賦值能夠向觀察者發送next事件,而且訪問value
能夠獲取最後一次發送的數據:
let variable = Variable(1)
variable.asObservable().subscribe { (event) in
print(event)
}
variable.value = 2
---- example output ----
next(1)
next(2)
completed
複製代碼
與其餘Subject類型不一樣的是,Variable
在釋放的時候會發送completed事件,而且Variable
對象永遠不會發送error事件。
Scheduler
是RxSwift中進行多線程編程的一種方式,一個Observable在執行的時候會指定一個Scheduler
,這個Scheduler
決定了在哪一個線程對序列進行操做以及事件回調。默認狀況下,在訂閱Observable以後,觀察者會在與調用subscribe
方法時相同的線程收到通知,而且也會在該線程進行銷燬(dispose)。
與GCD相似,Scheduler
分爲串行(serial)和並行(concurrent)兩種類型,RxSwift中定義了幾種Schedular:
subscribeOn
和observeOn
是其中兩個最重要的方法,它們能夠改變Observable所在的Scheduler:
// main thread
let scheduler = ConcurrentDispatchQueueScheduler(qos: .default)
let seq = Observable.of(1, 2)
seq.subscribeOn(scheduler)
.map {
return $0 * 2 // 子線程
}
.subscribe { (event) in
print(event) // 子線程
}
複製代碼
在上面的代碼中建立了一個併發的Scheduler,並在序列seq上調用subscribeOn
指定了該Scheduler,能夠看到,咱們在主線程中訂閱該序列,可是map
方法以及事件的回調都是在建立的子線程中執行。
subscribeOn
和observeOn
均可以指定序列的Scheduler,它們之間的區別在於:
subscribeOn
設定了整個序列開始的時候所在的Scheduler,序列在建立以及以後的操做都會在這個Scheduler上進行,subscribeOn
在整個鏈式調用中只能調用一次,以後再次調用subscribeOn
沒有任何效果。observeOn
指定一個Scheduler,在這以後的操做都會被派發到這個Scheduler上執行,observeOn
能夠在鏈式操做的中間改變SchedulercreateObservable().
.doSomething()
.subscribeOn(scheduler1) // (1)
.doSomethingElse()
.observeOn(scheduler2) // (2)
.doAnother()
...
複製代碼
如上代碼,在(1)處執行了subscribeOn
以後,以前的操做createObservable()和doSomething()都會在scheduler1中執行,隨後的doSomethingElse()一樣也在scheduler1中執行,隨後用observeOn
指定了另一個scheduler2,以後的doAnother()會在scheduler2上執行。
RxSwift中提供了一種擴展機制,能夠很方便的爲原有的代碼添加上Rx擴展。首先來看一個結構體Reactive
:
public struct Reactive<Base> {
/// base是擴展的對象實例
public let base: Base
public init(_ base: Base) {
self.base = base
}
}
複製代碼
Reactive
是一個泛型結構體,只定義了一個屬性base
,而且在初始化結構體的時候傳入該屬性的值。
此外還定義了一個協議ReactiveCompatible
:
public protocol ReactiveCompatible {
associatedtype CompatibleType
static var rx: Reactive<CompatibleType>.Type { get set }
var rx: Reactive<CompatibleType> { get set }
}
複製代碼
該協議中分別爲類對象和實例對象定義一個名字相同的屬性:rx
,類型爲上面定義的Reactive
,隨後經過協議擴展爲其提供了get
的默認的實現:
extension ReactiveCompatible {
public static var rx: Reactive<Self>.Type {
get {
return Reactive<Self>.self
}
set {
// this enables using Reactive to "mutate" base type
}
}
public var rx: Reactive<Self> {
get {
return Reactive(self)
}
set {
// this enables using Reactive to "mutate" base object
}
}
}
複製代碼
關聯類型CompatibleType
被自動推導爲實現該協議的類自己,使用self
初始化一個Reactive
對象。
最後經過協議擴展爲全部的NSObject類型實現了ReactiveCompatible
協議:
extension NSObject: ReactiveCompatible { }
複製代碼
這樣一來,代碼中全部繼承自NSObject的類型實例中都會有一個類型爲Reactive
的屬性rx
,當咱們要爲本身的類型添加Rx擴展時,只須要經過擴展向Reactive
中添加方法就能夠了,例如向UIButton類型添加擴展:
extension Reactive where Base: UIButton { // 爲Reactive<UIButton>添加擴展
public var tap: ControlEvent<Void> {
return controlEvent(.touchUpInside) // 經過base能夠訪問該實例自己
}
}
複製代碼
因爲Reactive
是一個泛型類型,咱們能夠經過where語句指定泛型的類型,這樣一來,咱們就能夠在UIButton實例的rx中訪問tap屬性了:
let button = UIButton(...)
button.rx.tap
複製代碼
相似RxCocoa這樣的RxSwift擴展庫都是經過這種方式進行Rx擴展的。