就問此時此刻還有誰?45度仰望天空,該死!我這無處安放的魅力!swift
- RxSwift(1)— 初探
- RxSwift(2)— 核心邏輯源碼分析
- RxSwift(3)— Observable序列的建立方式
- RxSwift(4)— 高階函數(上)
- RxSwift(5)— 高階函數(下)
- RxSwift(6)— 調度者-scheduler源碼解析(上)
- RxSwift(7)— 調度者-scheduler源碼解析(下)
- RxSwift(8)— KVO底層探索(上)
- RxSwift(9)— KVO底層探索(下)
- RxSwift(10)— 場景序列總結
- RxSwift(11)— dispose源碼解析
- RxSwift(12)— Subject即攻也守
- RxSwift(13)— 爬過的坑
- RxSwift(14)— MVVM雙向綁定
RxSwift 目錄直通車 --- 和諧學習,不急不躁!api
這一篇關於調度的具體流程分析,若有對RxSwift調度者不太瞭解的觀客,請移步RxSwift調度者-scheduler源碼解析(上)數組
上面兩個函數是 RxSwift
切換調度環境很是重要的,能夠說掌握了這兩個函數對 RxSwift調度者
的理解可謂是:指日可待安全
首先我給你們帶來一個測試:閉包
DispatchQueue.global().async {
self.actionBtn.rx.tap
.subscribe(onNext: { () in
print("點擊了按鈕 --- \(Thread.current)")
})
.disposed(by: self.bag)
}
複製代碼
點擊了按鈕 --- <NSThread: 0x600000c2d980>{number = 1, name = main}
public func controlEvent(_ controlEvents: UIControl.Event) -> ControlEvent<()> {
let source: Observable<Void> = Observable.create { [weak control = self.base] observer in
// 調度主線程判斷
MainScheduler.ensureRunningOnMainThread()
}
return ControlEvent(events: source)
}
複製代碼
controlEvent
,發現咱們調度執行必需要在主隊列
!public init<Ev: ObservableType>(events: Ev) where Ev.Element == Element {
self._events = events.subscribeOn(ConcurrentMainScheduler.instance)
}
public func subscribe<Observer: ObserverType>(_ observer: Observer) -> Disposable {
return self._events.subscribe(observer)
}
複製代碼
ControlEvent
的序列 subscribe
是調用了一個函數就是:subscribeOn
.ConcurrentMainScheduler.instance
內部封裝了 主隊列
!那麼全部事情也就清晰了!subscribeOn
到底內部的邏輯又是什麼?請看下面的分析public func subscribeOn(_ scheduler: ImmediateSchedulerType)
-> Observable<Element> {
return SubscribeOn(source: self, scheduler: scheduler)
}
複製代碼
subscribeOn
進行處理了,封裝了中間層:SubscribeOn
的序列final private class SubscribeOn<Ob: ObservableType>: Producer<Ob.Element> {
let source: Ob
let scheduler: ImmediateSchedulerType
init(source: Ob, scheduler: ImmediateSchedulerType) {
self.source = source
self.scheduler = scheduler
}
override func run(_ observer: cancel:) -> (sink:subscription:) {
let sink = SubscribeOnSink(parent: self, observer: observer, cancel: cancel)
let subscription = sink.run()
return (sink: sink, subscription: subscription)
}
}
複製代碼
SubscribeOn
的繼承關係(Producer
),咱們對他也就放心了。observer的觀察者
Producer
流回SubscribeOn的run
SubscribeOnSink.run
到觀察者的回調(或者內部源序列的訂閱,傳sink
做爲觀察者回調,後面的流程只是重複走了一次)sink
的 on
sink
的屬性觀察者(也就是中間封裝保存的)響應event事件
subscriber的閉包
func run() -> Disposable {
let disposeEverything = SerialDisposable()
let cancelSchedule = SingleAssignmentDisposable()
disposeEverything.disposable = cancelSchedule
let disposeSchedule = self.parent.scheduler.schedule(()) {
let subscription = self.parent.source.subscribe(self)
disposeEverything.disposable = ScheduledDisposable(scheduler: disposable:)
return Disposables.create()
}
cancelSchedule.setDisposable(disposeSchedule)
return disposeEverything
}
複製代碼
self.parent.scheduler.schedule()
,背後的邏輯咱們在前面的代碼也解析過了!self.scheduleInternal(state, action: action)
func schedule<StateType>(_ state: action: ) -> Disposable {
let cancel = SingleAssignmentDisposable()
self.queue.async {
if cancel.isDisposed {
return
}
cancel.setDisposable(action(state))
}
return cancel
}
複製代碼
action
封裝,利用 scheduler
統一管理action
就是一個 schduler
調用時候的閉包,就會執行:let subscription = self.parent.source.subscribe(self)
, 這裏距離的咱們的目標還有一點距離,哈哈哈哈subscribe
,必然會來到Producer
override func subscribe<Observer: ObserverType>(_ observer: Observer) -> Disposable where Observer.Element == Element {
if !CurrentThreadScheduler.isScheduleRequired {
// The returned disposable needs to release all references once it was disposed.
let disposer = SinkDisposer()
let sinkAndSubscription = self.run(observer, cancel: disposer)
disposer.setSinkAndSubscription(sink: sinkAndSubscription.sink, subscription: sinkAndSubscription.subscription)
return disposer
}
else {
return CurrentThreadScheduler.instance.schedule(()) { _ in
let disposer = SinkDisposer()
let sinkAndSubscription = self.run(observer, cancel: disposer)
disposer.setSinkAndSubscription(sink: sinkAndSubscription.sink, subscription: sinkAndSubscription.subscription)
return disposer
}
}
}
複製代碼
public func schedule<StateType>(_ state: action: ) -> Disposable {
if CurrentThreadScheduler.isScheduleRequired {
// 已經標記,就置false
CurrentThreadScheduler.isScheduleRequired = false
// 外界閉包調用執行
let disposable = action(state)
// 延遲銷燬
defer {
CurrentThreadScheduler.isScheduleRequired = true
CurrentThreadScheduler.queue = nil
}
// 先省略。。。
return disposable
}
// 先省略。。。
return scheduledItem
}
複製代碼
public func scheduleRecursive<State>(_ state: action: ) -> Disposable {
// 遞歸調度者
let recursiveScheduler = RecursiveImmediateScheduler(action: scheduler:)
// 調度狀態執行
recursiveScheduler.schedule(state)
return Disposables.create(with: recursiveScheduler.dispose)
}
複製代碼
func schedule(_ state: State) {
var scheduleState: ScheduleState = .initial
let d = self._scheduler.schedule(state) { state -> Disposable in
// 這裏由於在遞歸環境,加了一把鎖遞歸鎖,保障安全
let action = self._lock.calculateLocked { () -> Action? in
return self._action
}
if let action = action {
action(state, self.schedule)
}
return Disposables.create()
}
// 篇幅,先省略,你們自行查閱
}
複製代碼
action
執行,也就是外界傳給遞歸調度者的閉包任務get
到了爲何RxSwift
的數組調度出來是有順序的,由於在遞歸調度,已經加鎖了,保障線程資源安全public func schedule<StateType>(_ state: StateType, action: ) -> Disposable {
// 上面的流程就省略了
let existingQueue = CurrentThreadScheduler.queue
let queue: RxMutableBox<Queue<ScheduledItemType>>
if let existingQueue = existingQueue {
queue = existingQueue
}
else {
queue = RxMutableBox(Queue<ScheduledItemType>(capacity: 1))
CurrentThreadScheduler.queue = queue
}
let scheduledItem = ScheduledItem(action: action, state: state)
queue.value.enqueue(scheduledItem)
return scheduledItem
}
複製代碼
ScheduledItem
,面向對象,更容易傳輸&執行queue.value.enqueue(scheduledItem)
,排隊進隊列public func schedule<StateType>(_ state: StateType, action: @escaping (StateType) -> Disposable) -> Disposable {
if CurrentThreadScheduler.isScheduleRequired {
CurrentThreadScheduler.isScheduleRequired = false
let disposable = action(state)
// 判斷當前隊列狀況,是否存在
guard let queue = CurrentThreadScheduler.queue else {
return disposable
}
// 從隊列去除任務
while let latest = queue.value.dequeue() {
if latest.isDisposed {
continue
}
latest.invoke()
}
return disposable
}
/// 省略
}
複製代碼
action(state)
完畢以後,又會執行下面的流程queue.value.dequeue()
latest.invoke()
func invoke() {
self._disposable.setDisposable(self._action(self._state))
}
複製代碼
action
執行源序列
流進 ObserveOnSink
ObserveOnSink
的調度環境是有序的進隊的:self._queue.enqueue(event)
self._scheduler.scheduleRecursive((), action: self.run)
override func onCore(_ event: Event<Element>) {
let shouldStart = self._lock.calculateLocked { () -> Bool in
self._queue.enqueue(event)
}
if shouldStart {
self._scheduleDisposable.disposable =
self._scheduler.scheduleRecursive((), action: self.run)
}
}
複製代碼
func run(_ state: (), _ recurse: (()) -> Void) {
// 加鎖獲取觀察者,很隊列任務
let (nextEvent, observer) = self._lock.calculateLocked {
if !self._queue.isEmpty {
return (self._queue.dequeue(), self._observer)
}
}
// 觀察者發送響應
if let nextEvent = nextEvent, !self._cancel.isDisposed {
observer.on(nextEvent)
if nextEvent.isStopEvent {
self.dispose()
}
}
}
複製代碼
(self._queue.dequeue(), self._observer)
observer.on(nextEvent)
完美!解析到這裏,整個流程解析完畢,就問此時此刻還有誰? 45度仰望天空,該死!我這無處安放的魅力!併發
##總結async
整個流程是比較複雜,其實若是你這個時候,總體看源碼(上帝的視角)!不可貴出:ide
Sink
Sink
調度執行 響應發給觀察者訂閱事件event
就是兩層序列訂閱響應,個人第二層的 sink
就是源序列的觀察者函數
RxSwift
的源碼其實你只要掌握我前面的分享過的核心邏輯,後面的內容都是同樣的,只不過嵌套,膠水代碼依賴創建。其中依賴關係,中間層封裝,流程走位是各位在接下來的iOS進階
很是重要的思惟,告別垃圾代碼,迎接將來!咱們一塊兒加油:和諧學習,不急不躁源碼分析就問此時此刻還有誰?45度仰望天空,該死!我這無處安放的魅力!