ReactiveX序列——RxSwift
Swift是蘋果公司新推出的一門現代化的編程語言,而且將其開源出來了,Swift具備不少的優勢,這也使得這門語言推出的短期引發了很大反應的緣由,在最近的2016年3月的編程語言排行榜處於第14位,甚至超過了OC(15位)。可見Swift的在開發者心中的地位。html
RxSwift的觀察者對象(Observable)
在RxSwift中,能夠有多種建立Observable對象的方法,主要有如下幾種:java
- asObservable
- create
- deferred
- empty
- error
- toObservable/from
- interval
- never
- just
- of
- range
- repeatElement
- timer
要弄明白Observable就要先弄清楚Observable是繼承了哪些類和協議,從源碼開始分析:
首先第一個是ObservableConvertibleType:react
/** Type that can be converted to observable sequence (`Observer<E>`). */ public protocol ObservableConvertibleType { /** Type of elements in sequence. */ typealias E /** Converts `self` to `Observable` sequence. - returns: Observable sequence that represents `self`. */ func asObservable() -> Observable<E> }
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
從ObservableConvertibleType協議源碼能夠看出,它定義了一個typealias類型別名和asObservable方法,類型別名是用來定義將要處理的類型(例如String,Int等等),而asObervable這個咱們在後面會具體敘述。其次是ObservableType,它繼承了ObservableConvertibleType,ObservableType主要乾了兩個事情,第一個是建立出subscribe方法,它是用來執行訂閱事件的(onNext、onError/onComplete),第二個就是簡易實現asObservable方法(經過extension ObservableType 實現),asObservable主要是經過Observable.create(subscrible())實現的。再上來就是Observable,它是一個類,繼承了ObservableType協議接口。
下面咱們分別對以上幾種建立Observable對象作詳細的介紹。ios
- asObservable方法:
asObservable實際上是至關於clone方法,其內部實現以下:sql
public func asObservable() -> Observable<E> { return self }
- 1
- 2
- 3
從這裏看,它return self也就是本身,這就意味着,你必須先有Observable對象才能調用asObservable方法。例如:編程
var obs = Observable<String>.create { (observer) -> Disposable in observer.on(.Next("hahah")) observer.on(.Next("deasd")) observer.on(.Completed) return NopDisposable.instance } let observable = obs.asObservable() observable.subscribeOn(MainScheduler.instance) .subscribe{ event in print(event.debugDescription) }
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
第二個是subscribe方法,這個方法具體實現調用了一個「抽象」方法,這個「抽象」方法就是打印出來一個錯誤日誌而且中止運行。json
public func subscribe<O: ObserverType where O.E == E>(observer: O) -> Disposable { abstractMethod() }
- 1
- 2
- 3
- 4
固然,這個Observable類中方法,可是extension Observable實際上是有不少用法的。這也是咱們上面提到建立Observable的各類方法。
二、create方法 swift
public static func create(subscribe: (AnyObserver<E>) -> Disposable) -> Observable<E> { return AnonymousObservable(subscribe) }
- 1
- 2
- 3
- 4
這是一個「靜態方法」(在class中用static關鍵字標註,在struct和enum中使用class關鍵字標註),這個方法的參數是一個函數(一般咱們會用閉包的方式),函數的參數是AnyObserver,返回的是Disposable。AnyObserver其實就是訂閱者,Disposable是一個協議接口,裏面只有一個dispose方法,用來釋放一些資源。整個create方法返回的是一個AnonymousObservable(匿名Observable),AnonymousObservable繼承自Producer,Producer實現了線程調度功能,能夠安排某個線程來執行run方法。所以create方法返回的AnonymousObservable是能夠運行在指定線程中Observable。完整的create例子:數組
var obs = Observable<String> .create ({ (observer) -> Disposable in observer.on(.Next("hahah")) observer.on(.Next("deasd")) observer.on(.Completed) return NopDisposable.instance }) .observeOn(MainScheduler.instance) .subscribe({event in if let str = event.element { print(str) } }) //.dispose()
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
最後obs變量是一個Disposable類型變量,能夠繼續調用dispose方法釋放資源。整個代碼輸出結果:bash
hahah deasd
- 1
- 2
三、empty方法
public static func empty() -> Observable<E> { return Empty<E>() }
- 1
- 2
- 3
empty方法是一個空方法,裏面沒有onNext事件處理,只會處理onComplete方法。empty建立Observable對象比較簡單。代碼例子:
let obs1 = Observable<String>.empty() obs1.subscribe( onNext: {str in print(str)}, onError: { (errorType) -> Void in print(errorType) }, onCompleted: { () -> Void in print("complete") }) { () -> Void in print("dispose") }
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
輸出結果:
complete dispose
- 1
- 2
- 3
這個例子中有四個閉包,其中最後一個是尾隨閉包,並且這些閉包都是可選類型。固然你也能夠以下寫法:
let obs1 = Observable<String>.empty() obs1.subscribe( onNext: {str in print(str) }, onError: { (errorType) -> Void in print(errorType) }, onCompleted: { () -> Void in print("complete") }, onDisposed: {() -> Void in print("dispose") })
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
四、never方法
public static func never() -> Observable<E> { return Never() }
- 1
- 2
- 3
官方解釋是返回一個無終止的觀察者事件序列,能夠用來表示無限持續時間。儘管咱們給安排了next事件,但實際上,他是不會執行的。不會輸出onNext
Observable<String>
.never()
.subscribeNext( { (str) -> Void in print("onNext") }) //.dispose()
- 1
- 2
- 3
- 4
- 5
- 6
五、just方法
public static func just(element: E, scheduler: ImmediateSchedulerType) -> Observable<E> { return JustScheduled(element: element, scheduler: scheduler) }
- 1
- 2
- 3
just方法只能處理單個事件,簡單來講,咱們使用just方法不能將一組數據一塊兒處理,只能一個一個處理。例如:
Observable<String>
.just("just test") .subscribeOn(MainScheduler.instance) .subscribeNext({ (str) -> Void in print(str) }) .dispose()
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
輸出結果:
just test
- 1
just方法是一個多態方法,容許在傳入參數時候指定線程,例如:
它指定當前線程完成subscribe相關事件。
Observable<String>
.just("just with Scheduler", scheduler: CurrentThreadScheduler.instance) .subscribeNext({ (str) -> Void in print(str) }) .dispose()
- 1
- 2
- 3
- 4
- 5
- 6
六、error方法
public static func error(error: ErrorType) -> Observable<E> { return Error(error: error) }
- 1
- 2
- 3
error方法是返回一個只能調用onError方法的Observable序列。其中的onNext和OnComleted方法是不會執行的。例如:
public static func error(error: ErrorType) -> Observable<E> { return Error(error: error) } Observable<String> .error(RxError.Timeout) .subscribe( onNext: { (str) -> Void in print(str) print("onNext") }, onError: { (error)-> Void in print(error) }, onCompleted: { () -> Void in print("onCompleted") }, onDisposed: { () -> Void in print("dispose") }) .dispose()
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
最後的輸出結果是:
Sequence timeout dispose
- 1
- 2
七、of方法
能夠說of方法是just方法的升級版,它能夠將一序列的事情組合起來一塊兒處理。極大方便了開發者對數組(Array)、字典(Dictionary)進行分佈處理。
public static func of(elements: E ..., scheduler: ImmediateSchedulerType? = nil) -> Observable<E> { return Sequence(elements: elements, scheduler: scheduler) } Observable<String> .of("d1","d2", "d3", "d4") .subscribe( { (event) -> Void in if let els = event.element { print(els) } }) .dispose()
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
這裏解釋一下subscribe(on: Event->Void)方法,例子中event.element在調用get屬性的時候其實會執行一個onNext方法,它返回的是一個可選類型,所以要用if let解析處理。固然若是代碼改爲以下,那麼是不會輸出結果的,由於event.error執行的是錯誤監聽(也就是執行的onError方法,所以不會輸出結果)。of和just同樣,存在一個多態方法,能夠帶入線程控制。
Observable<String>
.of("d1","d2", "d3", "d4") .subscribe( { (event) -> Void in if let els = event.error{ print(els) } }) .dispose()
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
八、deferred方法
deferred方法是延時建立Observable對象,當subscribe的時候纔去建立,它爲每個bserver建立一個新的Observable; deferred採用一個Factory函數型做爲參數,Factory函數返回的是Observable類型。這也是其延時建立Observable的主要實現。
public static func deferred(observableFactory: () throws -> Observable<E>) -> Observable<E> { return Deferred(observableFactory: observableFactory) }
- 1
- 2
- 3
- 4
整個deferred方法的原理如上圖,從圖中能夠看出,deferred不是第一步建立Observable,而是在subscriber的時候建立的。(圖中紅色的是error,綠色的是next事件)
九、generate方法
public static func generate(initialState initialState: E, condition: E throws -> Bool, scheduler: ImmediateSchedulerType = CurrentThreadScheduler.instance, iterate: E throws -> E) -> Observable<E> { return Generate(initialState: initialState, condition: condition, iterate: iterate, resultSelector: { $0 }, scheduler: scheduler) }
- 1
- 2
- 3
generate方法是一個迭代器,它一直循環onNext事件,直到condition不知足要求退出。generate有四個參數,第一個是最開始的循環變量,第二個是條件,第三個是迭代器,這個迭代器每次運行都會返回一個E類型,做爲下一次是否執行onNext事件源,而是否正的要執行則看是否知足condition條件。其實咱們能夠理解generate就是一個循環體(其內部實現也正是一個循環,代碼在:GenerateSink的run方法中)。例子:
Observable<String>
.generate(
initialState: "ah", condition: ({ (str) -> Bool in return str.hasPrefix("ah") }), iterate: ({ (str1) -> String in return "h" + str1 })) //.subscribeOn(MainScheduler.instance) .subscribe ({ (event) -> Void in if let res = event.element { print(res) } }) .dispose()
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
輸出結果:
ah
- 1
上面這個例子說的是,初始的變量是「ah」,第一個條件知足,執行onNext事件,同時生成一個hah,不知足條件,不執行onNext事件。generate是一個具備高度可變的of方法,它同時兼備了後面要介紹的過濾(filter)特性。固然generate還有一個多態方法,容許傳入執行線程。這個線程是爲循環體而生的,並非爲subscrible而生的。
十、repeatElement方法
public static func repeatElement(element: E, scheduler: ImmediateSchedulerType = CurrentThreadScheduler.instance) -> Observable<E> { return RepeatElement(element: element, scheduler: scheduler) }
- 1
- 2
- 3
repeatElement方法是一個無限循環的,它會一直循環onNext方法。固然這種循環是能夠指定線程的。例子:
Observable<String>
.repeatElement("daa") .subscribeNext { (str) -> Void in print(str) } .dispose()
- 1
- 2
- 3
- 4
- 5
- 6
其中subscribeNext是一個尾隨閉包。
十一、using方法
public static func using<R: Disposable>(resourceFactory: () throws -> R, observableFactory: R throws -> Observable<E>) -> Observable<E> { return Using(resourceFactory: resourceFactory, observableFactory: observableFactory) }
- 1
- 2
- 3
using方法是經過Factory方法生成一個對象(resourceFactory)再轉換成Observable,中間咱們要使用Factory方法,上面已經介紹過一次Factory方法。using方法相對其餘的方法比較複雜和特殊,緣由是using方法是由咱們構建出資源和構建清除資源的,中間經過一個轉換生成Observable對象。
Observable<String>
.using( { () -> Student<String> in return Student(source: Observable<String>.just("jarlene"), disposeAction: { () -> () in print("hah") }) }, observableFactory: { (r) -> Observable<String> in return r.asObservable() }) .subscribeNext( { (ss) -> Void in print(ss) }) .dispose()
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
其中Student類繼承了兩個協議:ObservableConvertibleType和Disposable;ObservableConvertibleType是爲了生成Observable對象(經過調用asObservable方法),Disposable是爲了清除資源。源碼以下:
class Student<E>: ObservableConvertibleType, Disposable{ private let _source: Observable<E> private let _dispose: AnonymousDisposable init(source: Observable<E>, disposeAction: () -> ()) { _source = source _dispose = AnonymousDisposable(disposeAction) } func dispose() { _dispose.dispose() } func asObservable() -> Observable<E> { return _source } var name :String{ get { return self.name } set { self.name = newValue } } }
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
在上面例子中,咱們採用Observable.just方法生成了一個Observable對象傳遞給Student對象,同時也定義了一個釋放資源的方法。等到調用dispose()方法,就會執行咱們定義的釋放資源的方法。例子結果爲:
jarlene hah
- 1
- 2
十二、range方法
public static func range(start start: E, count: E, scheduler: ImmediateSchedulerType = CurrentThreadScheduler.instance) -> Observable<E> { return RangeProducer<E>(start: start, count: count, scheduler: scheduler) }
- 1
- 2
- 3
range方法其實方便版of方法,其功能和of差很少,咱們只要輸出start和count而後就能生成一組數據,讓他們執行onNext。值得注意的是,range方法只生成Observable型。在調用bindNext的時候能夠將其對應成其餘相應的類型。
例如:
let arr: [String] = ["ad", "cd", "ef", "gh"] Observable<Int> .range(start: 1, count: 3) .subscribeNext { (n) -> Void in print(arr[n]) } .dispose()
- 1
- 2
- 3
- 4
- 5
- 6
- 7
結果
cd ef gh
- 1
- 2
- 3
1三、toObservable(from)
public func toObservable(scheduler: ImmediateSchedulerType? = nil) -> Observable<Generator.Element> { return Sequence(elements: self, scheduler: scheduler) }
- 1
- 2
- 3
- 4
toObservable方法是擴展自Array,是將一個一個array轉換成Observable,其內部實調用了一個序列Sequence,其用法很簡單。
let arr: [String] = ["ab", "cd", "ef", "gh"] arr.toObservable() .subscribeNext { (s) -> Void in print(s) } .dispose()
- 1
- 2
- 3
- 4
- 5
- 6
運行結果:
ab
cd ef gh
- 1
- 2
- 3
- 4
1四、interval/timer
public static func interval(period: RxTimeInterval, scheduler: SchedulerType) -> Observable<E> { return Timer(dueTime: period, period: period, scheduler: scheduler ) } public static func timer(dueTime: RxTimeInterval, period: RxTimeInterval? = nil, scheduler: SchedulerType) -> Observable<E> { return Timer( dueTime: dueTime, period: period, scheduler: scheduler ) }
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
interval方法是定時產生一個序列,interval第一個參數就是時間間隔,第二個參數是指定線程。 能夠看出interval是range和repeateElement的結合。timer方法和interval方法相似。差異在於timer能夠設置間隔時間和持續時間,而interval的間隔時間和持續時間是同樣的。
至此,咱們將Observable對象基本的產生方法都講述完了,下一節開始咱們詳細講述Observer的建立以及製做器Producer,其次將詳細敘述Producer和事件方法onNext、onError、onComplete之間的聯繫,以及Producer是怎麼調度線程來完成線程控制的。
RxSwift的觀察者對象的變換(Transform Observable)和過濾(Filter Observable)
對觀察着對象進行變換使得一個對象變換成另外一個對象,這個是RxSwift核心之一,所以對於熟悉RxSwift特別重要。RxSwift存在如下變換方法:
- buffer
- flatMap
- flatMapFirst
- flatMapLatest
- map
- scan
- window
過濾方法
- debounce / throttle
- distinctUntilChanged
- elementAt
- filter
- sample
- skip
- take
- takeLast
- single
下面咱們分別對以上幾種對Observable對象變換作詳細的介紹(不所有闡述)。
一、 buffer方法:
buffer方法是extension ObservableType中的一個方法,它的做用是緩衝組合,第一個參數是緩衝時間,第二個參數是緩衝個數,第三個參數是線程。整體來講就是通過必定時間,將必定個數的事件組合成一個數組,一塊兒處理,在組合的過程當中,你能夠選擇線程。
public func buffer(timeSpan timeSpan: RxTimeInterval, count: Int, scheduler: SchedulerType) -> Observable<[E]> { return BufferTimeCount(source: self.asObservable(), timeSpan: timeSpan, count: count, scheduler: scheduler) }
- 1
- 2
- 3
- 4
例子:
Observable<String>
.of("ab", "cd", "ef", "gh") .buffer(timeSpan: 1, count: 2, scheduler: MainScheduler.instance) .subscribeNext({ (n) -> Void in if !n.isEmpty { print(n) } }) .dispose()
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
輸出結果
["ab", "cd"] ["ef", "gh"]
- 1
- 2
二、flatMap
flatMap也是擴展自ObservableType,它的做用是將一種類型變換成另外一種類型。flatMap的參數是一個方法,這個方法的輸入參數與Observable的E是同一種類型,輸出依然是Observable類型。
public func flatMap<O: ObservableConvertibleType>(selector: (E) throws -> O) -> Observable<O.E> { return FlatMap(source: asObservable(), selector: selector) }
- 1
- 2
- 3
- 4
咱們看一個例子,例子中首先是一組Observable,經過flatMap後仍是一組Observable,可是flatMap做用是,若是元素中遇到「a」字母開頭的,那麼它就從新組裝一個數組,這個數組是隻有元素和「a」;若是元素不是「a」字母開頭的就與「b」字母組裝成另外一個數組。這兩種狀況都經過調用toObservable返回Observable。flatMapFirst、flatMapLast、flatMapWithIndex都是相似的做用,這裏就不重複。
Observable<String>
.of("ab", "cd", "aef", "gh") .flatMap({ (element: String) -> Observable<String> in if element.hasPrefix("a") { let sd : [String] = [element, "a"] return sd.toObservable() } else { let sd : [String] = [element, "b"] return sd.toObservable() } }) .subscribeNext({ (n) -> Void in if !n.isEmpty { print(n) } }) .dispose()
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
結果
ab
a cd b aef a gh b
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
三、map
map方法是經過其實flatMap的簡化版本,它返回的能夠是任何類型。其中R是返回類型。
public func map<R>(selector: Self.E throws -> R) -> RxSwift.Observable<R>
- 1
例子:
Observable<String>
.of("ab", "cd", "aef", "gh") .map({ (str) -> String in return str+"ss" }) .subscribeNext({ (n) -> Void in if !n.isEmpty { print(n) } }) .dispose()
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
結果
abss cdss aefss ghss
- 1
- 2
- 3
- 4
四、scan方法
scan方法有兩個參數,第一個參數是種子,第二個參數是加速器。所謂的種子就是最初的狀態,加速器就是將每一次運行的結果延續到下一次。scan方法也是擴展自ObservableType
public func scan<A>(seed: A, accumulator: (A, E) throws -> A) -> Observable<A> { return Scan(source: self.asObservable(), seed: seed, accumulator: accumulator) }
- 1
- 2
- 3
- 4
例子:
Observable<String>
.of("a", "b", "c", "d", "e") .scan("s", accumulator: { (a, b) -> String in return a+b }) .subscribeNext({ (n) -> Void in print(n) }) .dispose()
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
這個例子中是將全部的字符依次串起來,運行結果是:
sa sab sabc sabcd sabcde
- 1
- 2
- 3
- 4
- 5
五、window
window方法一樣擴展自ObservableType,它有三個參數,第一個是時間間隔,第二個是數量,第三個是線程。時間間隔指的的是window方法開窗的時間間隔;第二個參數數量指的的是每次經過窗口的個數;線程就是這種操做執行在什麼線程上。起源碼以下:
public func window(timeSpan timeSpan: RxTimeInterval, count: Int, scheduler: SchedulerType) -> Observable<Observable<E>> { return WindowTimeCount(source: self.asObservable(), timeSpan: timeSpan, count: count, scheduler: scheduler) }
- 1
- 2
- 3
- 4
須要特別注意的是window方法以後,返回的是Observable
Observable<String>
.of("ab", "bc", "cd", "de", "ef") .window(timeSpan: 1, count: 2, scheduler: MainScheduler.instance) .subscribeNext({ (n) -> Void in n.subscribeNext({ (ss) -> Void in print(ss) }) }) .dispose()
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
結果:
ab
bc
cd de ef
- 1
- 2
- 3
- 4
- 5
變換的方法基本就這些,可是開發者能夠經過自定義的方式擴展變換的方法以達到所需的目的。接下來咱們看看過濾方法。
一、debounce / throttle
debounce/throttle 方法在規定的時間中過濾序列元素,就如上圖描述的同樣,當debounce打開的時候,恰好那個黃色的序列元素過來,那麼它就不會通知到事件(onNext、onError、onComplete)上去。下面是debounce方法源碼。
public func throttle(dueTime: RxTimeInterval, scheduler: SchedulerType) -> Observable<E> { return Throttle(source: self.asObservable(), dueTime: dueTime, scheduler: scheduler) }
- 1
- 2
- 3
- 4
例子:
Observable<String>
.of("a", "b", "c", "d", "e", "f") .debounce(1, scheduler: MainScheduler.instance) .subscribeNext { (str) -> Void in print(str) } .dispose()
- 1
- 2
- 3
- 4
- 5
- 6
- 7
輸出結果
f
- 1
二、distinctUntilChanged
distinctUntilChanged 主要是過濾相鄰兩個元素是否重複,重複的話就過濾掉其中之一。
public func distinctUntilChanged<K>(keySelector: (E) throws -> K, comparer: (lhs: K, rhs: K) throws -> Bool) -> Observable<E> { return DistinctUntilChanged(source: self.asObservable(), selector: keySelector, comparer: comparer) }
- 1
- 2
- 3
- 4
例子:
Observable<String>
.of("a", "a", "c", "e", "e", "f") .distinctUntilChanged({ (lhs, rhs) -> Bool in return lhs==rhs }) .subscribeNext { (str) -> Void in print(str) } .dispose()
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
輸出結果:
a c e f
- 1
- 2
- 3
- 4
三、elementAt
elementAt方法其實就挑選出所須要的序列元素,上圖描述的很清楚。
這個方法很簡單。沒有什麼難點。當index超界的時候,throwOnEmpty參數是否拋出異常。
public func elementAt(index: Int) -> Observable<E> { return ElementAt(source: asObservable(), index: index, throwOnEmpty: true) }
- 1
- 2
- 3
- 4
例子:
Observable<String>
.of("aa", "av", "cs", "ed", "ee", "ff") .elementAt(2) .subscribeNext { (str) -> Void in print(str) } .dispose()
- 1
- 2
- 3
- 4
- 5
- 6
- 7
輸出結果
cs
- 1
四、filter
filter方法很簡單,指出過濾條件就行,知足過濾條件的就能執行事件通知,不然不行
public func filter(predicate: (E) throws -> Bool) -> Observable<E> { return Filter(source: asObservable(), predicate: predicate) }
- 1
- 2
- 3
- 4
例子:
Observable<String>
.of("aa", "av", "cs", "ed", "ee", "ff") .filter({ (ss) -> Bool in return ss.hasPrefix("a") }) .subscribeNext { (str) -> Void in print(str) } .dispose()
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
輸出結果
aa av
- 1
- 2
接下來的幾個方法都是相似的,這裏不就在詳細敘述啦。
RxSwift的Observable事件處理以及線程調度
由第一部分能夠知道,幾乎在建立全部的Observable的時候都要用到Producer,而在事件處理(onNext、onError、onComplete)過程當中常常要用到線程調度(Scheduler),它們之間存在一種很巧妙的設計。首先先看看Producer源碼。
class Producer<Element> : Observable<Element> { override init() { super.init() } override func subscribe<O : ObserverType where O.E == Element>(observer: O) -> Disposable { if !CurrentThreadScheduler.isScheduleRequired { return run(observer) } else { return CurrentThreadScheduler.instance.schedule(()) { _ in return self.run(observer) } } } func run<O : ObserverType where O.E == Element>(observer: O) -> Disposable { abstractMethod() } }
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
Producer是繼承了Observable類,咱們在建立Observable類時候都用到了Producer,那麼Producer主要作了兩件事情,第一個實現subscribe方法,在subscribe方法中傳入了observer參數,observer類型是ObserverType,在上一部分介紹了ObserverType中有一個類型別名E,那麼在Producer的範型element就必須和ObserverType中類型別名E同樣。回過頭來講subscribe,咱們首先看CurrentThreadScheduler 的源碼,CurrentThreadScheduler 是繼承ImmediateSchedulerType協議,它裏面就定義了一個方法:
func schedule<StateType>(state: StateType, action: (StateType) -> Disposable) -> Disposable
- 1
而這個方法在CurrentThreadScheduler 具體實現了。
public class CurrentThreadScheduler : ImmediateSchedulerType { typealias ScheduleQueue = RxMutableBox<Queue<ScheduledItemType>> /** The singleton instance of the current thread scheduler. */ public static let instance = CurrentThreadScheduler() static var queue : ScheduleQueue? { get { return NSThread.getThreadLocalStorageValueForKey(CurrentThreadSchedulerQueueKeyInstance) } set { NSThread.setThreadLocalStorageValue(newValue, forKey: CurrentThreadSchedulerQueueKeyInstance) } } /** Gets a value that indicates whether the caller must call a `schedule` method. */ public static private(set) var isScheduleRequired: Bool { get { let value: CurrentThreadSchedulerValue? = NSThread.getThreadLocalStorageValueForKey(CurrentThreadSchedulerKeyInstance) return value == nil } set(isScheduleRequired) { NSThread.setThreadLocalStorageValue(isScheduleRequired ? nil : CurrentThreadSchedulerValueInstance, forKey: CurrentThreadSchedulerKeyInstance) } } /** Schedules an action to be executed as soon as possible on current thread. If this method is called on some thread that doesn't have `CurrentThreadScheduler` installed, scheduler will be automatically installed and uninstalled after all work is performed. - parameter state: State passed to the action to be executed. - parameter action: Action to be executed. - returns: The disposable object used to cancel the scheduled action (best effort). */ public func schedule<StateType>(state: StateType, action: (StateType) -> Disposable) -> Disposable { if CurrentThreadScheduler.isScheduleRequired { CurrentThreadScheduler.isScheduleRequired = false let disposable = action(state) defer { CurrentThreadScheduler.isScheduleRequired = true CurrentThreadScheduler.queue = nil } guard let queue = CurrentThreadScheduler.queue else { return disposable } while let latest = queue.value.dequeue() { if latest.disposed { continue } latest.invoke() } return disposable } let existingQueue = CurrentThreadScheduler.queue let queue: RxMutableBox<Queue<ScheduledItemType>> if let existingQueue = existingQueue { queue = existingQueue } else { queue = RxMutableBox(Queue<ScheduledItemType>(capacity: 1)) CurrentThreadScheduler.queue = queue } let scheduledItem = ScheduledItem(action: action, state: state) queue.value.enqueue(scheduledItem) return scheduledItem } }
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
- 39
- 40
- 41
- 42
- 43
- 44
- 45
- 46
- 47
- 48
- 49
- 50
- 51
- 52
- 53
- 54
- 55
- 56
- 57
- 58
- 59
- 60
- 61
- 62
- 63
- 64
- 65
- 66
- 67
- 68
- 69
- 70
- 71
- 72
- 73
- 74
- 75
- 76
- 77
- 78
- 79
- 80
- 81
其實主要是根據CurrentThreadScheduler.isScheduleRequired參數來選擇是否須要當前線程運行,若是須要,首調用action方法,而這個action方法其實就是onNext、onError、onCompelete方法。而後作了一個延遲清除(defer)和一個判斷(guard)。而後循環一個queue其實主要是看看是否還有沒有執行完的onNext時間。latest.invoke()其實就是作action(state),而後返回Disposable。若是不須要,則組合queue,生成Disposable返回。接下來咱們看看怎麼設置線程執行的,首選看看subscribleOn方法,這個方法就是指定接下來事情要發生在那個線程中,具體看一下代碼:
public func observeOn(scheduler: ImmediateSchedulerType) -> Observable<E> { if let scheduler = scheduler as? SerialDispatchQueueScheduler { return ObserveOnSerialDispatchQueue(source: self.asObservable(), scheduler: scheduler) } else { return ObserveOn(source: self.asObservable(), scheduler: scheduler) } }
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
方法是定義在extension ObservableType 中的,它指定ObservableType 運行線程,這裏面指定了兩種運行方式,第一種是運行ObserveOnSerialDispatchQueue,第二種是ObserveOn這兩個都繼承自Producer,上面咱們已經敘述了Producer,無論是ObserveOnSerialDispatchQueue仍是ObserveOn都重寫了run方法,他們返回的都是ObserverBase。ObserverBase其實就是在執行onNext、onError、onComplete方法。
class ObserverBase<ElementType> : Disposable, ObserverType {
typealias E = ElementType
private var _isStopped: AtomicInt = 0 func on(event: Event<E>) { switch event { case .Next: if _isStopped == 0 { onCore(event) } case .Error, .Completed: if !AtomicCompareAndSwap(0, 1, &_isStopped) { return } onCore(event) } } func onCore(event: Event<E>) { abstractMethod() } func dispose() { _isStopped = 1 } }
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
onCore方法是由繼承者實現,好比在ObserveOnSink類中及具體實現了onCore方法
override func onCore(event: Event<E>) { let shouldStart = _lock.calculateLocked { () -> Bool in self._queue.enqueue(event) switch self._state { case .Stopped: self._state = .Running return true case .Running: return false } } if shouldStart { _scheduleDisposable.disposable = self._scheduler.scheduleRecursive((), action: self.run) } }
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
這個onCore方法是判斷當前運行到那一步(onNext,onError,onComplete)。如今咱們回過頭來看Producer中的subscribe其實就是執行事件,只不過這個事件是在某個線程上執行的。咱們能夠繪製一個簡單的流程圖描述這些。
Observable執行subscribleOn方法,產生一個新的Observable,這個新Observable是Produce,他繼承了Observable,當Observable執行subscrible方法的時候,會根據線程來執行,若是指定了線程,那麼就會經過run方法去執行事件。若是沒有指定線程,就用當前線程執行run方法去執行事件。固然若是要用到變換或者過濾,也能夠經過指定線程來執行變換和過濾,其原理是同樣的。
RxSwift的觀察者對象的合併(Conbinate Observable)和連接器(Connect Observable)
對觀察着合併就是將多個觀察着(Observables)合併起來處理,使用起來更方便。它主要由如下方法:
- merge
- startWith
- switchLatest
- combineLatest
- zip
連接器
- multicast
- publish
- refCount
- replay
- shareReplay
固然爲了將多個相同類型觀察者對象合併起來處理,能夠極大減小重複代碼的工做量。從本節開始咱們將會敘述觀察者對象的合併和發佈。
一、merge
從圖中很容易看出merge方法就是將多個Observable對象合併處理。
public func merge() -> Observable<E.E> { return Merge(source: asObservable()) }
- 1
- 2
- 3
例子:
Observable.of( Observable.of("a", "b"), Observable.of("c", "d"), Observable.of("e", "f"), Observable.of("g", "h")) .merge() .subscribeNext { (str) -> Void in print(str) } .dispose()
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
結果:
a b c d e f g h
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
merge方法看起來特別簡單。
二、startWith
startWith方法能夠說是定製開始位置的,是一種比較特殊的merge方法。
public func startWith(elements: E ...) -> Observable<E> { return StartWith(source: self.asObservable(), elements: elements) }
- 1
- 2
- 3
- 4
startWith方法其實就是指定一個特殊的開頭,
例子:
Observable.of( Observable.of("a", "b"), Observable.of("c", "d"), Observable.of("e", "f"), Observable.of("g", "h")) .merge() .startWith("x") .subscribeNext { (str) -> Void in print(str) } .dispose()
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
結果:
x
a b c d e f g h
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
三、switchLatest
RxSwift中switchLatest至關與其餘語言的switch方法,從圖中能夠很明顯的看出來,第一個序列的最後一個元素被去掉了,沒有執行OnNext方法。
public func switchLatest() -> Observable<E.E> { return Switch(source: asObservable()) }
- 1
- 2
- 3
例子:
let var1 = Variable(0) let var2 = Variable(200) // var3 is like an Observable<Observable<Int>> let var3 = Variable(var1.asObservable()) let d = var3 .asObservable() .switchLatest() .subscribeNext { (str) -> Void in print(str) } var1.value = 1 var1.value = 2 var1.value = 3 var1.value = 4 var3.value = var2.asObservable() var2.value = 201 var1.value = 5 var1.value = 6 var1.value = 7
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
結果:
0 1 2 3 4 200 201
- 1
- 2
- 3
- 4
- 5
- 6
- 7
Variable是一個類,它裏面包含了一個Observable對象(BehaviorSubject),另外Variable中仍是實現了asObservable方法,而這個方法返回的就是裏面的Observable對象。Variable源碼很簡單,這裏不作特別的介紹。至於var1.value=5\6\7沒有執行,這個正是switchLatest的做用,當var1的做用完成後,切換到var2的Observable。那麼var1後續變化,是不會通知到var3的。
四、combineLatest
combineLatest和其餘方法同樣都是擴展自ObservableType協議,
public func combineLatest<R>(resultSelector: [Generator.Element.E] throws -> R) -> Observable<R> { return CombineLatestCollectionType(sources: self, resultSelector: resultSelector) }
- 1
- 2
- 3
例子:
let var1 = Variable(0) let var2 = Variable(200) // var3 is like an Observable<Observable<Int>> let var3 = Variable(var1.asObservable()) let d = var3 .asObservable() .switchLatest() .subscribeNext { (str) -> Void in print(str) } var1.value = 1 var1.value = 2 var1.value = 3 var1.value = 4 var3.value = var2.asObservable() var2.value = 201 var1.value = 5 var1.value = 6 var1.value = 7 Observable.combineLatest(var1.asObservable(), var2.asObservable()) { (as1, as2) -> Int in return as1 + as2 } .subscribeNext { (mon) -> Void in print(mon) } .dispose()
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
輸出結果:
208
- 1
簡單的分析一下,就能夠看出來,var2.value = 201; var1.value = 7,最後就是208;combineLatest是將多個Observable方法按照必定意願組合起來。它提供了開發者組合的方法,本身實現就好了。
五、zip
zip和combineLatest差很少,能夠將多個Observable合併起來處理,上面的例子一樣能夠用zip來實現,具體看例子,下面是zip方法的源碼
public static func zip<O1: ObservableType, O2: ObservableType>
(source1: O1, _ source2: O2, resultSelector: (O1.E, O2.E) throws -> E) -> Observable<E> { return Zip2( source1: source1.asObservable(), source2: source2.asObservable(), resultSelector: resultSelector ) }
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
例子:
Observable.zip(var1.asObservable(), var2.asObservable()) { (s1, s2) -> Int in return s1+s2 } .subscribeNext { (mon) -> Void in print(mon) } .dispose()
- 1
- 2
- 3
- 4
- 5
- 6
- 7
當全部的工做都作完以後,咱們要的觀察着對象進行發佈,那麼這個時候就要用到Connect,它是連接觀察着對象和被觀察着之間的一個連接器。它主要由如下方法:
一、multicast/publish
multicast和publish方法同樣,它們都是經過發佈/多播將Observable發出去,固然發出去必需要有一個鏈接(connect)的過程。只有連接的對象纔會收到publish/multicast的通知。下面是源碼:
public func multicast<S: SubjectType where S.SubjectObserverType.E == E>(subject: S) -> ConnectableObservable<S.E> { return ConnectableObservableAdapter(source: self.asObservable(), subject: subject) } public func publish() -> ConnectableObservable<E> { return self.multicast(PublishSubject()) }
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
例子:
let subject1 = PublishSubject<Int>()
_ = subject1
.subscribe { print("Subject \($0)") } let int1 = Observable<Int>.interval(1, scheduler: MainScheduler.instance) .multicast(subject1) _ = int1 .subscribe { print("first subscription \($0)") } int1.connect()
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
這個例子經過使用一個interval方法一直放送,而後經過multicast將subject1通知(多播)出去,int1.subscribe來接受subject1的變化。
輸出的結果:
Subject Next(0) first subscription Next(0) Subject Next(1) first subscription Next(1) Subject Next(2) first subscription Next(2) Subject Next(3) first subscription Next(3) Subject Next(4) first subscription Next(4)
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
固然publish也有相似的功能。
let subject1 = PublishSubject<Int>()
_ = subject1
.subscribe { print("Subject \($0)") } let int1 = Observable<Int>.interval(1, scheduler: MainScheduler.instance) .publish() _ = int1 .subscribe { print("first subscription \($0)") } int1.connect()
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
輸出結果:
first subscription Next(0) first subscription Next(1) first subscription Next(2) first subscription Next(3) first subscription Next(4) first subscription Next(5) first subscription Next(6) first subscription Next(7) first subscription Next(8) first subscription Next(9) first subscription Next(10)
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
publish方法只是將發佈出去的結果的變化告知,原變化沒有告知出來,固然這個事publish和multicast一點小區別。
二、refCount
refCount是結合了publish方法使用的,當Observable發佈出去,經過一個引用計數(refCount)方法來記錄,其實refCount就是至關於connect方法。refCount源碼以下,
public func refCount() -> Observable<E> { return RefCount(source: self) }
- 1
- 2
- 3
例子:
let int1 = Observable<Int>.interval(1, scheduler: MainScheduler.instance) .publish() .refCount() .subscribeNext({ (ss) -> Void in print(ss) })
- 1
- 2
- 3
- 4
- 5
- 6
輸出結果:
0 1 2 3 4 ...
- 1
- 2
- 3
- 4
- 5
- 6
三、replay/shareReplay
其實replay和multicast是同樣,在其源碼中其實也是調用multicast方法。起源碼以下:
public func replay(bufferSize: Int) -> ConnectableObservable<E> { return self.multicast(ReplaySubject.create(bufferSize: bufferSize)) } public func shareReplay(bufferSize: Int) -> Observable<E> { if bufferSize == 1 { return ShareReplay1(source: self.asObservable()) } else { return self.replay(bufferSize).refCount() } }
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
只不過,以前multicast傳入的PublishSubject,這裏是ReplaySubject,二者其實區別不大。
ReplaySubject中調用了ReplayMany,ReplayMany有一個事件隊列來輪循事件。ReplaySubject和ReplayMany比較簡單,這裏再也不敘述。
例子:
let int2 = Observable<Int>.interval(1, scheduler: MainScheduler.instance) .replay(1) _ = int2.subscribeNext({ (ss) -> Void in print(ss) }) int2.connect() _ = int2.subscribeNext({ (ss) -> Void in print("a..\(ss)") })
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
輸出結果:
0 a..0 1 a..1 2 a..2 3 a..3 4 a..4 5 a..5
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
其中shareReplay和replay是一回事,從源碼中酒能夠看出來,shareReplay是replay和refCount的組合,這裏再也不敘述。
至此爲止,關於Swift的一些基本用法和基本的概念都講述完畢,固然還有一些相關的擴展,可是這個都和上面講述原理是同樣的,你們能夠參考源碼理解。
RxSwift、RxCocoa
從本節開始講敘述與RxSwift配套使用的RxCocoa,RxCocoa主要是針對上層Ui作擴展,這些擴展主要是將上面所敘述的東西結合Ui控件使用。將RxSwift和RxCocoa結合起來使用對ios開發能夠節省很大開發的時間,RxCocoa擴展性和RxSwift同樣靈活,能夠針對不一樣的業務進行不一樣的擴展,很方便開發者使用。因爲本人對於ios sdk不是很瞭解。全部這裏不具體討論
總結
整篇博客寫下來字數達到2w多,能夠看成一個小論文了,做爲一個swift新手,我花了10天左右的時間熟悉swift基本語法,爲了可以更加熟悉swift,因而選擇研究RxSwift,已達到更加熟練掌握swift的目的,整篇博客下下來花費時間就長達一個多月,不少swift的基本東西看完以後就忘記,又不得不從新看起語法來。不過總算完成了,固然研究完RxSwift,我更加能夠熟練的swift進行ios開發。以後我會繼續研究ReactiveX其餘語言。期待愈來愈好吧。
參考連接:http://m.blog.csdn.net/article/details?id=50823558