RxSwift使用心得(一):using操做符與ActivityIndicator

using操做符

簡介

using操做符的官方描述:html

建立一個可被清除的資源,它和 Observable 具備相同的壽命
經過使用 using 操做符建立 Observable 時,同時建立一個可被清除的資源,一旦 Observable 終止了,那麼這個資源就會被清除掉了。react

beeth0ven.github.io/RxSwift-Chi…git

image.png

using方法是個靜態方法,有兩個實現:github

  1. 在ObservableType協議擴展中實現

供普通訊號源調用(Observable,Relay,ControlProperty等),方法建立了一個私有類型Using,具體內部實現比較複雜,還沒開始看Rx,因此還沒弄明白……編程

extension ObservableType {
    /** Constructs an observable sequence that depends on a resource object, whose lifetime is tied to the resulting observable sequence's lifetime. - seealso: [using operator on reactivex.io](http://reactivex.io/documentation/operators/using.html) - parameter resourceFactory: Factory function to obtain a resource object. - parameter observableFactory: Factory function to obtain an observable sequence that depends on the obtained resource. - returns: An observable sequence whose lifetime controls the lifetime of the dependent resource object. */
    public static func using<Resource: Disposable>(_ resourceFactory: @escaping () throws -> Resource, observableFactory: @escaping (Resource) throws -> Observable<Element>) -> Observable<Element> {
        return Using(resourceFactory: resourceFactory, observableFactory: observableFactory)
    }
}
複製代碼
  1. 在PrimitiveSequence結構體擴展中實現

供序列信號源調用(Single,Maybe等),方法實現仍是調用了ObservableType擴展中的實現swift

extension PrimitiveSequence {
    public static func using<Resource: Disposable>(_ resourceFactory: @escaping () throws -> Resource, primitiveSequenceFactory: @escaping (Resource) throws -> PrimitiveSequence<Trait, Element>)
        -> PrimitiveSequence<Trait, Element> {
            return PrimitiveSequence(raw: Observable.using(resourceFactory, observableFactory: { (resource: Resource) throws -> Observable<Element> in
                return try primitiveSequenceFactory(resource).asObservable()
            }))
    }
}
複製代碼

方法做用

方法接受兩個參數:api

  1. resourceFactory閉包

這個閉包沒有入參,返回類型爲Disposable協議對象,該對象就是官方介紹中的可被清除的資源,當第二個參數閉包返回的Observable訂閱釋放的時候,該對象就會同步調用dispose方法來釋放。markdown

  1. observableFactory閉包

這個閉包的入參爲resourceFactory閉包的返回的那個Disposable協議對象,閉包的返回值爲Observable信號源,該信號源也是using方法返回的信號源,用來給調用方進行訂閱處理網絡

由於在resourceFactory閉包中建立了一個能夠dispose的對象,並且這個對象會做爲入參交給observableFactory閉包來處理並最終返回一個信號源來給調用方訂閱,那就有不少種使用方法了閉包

簡單的示例

咱們先建立一個最簡單的demo,使用using來建立一個信號源,該信號源只是簡單的發送幾個數字,可被清除的資源也只是在dispose的時候,打個log

  1. Disposable協議表示某個能夠被釋放的資源,只有一個dispose方法,RxSwift中並無供外部使用的默認實現,所以咱們須要本身定義一個TestDisposable:
class TestDisposable: NSObject, Disposable {
    func dispose() {
        //簡單打印下就行
        print(String.init(format: "dp: %p dispose", self))
    }
    
    deinit {
        print("dp 釋放")
    }
}
複製代碼
  1. 而後使用using建立信號源,該信號源將持有TestDisposable並在訂閱取消的時候同時釋放Testdisposable:
_ = Observable<Int>.using({
    () -> TestDisposable in
    let dp = TestDisposable()
    print(String.init(format: "建立source: %p", dp))
    return dp
}, observableFactory: {
    dp in
    //並無對dp作處理,只是一樣打印下dp信息
    print(String.init(format: "建立factory, dp: %p", dp))
    // 返回一個直接簡單的輸出數字的信號源
    return Observable.from([1,2,3,4,5]).debug("factory", trimOutput: false)
}).debug("using", trimOutput: false).subscribe()
複製代碼

在using方法的resourceFactory閉包中,建立了TestDisposable,並打印了下地址,而後在observableFactory閉包中,一樣打印了做爲入參傳進來的TestDisposable的地址,而後返回了一個發出5個數字的Observable信號源,該信號源會在發送1,2,3,4,5以後發送onComplete,而後出發訂閱者的dispose,這裏我用debug分別在observableFactory閉包內對建立的Observable進行debug信息打印以及在using返回的信號也進行了debug打印,而後跑起來~

運行結果

image.png

能夠看到,先建立了source,而後建立了factory,在factory的訂閱取消以後,source也跟着dispose,而後被釋放掉了。

進階使用

上面的簡單使用中,咱們使用resourceFactory只是建立了一個很簡單的可取消訂閱的對象,而且在observableFactory閉包中沒有對該對象進行任何處理,那若是咱們返回的是一個複雜的可取消訂閱對象,該對象甚至攜帶有信號源,那麼就能夠在observableFactory閉包中對信號源進行處理,並返回新的信號源,就能夠實現一些高級的操做了,官方使用using的例子就在官方demo:RxExample中:ActivityIndicator,很是牛逼的設計

ActivityIndicator

簡介

這是一個信號指示器,發出的信號值爲Bool類型,能夠給每一個須要監聽的信號源綁定一個Token,當信號源訂閱完成時,Token也跟着取消訂閱,ActivityIndicator中持有一個relay來標記持有的Token個數,當Token個數爲0時,ActivityIndicator會發出false,標記沒有信號在發送了,當至少有一個Token時,會發出true,能夠用作下載的指示器

使用

建立一個延遲的Single信號源來模擬網絡請求,而後模擬發送4個不一樣時長的請求,使用debug打印這些請求的完成時間,同時使用debug打印ActivityIndicator的狀態

// 用來建立模擬請求的閉包, 入參爲模擬請求秒數
let testRequestBlock: (Int) -> Single<String> = {
    seconds in
    return .create(subscribe: {
        singler in
        print("模擬開始請求\(seconds)s")
        // 延遲後發送模擬請求完成
        DispatchQueue.main.asyncAfter(deadline: .now() + .seconds(seconds), execute: {
            singler(.success("模擬\(seconds)請求完成"))
        })
        return Disposables.create()
    })
}

// ActivityIndicator指示器,用來標記請求是否所有完成
let indicator = ActivityIndicator()

// 模擬四個請求
testRequestBlock(1).trackActivity(indicator).debug("req1", trimOutput: false).subscribe()
testRequestBlock(7).trackActivity(indicator).debug("req7", trimOutput: false).subscribe()
testRequestBlock(3).trackActivity(indicator).debug("req3", trimOutput: false).subscribe()
testRequestBlock(5).trackActivity(indicator).debug("req5", trimOutput: false).subscribe()

// 訂閱ActivityIndicator,打印狀態
indicator.asObservable().subscribe(onNext: {
    isRequesting in
    print(isRequesting ? "正在請求" : "請求結束")
})
複製代碼

執行結果:

image.png

能夠看到當開始請求時,ActivityIndicator發送信號true,四個請求所有完成時,發送信號false

配合信號流使用

通常狀況下,業務邏輯會是:點擊按鈕,進行N多信號變換處理,發送請求。因此中間會用到各類操做符,當出現flatMap或者concatMap這樣把舊信號源變成新信號源時,trackActivity()方法就要注意調用方法了,若是使用錯誤的話會監測到錯誤信號致使狀態異常。

簡單點信號流:點擊按鈕,flatMap變換成網絡請求,監聽這個請求的狀態。

let req1 = testRequestBlock(1).debug("req1", trimOutput: false)

// 正確寫法
btn1.rx.tap.flatMap({
    _ -> Observable<String> in
    print("按鈕點擊")
    return req1.trackActivity(indicator)
}).debug("btn1", trimOutput: false).subscribe()

// 錯誤寫法
btn1.rx.tap.flatMap({
    _ -> Observable<String> in
    print("按鈕點擊")
    return req1.asObservable()
})
.trackActivity(indicator)
.debug("btn1", trimOutput: false).subscribe()
複製代碼

正確運行結果:

image.png

能夠發現,按鈕事件的訂閱,沒有釋放,由於按鈕事件信號類型是ControlEvent,這種類型的信號特色是:運行在主線程且永遠不會完成/錯誤,所以信號訂閱永遠不會被釋放,flatMap的做用是在按鈕點擊事件產生後,把信號量換成了新的信號量來給訂閱者處理,因此trackActivity()方法須要寫在req1以後。

錯誤運行結果:

image.png

對比上面正確結果發現,還沒點擊按鈕,indicator的狀態就已經變成正在請求了,是由於錯誤寫法下,indicator監測的是按鈕時間信號的狀態,所以在按鈕subscribe以後就變成了true,由於按鈕的信號源訂閱永遠不會釋放,因此就不會打印請求結束

PS: 由於Driver,ControlEvent,Relay這樣的信號源,都不會發送complete與error事件,因此他們的訂閱者,永遠不會自動釋放訂閱,必須由外部條件來觸發dispose,不然會致使內存泄露,因此RxSwift中有DisposeBag對象,用來持有訂閱返回的Disposable對象,而後bag被某個對象持有(通常是vc,或者vm)當bag釋放時,會對持有的所有Disposable執行一遍dispose就能夠避免內存泄露
另一種避免內存泄露的方法就是使用take(count),或者take(until)明確的標明,對於這種信號源我只取我要的幾個信號,達到目標後訂閱會被自動釋放。可是這種方式並不保險,建議仍是使用bag或本身持有Disposable對象來管理訂閱釋放。

原理

ActivityIndicator的原理就是使用了using方法,爲須要監測狀態的信號源建立了一個ActivityToken,在建立token時,計數+1,token.dispose時,計數-1,當計數等於0時發送false,大於0時發送true,使用distinctUntilChanged來過濾重複的信號。

// 聲明週期跟隨source的可釋放資源, 會持有source, 持有一個釋放方法, 在source取消訂閱的時候, 會調用dispose方法
private struct ActivityToken<E> : ObservableConvertibleType, Disposable {
    private let _source: Observable<E>
    private let _dispose: Cancelable

    init(source: Observable<E>, disposeAction: @escaping () -> Void) {
        _source = source
        _dispose = Disposables.create(with: disposeAction)
    }

    func dispose() {
        _dispose.dispose()
    }

    func asObservable() -> Observable<E> {
        return _source
    }
}

// 類型爲共享的序列信號源
public class ActivityIndicator : SharedSequenceConvertibleType {
    // 信號元素類型爲Bool
    public typealias Element = Bool
    // 信號序列策略爲Driver(永不失敗,必定在主線程訂閱,每次新的訂閱,都會把最後一個信號發送一次)
    public typealias SharingStrategy = DriverSharingStrategy

    // 遞歸所
    private let _lock = NSRecursiveLock()
    // 記錄監測的還沒有完成的信號源個數
    private let _relay = BehaviorRelay(value: 0)
    // 用來實現SharedSequenceConvertibleType用
    private let _loading: SharedSequence<SharingStrategy, Bool>

    public init() {
        // 建立_loading:relay變換成Driver, 信號量變成Bool值, 過濾重複值
        _loading = _relay.asDriver()
            .map { $0 > 0 }
            .distinctUntilChanged()
    }

    // 監測信號源
    fileprivate func trackActivityOfObservable<Source: ObservableConvertibleType>(_ source: Source) -> Observable<Source.Element> {
        // 使用using建立新的信號源
        return Observable.using({ () -> ActivityToken<Source.Element> in
            // 先個數+1
            self.increment()
            // 建立token並返回token給後面的observableFactory使用
            return ActivityToken(source: source.asObservable(), disposeAction: self.decrement)
        }) { t in
            // 從token中取出source返回
            return t.asObservable()
        }
    }

    // 個數+1並讓relay發送信號
    private func increment() {
        _lock.lock()
        _relay.accept(_relay.value + 1)
        _lock.unlock()
    }

    // 個數-1並讓relay發送信號
    private func decrement() {
        _lock.lock()
        _relay.accept(_relay.value - 1)
        _lock.unlock()
    }

    // 實現SharedSequenceConvertibleType協議
    public func asSharedSequence() -> SharedSequence<SharingStrategy, Element> {
        return _loading
    }
}

extension ObservableConvertibleType {
    // 給信號源類型擴展, 添加監測方法
    public func trackActivity(_ activityIndicator: ActivityIndicator) -> Observable<Element> {
        // 用indicator監測self,
        return activityIndicator.trackActivityOfObservable(self)
    }
}

複製代碼

生命週期

  1. ActivityIndicator初始化,_relay個數爲0,狀態爲false
  2. 某個信號源A調用trackActivity方法,準備監測
  3. ActivityIndicator調用trackActivityOfObservable方法,使用using方法,先對_relay+1,而後建立ActivityToken,ActivityToken建立時,持有了A,而且使用ActivityIndicator的-1方法做爲閉包參數建立Disposables對象。
  4. 建立ActivityToken完成,接着using調用observableFactory方法,把token持有的resource(信號源A)做爲結果返回,所以信號源A經過調用trackActivity方法以後,返回的信號源的信號量與A一致,只不過通過了層層封裝
  5. 此時ActivityIndicator的_relay個數爲1,狀態爲true
  6. A完成,訂閱釋放
  7. ActivityToken觸發dispose方法,調用ActivityIndicator的-1方法,而後ActivityToken釋放
  8. 此時ActivityIndicator的_relay個數爲0,狀態爲false

總結

using方法的核心就是建立了一個能夠dispose的對象,綁定給信號源,在該信號源訂閱完成時,把dispose對象一塊兒給dispose掉。

而ActivityIndicator則是巧妙的使用須要檢測的信號源A來建立token,使用token封裝A,而後再從token中取出A做爲using方法的返回,信號源在調用方法先後,內部信號量未改變,只是對整個信號進行了封裝,而RxSwift整個框架都是使用block來對信號源進行各類封裝,因此每次調用操做符(filter,map等)都是會對當前信號源進行封裝,返回一個新的信號源對象,整個對象是新的,可是內部的信號量卻使用了block來過濾處理。

就像是水管: 初始的信號源時水龍頭,每個操做符,都至關於爲這個水龍頭接上了一根根管道,這些管道有的能夠過濾,有的能夠變顏色,有的甚至把水接到以後,喝掉並放出了新的水(๑´ㅂ`๑)。種種變換以後最終獲得了咱們想要的結果。

最終的訂閱者徹底不用關注最初的信號源是什麼類型,也不關注中間如何變換,只關心本身接收的數據類型正確便可。中間邏輯調整時,只要拆掉就的管道,換上新的管道,便可輕鬆改變邏輯。這就是函數鏈式編程的好處。

相關文章
相關標籤/搜索