4. 集合控制操做符
4.1 toArray
- 將一個可觀察序列轉換爲一個數組,將該數組做爲一個新的單元素可觀察序列發出,而後終止
print("*****toArray*****")
Observable.range(start: 1, count: 10)
.toArray()
.subscribe { print($0) }
.disposed(by: disposeBag)
複製代碼
4.2 reduce
- 從一個設置的初始化值開始,而後對一個可觀察序列發出的全部元素應用累加器閉包,並以單個元素可觀察序列的形式返回聚合結果 - 相似
scan
print("*****reduce*****")
Observable.of(10, 100, 1000)
.reduce(1, accumulator: +) // 1 + 10 + 100 + 1000 = 1111
.subscribe(onNext: { print($0) })
.disposed(by: disposeBag)
複製代碼
4.3 concat
- 以順序方式鏈接來自一個可觀察序列的內部可觀察序列的元素,在從下一個序列發出元素以前,等待每一個序列成功終止, 用來控制順序
print("*****concat*****")
let subject1 = BehaviorSubject(value: "AAA")
let subject2 = BehaviorSubject(value: "1")
let subjectsSubject = BehaviorSubject(value: subject1)
subjectsSubject.asObservable()
.concat()
.subscribe { print($0) }
.disposed(by: disposeBag)
subject1.onNext("BBB")
subject1.onNext("CCC")
subjectsSubject.onNext(subject2)
subject2.onNext("打印不出來")
subject2.onNext("2")
subject1.onCompleted() // 必需要等subject1 完成了才能訂閱到! 用來控制順序 網絡數據的異步
subject2.onNext("3")
複製代碼
5. 從可觀察對象的錯誤通知中恢復的操做符
5.1 catchErrorJustReturn
- 從錯誤事件中恢復,方法是返回一個可觀察到的序列,該序列發出單個元素,而後終止
print("*****catchErrorJustReturn*****")
let sequenceThatFails = PublishSubject<String>()
sequenceThatFails
.catchErrorJustReturn("Cooci")
.subscribe { print($0) }
.disposed(by: disposeBag)
sequenceThatFails.onNext("AAA")
sequenceThatFails.onNext("BBB") // 正常序列發送成功的
sequenceThatFails.onError(self.lgError) //發送失敗的序列,一旦訂閱到位 返回咱們以前設定的錯誤的預案
複製代碼
5.2 catchError
print("*****catchError*****")
let recoverySequence = PublishSubject<String>()
sequenceThatFails
.catchError {
print("Error:", $0)
return recoverySequence // 獲取到了錯誤序列-咱們在中間的閉包操做處理完畢,返回給用戶須要的序列(showAlert)
}
.subscribe { print($0) }
.disposed(by: disposeBag)
sequenceThatFails.onNext("AAA")
sequenceThatFails.onNext("BBB") // 正常序列發送成功的
sequenceThatFails.onError(lgError) // 發送失敗的序列
recoverySequence.onNext("CCC")
複製代碼
5.3 retry
var count = 1 // 外界變量控制流程
let sequenceRetryErrors = Observable<String>.create { observer in
observer.onNext("AAA")
observer.onNext("BBB")
observer.onNext("CCC")
if count == 1 { // 流程進來以後就會過分-這裏的條件能夠做爲出口,失敗的次數
observer.onError(self.lgError) // 接收到了錯誤序列,重試序列發生
print("錯誤序列來了")
count += 1
}
observer.onNext("DDD")
observer.onNext("EEE")
observer.onNext("FFF")
observer.onCompleted()
return Disposables.create()
}
sequenceRetryErrors
.retry()
.subscribe(onNext: { print($0) })
.disposed(by: disposeBag)
複製代碼
5.4 retry(_:)
- 經過從新訂閱可觀察到的序列,重複地從錯誤事件中恢復,直到重試次數達到max未遂計數
print("*****retry(_:)*****")
let sequenceThatErrors = Observable<String>.create { observer in
observer.onNext("AAA")
observer.onNext("BBB")
observer.onNext("CCC")
if count < 5 { // 這裏設置的錯誤出口是沒有太多意義的額,由於咱們設置重試次數
observer.onError(self.lgError)
print("錯誤序列來了")
count += 1
}
observer.onNext("DDD")
observer.onNext("EEE")
observer.onNext("FFF")
observer.onCompleted()
return Disposables.create()
}
sequenceThatErrors
.retry(3)
.subscribe(onNext: { print($0) })
.disposed(by: disposeBag)
複製代碼
6. debug Rx流程操做符
6.1 debug
print("*****debug*****")
var count = 1
let sequenceThatErrors = Observable<String>.create { observer in
observer.onNext("AAA")
observer.onNext("BBB")
observer.onNext("CCC")
if count < 5 {
observer.onError(self.lgError)
print("錯誤序列來了")
count += 1
}
observer.onNext("DDD")
observer.onNext("EEE")
observer.onNext("FFF")
observer.onCompleted()
return Disposables.create()
}
sequenceThatErrors
.retry(3)
.debug()
.subscribe(onNext: { print($0) })
.disposed(by: disposeBag)
複製代碼
7. RxSwift.Resources.total 操做符
7. 1 RxSwift.Resources.total
- 提供全部Rx資源分配的計數,這對於在開發期間檢測泄漏很是有用。
print("*****RxSwift.Resources.total*****")
print(RxSwift.Resources.total)
let subject = BehaviorSubject(value: "Cooci")
let subscription1 = subject.subscribe(onNext: { print($0) })
print(RxSwift.Resources.total)
let subscription2 = subject.subscribe(onNext: { print($0) })
print(RxSwift.Resources.total)
subscription1.dispose()
print(RxSwift.Resources.total)
subscription2.dispose()
print(RxSwift.Resources.total)
複製代碼
8. 連接操做符
8.1 multicast
- 將源可觀察序列轉換爲可鏈接序列,並經過指定的主題廣播其發射。
public func publish() -> ConnectableObservable<Element> {
return self.multicast { PublishSubject() }
}
複製代碼
print("*****multicast*****")
let subject = PublishSubject<Any>()
subject.subscribe{print("00:\($0)")}
.disposed(by: disposeBag)
let netOB = Observable<Any>.create { (observer) -> Disposable in
sleep(2)// 模擬網絡延遲
print("我開始請求網絡了")
observer.onNext("請求到的網絡數據")
observer.onNext("請求到的本地")
observer.onCompleted()
return Disposables.create {
print("銷燬回調了")
}
}.publish()
netOB.subscribe(onNext: { (anything) in
print("訂閱1:",anything)
})
.disposed(by: disposeBag)
// 咱們有時候不止一次網絡訂閱,由於有時候咱們的數據可能用在不一樣的額地方
// 因此在訂閱一次 會出現什麼問題?
netOB.subscribe(onNext: { (anything) in
print("訂閱2:",anything)
})
.disposed(by: disposeBag)
_ = netOB.connect()
複製代碼
8.2 replay
- 將源可觀察序列轉換爲可鏈接的序列,並將向每一個新訂閱服務器重放之前排放的緩衝大小
- 首先擁有和
publish
同樣的能力,共享 Observable sequence
, 其次使用replay
還須要咱們傳入一個參數(buffer size)
來緩存已發送的事件,當有新的訂閱者訂閱了,會把緩存的事件發送給新的訂閱者
print("*****replay*****")
let interval = Observable<Int>.interval(.seconds(1), scheduler: MainScheduler.instance).replay(5)
interval.subscribe(onNext: { print(Date.time,"訂閱: 1, 事件: \($0)") })
.disposed(by: self.disposeBag)
delay(2) { _ = interval.connect() }
delay(4) {
interval.subscribe(onNext: { print(Date.time,"訂閱: 2, 事件: \($0)") })
.disposed(by: self.disposeBag)
}
delay(8) {
interval.subscribe(onNext: { print(Date.time,"訂閱: 3, 事件: \($0)") })
.disposed(by: self.disposeBag)
}
delay(20, closure: {
self.disposeBag = DisposeBag()
})
/**
訂閱: 1, 事件: 4
訂閱: 1, 事件: 0
2019-05-28 21-32-42 訂閱: 2, 事件: 0
2019-05-28 21-32-42 訂閱: 1, 事件: 1
2019-05-28 21-32-42 訂閱: 2, 事件: 1
2019-05-28 21-32-45 訂閱: 2, 事件: 4
2019-05-28 21-32-46 訂閱: 3, 事件: 0
2019-05-28 21-32-46 訂閱: 3, 事件: 1
2019-05-28 21-32-46 訂閱: 3, 事件: 2
2019-05-28 21-32-46 訂閱: 3, 事件: 3
2019-05-28 21-32-46 訂閱: 3, 事件: 4
// 序列從 0開始
// 定時器也沒有斷層 sub2 sub3 和 sub1 是同步的
*/
複製代碼
8.3 push
- 將源可觀察序列轉換爲可鏈接序列,共享一個Observable的事件序列,避免建立多個
Observable sequence
注意:須要調用connect以後纔會開始發送事件
print("*****testPushConnect*****")
let interval = Observable<Int>.interval(.seconds(1), scheduler: MainScheduler.instance).publish()
interval.subscribe(onNext: { print("訂閱: 1, 事件: \($0)") })
.disposed(by: disposeBag)
delay(2) {
_ = interval.connect()
}
delay(4) {
interval.subscribe(onNext: { print("訂閱: 2, 事件: \($0)") })
.disposed(by: self.disposeBag)
}
delay(6) {
interval.subscribe(onNext: { print("訂閱: 3, 事件: \($0)") })
.disposed(by: self.disposeBag)
}
delay(10, closure: {
self.disposeBag = DisposeBag()
})
/**
訂閱: 1, 事件: 1
訂閱: 2, 事件: 1
訂閱: 1, 事件: 2
訂閱: 2, 事件: 2
訂閱: 1, 事件: 3
訂閱: 2, 事件: 3
訂閱: 3, 事件: 3
訂閱: 2 從1開始
訂閱: 3 從3開始
*/
// 可是後面來的訂閱者,卻沒法獲得以前已發生的事件
複製代碼
8.4 沒有共享序列
print("*****testWithoutConnect*****")
let interval = Observable<Int>.interval(.seconds(1), scheduler: MainScheduler.instance)
interval.subscribe(onNext: { print("訂閱: 1, 事件: \($0)") })
.disposed(by: disposeBag)
delay(3) {
interval.subscribe(onNext: { print("訂閱: 2, 事件: \($0)") })
.disposed(by: self.disposeBag)
}
delay(10, closure: {
self.disposeBag = DisposeBag()
})
// 發現有一個問題:在延時3s以後訂閱的Subscription: 2的計數並無和Subscription: 1一致,而是又從0開始了,若是想共享,怎麼辦?
複製代碼