RxSwift (二)序列核心邏輯分析react
RxSwift (三)Observable的建立,訂閱,銷燬git
RxSwift(五)(Rxswift對比swift,oc用法)github
RxSwift(七)Rxswift對比swift用法swift
RxSwift (十) 基礎使用篇 1- 序列,訂閱,銷燬api
RxSwift學習之十二 (基礎使用篇 3- UI控件擴展)數組
在計算機科學裏,函數式編程是一種編程範式,它將計算描述爲表達式求值並避免了狀態和數據改變。函數式編程裏面的「函數」指的是數學函數。 函數式編程思想:是將操做盡量寫在一塊兒!嵌套的函數!! 本質:就是往方法裏面傳入Block,方法中嵌套Block調用. 瞭解函數式編程首先要理解命令式編程,命令式編程每每是咱們大多數人固有的編程思惟,這中模式依賴於咱們但願本身的程序如何完成某項任務,而與之對應的是聲明式編程(Declarative Programming),它關心是任務的目的是什麼,而不是具體如何實現。這把程序員從那些細枝末節中解放了出來。而函數響應式編程正是聲明式編程的一個子範式。bash
編程函數和數學函數對比
數學函數的特色是對於每個自變量,存在惟一的因變量與之對應。而編程函數的特色是參數和返回值都不是必須的,函數可能依賴外界或者影響外界。命令式和函數式的區別
全部的命令式語言都被設計來高效地使用馮諾依曼體系結構的計算機。實際上,最初的命令式語言的目的就是取代彙編語言,對機器指令進行進一步抽象。所以,命令式語言帶有強烈的硬件結構特徵。命令式語言的核心特性有:模擬存儲單元的變量、基於傳輸操做的賦值語句,以及迭代形式的循環運算。命令式語言的基礎是語句(特別是賦值),它們經過修改存儲器的值而產生反作用(side effect)的方式去影響後續的計算。 函數式語言設計的基礎是Lambda表達式,函數式程序設計把程序的輸出定義爲其輸入的一個數學函數,在這裏沒有內部狀態,也沒有反作用。函數式語言進行計算的主要是將函數做用與給定參數之上。函數式語言沒有命令式語言所必需的那種變量,能夠沒有賦值語句,也能夠沒有循環。一個程序就是函數定義和函數應用的說明;一個程序的執行就是對函數應用的求值。
詳細教程參考:理解函數式編程
1. block表達式語法: ^返回值類型(參數列表){表達式} 實例 1: ^int (int count) { return count + 1; }; 2. 聲明類型變量的語法 //返回值類型(^變量名)(參數列表) = block表達式 實例 2: int (^sum)(int count) = (^int count) { return count + 1; }; 3. 做爲函數參數的語法 - (void)func(int (^)(int))block { } //定義block簡寫 typedef int (^Sumblock)(int); - (void)func(Sumblock)block { } 複製代碼
高階函數實際上就是函數的函數,它是全部函數式語言的性質。函數式語言中,函數做爲第一等公民,這也意味着你像定義或調用變量同樣去定義或調用函數。能夠在任何地方定義,在函數內或函數外,能夠將函數做爲參數或者返回值。在數學和計算機科學中,高階函數是至少知足下列一個條件的函數:
1 接受一個或多個函數做爲輸入 2 輸出一個函數 在數學中它們也叫作算子(運算符)或泛函。微積分中的導數就是常見的例子,由於它映射一個函數到另外一個函數。
關於Swift的相關高階函數能夠參考個人一篇博客:Swift的高階函數
實例 3
@interface Person : NSObject - (void)eat; - (void)sleep; @end @implementation Person - (void)eat { NSLog(@"%s", __FUNCTION__); } - (void)sleep { NSLog(@"%s", __FUNCTION__); } @end ViewController.m Person *person = [[Person alloc] init]; //調用時必須單個調用,並且不能任意組合順序 /** 普通的調用方式 */ [person eat]; [person sleep]; 複製代碼
實例 4
// 鏈式寫法 Person.h - (Person *)eat; - (Person *)sleep; Person.m - (Person *)eat { NSLog(@"%s", __FUNCTION__); return self; } - (Person *)sleep { NSLog(@"%s", __FUNCTION__); return self; } ViewController.m Person *person = [[Person alloc] init]; /** 鏈式寫法,這樣不只能夠無限調用,並且能夠控制順序 */ [[person eat] sleep]; [[person sleep] eat]; [[person eat] eat]; /** 經過」點」語法,將須要執行的代碼塊連續的書寫下去,就是鏈式編程.它能使代碼簡單易讀,書寫方便 */ person.eat.sleep.eat.sleep.sleep; 複製代碼
實例 5
Person.h - (Person *(^)(NSString *food))eat3; - (Person *(^)(NSString *where))sleep3; Person.m - (Person *(^)(NSString *food))eat3 { return ^(NSString *food) { NSLog(@"吃:%@ ",food); return self; }; } - (Person *(^)(NSString *where))sleep3 { return ^(NSString *where) { NSLog(@"睡在:%@上",where); return self; }; } ViewController.m Person *person = [[Person alloc] init]; /** 鏈式 + 函數式寫法 */ person.sleep3(@"牀").eat3(@"蘋果").eat3(@"香蕉").sleep3(@"沙發"); 返回值block不帶參數,()不傳參便可 person.sleep3().eat3().eat3().sleep3(); 複製代碼
函數式編程思想:是將操做盡量寫在一塊兒!嵌套的函數!! 本質:就是往方法裏面傳入Block,方法中嵌套Block調用.
以下代碼:
實例 6
/** 返回調用者自己,獲取其它屬性和方法 */ - (Person *)calculator:(NSInteger(^)(NSInteger result))block { _result = block(_result); return self; } Person *person = [[Person alloc] init]; /** 計算器 */ Person *calculatPerson = [person calculator:^NSInteger(NSInteger result) { result = result + 10; result = result*10; return result; }]; NSLog(@"%ld", calculatPerson.result); 複製代碼
簡單理解就是將block塊做爲函數的參數,能夠異步在參數block回調,將函數運行所須要的一些信息或者產出的一些結果經過block傳遞。代碼邏輯清晰。
列如:
實例 7
a = 2 b = 2 c = a + b // c is 4 b = 3 // now what is the value of c? 複製代碼
在響應式編程中,c的值將會隨着b的值改變而改變。 FRP提供了一種信號機制來實現這樣的效果,經過信號來記錄值的變化。信號能夠被疊加、分割或合併。經過對信號的組合,就不須要去監聽某個值或事件. 以下圖:
能夠把信號想象成水龍頭,只不過裏面不是水,而是玻璃球(value),直徑跟水管的內徑同樣,這樣就能保證玻璃球是依次排列,不會出現並排的狀況(數據都是線性處理的,不會出現併發狀況)。水龍頭的開關默認是關的,除非有了接收方(subscriber),纔會打開。這樣只要有新的玻璃球進來,就會自動傳送給接收方。能夠在水龍頭上加一個過濾嘴(filter),不符合的不讓經過,也能夠加一個改動裝置,把球改變成符合本身的需求(map)。也能夠把多個水龍頭合併成一個新的水龍頭(combineLatest:reduce:),這樣只要其中的一個水龍頭有玻璃球出來,這個新合併的水龍頭就會獲得這個球。
什麼是函數響應式編程呢? 能夠一句話歸納:
特色:代碼簡潔(複用)、易於理解(接近天然語言)、便於代碼管理 例如:
doSomething1().doSomething2().doSomething3()
一系列的動做簡潔明瞭,相互不干擾,能夠合併使用也能夠分開單獨使用.
實例 8:下面代碼清楚的講解了函數式編程與普通處理方式的區別。
//例1: //遍歷數組(要求:1.首先獲取 > 3的數字;2.獲取的數字以後 + 1;3.全部數字中的偶數;4.可讀性 清晰度) let array = [1,2,3,4,5,6,7] //普通處理方式: for num in array{ if num > 3{ let number = num + 1 if (number % 2 == 0) { print(number) } } } //函數式: array.filter{ $0 > 3} .filter{ ($0+1) % 2 == 0 } .forEach { print($0) } 複製代碼
從代碼中咱們能夠明顯的對比出代碼的優劣,普通代碼實現for循環須要層層嵌套,很是累贅,可讀性不高;而利用高階函數實現的函數式編碼代碼清晰簡潔。你能夠註釋掉中間的一個.filter{ ($0+1) % 2 == 0 }
代碼同樣能夠正常運行,很是靈活。
對象對某一數據流變化作出響應的這種編碼方式稱爲響應式。如對象A和對象B,A和B有一種「說不清」的關係,A要時刻監控B的行爲,對B的變化也作出相應的變化。那麼怎麼實現呢?對於B來講,B作任何事都須要向A彙報,這樣A就能實時監控B的行爲,並響應。在iOS開發中咱們常常會響應一些事件,button、tap、textField、textView、notifaction、KVO、NSTimer等等這些,都須要作響應監聽,響應後都須要在對應的響應事件中去作處理,而原生開發中,觸發對象與響應方法是分離的,如button的初始化和點擊響應方法是分離的。
在程序開發中:a = b + c賦值以後 b 或者 c 的值變化後,a 的值不會跟着變化; 響應式編程目標就是:若是 b 或者 c 的數值發生變化,a 的數值會同時發生變化; 響應編程的經典案例:KVO 響應式編程框架:ReactiveCocoa(RAC) 詳細學習能夠參考:ReactiveCocoa博客
響應式編程的優勢:
1) 開發過程當中,狀態以及狀態之間依賴過多,RAC更加有效率地處理事件流,而無需顯式去管理狀態。在OO或者過程式編程中,狀態變化是最難跟蹤,最頭痛的事。這個也是最重要的一點。 2) 減小變量的使用,因爲它跟蹤狀態和值的變化,所以不須要再申明變量不斷地觀察狀態和更新值。 3) 提供統一的消息傳遞機制,將oc中的通知,action,KVO以及其它全部UIControl事件的變化都進行監控,當變化發生時,就會傳遞事件和值。 4) 當值隨着事件變換時,可使用map,filter,reduce等函數便利地對值進行變換操做。
下面統一對比Rxswift的UIButton事件實現和原生的實現方式,瞭解一下響應式編程的優勢。
實例 9 :對比傳統UIButton響應事件寫法和Rxswift中函數響應式寫法的區別。
1. 傳統寫法 //傳統寫法,UI代碼和邏輯是分開的,爲了監聽一個按鈕的響應事件,咱們須要在另一處地方實現。這樣可讀性很差,代碼繁瑣。 button = UIButton() button.addTarget(self, action: #selector(text), for: .touchUpInside) @objc func text() { print("Button clicked!") } 2. Rxswift寫法 //Rxswift的實現就簡單多了,並且目標很是明確,就是三部曲:1建立序列,2,訂閱響應消息,3.析構銷燬 //當你訂閱了響應消息後,只要序列發生了變化,訂閱的消息總會觸發,以下面的代碼,當你訂閱了按鈕的點擊事件後,每次點擊按鈕,訂閱的消息subscibe就會收到一次。 self.button.rx.tap //序列,這裏默認的序列是默認是.onTouchUpInside事件 .subscribe(onNext: { () in //訂閱 print("Button clicked!") }, onError: { (error) in //當Rxswift的事件鏈走不通,會回調這個onError,通知錯誤 print("錯誤信息") }, onCompleted: {//當Rxswift訂閱的全部事件鏈走完了,會回調這個onCompleted,告知執行完畢,這個和onError是對立互斥的,二者只會發生一個。 print("訂閱完成") }) .disposed(by: DisposeBag()) //銷燬 複製代碼
這裏 taps 就是按鈕點擊事件的序列。而後咱們經過打印「Button clicked!」,來對每一次點擊事件作出響應。這種編程方式叫作響應式編程。咱們結合函數式編程以及響應式編程就獲得了函數響應式編程。經過不一樣的構建函數,來建立所須要的數據序列。最後經過適當的方式來響應這個序列。這就是函數響應式編程。
經過上面的代碼對比分析,咱們能夠看出Rxswift寫出來的代碼是多麼簡介,可讀性邏輯清晰,看每一行就知道在作什麼,不須要想原生UI響應和邏輯分開的,看代碼須要跳來跳去的。經過一個這麼小小的實例代碼,咱們初次見識到了Rxswift的強大,然而這個只是Rxswift框架功能的冰山一角。我強烈推薦你們都來學習下這麼優秀的開源框架。
下面咱們來窺探一下Rxswift究竟是個什麼東東,爲啥這麼牛逼。
An API for asynchronous programming with observable streams 經過可觀察的流實現異步編程的一種API(不明白?嗯,看完全部的例子再讀一篇) ReactiveX is more than an API, it's an idea and a breakthrough in programming. It has inspired several other APIs, frameworks, and even programming languages. ReactiveX 不只僅是一種 API 那麼簡單,它更是一種編程思想的突破。它已經影響了其餘 API,frameworks,以及編程語言。
總的一句話歸納: ReactiveX(Reactive Extensions)是經過可觀察的流實現異步編程的一種API,它結合了觀察者模式、迭代器模式和函數式編程的精華。RxSwift 是 ReactiveX 編程思想的一種實現,幾乎每一種語言都會有那麼一個 Rx[xxxx] 框架,好比Rxswift, RxJava,RxJS 等等。
Rx是一個家族,他們把函數響應式編程思想使用到了極致,要學習Rxswift必須先了解一下一些基本概念。
Observable 直譯爲可觀察的,它在 RxSwift 起到了舉足輕重的做用,在整個 RxSwift 的使用過程當中你會常常與它打交道。若是你使用過 RAC ,它如同 Signal 同樣。RxSwift 中關鍵點就是在於如何把普通的數據或者事件變成可觀察的,這樣當某些數據或事件有變化的時候就會通知它的訂閱者。RxSwift 中提供不少種建立 Observable 建立方法。好比:From、never、empty 和 create 等,更多建立方法。訂閱者能夠收到 3 個事件,onNext、onError 和 onCompleted,每一個 Observable 都應該至少有一個 onError 或 onCompleted 事件,onNext 表示它傳給下一個接收者時的數據流。
在Rxswift中咱們把它理解爲:觀察者序列,(就是一系列能夠被監聽,觀察的事件等,當你訂閱了這些序列,你就能夠在閉包中監聽到對應的事件)經過下面一個圖能夠更好的理解Observable:
序列監聽有三個步驟:1.建立序列,2訂閱序列,3.銷燬序列。當建立序列,並訂閱了序列後,只要某個事件發送了序列消息,就能夠在訂閱的閉包裏面監聽到。下面咱們一個小的實例來加深理解:實例 10
//1:建立序列 //利用函數式編程思想,在create()構造函數中傳入一個閉包,這個閉包會被類對象保存起來,後續每一個時間,事件觸發的時候會回調這個傳入的閉包,這樣就像鏈接了一個鏈條同樣,順着鏈條就可找到須要調用的閉包。 let ob = Observable<Any>.create { (observer) -> Disposable in // 3:發送信號 obserber.onNext([1,2,3,4]) obserber.onCompleted() // obserber.onError(NSError.init(domain: "error!", code: 10087, userInfo: nil)) return Disposables.create() //2:訂閱信息 //當咱們訂閱了Observable的消息後,只要Observable的事件觸發,都會經過OnNext這個閉包告訴咱們。 let _ = ob.subscribe(onNext: { (text) in print("訂閱到:\(text)") //這裏會監聽到訂閱的Observable消息 }, onError: { (error) in print("error: \(error)") //當發生錯誤時,會回調這裏 }, onCompleted: { // 當全部序列執行完畢時,會回調這裏。 print("完成") }) { print("銷燬") } 複製代碼
若是你仔細觀察這裏的代碼,會有一個疑問:從訂閱中心observer,一直在用的序列,序列內部的代碼是未曾看到的。爲何從序列閉包裏面的發出信號,訂閱信號的閉包裏面可以訂閱到? 這個問題咱們將會在Rxswift 序列核心邏輯淺析
中詳細分析
Observable 建立後,可能爲了知足某些需求須要修改它,這時就須要用到操做符。RxSwift 提供了很是多的操做符,固然沒必要要一一掌握這些操做符,使用的時候查一下便可,固然常見的操做符必需要掌握,好比 map、flatMap 、create 、filter 等。Operators詳細介紹
實例 11: 這個例子主要把查找數組中的字符串 kongyulu,並顯示到 Label 上。
override func viewDidLoad() { super.viewDidLoad() DispatchQueue.global().async { self.from() } } func from() { Observable.from(["haha", "kongyulu", "cc", "wswy", "Rx"]) .subscribeOn(MainScheduler.instance) .filter({ (text) -> Bool in return text == "kongyulu" }) .map({ (text) -> String in return "my name is: " + text }) .subscribe(onNext: { [weak self] (text) in self?.nickNameLabel.text = text }) .disposed(by: disposeBag) } 複製代碼
迭代模式 Iterator: 這樣集合或者序列中的值就能夠進行遍歷了。
調度器 Scheduler: 爲了提高用戶體驗,或其它目的,有些操做須要放到特定的線程去執行,好比 UI 操做須要放到主線程,這就涉及到了調度器。
若是你想給 Observable 操做符鏈添加多線程功能,你能夠指定操做符(或者特定的Observable)在特定的調度器(Scheduler)上執行。對於 ReactiveX 中可觀察對象操做符來講,它有時會攜帶一個調度器做爲參數,這樣能夠指定可觀察對象在哪個線程中執行。而默認的狀況下,某些可觀察對象是在訂閱者訂閱時的那個線程中執行。SubscribeOn 能夠改變可觀察對象該在那個調度器中執行。ObserveOn 用來改變給訂閱者發送通知時所在的調度器。這樣就可使可觀察對象想在那個調度器中執行就在那個調度器中執行,不受約束,而這些細節是不被調用者所關心的。猶如 GCD 同樣,你只管使用,底層線程是咋麼建立的,你沒必要關心。
Rxswift就是一個框架而已,經過pod安裝跟其餘框架沒有什麼差別 在podfile中寫入
pod 'RxSwift' pod 'RxCocoa' 複製代碼
命令行執行pod install
就能夠了。
在實例 10
中咱們留下一個疑問:從訂閱中心observer,一直在用的序列,序列內部的代碼是未曾看到的。爲何從序列閉包裏面的發出信號,訂閱信號的閉包裏面可以訂閱到?
下面咱們將一步步經過Rxswift的源碼分析來揭開這個疑團。
在分析代碼前咱們先回顧一下序列的三部曲:1.建立序列,2,訂閱序列,3,銷燬序列,其中在2中訂閱序列以後,爲何咱們就能監聽到序列呢,理解了這個邏輯後,咱們的疑問就會天然而解。
先看一個簡單的類圖:
接着咱們來研究一下這段代碼的執行邏輯1:建立序列 // AnonymousObservable -> producer.subscriber -> run // 保存閉包 - 函數式 保存 _subscribeHandler // let ob = Observable<Any>.create { (obserber) -> Disposable in // 3:發送信號 obserber.onNext("框架班級") obserber.onCompleted() // obserber.onError(NSError.init(domain: "coocieeror", code: 10087, userInfo: nil)) return Disposables.create() } 2:訂閱信號 // AnonymousObserver - event .next -> onNext() // _eventHandler // AnonymousObservable._subscribeHandler(observer) // 銷燬 let _ = ob.subscribe(onNext: { (text) in print("訂閱到:\(text)") }, onError: { (error) in print("error: \(error)") }, onCompleted: { print("完成") }) { print("銷燬") } 複製代碼
用一個圖來表達上面這段代碼執行時,Rxswift框架作的事情以下:
如今咱們來研究一下源碼的具體實現:let ob = Observable<Any>.create { (obserber) -> Disposable in }
這行代碼時,進入Rxswift源碼能夠看到實際調用了:extension ObservableType { /** Creates an observable sequence from a specified subscribe method implementation. - seealso: [create operator on reactivex.io](http://reactivex.io/documentation/operators/create.html) - parameter subscribe: Implementation of the resulting observable sequence's `subscribe` method. - returns: The observable sequence with the specified implementation for the `subscribe` method. */ public static func create(_ subscribe: @escaping (RxSwift.AnyObserver<Self.E>) -> Disposable) -> RxSwift.Observable<Self.E> } 複製代碼
let _ = ob.subscribe(onNext: { (text) }, onError: { (error) in }, onCompleted: { })
這行代碼時,咱們就訂閱了消息,咱們分析源碼得知,Rxswift其實是調用了
咱們進入源碼查看該方法的實現:/** Subscribes an element handler, an error handler, a completion handler and disposed handler to an observable sequence. - parameter onNext: Action to invoke for each element in the observable sequence. - parameter onError: Action to invoke upon errored termination of the observable sequence. - parameter onCompleted: Action to invoke upon graceful termination of the observable sequence. - parameter onDisposed: Action to invoke upon any type of termination of sequence (if the sequence has gracefully completed, errored, or if the generation is canceled by disposing subscription). - returns: Subscription object used to unsubscribe from the observable sequence. */ public func subscribe(onNext: ((E) -> Void)? = nil, onError: ((Swift.Error) -> Void)? = nil, onCompleted: (() -> Void)? = nil, onDisposed: (() -> Void)? = nil) -> Disposable { let disposable: Disposable if let disposed = onDisposed { disposable = Disposables.create(with: disposed) } else { disposable = Disposables.create() } #if DEBUG let synchronizationTracker = SynchronizationTracker() #endif let callStack = Hooks.recordCallStackOnError ? Hooks.customCaptureSubscriptionCallstack() : [] let observer = AnonymousObserver<E> { event in #if DEBUG synchronizationTracker.register(synchronizationErrorMessage: .default) defer { synchronizationTracker.unregister() } #endif switch event { case .next(let value): onNext?(value) case .error(let error): if let onError = onError { onError(error) } else { Hooks.defaultErrorHandler(callStack, error) } disposable.dispose() case .completed: onCompleted?() disposable.dispose() } } return Disposables.create( self.asObservable().subscribe(observer), disposable ) } 複製代碼
分析源碼,能夠看到,咱們在訂閱消息註冊的回調:onNext, onError,onComplete, onDisposed這個四個閉包都是做爲函數的參數,在調用ob.subscribe()這個方法是做爲參數傳入進來的,在上面代碼中定義了一個逃逸閉包 let observer = AnonymousObserver {} 在這個閉包內部,當調用這個逃逸閉包的調用者傳遞不一樣的event就會調用咱們傳入的相應閉包,這樣咱們就監聽到了訂閱的消息。以下圖:
這裏咱們能夠得知,只有咱們的觀察者有了事件變化,只須要通知上面代碼定義的observer這個參數閉包就能夠了。
如今咱們前面提到的疑問的答案來了: 在源碼中咱們只看到了return Disposables.create( self.asObservable().subscribe(observer), disposable
就結束了。 而後玄機就在這句代碼,self.asObservable()其實就是咱們的建立的序列,而subscribe()就回調了咱們傳入的observer閉包,而在這個observer就會調用咱們船人都監聽序列消息閉包onNext(), onError(),onCompleted().
接下來咱們能夠分析下 subscribe(observer)是如何調用observer的
經過上面的分析,到這裏咱們總結一下大體流程:
1.調用create()建立序列時,首先建立了一個AnonymousObserver對象,在初始化時傳遞了一個閉包做爲參數而且保存下來self._eventHandler = eventHandler。 AnonymousObserver是匿名觀察者,用於存儲和處理事件的閉包。 2. 而後在最後有self.asObservable().subscribe(observer)這樣一行代碼,asObservable返回的是對象自己。 3. 而後調用subscribe這個函數而且把建立的AnonymousObserver對象傳遞過去,會來到AnonymousObservable這個類裏面,可是發現這個類裏面沒有subscribe方法,咱們往父類Producer裏面找到這個方法。 4. 在Producer源碼裏面咱們發現調用了run方法,也就是AnonymousObservable這個類裏面的run方法,把observer做爲參數傳過來。
return CurrentThreadScheduler.instance.schedule(()) { _ in let disposer = SinkDisposer() let sinkAndSubscription = self.run(observer, cancel: disposer) disposer.setSinkAndSubscription(sink: sinkAndSubscription.sink, subscription: sinkAndSubscription.subscription) return disposer } 複製代碼
在run方法中,先建立一個AnonymousObservableSink對象並持有observer,而後調用這個對象的run方法把self傳遞過去,也就是把observable做爲參數。 AnonymousObservableSink這個類將可觀察者Observable和觀察者Observer連接起來,實現事件的傳遞,起到一個橋樑的做用。
//parent就是咱們建立的序列。
func run(_ parent: Parent) -> Disposable {
return parent._subscribeHandler(AnyObserver(self))
}
複製代碼
在這裏觸發了_subscribeHandler的調用,這裏的_subscribeHandler就是以前create函數參數傳入的閉包。也就是這段閉包代碼:let ob = Observable<Any>.create { (obserber) -> Disposable in // 3:發送信號 obserber.onNext("框架班級") obserber.onCompleted() // obserber.onError(NSError.init(domain: "coocieeror", code: 10087, userInfo: nil)) return Disposables.create() }
注意:parent._subscribeHandler(AnyObserver(self)) 這行代碼把self轉換成AnyObserver對象,也就是把AnonymousObservableSink對象轉換成AnyObserver對象。
public struct AnyObserver<Element> : ObserverType { ... /// Construct an instance whose `on(event)` calls `observer.on(event)` /// /// - parameter observer: Observer that receives sequence events. public init<O : ObserverType>(_ observer: O) where O.E == Element { self.observer = observer.on } /// Send `event` to this observer. /// /// - parameter event: Event instance. public func on(_ event: Event<Element>) { return self.observer(event) } /// Erases type of observer and returns canonical observer. /// /// - returns: type erased observer. public func asObserver() -> AnyObserver<E> { return self } } 複製代碼
經過上面這段源碼咱們也看見public func asObserver() -> AnyObserver<E> { return self }
這個函數返回self 也就明白了以前說的 asObserver()爲何就是咱們建立的序列。也就是這個很是牛逼的代碼return Disposables.create( self.asObservable().subscribe(observer), disposable )
這裏的self.asObservable() 實際就是咱們建立的序列。
具體源碼以下:
1. AnonymousObservableSink類on方法 func on(_ event: Event<E>) { #if DEBUG self._synchronizationTracker.register(synchronizationErrorMessage: .default) defer { self._synchronizationTracker.unregister() } #endif switch event { case .next: if load(self._isStopped) == 1 { return } self.forwardOn(event) case .error, .completed: if fetchOr(self._isStopped, 1) == 0 { self.forwardOn(event) self.dispose() } } 2. Sink類的forwardOn方法 final func forwardOn(_ event: Event<O.E>) { #if DEBUG self._synchronizationTracker.register(synchronizationErrorMessage: .default) defer { self._synchronizationTracker.unregister() } #endif if isFlagSet(self._disposed, 1) { return } self._observer.on(event) } 3. ObserverBase的on方法 func on(_ event: Event<E>) { switch event { case .next: if load(self._isStopped) == 0 { self.onCore(event) } case .error, .completed: if fetchOr(self._isStopped, 1) == 0 { self.onCore(event) } } } 4. onCore()方法最終會調用以前保存的閉包_eventHandler(event) override func onCore(_ event: Event<Element>) { return self._eventHandler(event) } 複製代碼
最後有兩張圖總結序列事件傳遞的核心流程