最近老大給了個新項目,我打算用Swift寫.原來OC用的RAC,換到Swift天然框架也想試試新的,就用了RXSwift,對於這兩個框架,我都是會用,但不解其中的原理,正好最近需求沒下來,就研究了研究RXSwif,把本身的收穫分享一下,文中要有不許確的地方還望你們多多指正~swift
關於RXSwift是什麼和怎麼用我就不廢話了,網上資源不少,本文先從Observable實現原理入手,旨在以小見大,後面的Single
什麼的天然觸類旁通~設計模式
下面是一段簡單使用Observable的代碼api
let numbers: Observable<Int> = Observable.create { observer -> Disposable in
observer.onNext(0)
observer.onNext(1)
observer.onCompleted()
return Disposables.create {
}
}
numbers.subscribe{
print($0)
}
複製代碼
demo實現的效果其實就是 將上一段閉包中輸入的 產生的事件(0,1,Completed),在下一段閉包中提取出來. 這樣就將 事件的產生 和 事件的處理 分開. 本文也就是分析這個效果怎麼實現的安全
匿名可觀察者,存儲產生事件的閉包 和激活處理事件閉包的入口bash
任意觀察者,用於存儲事件 和 輸出事件多線程
匿名觀察者,用於存儲 處理事件的閉包閉包
將可觀察者 和 觀察者 連接,實現事件的傳遞框架
協議,將上面全部內容都包裹起來,將它們加以限制,便於有效的溝通~ide
事件自己,是枚舉,有 Error,Complete,Element(元素)函數
首先要說的是 ObserverType 定義的一些內容
associatedtype E
func subscribe<O: ObserverType>(_ observer: O) -> Disposable where O.E == E
複製代碼
E:爲本次事件流中定義一個肯定的類型,保證 產生的和處理的元素類型相同,不然沒法傳遞
Observable<Int>.create { observer -> Disposable in ....}
對於Observable
,它是一個抽象類,咱們在實際使用中並不能使用它,在協議中有默認的實現
extension ObservableType {
public static func create(_ subscribe: @escaping (AnyObserver<E>) -> Disposable) -> Observable<E> {
return AnonymousObservable(subscribe)
}
}
複製代碼
因此此處建立的是 AnonymousObservable
對象,我先稱其爲A1,A1將事件產生的閉包持有, 閉包中產生的事件 輸入到AnyObserver
結構體中.閉包咱們成爲A2 這樣 存儲部分就行了~~
激活 咱們經過調用A1的訂閱方法subscribe
(也是協議中限定的方法),接下來看方法中的實現~ 由於Observable
是抽象類,因此這裏也是協議默認的實現
public func subscribe(_ on: @escaping (Event<E>) -> Void)
-> Disposable {
let observer = AnonymousObserver { e in
on(e)
}
return self.asObservable().subscribe(observer)
}
複製代碼
在這裏就分兩步了,一是觀察者的實現,而是事件的傳遞
在這裏很簡單,也就是建立AnonymousObserver
匿名觀察者對象B1,B1將事件處理閉包持有,閉包咱們成爲B2
首先是asObservable()
方法,由於 B1間接繼承自Observable
,因此也就是return self
,應該是在處理其餘類型的可觀察物用到,在後續 若是碰到我會補充~
而後就是對A1的 另外一個訂閱方法(重載),將B1做爲參數傳入 細枝末節先不說,先把握主幹~
override func subscribe<O : ObserverType>(_ observer: O) -> Disposable where O.E == Element {
if !CurrentThreadScheduler.isScheduleRequired {
//第一步
let disposer = SinkDisposer()
//第二步
let sinkAndSubscription = run(observer, cancel: disposer)
//第三步
disposer.setSinkAndSubscription(sink: sinkAndSubscription.sink, subscription: sinkAndSubscription.subscription)
return disposer
}
//else先不說~
else {
return CurrentThreadScheduler.instance.schedule(()) { _ in
let disposer = SinkDisposer()
let sinkAndSubscription = self.run(observer, cancel: disposer)
disposer.setSinkAndSubscription(sink: sinkAndSubscription.sink, subscription: sinkAndSubscription.subscription)
return disposer
}
}
}
複製代碼
SinkDisposer
對象是關於 傳遞結束後,處理資源回收的對象,叫它C1,用來處理 A1create閉包返回的disposer閉包的~
調用了run
方法,將B1對象傳入
override func run<O : ObserverType>(_ observer: O, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where O.E == Element {
//2.1
let sink = AnonymousObservableSink(observer: observer, cancel: cancel)
//2.2
let subscription = sink.run(self)
//2.3
return (sink: sink, subscription: subscription)
}
複製代碼
建立AnonymousObservableSink對象,我稱它D1,它也是將B1對象和C1對象持有
調用D1對象的run
方法,將A1自身傳入
func run(_ parent: Parent) -> Disposable {
return parent._subscribeHandler(AnyObserver(self))
}
複製代碼
在該方法中,就是將A1對象的A2閉包 調用,將D1對象化爲AnyObserver
結構體做爲A2參數傳入~
而後咱們看 D1對象 若何轉換的
//結構體方法
public init<O : ObserverType>(_ observer: O) where O.E == Element {
self.observer = observer.on
}
複製代碼
在這裏結構體 將 D1持有的B1對象的on方法 做爲屬性持有~,將結構體成爲E1
再來看E1的onNext....
方法
extension ObserverType {
//YSD
/// Convenience method equivalent to `on(.next(element: E))`
///
/// - parameter element: Next element to send to observer(s)
public func onNext(_ element: E) {
on(.next(element))
}
/// Convenience method equivalent to `on(.completed)`
public func onCompleted() {
on(.completed)
}
/// Convenience method equivalent to `on(.error(Swift.Error))`
/// - parameter error: Swift.Error to send to observer(s)
public func onError(_ error: Swift.Error) {
on(.error(error))
}
}
複製代碼
對應的實際上是調用 B1的on
方法~~
func on(_ event: Event<E>) {
switch event {
case .next:
if _isStopped == 0 {
onCore(event)
}
case .error, .completed:
if AtomicCompareAndSwap(0, 1, &_isStopped) {
onCore(event)
}
}
}
複製代碼
對應的B1的onCore
方法
override func onCore(_ event: Event<Element>) {
return _eventHandler(event)
}
複製代碼
也就是將 E1從A2接收的事件 傳入B2中,最終實現內容的傳遞~~ 而後再將A1中釋放資源的閉包返回~
將D1和disposable閉包 做爲元組返回~
C1接收元組參數,調用setSinkAndSubscription
方法~,而後將SinkDisposer對象返回,讓用戶選擇是否釋放~
文字太抽象,畫個圖吧~ 畫的有點醜(๑•ᴗ•๑)~
能夠看到 A1 在這個過程當中只持有了A2, 不會致使內存泄露~ 固然若是你dispose 使用不當 確定有泄漏的~ 親測(๑•ᴗ•๑)~
訂閱2中的if !CurrentThreadScheduler.isScheduleRequired
內容是這樣的~
public static fileprivate(set) var isScheduleRequired: Bool {
get {
//獲取該指示值
return pthread_getspecific(CurrentThreadScheduler.isScheduleRequiredKey) == nil
}
set(isScheduleRequired) {
// 成功返回0 true設置no no設置爲 true
if pthread_setspecific(CurrentThreadScheduler.isScheduleRequiredKey, isScheduleRequired ? nil : scheduleInProgressSentinel) != 0 {
rxFatalError("pthread_setspecific failed")
}
}
}
private static var isScheduleRequiredKey: pthread_key_t = { () -> pthread_key_t in
//YSD
//https://onevcat.com/2015/01/swift-pointer/
//可變指針 pthread_key_t類型 分配空間
let key = UnsafeMutablePointer<pthread_key_t>.allocate(capacity: 1)
defer {
key.deallocate(capacity: 1)
}
//建立線程安全的變量
guard pthread_key_create(key, nil) == 0 else {
rxFatalError("isScheduleRequired key creation failed")
}
return key.pointee
}()
複製代碼
這裏應該是爲了保護,RXSwift在多線程操做下的數據安全~ 在本次事件流中只使用了get方法,並沒使用set~,因此具體效果我不清楚~,之後碰到了 我在補充上吧~
就是釋放資源部分~
fileprivate enum DisposeState: UInt32 {
case disposed = 1
case sinkAndSubscriptionSet = 2
}
// Jeej, swift API consistency rules
fileprivate enum DisposeStateInt32: Int32 {
case disposed = 1
case sinkAndSubscriptionSet = 2
}
private var _state: AtomicInt = 0
private var _sink: Disposable? = nil
private var _subscription: Disposable? = nil
func setSinkAndSubscription(sink: Disposable, subscription: Disposable) {
_sink = sink
_subscription = subscription
let previousState = AtomicOr(DisposeState.sinkAndSubscriptionSet.rawValue, &_state)
if (previousState & DisposeStateInt32.sinkAndSubscriptionSet.rawValue) != 0 {
rxFatalError("Sink and subscription were already set")
}
if (previousState & DisposeStateInt32.disposed.rawValue) != 0 {
sink.dispose()
subscription.dispose()
_sink = nil
_subscription = nil
}
}
func dispose() {
let previousState = AtomicOr(DisposeState.disposed.rawValue, &_state)
if (previousState & DisposeStateInt32.disposed.rawValue) != 0 {
return
}
if (previousState & DisposeStateInt32.sinkAndSubscriptionSet.rawValue) != 0 {
guard let sink = _sink else {
rxFatalError("Sink not set")
}
guard let subscription = _subscription else {
rxFatalError("Subscription not set")
}
sink.dispose()
subscription.dispose()
_sink = nil
_subscription = nil
}
}
複製代碼
從輸出崩潰提示哪裏就能夠得知~ 這裏是爲了防止dispose的屢次調用~ 由於在整個事件流中,dipose閉包 多是 產生Complete,Error或者用戶手動調用的~
AtomicOr
方法其實調用的是OSAtomicOr32OrigBarrier(A,&B)
該函數會將兩個變量 線程安全的 按位或運算返回結果, 併爲後者賦值=前者~ B=A
未調用dipose時 邏輯與運算 state = 2 previousState = 0 兩個條件都不成立~ 因此此時是用戶要手動dispose
以前調用過 也就是發生complete 或 Error(在上面的代碼中也有保證,二者只發生一塊兒~),則 state = 1當調用setSinkAndSubscription方法時 邏輯與運算 state = 2 previousState = 1 則第一個條件不成立 第二個成立~ 釋放資源
當屢次Complete時,則只會dipose一次~
當在外界屢次調用時 則state = 2 previousState = 1 則第一個條件成立 崩潰~
固然這裏實現這種效果的方案有不少種~ RSSwift的方案比較有逼格吧~
看完這些源碼,個人感受是RXSwift對 設計模式 貫徹的很完全~ 在時間富裕的狀況下本身寫的項目要向他靠攏,加強項目的延展性,這樣項目經理讓加啥也不會太頭疼了~~