在RxSwift中主要有以下四個成員:api
Observable
Observer
Scheduler
Dispose
若是這四個都弄明白了,那麼能夠說整個RxSwift也就弄明白了。這篇文章來具體分析調度者 - Scheduler
安全
Schedulers
是
Rx
實現多線程的核心模塊,它主要用於控制任務在哪一個線程或隊列運行,它內部的實現是對
GCD
和
OperationQueue
進行了封裝。 若是你曾經使用過 GCD, 那你對如下代碼應該不會陌生,功能都是從多線程獲取數據而後到主線程刷新UI:
// 後臺取得數據,主線程處理結果
DispatchQueue.global(qos: .userInitiated).async {
let data = try? Data(contentsOf: url)
DispatchQueue.main.async {
self.data = data
}
}
複製代碼
若是用 RxSwift
來實現,大體是這樣的:bash
let rxData: Observable<Data> = ...
rxData
.subscribeOn(ConcurrentDispatchQueueScheduler(qos: .userInitiated))
.observeOn(MainScheduler.instance)
.subscribe(onNext: { [weak self] data in
self?.data = data
})
.disposed(by: disposeBag)
複製代碼
感覺了Scheduler
的使用以後,來看看裏面具體是如何實現的。多線程
CurrentThreadScheduler閉包
public class CurrentThreadScheduler : ImmediateSchedulerType {
typealias ScheduleQueue = RxMutableBox<Queue<ScheduledItemType>>
/// The singleton instance of the current thread scheduler.
public static let instance = CurrentThreadScheduler()
private static var isScheduleRequiredKey: pthread_key_t = { () -> pthread_key_t in
let key = UnsafeMutablePointer<pthread_key_t>.allocate(capacity: 1)
defer { key.deallocate() }
guard pthread_key_create(key, nil) == 0 else {
rxFatalError("isScheduleRequired key creation failed")
}
return key.pointee
}()
private static var scheduleInProgressSentinel: UnsafeRawPointer = { () -> UnsafeRawPointer in
return UnsafeRawPointer(UnsafeMutablePointer<Int>.allocate(capacity: 1))
}()
static var queue : ScheduleQueue? {
get {
return Thread.getThreadLocalStorageValueForKey(CurrentThreadSchedulerQueueKey.instance)
}
set {
Thread.setThreadLocalStorageValue(newValue, forKey: CurrentThreadSchedulerQueueKey.instance)
}
}
/// Gets a value that indicates whether the caller must call a `schedule` method.
public static fileprivate(set) var isScheduleRequired: Bool {
get {
return pthread_getspecific(CurrentThreadScheduler.isScheduleRequiredKey) == nil
}
set(isScheduleRequired) {
if pthread_setspecific(CurrentThreadScheduler.isScheduleRequiredKey, isScheduleRequired ? nil : scheduleInProgressSentinel) != 0 {
rxFatalError("pthread_setspecific failed")
}
}
}
......
}
複製代碼
isScheduleRequired
用來表示是否必須調用schedule
方法,利用對 queue
的set,get方法的觀察,綁定咱們的當前隊列與靜態字符串,實現同一線程數據共享。SerialDispatchQueueScheduler併發
SerialDispatchQueueScheduler
抽象了串行 DispatchQueue
。若是你須要執行一些串行任務,能夠切換到這個 Scheduler
運行。ConcurrentDispatchQueueScheduler異步
ConcurrentDispatchQueueScheduler
抽象了並行 DispatchQueue
。若是你須要執行一些併發任務,能夠切換到這個 Scheduler
運行。OperationQueueSchedulerasync
OperationQueueScheduler
抽象了 NSOperationQueue
。它具有 NSOperationQueue
的一些特色,例如,你能夠經過設置 maxConcurrentOperationCount
,來控制同時執行併發任務的最大數量。public class OperationQueueScheduler: ImmediateSchedulerType {
public let operationQueue: OperationQueue
public let queuePriority: Operation.QueuePriority
/// Constructs new instance of `OperationQueueScheduler` that performs work on `operationQueue`.
///
/// - parameter operationQueue: Operation queue targeted to perform work on.
/// - parameter queuePriority: Queue priority which will be assigned to new operations.
public init(operationQueue: OperationQueue, queuePriority: Operation.QueuePriority = .normal) {
self.operationQueue = operationQueue
self.queuePriority = queuePriority
}
......
}
複製代碼
OperationQueueScheduler
對象時,須要傳入 OperationQueue
和 優先級queuePriority
,做爲初始化參數。MainScheduleride
MainScheduler
表明主線程。若是你須要執行一些和 UI 相關的任務,就須要切換到該 Scheduler
運行。public final class MainScheduler : SerialDispatchQueueScheduler {
private let _mainQueue: DispatchQueue
let numberEnqueued = AtomicInt(0)
public init() {
self._mainQueue = DispatchQueue.main
super.init(serialQueue: self._mainQueue)
}
public static let instance = MainScheduler()
}
複製代碼
MainScheduler
繼承了SerialDispatchQueueScheduler
串行隊列,由於主隊列原本就是一個特殊的串行隊列。而後在初始化對象時,肯定了隊列類型爲主隊列self._mainQueue = DispatchQueue.main
。根據前面的示例來分析下subscribeOn
和observeOn
的具體實現 使用 subscribeOn函數
subscribeOn
來決定數據序列的構建函數在哪一個 Scheduler
上運行。以上例子中,因爲獲取 Data 須要花很長的時間,因此用 subscribeOn
切換到 後臺 Scheduler
來獲取 Data。這樣能夠避免主線程被阻塞。Observable
建立,應用操做符以及發出通知都會在 Subscribe
方法調用的 Scheduler
執行。subscribeOn
操做符將改變這種行爲,它會指定一個不一樣的 Scheduler
來讓 Observable
執行。public func subscribeOn(_ scheduler: ImmediateSchedulerType)
-> Observable<Element> {
return SubscribeOn(source: self, scheduler: scheduler)
}
複製代碼
Observable
,咱們就知道原來subscribeOn
是把源序列封裝成了一箇中間層序列SubscribeOn
。final private class SubscribeOn<Ob: ObservableType>: Producer<Ob.Element> {
let source: Ob
let scheduler: ImmediateSchedulerType
init(source: Ob, scheduler: ImmediateSchedulerType) {
self.source = source
self.scheduler = scheduler
}
override func run<Observer: ObserverType>(_ observer: Observer, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where Observer.Element == Ob.Element {
let sink = SubscribeOnSink(parent: self, observer: observer, cancel: cancel)
let subscription = sink.run()
return (sink: sink, subscription: subscription)
}
}
複製代碼
RxSwift
核心邏輯的講解的文章,就可以知道,當序列被訂閱的時候,代碼必定會執行到run
方法來。(還不太瞭解的朋友能夠查看我前面的關於RxSwift核心邏輯的文章) 進入到SubscribeOnSink.run
方法func run() -> Disposable {
let disposeEverything = SerialDisposable()
let cancelSchedule = SingleAssignmentDisposable()
disposeEverything.disposable = cancelSchedule
let disposeSchedule = self.parent.scheduler.schedule(()) { _ -> Disposable in
let subscription = self.parent.source.subscribe(self)
disposeEverything.disposable = ScheduledDisposable(scheduler: self.parent.scheduler, disposable: subscription)
return Disposables.create()
}
cancelSchedule.setDisposable(disposeSchedule)
return disposeEverything
}
複製代碼
self.parent.scheduler.schedule()
,self.parent.scheduler
就是調用SubscribeOn
方法做爲參數傳進來的隊列,而後執行schedule
方法。self.scheduleInternal(state, action: action)
self.configuration.schedule(state, action: action)
func schedule<StateType>(_ state: StateType, action: @escaping (StateType) -> Disposable) -> Disposable {
let cancel = SingleAssignmentDisposable()
self.queue.async {
if cancel.isDisposed {
return
}
cancel.setDisposable(action(state))
}
return cancel
}
複製代碼
action(state)
就是從外面傳進來的尾隨閉包,因此代碼會開始執行閉包,就會執行let subscription = self.parent.source.subscribe(self)
,對源序列進行訂閱,因此必然會來到Producer
的subscribe
方法。override func subscribe<Observer: ObserverType>(_ observer: Observer) -> Disposable where Observer.Element == Element {
if !CurrentThreadScheduler.isScheduleRequired {
// The returned disposable needs to release all references once it was disposed.
let disposer = SinkDisposer()
let sinkAndSubscription = self.run(observer, cancel: disposer)
disposer.setSinkAndSubscription(sink: sinkAndSubscription.sink, subscription: sinkAndSubscription.subscription)
return disposer
}
else {
return CurrentThreadScheduler.instance.schedule(()) { _ in
let disposer = SinkDisposer()
let sinkAndSubscription = self.run(observer, cancel: disposer)
disposer.setSinkAndSubscription(sink: sinkAndSubscription.sink, subscription: sinkAndSubscription.subscription)
return disposer
}
}
}
複製代碼
schedule
方法裏面
defer
是延遲調用,保證在return
以前調用action(state)
,在閉包裏面會執行ObservableSequenceSink.run
方法,最後代碼又會來到這個schedule
方法,因爲上一次進來時把isScheduleRequired
設置成了false,因此代碼會執行代碼塊3(如圖)ScheduledItem
對象,而後加入到隊列中func schedule(_ state: State) {
var scheduleState: ScheduleState = .initial
let d = self._scheduler.schedule(state) { state -> Disposable in
// best effort
if self._group.isDisposed {
return Disposables.create()
}
// 這裏由於在遞歸環境,加了一把鎖遞歸鎖,保障安全
let action = self._lock.calculateLocked { () -> Action? in
switch scheduleState {
case let .added(removeKey):
self._group.remove(for: removeKey)
case .initial:
break
case .done:
break
}
scheduleState = .done
return self._action
}
if let action = action {
action(state, self.schedule)
}
return Disposables.create()
}
......
}
複製代碼
action
,也就是外界傳給遞歸調度者的閉包,後面就是發送信號的常規流程self.forwardOn(.next(next))
。RxSwift
的代碼調用很是的繁瑣,嵌套很深,各類閉包,因此須要慢慢地一遍一遍的打斷點去仔細斟酌使用 observeOn
observeOn
來決定在哪一個 Scheduler
監聽這個數據序列。以上例子中,經過使用 observeOn
方法切換到主線程來監聽而且處理結果。observeOn
操做符將指定一個不一樣的 Scheduler
來讓 Observable
通知觀察者。observeOn
大致流程和思想跟subscribeOn
差很少,因此這裏就不一一分析了。Scheduler
的繼承關係圖:
Schedulers
是 Rx
實現多線程的核心模塊,它主要用於控制任務在哪一個線程或隊列運行subscribeOn
來決定數據序列的構建函數在哪一個 Scheduler
上運行observeOn
來決定在哪一個 Scheduler
監聽這個數據序列subscribeOn
和observeOn
都會建立一箇中間層序列,因此內部也有一個訂閱響應序列的流程,中間層的 sink
就是源序列的觀察者有問題或者建議和意見,歡迎你們評論或者私信。 喜歡的朋友能夠點下關注和喜歡,後續會持續更新文章。