RxSwift之Subject

一、PublishSubject

PublishSubject將對觀察者發送訂閱後產生的元素,而在訂閱前發送的元素將不會發送給觀察者。 git

示例:swift

let subject = PublishSubject<String>()

subject.onNext("🐘")

subject.subscribe(onNext: { print("訂閱到了: \($0)") })
    .disposed(by: disposeBag)

subject.onNext("🐶")
subject.onNext("🐱")
複製代碼
打印結果:
    訂閱到了: 🐶
    訂閱到了: 🐱
複製代碼

源碼分析: 數組

經過 PublishSubject 的繼承關係,能夠看出, PublishSubject 既是 可監聽序列,也是 觀察者

(1) 在第一次執行 onNext 函數 subject.onNext("🐘")閉包

會執行 PublishSubject.on 函數,並繼續調用 dispatch 函數和 self._synchronized_on(event) 函數。

  • self._synchronized_on(event) 函數最終會返回 self._observers
  • self._observers 的類型是 Bag<(Event<Element>) -> Void>
  • 再查看 dispatch 函數的源碼
    經分析能夠知道,其做用就是對 bag 中保存的 代碼塊進行執行回調。

由於上一步傳入的 bag 僅是一個初始化的 bag 中間沒有保存任何代碼塊,因此不會執行任何回調。到這裏,在第一次執行 subject.onNext("🐘") 已經結束。沒有任何打印結果。ide

(2) 調用 subscribe函數

  • PublishSubject 源碼中,實現了 subscribe
  • 最終會調用 let key = self._observers.insert(observer.on)
  • 查看 insert 函數源碼。
    由於是第首次執行,因此 _dictionary 值爲nil,且 _pairs.count 小於 arrayDictionaryMaxSize。因此,觀察者的 on函數 observer.on 被加入到 _pairs 數組中。

(3)第二次執行 onNext 函數 subject.onNext("🐶")。開始的步驟和 (1)中一致,只是在最後執行 dispatch 函數時,由於 bag._pairs 有保存一個觀察者的 on 函數代碼塊,因此會執行回調。源碼分析

最終會回調 subscribe.onNext 閉包,打印結果。(ps: 這一步不清楚的小夥伴請閱讀RxSwift核心邏輯簡介post

二、BehaviorSubject

當觀察者對BehaviorSubject進行訂閱時,它會將源 Observable 中最新的元素髮送出來。若是不存在最新的元素,就發送默認元素。而後將隨後產生的元素髮送出來。 ui

示例:spa

let subject = BehaviorSubject<Int>(value: 100)

subject.subscribe(onNext: { print("訂閱1:\($0)") })
    .disposed(by: disposeBag)

subject.onNext(3)
subject.onNext(5)

subject.subscribe(onNext: { print("訂閱2:\($0)") })
    .disposed(by: disposeBag)
複製代碼
打印結果:
    訂閱1100
    訂閱13
    訂閱15
    訂閱25
複製代碼

源碼分析: BehaviorSubjectPublishSubject 源碼很類似,可是也有差別之處,咱們重點分析其中的差別,類似之處請參照上面的 PublishSubject 源碼分析。

(1)初始化方法

public init(value: Element) {
    self._element = value
    
    #if TRACE_RESOURCES
    _ = Resources.incrementTotal()
    #endif
}
複製代碼

BehaviorSubject 的初始化方法須要傳入一個初始值,做爲默認元素。

(2) subscribe 函數

BehaviorSubject_synchronized_subscribe 函數中比 PublishSubject 多一行代碼,執行了一次 observer.on 函數,並將 self._element 做爲參數傳遞。 self._element 中保存的是最新發送的元素,若是沒有最新元素,則爲 init 初始化時的默認元素。

(3)onNext 函數

BehaviorSubject 在調用 onNext 時,會將最新的元素保存在 self._element 中,在執行執行 subscribe 時,發送出去。

三、ReplaySubject

不管觀察者是什麼時候進行訂閱的,ReplaySubject都將對觀察者發送所有的元素。

示例:

let subject = ReplaySubject<Int>.create(bufferSize: 2)

subject.onNext(1)
subject.onNext(2)
subject.onNext(3)

subject.subscribe(onNext: { print("訂閱到:\($0)") })
    .disposed(by: disposeBag)

subject.onNext(4)
subject.onNext(5)
subject.onNext(6)
複製代碼
打印結果:
    訂閱到:2
    訂閱到:3
    訂閱到:4
    訂閱到:5
    訂閱到:6
複製代碼

源碼分析: (1)初始化方法 create函數

由於示例中傳入的 bufferSize 爲 2,因此建立一個 ReplayMany 對象。
其做用是建立一個指定大小的隊列 queue 來保存須要發送的元素。

(2)onNext 函數 onNext 函數調用的是 ReplayBufferBaseon 函數。

  • 調用 addValueToBuffer 函數將發送的元素加入到 queue 中。
  • 調用 trim 函數刪除 queue 中大於 bufferSize 的多餘的元素。

(3)subscribe 函數

在將 observer 加入到 self._observers 以前,先調用了 self.replayBuffer(anyObserver)。會執行 ReplayManyBase.replayBuffer 函數。

override func replayBuffer<O: ObserverType>(_ observer: O) where O.E == Element {
    for item in self._queue {
        observer.on(.next(item))
    }
}
複製代碼

將保存在隊列 queue 中的全部元素,發送給訂閱者。

四、AsyncSubject

AsyncSubject 將在源 Observable 產生完成時間以後,發出最後一個元素(有且僅有最後一個元素)。若是源 Observable 沒有發出任何元素,只有一個完成事件,則AsyncSubject也只有一個完成事件。若是源 Observable 產生了一個 error 事件而停止,那麼 AsyncSubject 就不會發出任何元素,而是將 error 事件發送出來。

示例:

let subject = AsyncSubject<Int>()

subject.onNext(1)
subject.onNext(2)

subject.subscribe({ print("訂閱到:\($0)")})
    .disposed(by: disposeBag)

subject.onNext(3)
subject.onNext(4)
subject.onCompleted()
複製代碼
打印結果:
    訂閱到:next(4)
    訂閱到:completed
複製代碼

源碼分析:重點分析on 函數 最後會來到下面👇的代碼,獲取保存的 observer.on 閉包,再執行事件回調。

  • next 事件中,將最新的元素保存在 self._lastElement 中後,並無返回全部的 observer.on 和 發送 next 事件,反而是初始化了一個空的 observer.on 集合和返回 completed 事件。因此不會有觀察者響應事件收到信號。
  • error 事件中,會清空全部的 self._observers,並返回全部的 observer.onerror 事件,因此,全部的觀察者都只會收到 error 信號。
  • completed 事件中。 會判斷是否有發送過來的最新元素:若是有,就將最新元素髮送出去,並執行 next 事件。
    而且在執行 next 事件以後,會執行 completed 事件。若是沒有最新元素,則僅對全部 observer.on 發送 completed 事件。

五、Variable(已棄用)

示例:

let subject = Variable.init(1)

subject.value = 10
subject.value = 100

subject.asObservable().subscribe({ print("訂閱到:\($0)")})
    .disposed(by: disposeBag)

subject.value = 1000
複製代碼
打印結果:
    ℹ️ [DEPRECATED] `Variable` is planned for future deprecation. Please consider `BehaviorRelay` as a replacement. Read more at: https://git.io/vNqvx
    訂閱到:next(100)
    訂閱到:next(1000)
    訂閱到:completed
複製代碼

源碼分析:

從源碼能夠看出, Variable 雖然沒有繼承自 ObserverType 或者 Observable。可是其有一個 _subject: BehaviorSubject<Element> 屬性。因此, Variable 的行爲和 BehaviorSubject 是一致的。但由於不是繼承自 ObserverType,因此沒有 on 函數,不能直接調用 on 函數發送信號。

  • 在初始化時,使用初始化值,初始化 BehaviorSubject,並保存在 self._subject 中。
  • value 作了一層封裝,在 valueset 函數中,會調用 _subjecton 函數。完成信號的發送。

官方推薦使用 BehaviorRelayBehaviorSubject 做爲替換。

六、BehaviorRelay

BehaviorRelay 就是 BehaviorSubject 去掉終止事件 onErroronCompleted

示例:

let subject = BehaviorRelay(value: 1)

subject.accept(10)

subject.subscribe({ print("訂閱到:\($0)")})
    .disposed(by: disposeBag)

subject.accept(100)

subject.accept(1000)
複製代碼
打印結果:
    訂閱到:next(10)
    訂閱到:next(100)
    訂閱到:next(1000)
複製代碼

源碼分析:

查看源碼,請注意 BehaviorRelay 上方的註釋,註釋中說得很是清楚, BehaviorRelay 是對 BehaviorSubject 的封裝,可是和 BehaviorSubject 不同的地方在於, BehaviorRelay 不會被 errorcompleted 事件終止。

既然已經有了 BehaviorSubject,又爲什麼須要BehaviorRelay 來對其進行封裝呢? 通常來講,若是須要知道 BehaviorSubject 當前的發送的信號值,只能在 subscribe 中獲取,可是使用 BehaviorRelay 則能夠方便的使用 BehaviorRelay.value 獲取到當前的信號,很是之方便。

以上就是對常見的 Subject 的一些分析,如有不足之處,請評論指正。

相關文章
相關標籤/搜索