RxSwift源碼分析(五)-調度器Scheduler

在RxSwift中主要有以下四個成員:api

  • 可觀察序列 - Observable
  • 觀察者 - Observer
  • 調度者 - Scheduler
  • 銷燬者 - Dispose

若是這四個都弄明白了,那麼能夠說整個RxSwift也就弄明白了。這篇文章來具體分析調度者 - Scheduler安全

什麼是調度者

SchedulersRx 實現多線程的核心模塊,它主要用於控制任務在哪一個線程或隊列運行,它內部的實現是對 GCDOperationQueue進行了封裝。 若是你曾經使用過 GCD, 那你對如下代碼應該不會陌生,功能都是從多線程獲取數據而後到主線程刷新UI:

// 後臺取得數據,主線程處理結果
DispatchQueue.global(qos: .userInitiated).async {
    let data = try? Data(contentsOf: url)
    DispatchQueue.main.async {
        self.data = data
    }
}
複製代碼

若是用 RxSwift 來實現,大體是這樣的:bash

let rxData: Observable<Data> = ...

rxData
    .subscribeOn(ConcurrentDispatchQueueScheduler(qos: .userInitiated))
    .observeOn(MainScheduler.instance)
    .subscribe(onNext: { [weak self] data in
        self?.data = data
    })
    .disposed(by: disposeBag)
複製代碼

感覺了Scheduler的使用以後,來看看裏面具體是如何實現的。多線程

RxSwift中的幾種Scheduler的介紹

CurrentThreadScheduler閉包

  • 表示當前線程,默認就在當前線程上。
public class CurrentThreadScheduler : ImmediateSchedulerType {
    typealias ScheduleQueue = RxMutableBox<Queue<ScheduledItemType>>

    /// The singleton instance of the current thread scheduler.
    public static let instance = CurrentThreadScheduler()

    private static var isScheduleRequiredKey: pthread_key_t = { () -> pthread_key_t in
        let key = UnsafeMutablePointer<pthread_key_t>.allocate(capacity: 1)
        defer { key.deallocate() }
                                                               
        guard pthread_key_create(key, nil) == 0 else {
            rxFatalError("isScheduleRequired key creation failed")
        }

        return key.pointee
    }()

    private static var scheduleInProgressSentinel: UnsafeRawPointer = { () -> UnsafeRawPointer in
        return UnsafeRawPointer(UnsafeMutablePointer<Int>.allocate(capacity: 1))
    }()

    static var queue : ScheduleQueue? {
        get {
            return Thread.getThreadLocalStorageValueForKey(CurrentThreadSchedulerQueueKey.instance)
        }
        set {
            Thread.setThreadLocalStorageValue(newValue, forKey: CurrentThreadSchedulerQueueKey.instance)
        }
    }

    /// Gets a value that indicates whether the caller must call a `schedule` method.
    public static fileprivate(set) var isScheduleRequired: Bool {
        get {
            return pthread_getspecific(CurrentThreadScheduler.isScheduleRequiredKey) == nil
        }
        set(isScheduleRequired) {
            if pthread_setspecific(CurrentThreadScheduler.isScheduleRequiredKey, isScheduleRequired ? nil : scheduleInProgressSentinel) != 0 {
                rxFatalError("pthread_setspecific failed")
            }
        }
    }
    ......
}
複製代碼
  • isScheduleRequired 用來表示是否必須調用schedule方法,利用對 queue的set,get方法的觀察,綁定咱們的當前隊列與靜態字符串,實現同一線程數據共享。

SerialDispatchQueueScheduler併發

  • SerialDispatchQueueScheduler 抽象了串行 DispatchQueue。若是你須要執行一些串行任務,能夠切換到這個 Scheduler 運行。

ConcurrentDispatchQueueScheduler異步

  • ConcurrentDispatchQueueScheduler 抽象了並行 DispatchQueue。若是你須要執行一些併發任務,能夠切換到這個 Scheduler 運行。

OperationQueueSchedulerasync

  • OperationQueueScheduler 抽象了 NSOperationQueue。它具有 NSOperationQueue 的一些特色,例如,你能夠經過設置 maxConcurrentOperationCount,來控制同時執行併發任務的最大數量。
public class OperationQueueScheduler: ImmediateSchedulerType {
    public let operationQueue: OperationQueue
    public let queuePriority: Operation.QueuePriority
    
    /// Constructs new instance of `OperationQueueScheduler` that performs work on `operationQueue`.
    ///
    /// - parameter operationQueue: Operation queue targeted to perform work on.
    /// - parameter queuePriority: Queue priority which will be assigned to new operations.
    public init(operationQueue: OperationQueue, queuePriority: Operation.QueuePriority = .normal) {
        self.operationQueue = operationQueue
        self.queuePriority = queuePriority
    }
    ......
}
複製代碼
  • 在初始化 OperationQueueScheduler 對象時,須要傳入 OperationQueue 和 優先級queuePriority,做爲初始化參數。

MainScheduleride

  • MainScheduler 表明主線程。若是你須要執行一些和 UI 相關的任務,就須要切換到該 Scheduler 運行。
public final class MainScheduler : SerialDispatchQueueScheduler {

    private let _mainQueue: DispatchQueue

    let numberEnqueued = AtomicInt(0)
    public init() {
        self._mainQueue = DispatchQueue.main
        super.init(serialQueue: self._mainQueue)
    }
    public static let instance = MainScheduler()
}
複製代碼
  • 經過源碼能夠看出MainScheduler繼承了SerialDispatchQueueScheduler串行隊列,由於主隊列原本就是一個特殊的串行隊列。而後在初始化對象時,肯定了隊列類型爲主隊列self._mainQueue = DispatchQueue.main

使用

根據前面的示例來分析下subscribeOnobserveOn的具體實現 使用 subscribeOn函數

  • 咱們用 subscribeOn 來決定數據序列的構建函數在哪一個 Scheduler 上運行。以上例子中,因爲獲取 Data 須要花很長的時間,因此用 subscribeOn 切換到 後臺 Scheduler 來獲取 Data。這樣能夠避免主線程被阻塞。
  • 默認狀況下,Observable 建立,應用操做符以及發出通知都會在 Subscribe 方法調用的 Scheduler 執行。subscribeOn 操做符將改變這種行爲,它會指定一個不一樣的 Scheduler 來讓 Observable 執行。
  • 源碼分析
public func subscribeOn(_ scheduler: ImmediateSchedulerType)
    -> Observable<Element> {
    return SubscribeOn(source: self, scheduler: scheduler)
}
複製代碼
  • 看到返回值類型是Observable,咱們就知道原來subscribeOn是把源序列封裝成了一箇中間層序列SubscribeOn
final private class SubscribeOn<Ob: ObservableType>: Producer<Ob.Element> {
    let source: Ob
    let scheduler: ImmediateSchedulerType
    
    init(source: Ob, scheduler: ImmediateSchedulerType) {
        self.source = source
        self.scheduler = scheduler
    }
    
    override func run<Observer: ObserverType>(_ observer: Observer, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where Observer.Element == Ob.Element {
        let sink = SubscribeOnSink(parent: self, observer: observer, cancel: cancel)
        let subscription = sink.run()
        return (sink: sink, subscription: subscription)
    }
}
複製代碼
  • 在初始化的時候保存了源序列和調度者。
  • 經過以前對RxSwift核心邏輯的講解的文章,就可以知道,當序列被訂閱的時候,代碼必定會執行到run方法來。(還不太瞭解的朋友能夠查看我前面的關於RxSwift核心邏輯的文章) 進入到SubscribeOnSink.run方法
func run() -> Disposable {
    let disposeEverything = SerialDisposable()
    let cancelSchedule = SingleAssignmentDisposable()
    
    disposeEverything.disposable = cancelSchedule
    
    let disposeSchedule = self.parent.scheduler.schedule(()) { _ -> Disposable in
        let subscription = self.parent.source.subscribe(self)
        disposeEverything.disposable = ScheduledDisposable(scheduler: self.parent.scheduler, disposable: subscription)
        return Disposables.create()
    }

    cancelSchedule.setDisposable(disposeSchedule)

    return disposeEverything
}
複製代碼
  • 看到有一句關於調度相關的代碼self.parent.scheduler.schedule()self.parent.scheduler就是調用SubscribeOn方法做爲參數傳進來的隊列,而後執行schedule方法。
  • 調用self.scheduleInternal(state, action: action)
  • 而後執行到self.configuration.schedule(state, action: action)
func schedule<StateType>(_ state: StateType, action: @escaping (StateType) -> Disposable) -> Disposable {
    let cancel = SingleAssignmentDisposable()

    self.queue.async {
        if cancel.isDisposed {
            return
        }
        cancel.setDisposable(action(state))
    }

    return cancel
}
複製代碼
  • 看到這裏終於明白了,原來就是把任務放在了設置的隊列裏面異步執行
  • 這個action(state)就是從外面傳進來的尾隨閉包,因此代碼會開始執行閉包,就會執行let subscription = self.parent.source.subscribe(self),對源序列進行訂閱,因此必然會來到Producersubscribe方法。
override func subscribe<Observer: ObserverType>(_ observer: Observer) -> Disposable where Observer.Element == Element {
    if !CurrentThreadScheduler.isScheduleRequired {
        // The returned disposable needs to release all references once it was disposed.
        let disposer = SinkDisposer()
        let sinkAndSubscription = self.run(observer, cancel: disposer)
        disposer.setSinkAndSubscription(sink: sinkAndSubscription.sink, subscription: sinkAndSubscription.subscription)

        return disposer
    }
    else {
        return CurrentThreadScheduler.instance.schedule(()) { _ in
            let disposer = SinkDisposer()
            let sinkAndSubscription = self.run(observer, cancel: disposer)
            disposer.setSinkAndSubscription(sink: sinkAndSubscription.sink, subscription: sinkAndSubscription.subscription)

            return disposer
        }
    }
}
複製代碼
  • 這裏會根據當前的調度環境來判斷具體執行哪塊代碼,這裏首先會走else裏面。來到schedule方法裏面
  • defer是延遲調用,保證在return以前調用
  • 首先會執行action(state),在閉包裏面會執行ObservableSequenceSink.run方法,最後代碼又會來到這個schedule方法,因爲上一次進來時把isScheduleRequired設置成了false,因此代碼會執行代碼塊3(如圖)
  • 代碼塊3主要是將任務封裝成一個ScheduledItem對象,而後加入到隊列中
  • 而後執行代碼塊2,代碼塊2的做用是把前面存進隊列的任務一個個拿出來按順序執行,即開始執行下面的閉包
func schedule(_ state: State) {
    var scheduleState: ScheduleState = .initial

    let d = self._scheduler.schedule(state) { state -> Disposable in
        // best effort
        if self._group.isDisposed {
            return Disposables.create()
        }
        // 這裏由於在遞歸環境,加了一把鎖遞歸鎖,保障安全 
        let action = self._lock.calculateLocked { () -> Action? in
            switch scheduleState {
            case let .added(removeKey):
                self._group.remove(for: removeKey)
            case .initial:
                break
            case .done:
                break
            }

            scheduleState = .done

            return self._action
        }
        
        if let action = action {
            action(state, self.schedule)
        }
        
        return Disposables.create()
    }
    
    ......
}
複製代碼
  • 首先爲了保證線程安全,保證FIFO,加了一把鎖。這也就是爲何RxSwift的信號執行是有順序的
  • 而後執行action,也就是外界傳給遞歸調度者的閉包,後面就是發送信號的常規流程self.forwardOn(.next(next))
  • ps:RxSwift的代碼調用很是的繁瑣,嵌套很深,各類閉包,因此須要慢慢地一遍一遍的打斷點去仔細斟酌

使用 observeOn

  • 咱們用 observeOn 來決定在哪一個 Scheduler 監聽這個數據序列。以上例子中,經過使用 observeOn 方法切換到主線程來監聽而且處理結果。
  • observeOn 操做符將指定一個不一樣的 Scheduler 來讓 Observable 通知觀察者。
  • observeOn大致流程和思想跟subscribeOn差很少,因此這裏就不一一分析了。

總結

  • 調度器 Scheduler 的繼承關係圖:
  • SchedulersRx 實現多線程的核心模塊,它主要用於控制任務在哪一個線程或隊列運行
  • subscribeOn 來決定數據序列的構建函數在哪一個 Scheduler 上運行
  • observeOn 來決定在哪一個 Scheduler 監聽這個數據序列
  • subscribeOnobserveOn都會建立一箇中間層序列,因此內部也有一個訂閱響應序列的流程,中間層的 sink 就是源序列的觀察者

有問題或者建議和意見,歡迎你們評論或者私信。 喜歡的朋友能夠點下關注和喜歡,後續會持續更新文章。

相關文章
相關標籤/搜索