09.RxSwift 高階函數(三)

4. 集合控制操做符
4.1 toArray
  • 將一個可觀察序列轉換爲一個數組,將該數組做爲一個新的單元素可觀察序列發出,而後終止
print("*****toArray*****")
        Observable.range(start: 1, count: 10)
            .toArray()
            .subscribe { print($0) }
            .disposed(by: disposeBag)
複製代碼
4.2 reduce
  • 從一個設置的初始化值開始,而後對一個可觀察序列發出的全部元素應用累加器閉包,並以單個元素可觀察序列的形式返回聚合結果 - 相似scan
print("*****reduce*****")
        Observable.of(10, 100, 1000)
            .reduce(1, accumulator: +) // 1 + 10 + 100 + 1000 = 1111
            .subscribe(onNext: { print($0) })
            .disposed(by: disposeBag)
複製代碼
4.3 concat
  • 以順序方式鏈接來自一個可觀察序列的內部可觀察序列的元素,在從下一個序列發出元素以前,等待每一個序列成功終止, 用來控制順序
print("*****concat*****")
        let subject1 = BehaviorSubject(value: "AAA")
        let subject2 = BehaviorSubject(value: "1")
        
        let subjectsSubject = BehaviorSubject(value: subject1)
        
        subjectsSubject.asObservable()
            .concat()
            .subscribe { print($0) }
            .disposed(by: disposeBag)
        
        subject1.onNext("BBB")
        subject1.onNext("CCC")
        
        subjectsSubject.onNext(subject2)
        
        subject2.onNext("打印不出來")
        subject2.onNext("2")
        
        subject1.onCompleted() // 必需要等subject1 完成了才能訂閱到! 用來控制順序 網絡數據的異步
        subject2.onNext("3")
複製代碼
5. 從可觀察對象的錯誤通知中恢復的操做符
5.1 catchErrorJustReturn
  • 從錯誤事件中恢復,方法是返回一個可觀察到的序列,該序列發出單個元素,而後終止
print("*****catchErrorJustReturn*****")
        let sequenceThatFails = PublishSubject<String>()
        
        sequenceThatFails
            .catchErrorJustReturn("Cooci")
            .subscribe { print($0) }
            .disposed(by: disposeBag)
        
        sequenceThatFails.onNext("AAA")
        sequenceThatFails.onNext("BBB") // 正常序列發送成功的
        sequenceThatFails.onError(self.lgError) //發送失敗的序列,一旦訂閱到位 返回咱們以前設定的錯誤的預案
複製代碼
5.2 catchError
  • 經過切換到提供的恢復可觀察序列,從錯誤事件中恢復
print("*****catchError*****")
        let recoverySequence = PublishSubject<String>()
        
        sequenceThatFails
            .catchError {
                print("Error:", $0)
                return recoverySequence  // 獲取到了錯誤序列-咱們在中間的閉包操做處理完畢,返回給用戶須要的序列(showAlert)
            }
            .subscribe { print($0) }
            .disposed(by: disposeBag)
        
        sequenceThatFails.onNext("AAA")
        sequenceThatFails.onNext("BBB") // 正常序列發送成功的
        sequenceThatFails.onError(lgError) // 發送失敗的序列
        
        recoverySequence.onNext("CCC")
複製代碼
5.3 retry
  • 經過無限地從新訂閱可觀察序列來恢復重複的錯誤事件
var count = 1 // 外界變量控制流程
        let sequenceRetryErrors = Observable<String>.create { observer in
            observer.onNext("AAA")
            observer.onNext("BBB")
            observer.onNext("CCC")
            
            if count == 1 { // 流程進來以後就會過分-這裏的條件能夠做爲出口,失敗的次數
                observer.onError(self.lgError)  // 接收到了錯誤序列,重試序列發生
                print("錯誤序列來了")
                count += 1
            }
            observer.onNext("DDD")
            observer.onNext("EEE")
            observer.onNext("FFF")
            observer.onCompleted()
            
            return Disposables.create()
        }
        sequenceRetryErrors
            .retry()
            .subscribe(onNext: { print($0) })
            .disposed(by: disposeBag)
        
複製代碼
5.4 retry(_:)
  • 經過從新訂閱可觀察到的序列,重複地從錯誤事件中恢復,直到重試次數達到max未遂計數
print("*****retry(_:)*****")
        let sequenceThatErrors = Observable<String>.create { observer in
            observer.onNext("AAA")
            observer.onNext("BBB")
            observer.onNext("CCC")
            
            if count < 5 { // 這裏設置的錯誤出口是沒有太多意義的額,由於咱們設置重試次數
                observer.onError(self.lgError)
                print("錯誤序列來了")
                count += 1
            }
            
            observer.onNext("DDD")
            observer.onNext("EEE")
            observer.onNext("FFF")
            observer.onCompleted()
            
            return Disposables.create()
        }
        
        sequenceThatErrors
            .retry(3)
            .subscribe(onNext: { print($0) })
            .disposed(by: disposeBag)
複製代碼
6. debug Rx流程操做符
6.1 debug
  • 打印全部訂閱、事件和處理。
print("*****debug*****")
        var count = 1
        
        let sequenceThatErrors = Observable<String>.create { observer in
            observer.onNext("AAA")
            observer.onNext("BBB")
            observer.onNext("CCC")
            
            if count < 5 {
                observer.onError(self.lgError)
                print("錯誤序列來了")
                count += 1
            }
            
            observer.onNext("DDD")
            observer.onNext("EEE")
            observer.onNext("FFF")
            observer.onCompleted()
            
            return Disposables.create()
        }
        
        sequenceThatErrors
            .retry(3)
            .debug()
            .subscribe(onNext: { print($0) })
            .disposed(by: disposeBag)
複製代碼
7. RxSwift.Resources.total 操做符
7. 1 RxSwift.Resources.total
  • 提供全部Rx資源分配的計數,這對於在開發期間檢測泄漏很是有用。
print("*****RxSwift.Resources.total*****")

        print(RxSwift.Resources.total)

        let subject = BehaviorSubject(value: "Cooci")

        let subscription1 = subject.subscribe(onNext: { print($0) })

        print(RxSwift.Resources.total)

        let subscription2 = subject.subscribe(onNext: { print($0) })

        print(RxSwift.Resources.total)

        subscription1.dispose()

        print(RxSwift.Resources.total)

        subscription2.dispose()

        print(RxSwift.Resources.total)
複製代碼
8. 連接操做符
8.1 multicast
  • 將源可觀察序列轉換爲可鏈接序列,並經過指定的主題廣播其發射。
public func publish() -> ConnectableObservable<Element> {
        return self.multicast { PublishSubject() }
    }
複製代碼
print("*****multicast*****")
        let subject = PublishSubject<Any>()
        subject.subscribe{print("00:\($0)")}
            .disposed(by: disposeBag)
        
        let netOB = Observable<Any>.create { (observer) -> Disposable in
                sleep(2)// 模擬網絡延遲
                print("我開始請求網絡了")
                observer.onNext("請求到的網絡數據")
                observer.onNext("請求到的本地")
                observer.onCompleted()
                return Disposables.create {
                    print("銷燬回調了")
                }
            }.publish()
        
        netOB.subscribe(onNext: { (anything) in
                print("訂閱1:",anything)
            })
            .disposed(by: disposeBag)

        // 咱們有時候不止一次網絡訂閱,由於有時候咱們的數據可能用在不一樣的額地方
        // 因此在訂閱一次 會出現什麼問題?
        netOB.subscribe(onNext: { (anything) in
                print("訂閱2:",anything)
            })
            .disposed(by: disposeBag)
        
        _ = netOB.connect()
複製代碼
8.2 replay
  • 將源可觀察序列轉換爲可鏈接的序列,並將向每一個新訂閱服務器重放之前排放的緩衝大小
  • 首先擁有和publish同樣的能力,共享 Observable sequence, 其次使用replay還須要咱們傳入一個參數(buffer size)來緩存已發送的事件,當有新的訂閱者訂閱了,會把緩存的事件發送給新的訂閱者
print("*****replay*****")

        let interval = Observable<Int>.interval(.seconds(1), scheduler: MainScheduler.instance).replay(5)
        
        interval.subscribe(onNext: { print(Date.time,"訂閱: 1, 事件: \($0)") })
            .disposed(by: self.disposeBag)
        
        delay(2) { _ = interval.connect() }
        
        delay(4) {
            interval.subscribe(onNext: { print(Date.time,"訂閱: 2, 事件: \($0)") })
                .disposed(by: self.disposeBag)
        }
        
        delay(8) {
            interval.subscribe(onNext: { print(Date.time,"訂閱: 3, 事件: \($0)") })
                .disposed(by: self.disposeBag)
        }
        delay(20, closure: {
            self.disposeBag = DisposeBag()
        })
     /**
         訂閱: 1, 事件: 4
         訂閱: 1, 事件: 0
         2019-05-28 21-32-42 訂閱: 2, 事件: 0
         2019-05-28 21-32-42 訂閱: 1, 事件: 1
         2019-05-28 21-32-42 訂閱: 2, 事件: 1
         2019-05-28 21-32-45 訂閱: 2, 事件: 4
         2019-05-28 21-32-46 訂閱: 3, 事件: 0
         2019-05-28 21-32-46 訂閱: 3, 事件: 1
         2019-05-28 21-32-46 訂閱: 3, 事件: 2
         2019-05-28 21-32-46 訂閱: 3, 事件: 3
         2019-05-28 21-32-46 訂閱: 3, 事件: 4
         
         // 序列從 0開始
         // 定時器也沒有斷層  sub2 sub3 和 sub1 是同步的
         */
複製代碼
8.3 push
  • 將源可觀察序列轉換爲可鏈接序列,共享一個Observable的事件序列,避免建立多個Observable sequence
  • 注意:須要調用connect以後纔會開始發送事件
print("*****testPushConnect*****")

        let interval = Observable<Int>.interval(.seconds(1), scheduler: MainScheduler.instance).publish()
        
        interval.subscribe(onNext: { print("訂閱: 1, 事件: \($0)") })
            .disposed(by: disposeBag)
        
        delay(2) {
            _ = interval.connect()
        }
        delay(4) {
            interval.subscribe(onNext: { print("訂閱: 2, 事件: \($0)") })
                .disposed(by: self.disposeBag)
        }
        delay(6) {
            interval.subscribe(onNext: { print("訂閱: 3, 事件: \($0)") })
                .disposed(by: self.disposeBag)
        }
        delay(10, closure: {
            self.disposeBag = DisposeBag()
        })
      /**
            訂閱: 1, 事件: 1
            訂閱: 2, 事件: 1
            訂閱: 1, 事件: 2
            訂閱: 2, 事件: 2
            訂閱: 1, 事件: 3
            訂閱: 2, 事件: 3
            訂閱: 3, 事件: 3
         
            訂閱: 2 從1開始
            訂閱: 3 從3開始
        */
        // 可是後面來的訂閱者,卻沒法獲得以前已發生的事件
複製代碼
8.4 沒有共享序列
print("*****testWithoutConnect*****")

        let interval = Observable<Int>.interval(.seconds(1), scheduler: MainScheduler.instance)
        
        interval.subscribe(onNext: { print("訂閱: 1, 事件: \($0)") })
            .disposed(by: disposeBag)
        
        delay(3) {
            interval.subscribe(onNext: { print("訂閱: 2, 事件: \($0)") })
                .disposed(by: self.disposeBag)
        }
        delay(10, closure: {
            self.disposeBag = DisposeBag()
        })

        // 發現有一個問題:在延時3s以後訂閱的Subscription: 2的計數並無和Subscription: 1一致,而是又從0開始了,若是想共享,怎麼辦?
複製代碼
相關文章
相關標籤/搜索