PublishSubject將對觀察者發送訂閱後產生的元素,而在訂閱前發送的元素將不會發送給觀察者。 git
示例:swift
let subject = PublishSubject<String>()
subject.onNext("🐘")
subject.subscribe(onNext: { print("訂閱到了: \($0)") })
.disposed(by: disposeBag)
subject.onNext("🐶")
subject.onNext("🐱")
複製代碼
打印結果:
訂閱到了: 🐶
訂閱到了: 🐱
複製代碼
源碼分析: 數組
經過PublishSubject
的繼承關係,能夠看出,
PublishSubject
既是
可監聽序列,也是
觀察者。
(1) 在第一次執行 onNext
函數 subject.onNext("🐘")
時 閉包
PublishSubject.on
函數,並繼續調用
dispatch
函數和
self._synchronized_on(event)
函數。
self._synchronized_on(event)
函數最終會返回 self._observers
。self._observers
的類型是 Bag<(Event<Element>) -> Void>
。
dispatch
函數的源碼
經分析能夠知道,其做用就是對 bag
中保存的 代碼塊進行執行回調。由於上一步傳入的 bag
僅是一個初始化的 bag
中間沒有保存任何代碼塊,因此不會執行任何回調。到這裏,在第一次執行 subject.onNext("🐘")
已經結束。沒有任何打印結果。ide
(2) 調用 subscribe
。函數
PublishSubject
源碼中,實現了 subscribe
。
let key = self._observers.insert(observer.on)
insert
函數源碼。
由於是第首次執行,因此 _dictionary
值爲nil,且 _pairs.count
小於 arrayDictionaryMaxSize
。因此,觀察者的 on
函數 observer.on
被加入到 _pairs
數組中。(3)第二次執行 onNext
函數 subject.onNext("🐶")
。開始的步驟和 (1)中一致,只是在最後執行 dispatch
函數時,由於 bag._pairs
有保存一個觀察者的 on
函數代碼塊,因此會執行回調。源碼分析
最終會回調 subscribe.onNext
閉包,打印結果。(ps: 這一步不清楚的小夥伴請閱讀RxSwift核心邏輯簡介)post
當觀察者對BehaviorSubject進行訂閱時,它會將源
Observable
中最新的元素髮送出來。若是不存在最新的元素,就發送默認元素。而後將隨後產生的元素髮送出來。 ui
示例:spa
let subject = BehaviorSubject<Int>(value: 100)
subject.subscribe(onNext: { print("訂閱1:\($0)") })
.disposed(by: disposeBag)
subject.onNext(3)
subject.onNext(5)
subject.subscribe(onNext: { print("訂閱2:\($0)") })
.disposed(by: disposeBag)
複製代碼
打印結果:
訂閱1:100
訂閱1:3
訂閱1:5
訂閱2:5
複製代碼
源碼分析: BehaviorSubject
和 PublishSubject
源碼很類似,可是也有差別之處,咱們重點分析其中的差別,類似之處請參照上面的 PublishSubject
源碼分析。
(1)初始化方法
public init(value: Element) {
self._element = value
#if TRACE_RESOURCES
_ = Resources.incrementTotal()
#endif
}
複製代碼
BehaviorSubject
的初始化方法須要傳入一個初始值,做爲默認元素。
(2) subscribe
函數
BehaviorSubject
在
_synchronized_subscribe
函數中比
PublishSubject
多一行代碼,執行了一次
observer.on
函數,並將
self._element
做爲參數傳遞。
self._element
中保存的是最新發送的元素,若是沒有最新元素,則爲
init
初始化時的默認元素。
(3)onNext
函數
BehaviorSubject
在調用
onNext
時,會將最新的元素保存在
self._element
中,在執行執行
subscribe
時,發送出去。
不管觀察者是什麼時候進行訂閱的,ReplaySubject都將對觀察者發送所有的元素。
示例:
let subject = ReplaySubject<Int>.create(bufferSize: 2)
subject.onNext(1)
subject.onNext(2)
subject.onNext(3)
subject.subscribe(onNext: { print("訂閱到:\($0)") })
.disposed(by: disposeBag)
subject.onNext(4)
subject.onNext(5)
subject.onNext(6)
複製代碼
打印結果:
訂閱到:2
訂閱到:3
訂閱到:4
訂閱到:5
訂閱到:6
複製代碼
源碼分析: (1)初始化方法 create
函數
bufferSize
爲 2,因此建立一個
ReplayMany
對象。
其做用是建立一個指定大小的隊列
queue
來保存須要發送的元素。
(2)onNext
函數 onNext
函數調用的是 ReplayBufferBase
的 on
函數。
addValueToBuffer
函數將發送的元素加入到 queue
中。trim
函數刪除 queue
中大於 bufferSize
的多餘的元素。(3)subscribe
函數
observer
加入到
self._observers
以前,先調用了
self.replayBuffer(anyObserver)
。會執行
ReplayManyBase.replayBuffer
函數。
override func replayBuffer<O: ObserverType>(_ observer: O) where O.E == Element {
for item in self._queue {
observer.on(.next(item))
}
}
複製代碼
將保存在隊列 queue
中的全部元素,發送給訂閱者。
AsyncSubject 將在源
Observable
產生完成時間以後,發出最後一個元素(有且僅有最後一個元素)。若是源Observable
沒有發出任何元素,只有一個完成事件,則AsyncSubject也只有一個完成事件。若是源Observable
產生了一個error
事件而停止,那麼 AsyncSubject 就不會發出任何元素,而是將error
事件發送出來。
示例:
let subject = AsyncSubject<Int>()
subject.onNext(1)
subject.onNext(2)
subject.subscribe({ print("訂閱到:\($0)")})
.disposed(by: disposeBag)
subject.onNext(3)
subject.onNext(4)
subject.onCompleted()
複製代碼
打印結果:
訂閱到:next(4)
訂閱到:completed
複製代碼
源碼分析:重點分析on
函數 最後會來到下面👇的代碼,獲取保存的 observer.on
閉包,再執行事件回調。
next
事件中,將最新的元素保存在 self._lastElement
中後,並無返回全部的 observer.on
和 發送 next
事件,反而是初始化了一個空的 observer.on
集合和返回 completed
事件。因此不會有觀察者響應事件收到信號。error
事件中,會清空全部的 self._observers
,並返回全部的 observer.on
和 error
事件,因此,全部的觀察者都只會收到 error
信號。completed
事件中。 會判斷是否有發送過來的最新元素:若是有,就將最新元素髮送出去,並執行 next
事件。
而且在執行 next
事件以後,會執行 completed
事件。若是沒有最新元素,則僅對全部 observer.on
發送 completed
事件。示例:
let subject = Variable.init(1)
subject.value = 10
subject.value = 100
subject.asObservable().subscribe({ print("訂閱到:\($0)")})
.disposed(by: disposeBag)
subject.value = 1000
複製代碼
打印結果:
ℹ️ [DEPRECATED] `Variable` is planned for future deprecation. Please consider `BehaviorRelay` as a replacement. Read more at: https://git.io/vNqvx
訂閱到:next(100)
訂閱到:next(1000)
訂閱到:completed
複製代碼
源碼分析:
從源碼能夠看出,Variable
雖然沒有繼承自
ObserverType
或者
Observable
。可是其有一個
_subject: BehaviorSubject<Element>
屬性。因此,
Variable
的行爲和
BehaviorSubject
是一致的。但由於不是繼承自
ObserverType
,因此沒有
on
函數,不能直接調用
on
函數發送信號。
BehaviorSubject
,並保存在 self._subject
中。value
作了一層封裝,在 value
的 set
函數中,會調用 _subject
的 on
函數。完成信號的發送。官方推薦使用 BehaviorRelay 和 BehaviorSubject 做爲替換。
BehaviorRelay 就是 BehaviorSubject 去掉終止事件
onError
和onCompleted
。
示例:
let subject = BehaviorRelay(value: 1)
subject.accept(10)
subject.subscribe({ print("訂閱到:\($0)")})
.disposed(by: disposeBag)
subject.accept(100)
subject.accept(1000)
複製代碼
打印結果:
訂閱到:next(10)
訂閱到:next(100)
訂閱到:next(1000)
複製代碼
源碼分析:
查看源碼,請注意BehaviorRelay
上方的註釋,註釋中說得很是清楚,
BehaviorRelay
是對
BehaviorSubject
的封裝,可是和
BehaviorSubject
不同的地方在於,
BehaviorRelay
不會被
error
和
completed
事件終止。
既然已經有了 BehaviorSubject,又爲什麼須要BehaviorRelay 來對其進行封裝呢? 通常來講,若是須要知道 BehaviorSubject 當前的發送的信號值,只能在
subscribe
中獲取,可是使用 BehaviorRelay 則能夠方便的使用BehaviorRelay.value
獲取到當前的信號,很是之方便。
以上就是對常見的 Subject 的一些分析,如有不足之處,請評論指正。