RxSwift(四)高階函數

Rxswift(一)函數響應式編程思想git

RxSwift (二)序列核心邏輯分析github

RxSwift (三)Observable的建立,訂閱,銷燬編程

RxSwift(四)高階函數swift

RxSwift(五)(Rxswift對比swift,oc用法)數組

Rxswift (六)銷燬者Dispose源碼分析緩存

RxSwift(七)Rxswift對比swift用法服務器

RxSwift (十) 基礎使用篇 1- 序列,訂閱,銷燬markdown

RxSwift學習之十二 (基礎使用篇 3- UI控件擴展) @TOC網絡

咱們知道Swift中有不少高階函數,很是好用,並且效率都很高,如咱們常用的map,fliter,flatmap等等。詳情能夠參考我以前的一篇博客:Swift的高階函數閉包

本篇文章主要講解Rxswift中的高階函數. 主要講解高階函數的使用,而後展開來探索具體源碼實現。

Rwswift高階函數簡介

1. 高階函數種類

  • Rxswift有哪些高階函數呢?經過下面這張圖你能夠大體瞭解一下:
    Rxswift高階函數-kyl

2. Rxswift高階函數簡介

2.1 組合操做符

  • startWith: 在開始從可觀測源發出元素以前,發出知道的元素序列
  • merge:將員可觀測序列中的元素組合成一個新的可觀測序列,並將像每一個源可觀測序列發出元素同樣發出每一個元素。
  • zip:將多達8個源可觀測序列組合成一個新的可觀測序列,並將從組合的可觀測序列中發射出對應索引處每一個源可 觀測序列的元素。
  • combineLatest:將8個源可觀測序列組合成一個新的可觀測序列,並將開始發出聯合可觀測序列的每一個源的最新元素可觀測序列一旦全部排放源序列至少有一個 元素,而且當源可觀測序列發出的任何一個新元素。
  • switchLatest:將可觀測序列發出的元素轉換爲可觀察序列,並返回轉換後的新可觀察序列。

2.2 映射操做符

  • map:轉換閉包應用於可觀察序列發出的元素,並返回轉換後的元素的新可觀察序列。
  • flatMap:將可觀測序列 發送的元素轉換爲可觀測序列,並將兩個可觀測的序列 發送合併爲一個可觀測的序列。 例如:當你有一個可觀測的序列,他自己能夠發出可觀察序列,你想可以對任何一個可觀察序列的新發射作出反應(序列中的序列:好比網絡序列中還有模型序列)
  • flatMapLatest:同flatMap,區別就是flatMapLatest只會從最近的內部可觀察序列發射元素。
  • scan:從初始就帶有一個默認值開始,而後對可觀察序列發出的每一個元素應用累加器閉包,並以單個元素可觀察序列的形式返回每一箇中間結果。

2.3 過濾條件操做符

  • fliter:僅從知足指定條件的可 觀察序列中發出那些元素。
  • distincUntilChanged:抑制可觀察序列發出的順序重複元素。
  • elementAt:僅在可觀察序列發出的全部元素的指定索引處發出元素。
  • single:只發出可 觀察序列發出的第一個元素(或知足條件的第一個元素)。若是可觀察序列發出多個元素,將拋出一個錯誤。
  • take:只從一個可觀察序列的開始發出指定數量的元素。上面single只有一個序列,在實際開發中受到侷限,這裏引出take想幾個就幾個。
  • takeLast:僅從可觀察序列的末尾發出指定數量的元素。
  • takeWhile:只要指定的條件只爲true,就能夠從可觀察序列的開始發出元素。
  • takeUntil:從源可觀察序列發出元素,直到參考可觀察序列發出元素(這個很重要,應用很是頻繁,好比我頁面銷燬了,就不能獲取值了,如:cell重用)
  • skip:從源可觀察序列發出元素,直到參考可觀察序列發出元素。(這個很重要,應用頻繁)
  • skipUntil:抑制從源可觀察序列發出元素,直到參考可觀察序列產生。

2.5 集合控制操做符

  • toArray:將一個可觀察序列轉換爲一個數組,將該數組做爲一個新的單元素可觀察序列發出,而後終止。
  • reduce:從一個設置的初始化值開始,而後對一個可觀察序列發出的全部元素應用累計器閉包,並以單個元素可觀察序列的形式返回聚合結果。(相似於scan)
  • concat:以順序方式鏈接來自一個可觀察序列的內部可觀察序列的元素,在從下一個序列發出元素以前,等待每一個序列成功 終止。(用來控制順序)

2.6 從序列錯誤中恢復的操做符

  • catchErrorJustReturn:從錯誤事件中恢復,方法是返回一個可觀察到的序列,該序列發出單個元素,而後終止。
  • catchError:經過切換到提供的恢復可觀察序列,從錯誤事件中恢復。
  • retry:經過 無限地從新訂閱可觀察序列來恢復重複的錯誤事件。
  • retry(_ : ):經過從新訂閱可觀察到的序列,重複地從錯誤事件中恢復,直到重試次數達到max未遂計數。

2.7 debug Rx流程操做符

  • debug:打印全部訂閱,事件和處理。
  • RxSwift.Resources.total:操做符 提供全部Rx資源分配的計數,這對於在開發期間檢測內存泄漏很是有用。

2.8 連接操做符

  • multicast:將源可觀察序列轉換爲可鏈接序列,並經過指定的主題廣播它的發射。
  • replay:將源可觀察序列轉換爲可鏈接的序列,並將向每一個新訂閱服務器重放一塊兒排放的緩存大小。(首先擁有和publish同樣的能力,共享Observable 和sequence,其次使用replay還須要咱們傳入一個參數:buffer size 來緩存已經發送的事件,當有了新的訂閱者訂閱了,會把緩存的事件發送給新的訂閱者)
  • push:將源可觀察序列轉換爲可鏈接序列。(共享一個Observable的實際序列,避免建立多個Observable 和sequence.注意:須要調用conect以後纔會開始發送事件)

Rwswift高階函數使用

2 Rx高階函數實例講解

本篇博客涉及的實例都放在一個項目源碼裏面:點擊此處下載高階函數使用源碼

2.1 組合操做符

startWith

  • 在開始從可觀測源發出元素以前,發出知道的元素序列

實例 1: 代碼:

// *** startWith : 在開始從可觀察源發出元素以前,發出指定的元素序列
func test_startWith()  {
        print("*****startWith*****")
        Observable.of("1", "2", "3", "4")
            .startWith("A")
            .startWith("B")
            .startWith("C", "a", "b")
            .subscribe(onNext: { print($0) })
            .disposed(by: disposeBag)
        //效果: CabBA1234
 }
複製代碼

運行結果:

image

merge

將員可觀測序列中的元素組合成一個新的可觀測序列,並將像每一個源可觀測序列發出元素同樣發出每一個元素。

實例 5: 代碼:

// **** merge : 將源可觀察序列中的元素組合成一個新的可觀察序列,並將像每一個源可觀察序列發出元素同樣發出每一個元素
func test_merge()  {
        print("*****merge*****")
        let subject1 = PublishSubject<String>()
        let subject2 = PublishSubject<String>()
        // merge
        Observable.of(subject1,subject2)
            .merge()
            .subscribe(onNext: { print($0)})
            .disposed(by: disposeBag)
        
        //- 下面任何一個響應都會勾起新序列響應
        subject1.onNext("K")
        subject1.onNext("o")
        subject2.onNext("n")
        subject2.onNext("g")
        subject1.onNext("Y")
        subject2.onNext("u")
        subject1.onNext("L")
        subject2.onNext("u")
    }
複製代碼

運行結果:

image

zip

  • 將多達8個源可觀測序列組合成一個新的可觀測序列,並將從組合的可觀測序列中發射出對應索引處每一個源可 觀測序列的元素。

實例 10: 代碼:

// *** zip: 將多達8個源可觀測序列組合成一個新的可觀測序列,
// 並將從組合的可觀測序列中發射出對應索引處每一個源可觀測序列的元素
func test_zip()  {
        print("*****zip*****")
        let stringSubject = PublishSubject<String>()
        let intSubject = PublishSubject<Int>()
        
        Observable.zip(stringSubject, intSubject) { stringElement , intElement in
            "\(stringElement) \(intElement)"
        }
            .subscribe(onNext: {print($0)})
            .disposed(by: disposeBag)
        
        stringSubject.onNext("K")
        stringSubject.onNext("o") //到這裏存儲了K o 可是很差響應,除非有另外一個響應
        
        intSubject.onNext(1) //勾出一個
        intSubject.onNext(2) //勾出另外一個
        stringSubject.onNext("o") //再存一個
        intSubject.onNext(3) //勾出一個
        
        //總結: 只有兩個序列同時有值的 時候纔會響應,不然只會存值。
        
    }
複製代碼

運行結果:

image

combineLatest

  • 將8個源可觀測序列組合成一個新的可觀測序列,並將開始發出聯合可觀測序列的每一個源的最新元素可觀測序列一旦全部排放源序列至少有一個 元素,而且當源可觀測序列發出的任何一個新元素。

實例 15: 代碼:

/// combineLatest:將8源可觀測序列組合成一個新的觀測序列,
    ///並將開始發出聯合觀測序列的每一個源的最新元素可觀測序列一旦全部排放源序列至少有一個元素,
    ///而且當源可觀測序列發出的任何一個新元素
    func test_combineLatest()  {
        print("*****combineLatest*****")
        let stringSub =  PublishSubject<String>()
        let intSub = PublishSubject<Int>()
        //合併序列
        Observable.combineLatest(stringSub, intSub) { strE, intE in
            "\(strE) \(intE)"
        }
        .subscribe(onNext: {print($0)})
        .disposed(by: disposeBag)
        
        stringSub.onNext("K") //存一個K
        stringSub.onNext("Y") //存了一個覆蓋 - 和zip不同
        intSub.onNext(1) //發現strOB 觀察者存在值 Y(上面的Y覆蓋了K) 則 響應 Y 1
        intSub.onNext(2) //覆蓋1 -> 2, 發現strOB有值YK 響應 Y 2
        stringSub.onNext("Kongyulu") // 覆蓋Y -> Kongyulu 發現intOB有值 2 響應:Kongyulu 2
        
        //總結:1. combineLatest 比較zip 會覆蓋
        // 2. 應用很是頻繁: 好比帳戶和密碼同時知足->才能登錄. 不關係帳戶密碼怎麼變化的只要查看最後有值就能夠 loginEnable
    }
複製代碼

運行結果:

image

switchLatest

  • 將可觀測序列發出的元素轉換爲可觀察序列,並返回轉換後的新可觀察序列。

實例 20: 代碼:

// 將可觀察序列發出的元素轉換爲可觀察序列,並從最近的內部可觀察序列發出元素
    func test_switchLatest()  {
        // switchLatest : 將可觀察序列發出的元素轉換爲可觀察序列,並從最近的內部可觀察序列發出元素
        print("*****switchLatest*****")
        let switchLatestSub1 = BehaviorSubject(value: "K")
        let switchLatestSub2 = BehaviorSubject(value: "1")
        //注意下面這句代碼:這裏選擇了switchLatestSub1就不會再監聽switchLatestSub2
        let switchLatestSub = BehaviorSubject(value: switchLatestSub1)
        
        switchLatestSub.asObservable()
        .switchLatest()
        .subscribe(onNext: {print($0)})
        .disposed(by: disposeBag)
        
        switchLatestSub1.onNext("Y")
        switchLatestSub1.onNext("_")
        switchLatestSub2.onNext("2")
        switchLatestSub2.onNext("3") // 2,3都不會監聽,可是默認保存有2 覆蓋1 3覆蓋2
        
    }
複製代碼

運行結果:

image
代碼:

// 將可觀察序列發出的元素轉換爲可觀察序列,並從最近的內部可觀察序列發出元素
    func test_switchLatest()  {
        // switchLatest : 將可觀察序列發出的元素轉換爲可觀察序列,並從最近的內部可觀察序列發出元素
        print("*****switchLatest*****")
        let switchLatestSub1 = BehaviorSubject(value: "K")
        let switchLatestSub2 = BehaviorSubject(value: "1")
        //注意下面這句代碼:這裏選擇了switchLatestSub1就不會再監聽switchLatestSub2
        let switchLatestSub = BehaviorSubject(value: switchLatestSub1)
        
        switchLatestSub.asObservable()
        .switchLatest()
        .subscribe(onNext: {print($0)})
        .disposed(by: disposeBag)
        
        switchLatestSub1.onNext("Y")
        switchLatestSub1.onNext("_")
        switchLatestSub2.onNext("2")
        switchLatestSub2.onNext("3") // 2,3都不會監聽,可是默認保存有2 覆蓋1 3覆蓋2
        switchLatestSub.onNext(switchLatestSub2) //切換到 switchLatestSub2
        switchLatestSub1.onNext("*") //因爲上面切換到了switchLatestSub2,因此switchLatestSub1不會響應,不會輸出*
        switchLatestSub1.onNext("Kongyulu")//這裏不會響應,不會輸出Kongyulu
        switchLatestSub2.onNext("4")
        /* 到這裏會輸出: *****switchLatest***** K Y _ 3 4 */
        switchLatestSub.onNext(switchLatestSub1)// 若是再次切換到 switchLatestSub1會打印出 Kongyulu
        switchLatestSub2.onNext("5")
        /* 到這裏會輸出: *****switchLatest***** K Y _ 3 4 Kongyulu */
        
    }
複製代碼

運行結果:

image

2.2 映射操做符

map

  • 轉換閉包應用於可觀察序列發出的元素,並返回轉換後的元素的新可觀察序列。

實例 25: 代碼:

/// 轉換閉包應用於可觀察序列發出的元素,並返回轉換後的元素的新可觀察序列。
    func test_map() {
        // ***** map: 轉換閉包應用於可觀察序列發出的元素,並返回轉換後的元素的新可觀察序列。
        print("*****map*****")
        let ob = Observable.of(1,2,3,4)
        ob.map { (number) -> Int in
            return number + 2
        }
        .subscribe(onNext: {print($0)})
        .disposed(by: disposeBag)
    }
複製代碼

運行結果:

在這裏插入圖片描述

flatMap

  • 將可觀測序列 發送的元素轉換爲可觀測序列,並將兩個可觀測的序列 發送合併爲一個可觀測的序列。 例如:當你有一個可觀測的序列,他自己能夠發出可觀察序列,你想可以對任何一個可觀察序列的新發射作出反應(序列中的序列:好比網絡序列中還有模型序列)

實例 30: 代碼:

///將可觀測序列發射的元素轉換爲可觀測序列,並將兩個可觀測序列的發射合併爲一個可觀測序列。
    ///這也頗有用,例如,當你有一個可觀察的序列,它自己發出可觀察的序列,
    ///你想可以對任何一個可觀察序列的新發射作出反應(序列中序列:好比網絡序列中還有模型序列)
    func test_flatmap() {
        print("*****flatMap*****")
        let boy = LGPlayer(score: 100)
        let girl = LGPlayer(score: 90)
        let player = BehaviorSubject(value: boy)
        
        player.asObservable()
            .flatMap { $0.score.asObservable() } // 自己score就是序列 模型就是序列中的序列
            .subscribe(onNext: {print($0)})
            .disposed(by: disposeBag)
        
        boy.score.onNext(60)
        player.onNext(girl)
    }
複製代碼

運行結果:

image
代碼:

struct LGPlayer {
    init(score: Int) {
        self.score = BehaviorSubject(value: score)
    }
    let score: BehaviorSubject<Int>
}

func test_flatmap() {
        print("*****flatMap*****")
        let boy = LGPlayer(score: 100)
        let girl = LGPlayer(score: 90)
        let player = BehaviorSubject(value: boy)
        
        player.asObservable()
            .flatMap { $0.score.asObservable() } // 自己score就是序列 模型就是序列中的序列
            .subscribe(onNext: {print($0)})
            .disposed(by: disposeBag)
        
        boy.score.onNext(60)
        player.onNext(girl)
        boy.score.onNext(50)
        boy.score.onNext(40)// 若是切換到 flatMapLatest 就不會打印
        girl.score.onNext(10)
        girl.score.onNext(0)
    }
複製代碼

運行結果:

image

flatMapLatest

  • 同flatMap,區別就是flatMapLatest只會從最近的內部可觀察序列發射元素。

實例 35: 代碼:

struct LGPlayer {
    init(score: Int) {
        self.score = BehaviorSubject(value: score)
    }
    let score: BehaviorSubject<Int>
}
 /// flatMap和flatMapLatest的區別是,flatMapLatest只會從最近的內部可觀測序列發射元素
    /// flatMapLatest其實是map和switchLatest操做符的組合。
    func test_flatMapLatest() {
        print("*****flatMapLatest*****")
        let boy = LGPlayer(score: 100)
        let girl = LGPlayer(score: 90)
        let player = BehaviorSubject(value: boy)
        
        player.asObservable()
            .flatMapLatest { $0.score.asObservable() } // 自己score就是序列 模型就是序列中的序列
            .subscribe(onNext: {print($0)})
            .disposed(by: disposeBag)
        
        boy.score.onNext(60)
        player.onNext(girl)
        boy.score.onNext(50)
        boy.score.onNext(40)// 若是切換到 flatMapLatest 就不會打印
        girl.score.onNext(10)
        girl.score.onNext(0)
    }
複製代碼

運行結果:

image

scan

  • 從初始就帶有一個默認值開始,而後對可觀察序列發出的每一個元素應用累加器閉包,並以單個元素可觀察序列的形式返回每一箇中間結果。

實例 40: 代碼:

///從初始就帶有一個默認值開始,而後對可觀察序列發出的每一個元素應用累加器閉包,並以單個元素可觀察序列的形式返回每一箇中間結果
    func test_scan() {
        print("*****scan*****")
        Observable.of(10,100,1000)
            .scan(2) { aggregateValue, newValue in
                aggregateValue + newValue // 10 + 2 , 100 + 10 + 2 , 1000 + 100 + 2
        }
        .subscribe(onNext: {print($0)})
        .disposed(by: disposeBag)
    }
複製代碼

運行結果:

image

2.3 過濾條件操做符

fliter

  • 僅從知足指定條件的可 觀察序列中發出那些元素。

實例 45: 代碼:

///僅從知足指定條件的可觀察序列中發出那些元素
    func test_fliter() {
        // **** filter : 僅從知足指定條件的可觀察序列中發出那些元素
        print("*****filter*****")
        Observable.of(1,2,3,4,5,6,7,8,9,0)
            .filter{$0 % 2 == 0}
            .subscribe(onNext: {print($0)})
            .disposed(by: disposeBag)
    }
複製代碼

運行結果:

image

distincUntilChanged

  • 抑制可觀察序列發出的順序重複元素。

實例 50: 代碼:

///抑制可觀察序列發出的順序重複元素
    func test_distinctUntilChanged() {
        // ***** distinctUntilChanged: 抑制可觀察序列發出的順序重複元素
        print("*****distinctUntilChanged*****")
        Observable.of("1", "2", "2", "2", "3", "3", "4")
            .distinctUntilChanged()
            .subscribe(onNext: {print($0)})
            .disposed(by: disposeBag)
    }
複製代碼

運行結果:

image

elementAt

  • 僅在可觀察序列發出的全部元素的指定索引處發出元素。

實例 55: 代碼:

///僅在可觀察序列發出的全部元素的指定索引處發出元素
    func test_elementAt() {
        // **** elementAt: 僅在可觀察序列發出的全部元素的指定索引處發出元素
        print("*****elementAt*****")
        Observable.of("C", "o", "o", "c", "I")
            .elementAt(3)
            .subscribe(onNext: { print($0) })
            .disposed(by: disposeBag)
    }
複製代碼

運行結果:

image

single

  • 只發出可 觀察序列發出的第一個元素(或知足條件的第一個元素)。若是可觀察序列發出多個元素,將拋出一個錯誤。

實例 60: 代碼:

///只發出可觀察序列發出的第一個元素(或知足條件的第一個元素)。若是可觀察序列發出多個元素,將拋出一個錯誤。
    func test_single() {
        // *** single: 只發出可觀察序列發出的第一個元素(或知足條件的第一個元素)。若是可觀察序列發出多個元素,將拋出一個錯誤。
        print("*****single*****")
        Observable.of("kongyulu", "yuhairong")
            .single()
            .subscribe(onNext: { print($0) })
            .disposed(by: disposeBag)
    }
複製代碼

運行結果:

image

代碼:

func test_single2() {
        // *** single: 只發出可觀察序列發出的第一個元素(或知足條件的第一個元素)。若是可觀察序列發出多個元素,將拋出一個錯誤。
        print("*****single*****")
        Observable.of("kongyulu", "yuhairong")
            .single{ $0 == "kongyulu"}
            .subscribe(onNext: { print($0) })
            .disposed(by: disposeBag)
    }
複製代碼

運行結果:

image

take

  • 只從一個可觀察序列的開始發出指定數量的元素。上面single只有一個序列,在實際開發中受到侷限,這裏引出take想幾個就幾個。

實例 65: 代碼:

///只從一個可觀察序列的開始發出指定數量的元素。 上面signal只有一個序列 在實際開發會受到侷限 這裏引出 take 想幾個就幾個
    func test_take() {
        // **** take: 只從一個可觀察序列的開始發出指定數量的元素。 上面signal只有一個序列 在實際開發會受到侷限 這裏引出 take 想幾個就幾個
        print("*****take*****")
        Observable.of("kongyulu", "yuhairong","yifeng", "yisheng")
            .take(2)//這裏取前面兩個
            .subscribe(onNext: { print($0) })
            .disposed(by: disposeBag)
    }
複製代碼

運行結果:

image

代碼:

func test_take2() {
        // **** take: 只從一個可觀察序列的開始發出指定數量的元素。 上面signal只有一個序列 在實際開發會受到侷限 這裏引出 take 想幾個就幾個
        print("*****take*****")
        Observable.of("kongyulu", "yuhairong","yifeng", "yisheng")
            .take(3) //這裏取前面三個
            .subscribe(onNext: { print($0) })
            .disposed(by: disposeBag)
    }
複製代碼

運行結果:

image

takeLast

  • 僅從可觀察序列的末尾發出指定數量的元素。

實例 70: 代碼:

//僅從可觀察序列的末尾發出指定數量的元素
    func test_takeLast() {
        // *** takeLast: 僅從可觀察序列的末尾發出指定數量的元素
        print("*****takeLast*****")
        Observable.of("kongyulu", "yuhairong","yifeng", "yisheng")
            .takeLast(3)//取從末尾開始算起的3個元素
            .subscribe(onNext: { print($0) })
            .disposed(by: disposeBag)
    }
複製代碼

運行結果:

image

takeWhile

  • 只要指定的條件只爲true,就能夠從可觀察序列的開始發出元素。

實例 75: 代碼:

///只要指定條件的值爲true,就從可觀察序列的開始發出元素
    func test_takeWhile() {
        // **** takeWhile: 只要指定條件的值爲true,就從可觀察序列的開始發出元素
        print("*****takeWhile*****")
        Observable.of(1, 2, 3, 4, 5, 6)
            .takeWhile { $0 < 3 } //取出知足條件的元素 (1,2)
            .subscribe(onNext: { print($0) })
            .disposed(by: disposeBag)
    }
複製代碼

運行結果:

image

takeUntil

  • 從源可觀察序列發出元素,直到參考可觀察序列發出元素(這個很重要,應用很是頻繁,好比我頁面銷燬了,就不能獲取值了,如:cell重用)

實例 80: 代碼:

/// 從源可觀察序列發出元素,直到參考可觀察序列發出元素
    /// 這個要重點,應用很是頻繁 好比我頁面銷燬了,就不能獲取值了(cell重用運用)
    func test_takeUntil() {
        // ***** takeUntil: 從源可觀察序列發出元素,直到參考可觀察序列發出元素
        print("*****takeUntil*****")
        let sourceSequence = PublishSubject<String>()
        let referenceSequence = PublishSubject<String>()
        sourceSequence
            .takeUntil(referenceSequence)
            .subscribe(onNext: {print($0)})
            .disposed(by: disposeBag)
        
        sourceSequence.onNext("kongyulu")
        sourceSequence.onNext("yifeng")
        sourceSequence.onNext("yisheng")
        //referenceSequence.onNext("yuhairong") // 條件一出來,下面就走不了
        sourceSequence.onNext("test1")
        sourceSequence.onNext("test2")
        sourceSequence.onNext("test3")
    }
複製代碼

運行結果:

image

  • 將上面的實例 80的註釋行:referenceSequence.onNext("yuhairong") 放開後,控制檯打印的結果爲:

image

skip

  • 從源可觀察序列發出元素,直到參考可觀察序列發出元素。(這個很重要,應用頻繁)

實例 85: 代碼:

///從源可觀察序列發出元素,直到參考可觀察序列發出元素
    /// 這個要重點,應用很是頻繁 textfiled 都會有默認序列產生
    func test_skip() {
        // ***** skip: 從源可觀察序列發出元素,直到參考可觀察序列發出元素
        print("*****skip*****")
        Observable.of(1,2,3,4,5,6)
        .skip(2) //直接跳過前面兩個元素,即從3開始
        .subscribe(onNext: {print($0)})
        .disposed(by: disposeBag)
    }
複製代碼

運行結果:

image

skipWhile

  • 抑制知足指定條件的元素,直到參考可觀察序列產生。

實例 90: 代碼:

/// 直接跳過知足條件的元素,至關於過濾做用
    func test_skipWhile() {
        print("*****skipWhile*****")
        //skipWhile剛剛和takeWhile的做用相反
        Observable.of(1, 2, 3, 4, 5, 6)
            .skipWhile { $0 < 4 } //直接跳過知足條件的元素,至關於過濾做用(知足小於4的都跳過,即只有4,5,6)
            .subscribe(onNext: { print($0) })
            .disposed(by: disposeBag)
    }
複製代碼

運行結果:

image

skipUntil

  • 抑制從源可觀察序列發出元素,直到參考可觀察序列產生。

代碼:

/// 抑制從源可觀察序列發出元素,直到參考可觀察序列發出元素
    func test_skipUntil() {
        // *** skipUntil: 抑制從源可觀察序列發出元素,直到參考可觀察序列發出元素
        // skipUntil 做用剛剛和 takeUntil 相反
        print("*****skipUntil*****")
        let sourceSeq = PublishSubject<String>()
        let referenceSeq = PublishSubject<String>()
        
        sourceSeq
            .skipUntil(referenceSeq)
            .subscribe(onNext: { print($0) })
            .disposed(by: disposeBag)
        
        // 沒有條件命令 下面走不了
        sourceSeq.onNext("kongyulu")
        sourceSeq.onNext("yifeng")
        sourceSeq.onNext("yisheng")
        
        //referenceSeq.onNext("yuhairong") // 條件一出來,下面就能夠走了
        
        sourceSeq.onNext("test1")
        sourceSeq.onNext("test2")
        sourceSeq.onNext("test3")
    }
複製代碼

運行結果:

image
這裏因爲註釋了條件代碼,什麼都沒有輸出。下面咱們把註釋的那行代碼放開 referenceSeq.onNext("yuhairong") 再來看運行結果。

運行結果:

image

2.5 集合控制操做符

toArray

  • 將一個可觀察序列轉換爲一個數組,將該數組做爲一個新的單元素可觀察序列發出,而後終止。

實例 95: 代碼:

/// 將一個可觀察序列轉換爲一個數組,將該數組做爲一個新的單元素可觀察序列發出,而後終止
    func test_toArray() {
        // *** toArray: 將一個可觀察序列轉換爲一個數組,將該數組做爲一個新的單元素可觀察序列發出,而後終止
        print("*****toArray*****")
        Observable.range(start: 1, count: 10)
            .toArray() //這裏生成一個從1到10的數組
            .subscribe { print($0) }
            .disposed(by: disposeBag)
    }
複製代碼

運行結果:

image

reduce

  • 從一個設置的初始化值開始,而後對一個可觀察序列發出的全部元素應用累計器閉包,並以單個元素可觀察序列的形式返回聚合結果。(相似於scan)

實例 100: 代碼:

/// 從一個設置的初始化值開始,而後對一個可觀察序列發出的全部元素應用累加器閉包,並以單個元素可觀察序列的形式返回聚合結果 - 相似scan
    func test_reduce() {
        // *** reduce: 從一個設置的初始化值開始,而後對一個可觀察序列發出的全部元素應用累加器閉包,並以單個元素可觀察序列的形式返回聚合結果 - 相似scan
        print("*****reduce*****")
        Observable.of(10, 100, 1000)
            .reduce(1, accumulator: +) // 1 + 10 + 100 + 1000 = 1111
            .subscribe(onNext: { print($0) })
            .disposed(by: disposeBag)
    }
複製代碼

結果:

image

concat

  • 以順序方式鏈接來自一個可觀察序列的內部可觀察序列的元素,在從下一個序列發出元素以前,等待每一個序列成功 終止。(用來控制順序)

實例 105: 代碼:

/// 以順序方式鏈接來自一個可觀察序列的內部可觀察序列的元素,在從下一個序列發出元素以前,等待每一個序列成功終止
    /// 用來控制順序
    func test_concat() {
        // *** concat: 以順序方式鏈接來自一個可觀察序列的內部可觀察序列的元素,在從下一個序列發出元素以前,等待每一個序列成功終止
        // 用來控制順序
        print("*****concat*****")
        let subject1 = BehaviorSubject(value: "kongyulu")
        let subject2 = BehaviorSubject(value: "1")
        
        let subjectsSubject = BehaviorSubject(value: subject1)
        
        subjectsSubject.asObservable()
            .concat()
            .subscribe { print($0) }
            .disposed(by: disposeBag)
        
        subject1.onNext("yifeng")
        subject1.onNext("yisheng")
        
        subjectsSubject.onNext(subject2)
        
        subject2.onNext("打印不出來")
        subject2.onNext("2")
        
        //subject1.onCompleted() // 必需要等subject1 完成了才能訂閱到! 用來控制順序 網絡數據的異步
        subject2.onNext("3")
    }
複製代碼

結果1:

image
因爲咱們註釋代碼 subject1.onCompleted(), 而使用了concat後subject2的訂閱須要等待subject1完成以後才能執行。因此纔有了咱們看到的上面打印結果,subject2的訂閱信息都沒有打印出來。

  • 咱們放開註釋的那行代碼,再從新運行代碼,能夠獲得下面的結果:
    image

2.6 從序列錯誤中恢復的操做符

catchErrorJustReturn

  • 從錯誤事件中恢復,方法是返回一個可觀察到的序列,該序列發出單個元素,而後終止。

實例 110: 代碼:

/// 從錯誤事件中恢復,方法是返回一個可觀察到的序列,該序列發出單個元素,而後終止
    func test_catchErrorJustReturn() {
        // **** catchErrorJustReturn
        // 從錯誤事件中恢復,方法是返回一個可觀察到的序列,該序列發出單個元素,而後終止
        print("*****catchErrorJustReturn*****")
        let sequenceThatFails = PublishSubject<String>()
        
        sequenceThatFails
            .catchErrorJustReturn("kongyulu")
            .subscribe{print($0)}
            .disposed(by: disposeBag)

        sequenceThatFails.onNext("yifeng")
        sequenceThatFails.onNext("yisheng")// 正常序列發送成功的
        sequenceThatFails.onError(self.kylError) //發送失敗的序列,一旦訂閱到位 返回咱們以前設定的錯誤的預案
    }
    
複製代碼

輸出結果:

image

catchError

  • 經過切換到提供的恢復可觀察序列,從錯誤事件中恢復。

實例 115: 代碼:

/// 經過切換到提供的恢復可觀察序列,從錯誤事件中恢復
    func test_catchError() {
        // **** catchErrorJustReturn
        // 從錯誤事件中恢復,方法是返回一個可觀察到的序列,該序列發出單個元素,而後終止
        print("*****catchErrorJustReturn*****")
        let sequenceThatFails = PublishSubject<String>()
        
        sequenceThatFails
            .catchErrorJustReturn("kongyulu")
            .subscribe{print($0)}
            .disposed(by: disposeBag)
        
        sequenceThatFails.onNext("yifeng")
        sequenceThatFails.onNext("yisheng")// 正常序列發送成功的
        sequenceThatFails.onError(self.kylError) //發送失敗的序列,一旦訂閱到位 返回咱們以前設定的錯誤的預案
        
        // **** catchError
        // 經過切換到提供的恢復可觀察序列,從錯誤事件中恢復
        print("*****catchError*****")
        
        let recoverySequence = PublishSubject<String>()
        
        recoverySequence
            .catchError {
            print("Error:",$0)
            return recoverySequence // 獲取到了錯誤序列-咱們在中間的閉包操做處理完畢,返回給用戶須要的序列(showAlert)
            }
            .subscribe{print($0)}
            .disposed(by: disposeBag)
        
        sequenceThatFails.onNext("test1")
        sequenceThatFails.onNext("test2")  // 正常序列發送成功的
        sequenceThatFails.onError(kylError) // 發送失敗的序列
        
        recoverySequence.onNext("yuhairong")
        
    }
複製代碼

輸出結果:

image

retry

  • 經過 無限地從新訂閱可觀察序列來恢復重複的錯誤事件。

實例 120: 代碼:

/// 經過無限地從新訂閱可觀察序列來恢復重複的錯誤事件
    func test_retry() {
        // *** retry: 經過無限地從新訂閱可觀察序列來恢復重複的錯誤事件
        print("*****retry*****")
        var count = 1 // 外界變量控制流程
        let sequenceRetryErrors = Observable<String>.create { (observer) -> Disposable in
            observer.onNext("kongyulu")
            observer.onNext("yifeng")
            observer.onNext("yisheng")
            
            if count == 1 { // 流程進來以後就會過分-這裏的條件能夠做爲出口,失敗的次數
                observer.onError(self.kylError)
                print("錯誤序列來了")
                count += 1
            }
            
            observer.onNext("test1")
            observer.onNext("test2")
            observer.onNext("test3")
            observer.onCompleted()
            return Disposables.create()
        }
        
        sequenceRetryErrors
            //.retry() //調用這個retry後,上面的observer閉包會從新執行一次
            .subscribe(onNext: {print($0)})
            .disposed(by: disposeBag)
    }
複製代碼

上面的代碼咱們註釋掉了//.retry() ,這樣咱們能夠更好的對比結果 輸出結果:

image
從結果中咱們看到了錯誤提示信息打印多來了,如今咱們把註釋的那行代碼打開,再從新運行,查看結果,以下:

image
咱們能夠看到,observer的閉包從新執行了一次,多打印了信息。

retry(_ : )

  • 經過從新訂閱可觀察到的序列,重複地從錯誤事件中恢復,直到重試次數達到max未遂計數。

實例 125: 代碼:

/// retry(_:): 經過從新訂閱可觀察到的序列,重複地從錯誤事件中恢復,直到重試次數達到max未遂計數
    func test_retry2() {
        // **** retry(_:): 經過從新訂閱可觀察到的序列,重複地從錯誤事件中恢復,直到重試次數達到max未遂計數
        print("*****retry(_:)*****")
        var count = 1 // 外界變量控制流程
        let sequenceThatErrors = Observable<String>.create { observer in
            observer.onNext("kongyulu")
            observer.onNext("yifeng")
            observer.onNext("yisheng")
            
            if count < 5 { // 這裏設置的錯誤出口是沒有太多意義的額,由於咱們設置重試次數
                observer.onError(self.kylError) //發送錯誤消息
                print("錯誤序列來了")
                count += 1
            }
            //發送錯誤後,下面的sender都不會打印了
            observer.onNext("sender 1")
            observer.onNext("sender 2")
            observer.onNext("sender 3")
            observer.onCompleted()
            
            return Disposables.create()
        }
        
        sequenceThatErrors
            .retry(3) //重複地從錯誤事件中恢復3次
            .subscribe(onNext: { print($0) })
            .disposed(by: disposeBag)
    }
複製代碼

結果:

image

2.7 debug Rx流程操做符

debug

  • 打印全部訂閱,事件和處理。 實例 130: 代碼:
func test_debug() {
        // **** debug
        // 打印全部訂閱、事件和處理。
        print("*****debug*****")
        var count = 1
        
        let sequenceThatErrors = Observable<String>.create { observer in
            observer.onNext("Kongyulu")
            observer.onNext("yifeng")
            observer.onNext("yisheng")
            
            if count < 5 {
                observer.onError(self.kylError)
                print("錯誤序列來了")
                count += 1
            }
            
            observer.onNext("yuhairong")
            observer.onNext("zhangsiyuan")
            observer.onNext("kongliyuan")
            observer.onCompleted()
            
            return Disposables.create()
        }
        
        sequenceThatErrors
            .retry(3)
            .debug()
            .subscribe(onNext: { print($0) })
            .disposed(by: disposeBag)
    }
複製代碼

結果:

RxSwift.Resources.total

  • 操做符 提供全部Rx資源分配的計數,這對於在開發期間檢測內存泄漏很是有用。

實例 135: 代碼:

/// RxSwift.Resources.total 操做符
    func testResourcesTotal() {
        
        // ** RxSwift.Resources.total: 提供全部Rx資源分配的計數,這對於在開發期間檢測泄漏很是有用。
        print("*****RxSwift.Resources.total*****")

        print(RxSwift.Resources.total)

        let subject = BehaviorSubject(value: "Cooci")

        let subscription1 = subject.subscribe(onNext: { print($0) })

        print(RxSwift.Resources.total)

        let subscription2 = subject.subscribe(onNext: { print($0) })

        print(RxSwift.Resources.total)

        subscription1.dispose()

        print(RxSwift.Resources.total)

        subscription2.dispose()

        print(RxSwift.Resources.total)
    }
複製代碼

結果:

2.8 連接操做符

multicast

  • 將源可觀察序列轉換爲可鏈接序列,並經過指定的主題廣播它的發射。

實例 140: 代碼:

/// multicast
    func testMulticastConnectOperators(){
        
        // *** multicast : 將源可觀察序列轉換爲可鏈接序列,並經過指定的主題廣播其發射。
        print("*****multicast*****")
        let subject = PublishSubject<Any>()
        subject.subscribe{print("00:\($0)")}
            .disposed(by: disposeBag)
        
        let netOB = Observable<Any>.create { (observer) -> Disposable in
            sleep(2)// 模擬網絡延遲
            print("我開始請求網絡了")
            observer.onNext("請求到的網絡數據")
            observer.onNext("請求到的本地")
            observer.onCompleted()
            return Disposables.create {
                print("銷燬回調了")
            }
            }.publish()
        
        netOB.subscribe(onNext: { (anything) in
            print("訂閱1:",anything)
        })
            .disposed(by: disposeBag)
        
        // 咱們有時候不止一次網絡訂閱,由於有時候咱們的數據可能用在不一樣的額地方
        // 因此在訂閱一次 會出現什麼問題?
        netOB.subscribe(onNext: { (anything) in
            print("訂閱2:",anything)
        })
            .disposed(by: disposeBag)
        
        _ = netOB.connect()
        
    }
複製代碼

結果:

image

replay

  • 將源可觀察序列轉換爲可鏈接的序列,並將向每一個新訂閱服務器重放一塊兒排放的緩存大小。(首先擁有和publish同樣的能力,共享Observable 和sequence,其次使用replay還須要咱們傳入一個參數:buffer size 來緩存已經發送的事件,當有了新的訂閱者訂閱了,會把緩存的事件發送給新的訂閱者)

實例 145: 代碼:

/// replay
    func testReplayConnectOperators(){
        
        // **** replay: 將源可觀察序列轉換爲可鏈接的序列,並將向每一個新訂閱服務器重放之前排放的緩衝大小
        // 首先擁有和publish同樣的能力,共享 Observable sequence, 其次使用replay還須要咱們傳入一個參數(buffer size)來緩存已發送的事件,當有新的訂閱者訂閱了,會把緩存的事件發送給新的訂閱者
        print("*****replay*****")
        
        let interval = Observable<Int>.interval(.seconds(1), scheduler: MainScheduler.instance).replay(5)
        
        interval.subscribe(onNext: { print(Date.time,"訂閱: 1, 事件: \($0)") })
            .disposed(by: self.disposeBag)
        
        delay(2) { _ = interval.connect() }
        
        delay(4) {
            interval.subscribe(onNext: { print(Date.time,"訂閱: 2, 事件: \($0)") })
                .disposed(by: self.disposeBag)
        }
        
        delay(8) {
            interval.subscribe(onNext: { print(Date.time,"訂閱: 3, 事件: \($0)") })
                .disposed(by: self.disposeBag)
        }
        delay(20, closure: {
            self.disposeBag = DisposeBag()
        })
        
        /** 訂閱: 1, 事件: 4 訂閱: 1, 事件: 0 2019-05-28 21-32-42 訂閱: 2, 事件: 0 2019-05-28 21-32-42 訂閱: 1, 事件: 1 2019-05-28 21-32-42 訂閱: 2, 事件: 1 2019-05-28 21-32-45 訂閱: 2, 事件: 4 2019-05-28 21-32-46 訂閱: 3, 事件: 0 2019-05-28 21-32-46 訂閱: 3, 事件: 1 2019-05-28 21-32-46 訂閱: 3, 事件: 2 2019-05-28 21-32-46 訂閱: 3, 事件: 3 2019-05-28 21-32-46 訂閱: 3, 事件: 4 // 序列從 0開始 // 定時器也沒有斷層 sub2 sub3 和 sub1 是同步的 */
    }
複製代碼

結果:

image

push

  • 將源可觀察序列轉換爲可鏈接序列。(共享一個Observable的實際序列,避免建立多個Observable 和sequence.注意:須要調用conect以後纔會開始發送事件)

實例 150: 代碼:

/// push - connect 將源可觀察序列轉換爲可鏈接序列
    func testPushConnectOperators(){
        
        // **** push:將源可觀察序列轉換爲可鏈接序列
        // 共享一個Observable的事件序列,避免建立多個Observable sequence。
        // 注意:須要調用connect以後纔會開始發送事件
        print("*****testPushConnect*****")
        
        let interval = Observable<Int>.interval(.seconds(1), scheduler: MainScheduler.instance).publish()
        
        interval.subscribe(onNext: { print("訂閱: 1, 事件: \($0)") })
            .disposed(by: disposeBag)
        
        delay(2) {
            _ = interval.connect()
        }
        delay(4) {
            interval.subscribe(onNext: { print("訂閱: 2, 事件: \($0)") })
                .disposed(by: self.disposeBag)
        }
        delay(6) {
            interval.subscribe(onNext: { print("訂閱: 3, 事件: \($0)") })
                .disposed(by: self.disposeBag)
        }
        delay(10, closure: {
            self.disposeBag = DisposeBag()
        })
        /** 訂閱: 1, 事件: 1 訂閱: 2, 事件: 1 訂閱: 1, 事件: 2 訂閱: 2, 事件: 2 訂閱: 1, 事件: 3 訂閱: 2, 事件: 3 訂閱: 3, 事件: 3 訂閱: 2 從1開始 訂閱: 3 從3開始 */
        // 可是後面來的訂閱者,卻沒法獲得以前已發生的事件
    }
複製代碼

結果:

image

Rwswift高階函數源碼分析

相關文章
相關標籤/搜索