RxSwift(7)— 調度者-scheduler源碼解析(下)

就問此時此刻還有誰?45度仰望天空,該死!我這無處安放的魅力!swift


RxSwift 目錄直通車 --- 和諧學習,不急不躁!api


這一篇關於調度的具體流程分析,若有對RxSwift調度者不太瞭解的觀客,請移步RxSwift調度者-scheduler源碼解析(上)數組

observeOn&subscribeOn

上面兩個函數是 RxSwift 切換調度環境很是重要的,能夠說掌握了這兩個函數對 RxSwift調度者 的理解可謂是:指日可待安全

首先我給你們帶來一個測試:閉包

DispatchQueue.global().async {
    self.actionBtn.rx.tap
        .subscribe(onNext: { () in
            print("點擊了按鈕 --- \(Thread.current)")
        })
        .disposed(by: self.bag)
}
複製代碼
  • 問:當前按鈕點擊,打印的線程狀況是什麼: 點擊了按鈕 --- <NSThread: 0x600000c2d980>{number = 1, name = main}
  • 你是否回答正確?這裏切換到了主線程,那麼我明明規定了當前線程就是全局併發隊列(子線程),可是爲何切換過來的?

image

public func controlEvent(_ controlEvents: UIControl.Event) -> ControlEvent<()> {
    let source: Observable<Void> = Observable.create { [weak control = self.base] observer in
            // 調度主線程判斷
            MainScheduler.ensureRunningOnMainThread()
        }
    return ControlEvent(events: source)
}
複製代碼
  • 咱們通常思惟跟進:controlEvent,發現咱們調度執行必需要在主隊列
  • NO 如今我不要知道這個,我想知道它是怎麼切過去的!
public init<Ev: ObservableType>(events: Ev) where Ev.Element == Element {
    self._events = events.subscribeOn(ConcurrentMainScheduler.instance)
}

public func subscribe<Observer: ObserverType>(_ observer: Observer) -> Disposable {
    return self._events.subscribe(observer)
}
複製代碼
  • OK 很明顯咱們的 ControlEvent 的序列 subscribe 是調用了一個函數就是:subscribeOn.
  • 其中ConcurrentMainScheduler.instance 內部封裝了 主隊列!那麼全部事情也就清晰了!
  • 哈哈,其實還有一個地方仍是不明確的就是:subscribeOn 到底內部的邏輯又是什麼?請看下面的分析
public func subscribeOn(_ scheduler: ImmediateSchedulerType)
    -> Observable<Element> {
    return SubscribeOn(source: self, scheduler: scheduler)
}
複製代碼
  • 看到返回值的類型就知道,原來的序列是被subscribeOn進行處理了,封裝了中間層:SubscribeOn 的序列
final private class SubscribeOn<Ob: ObservableType>: Producer<Ob.Element> {
    let source: Ob
    let scheduler: ImmediateSchedulerType
    
    init(source: Ob, scheduler: ImmediateSchedulerType) {
        self.source = source
        self.scheduler = scheduler
    }
    
    override func run(_ observer: cancel:) -> (sink:subscription:) {
        let sink = SubscribeOnSink(parent: self, observer: observer, cancel: cancel)
        let subscription = sink.run()
        return (sink: sink, subscription: subscription)
    }
}
複製代碼
  • 看到 SubscribeOn 的繼承關係(Producer),咱們對他也就放心了。
  • 序列訂閱的時候,會建立一個observer的觀察者
  • 通過Producer 流回SubscribeOn的run
  • 在通過 SubscribeOnSink.run 到觀察者的回調(或者內部源序列的訂閱,傳sink做爲觀察者回調,後面的流程只是重複走了一次)
  • 由觀察者的發送響應,回到 sinkon
  • sink的屬性觀察者(也就是中間封裝保存的)響應event事件
  • 最後調用外界的subscriber的閉包
  • 下面咱們直接跳過上面的流程,進入關於調度相關代碼(這裏由於篇幅緣由,我也簡寫流程,看核心源碼)
func run() -> Disposable {
    let disposeEverything = SerialDisposable()
    let cancelSchedule = SingleAssignmentDisposable()
    
    disposeEverything.disposable = cancelSchedule
    
    let disposeSchedule = self.parent.scheduler.schedule(()) {

        let subscription = self.parent.source.subscribe(self)
        disposeEverything.disposable = ScheduledDisposable(scheduler: disposable:)
        return Disposables.create()
    }
    cancelSchedule.setDisposable(disposeSchedule)
    return disposeEverything
}
複製代碼
  • 這裏就有一個很是重要的方法:self.parent.scheduler.schedule(),背後的邏輯咱們在前面的代碼也解析過了!
  • 調用self.scheduleInternal(state, action: action)
func schedule<StateType>(_ state: action: ) -> Disposable {
    let cancel = SingleAssignmentDisposable()
    self.queue.async {
        if cancel.isDisposed {
            return
        }
        cancel.setDisposable(action(state))
    }
    return cancel
}
複製代碼
  • 看到這裏你也就明白了咱們調度的是把任務 action封裝,利用 scheduler 統一管理
  • 不少同窗若是不求甚解,那麼到這裏也差很少了!不怕死的同窗能夠繼續跟着我往下面走
  • 其實這裏的action就是一個 schduler 調用時候的閉包,就會執行:let subscription = self.parent.source.subscribe(self), 這裏距離的咱們的目標還有一點距離,哈哈哈哈
  • 源序列的subscribe,必然會來到Producer
override func subscribe<Observer: ObserverType>(_ observer: Observer) -> Disposable where Observer.Element == Element {
    if !CurrentThreadScheduler.isScheduleRequired {
        // The returned disposable needs to release all references once it was disposed.
        let disposer = SinkDisposer()
        let sinkAndSubscription = self.run(observer, cancel: disposer)
        disposer.setSinkAndSubscription(sink: sinkAndSubscription.sink, subscription: sinkAndSubscription.subscription)

        return disposer
    }
    else {
        return CurrentThreadScheduler.instance.schedule(()) { _ in
            let disposer = SinkDisposer()
            let sinkAndSubscription = self.run(observer, cancel: disposer)
            disposer.setSinkAndSubscription(sink: sinkAndSubscription.sink, subscription: sinkAndSubscription.subscription)

            return disposer
        }
    }
}
複製代碼
  • 這裏會根據當前的調度環境來判斷,其實內部的流程差很少,只是多了一個方法調用
public func schedule<StateType>(_ state: action: ) -> Disposable {

    if CurrentThreadScheduler.isScheduleRequired {
      // 已經標記,就置false
        CurrentThreadScheduler.isScheduleRequired = false
     // 外界閉包調用執行
        let disposable = action(state)
      // 延遲銷燬 
        defer {
            CurrentThreadScheduler.isScheduleRequired = true
            CurrentThreadScheduler.queue = nil
        }
      // 先省略。。。
        return disposable
    }
     // 先省略。。。
    return scheduledItem
}
複製代碼
  • 由這裏就能夠看出直接調度執行,可是問題在哪呢?
    • 若是你當前調度環境不變,那就沒有問題!一直都在我貼出的代碼流程來回執行
    • 若是我這裏調度的是子線程,那麼就徹底不同,針對當前隊列,還有線程安全都是須要處理的!
public func scheduleRecursive<State>(_ state: action: ) -> Disposable {
    // 遞歸調度者
    let recursiveScheduler = RecursiveImmediateScheduler(action: scheduler:)
    // 調度狀態執行
    recursiveScheduler.schedule(state)
    return Disposables.create(with: recursiveScheduler.dispose)
}
複製代碼
  • 在這個遞歸調度者裏面的邏輯就相對來講,比較複雜!請認真跟我分析
func schedule(_ state: State) {
    var scheduleState: ScheduleState = .initial
    let d = self._scheduler.schedule(state) { state -> Disposable in     
        // 這裏由於在遞歸環境,加了一把鎖遞歸鎖,保障安全 
        let action = self._lock.calculateLocked { () -> Action? in
                 return self._action
        }
        
        if let action = action {
            action(state, self.schedule)
        }
        
        return Disposables.create()
    }
  // 篇幅,先省略,你們自行查閱 
}
複製代碼
  • 這裏由於在遞歸環境,加了一把鎖遞歸鎖,保障安全
  • 經過保護,獲取action執行,也就是外界傳給遞歸調度者的閉包任務
  • 看到這裏,是否是你已經get 到了爲何RxSwift 的數組調度出來是有順序的,由於在遞歸調度,已經加鎖了,保障線程資源安全
  • 執行完源序列的響應,會吧任務保存進隊列,這裏仍是須要講解的!
public func schedule<StateType>(_ state: StateType, action: ) -> Disposable {
    // 上面的流程就省略了
    let existingQueue = CurrentThreadScheduler.queue

    let queue: RxMutableBox<Queue<ScheduledItemType>>
    if let existingQueue = existingQueue {
        queue = existingQueue
    }
    else {
        queue = RxMutableBox(Queue<ScheduledItemType>(capacity: 1))
        CurrentThreadScheduler.queue = queue
    }

    let scheduledItem = ScheduledItem(action: action, state: state)
    queue.value.enqueue(scheduledItem)

    return scheduledItem
}
複製代碼
  • 把任務和狀態封裝成了ScheduledItem,面向對象,更容易傳輸&執行
  • 把這個事務queue.value.enqueue(scheduledItem),排隊進隊列
public func schedule<StateType>(_ state: StateType, action: @escaping (StateType) -> Disposable) -> Disposable {
    if CurrentThreadScheduler.isScheduleRequired {

        CurrentThreadScheduler.isScheduleRequired = false
        let disposable = action(state)  

         // 判斷當前隊列狀況,是否存在
        guard let queue = CurrentThreadScheduler.queue else {
            return disposable
        }
        // 從隊列去除任務
        while let latest = queue.value.dequeue() {
            if latest.isDisposed {
                continue
            }
            latest.invoke()
        }

        return disposable
    }
    /// 省略
}
複製代碼
  • 流程任務執行action(state) 完畢以後,又會執行下面的流程
  • 判斷當前隊列狀況,是否存在
  • 從隊列去除任務 : queue.value.dequeue()
  • 重點:latest.invoke()
func invoke() {
     self._disposable.setDisposable(self._action(self._state))
}
複製代碼
  • 就是原來響應回來時候保存的 action執行
  • 只不過加了銷燬的機制
  • 這個時候咱們的流程就會由原來的 源序列 流進 ObserveOnSink
  • 保障了在 ObserveOnSink 的調度環境是有序的進隊的:self._queue.enqueue(event)
  • 執行self._scheduler.scheduleRecursive((), action: self.run)
override func onCore(_ event: Event<Element>) {
    let shouldStart = self._lock.calculateLocked { () -> Bool in
        self._queue.enqueue(event)
    }
    if shouldStart {
        self._scheduleDisposable.disposable = 
        self._scheduler.scheduleRecursive((), action: self.run)
    }
}
複製代碼
  • 這裏的手法是很是重要的:畢竟併發隊列極可能存在!咱們模擬的也是併發
func run(_ state: (), _ recurse: (()) -> Void) {
    // 加鎖獲取觀察者,很隊列任務
    let (nextEvent, observer) = self._lock.calculateLocked { 
        if !self._queue.isEmpty {
            return (self._queue.dequeue(), self._observer)
        }
    }
   
    // 觀察者發送響應
    if let nextEvent = nextEvent, !self._cancel.isDisposed {
        observer.on(nextEvent)
        if nextEvent.isStopEvent {
            self.dispose()
        }
    }
}
複製代碼
  • 加鎖獲取觀察者,很隊列任務 : (self._queue.dequeue(), self._observer)
  • 觀察者發送響應: observer.on(nextEvent)
  • 這個時候咱們外界的訂閱閉包就能夠如期回調咯!

完美!解析到這裏,整個流程解析完畢,就問此時此刻還有誰? 45度仰望天空,該死!我這無處安放的魅力!併發

##總結async

整個流程是比較複雜,其實若是你這個時候,總體看源碼(上帝的視角)!不可貴出:ide

  • 源序列包裝
  • 內部序列建立
  • 調度環境&觀察者傳遞準備
  • 源序列訂閱 - 根據調度環境調度 - 流程流到觀察者就是咱們中間內部序列的Sink
  • Sink 調度執行 響應發給觀察者
  • 由觀察者響應 訂閱事件event

就是兩層序列訂閱響應,個人第二層的 sink 就是源序列的觀察者函數

RxSwift的源碼其實你只要掌握我前面的分享過的核心邏輯,後面的內容都是同樣的,只不過嵌套,膠水代碼依賴創建。其中依賴關係,中間層封裝,流程走位是各位在接下來的iOS進階很是重要的思惟,告別垃圾代碼,迎接將來!咱們一塊兒加油:和諧學習,不急不躁源碼分析

就問此時此刻還有誰?45度仰望天空,該死!我這無處安放的魅力!

相關文章
相關標籤/搜索