Combine 框架,從0到1 —— 2.經過 ConnectablePublisher 控制什麼時候發佈

 

本文首發於 Ficow Shen's Blog,原文地址: Combine 框架,從0到1 —— 2.經過 ConnectablePublisher 控制什麼時候發佈編程

 

內容概覽

  • 前言
  • 使用 makeConnectable() 和 connect() 手動控制發佈
  • 使用 autoconnect() 操做符進行自動鏈接
  • 總結

 

前言

 

使用 Connectable Publisher, 你能夠決定發佈者什麼時候開始發送訂閱元素給訂閱者。那麼,爲何咱們須要這麼作?swift

使用 sink(receiveValue:) 能夠馬上開始接收訂閱元素,可是這可能不是你想要的結果。當多個訂閱者訂閱了同一個發佈者時,有可能會出現其中一個訂閱者收到訂閱內容,而另一個訂閱者收不到的狀況。網絡

好比,當你發起一個網絡請求,併爲這個請求建立了一個發佈者以及鏈接了這個發佈者的訂閱者。app

圖片alt

而後,這個訂閱者的訂閱操做觸發了實際的網絡請求。在某個時間點,你將第二個訂閱者鏈接到了這個發佈者。若是在鏈接第二個訂閱者以前,網絡請求已經完成,那麼第二個訂閱者將只會收到完成事件,收不到網絡請求的響應結果。這時候,這個結果將不是你所指望。框架

在使用 Combine 的過程當中,咱們每每須要面對這些問題。如今就來弄清楚如何處理這一類問題吧~異步

 

使用 makeConnectable() 和 connect() 控制發佈

 

ConnectablePublisher 是一個協議類型,它能夠在你準備好以前阻止發佈者發佈元素。async

/// 可鏈接的發佈者,它提供了顯式的鏈接、取消訂閱的方式
///
/// 使用 `makeConnectable()` 來從任何一個失敗類型是 `Never` 的發佈者建立一個 `ConnectablePublisher`
@available(OSX 10.15, iOS 13.0, tvOS 13.0, watchOS 6.0, *)
public protocol ConnectablePublisher : Publisher {

    /// 鏈接到發佈者並返回一個用於取消發佈的 `Cancellable` 實例
    ///
    /// - 返回值: 一個用於取消發佈的 `Cancellable` 實例
    func connect() -> Cancellable
}

在你顯式地調用 connect() 方法以前,一個 ConnectablePublisher 不會發送任何元素。異步編程

如今,就讓咱們用 ConnectablePublisher 來解決上面提到的網絡請求示例中的問題吧!post

ConnectablePublisher

在兩個訂閱者都鏈接到發佈者以後,調用 connect(),而後網絡請求才被觸發。這樣就能夠避免競爭(race condition),保證兩個訂閱者都收到數據。url

爲了在你的 Combine 代碼中使用 ConnectablePublisher,你能夠使用 makeConnectable() 操做符將當前的發佈者包裝到一個 Publishers.MakeConnectable 結構體實例中。

以下方的代碼所示:

class ConnectablePublisherDemo {
    
    private var cancellable1: AnyCancellable?
    private var cancellable2: AnyCancellable?
    private var connection: Cancellable?
    
    func run() {
        let url = URL(string: "https://ficow.cn")!
        let connectable = URLSession.shared
            .dataTaskPublisher(for: url)
            .map(\.data)
            .catch() { _ in Just(Data()) }
            .share()
            .makeConnectable() // 阻止發佈者發佈內容
        
        cancellable1 = connectable
            .sink(receiveCompletion: { print("Received completion 1: \($0).") },
                  receiveValue: { print("Received data 1: \($0.count) bytes.") })
        
        DispatchQueue.main.asyncAfter(deadline: .now() + 1) {
            self.cancellable2 = connectable.sink(receiveCompletion: { log("Received completion 2: \($0).") },
                                                 receiveValue: { log("Received data 2: \($0.count) bytes.") })
        }

        DispatchQueue.main.asyncAfter(deadline: .now() + 2) {
			// 顯式地啓動發佈。返回值須要被強引用,可用於取消發佈(主動調用cancel方法或返回值被析構)
            self.connection = connectable.connect() 
        }
    }

}

請注意,在 makeConnectable() 操做符前面有一個 share() 操做符!請問,這個操做符有什麼做用呢?

 

使用 autoconnect() 操做符進行自動鏈接

 

某些 Combine 發佈者已經實現了 ConnectablePublisher 協議,如:Publishers.MulticastTimer.TimerPublisher。使用這些發佈者時,若是你不須要配置發佈者或者不須要鏈接多個訂閱者,你就須要顯式地調用 connect() 方法。

對於這種狀況,ConnectablePublisher 提供了 autoconnect() 操做符。當一個訂閱者經過 subscribe(_:) 方法鏈接到發佈者時,connect() 方法會被立刻調用。

let cancellable = Timer.publish(every: 1, on: .main, in: .default)
    .autoconnect()
    .sink() { date in
        print ("Date now: \(date)")
     }

上面的代碼示例中使用了 autoconnect(),因此訂閱者能夠立刻接收到定時器發送的元素。若是沒有 autoconnect(),咱們就須要在某個時刻手動地調用 connect() 方法。

 

總結

 

Combine 爲咱們提供了很強大的異步編程功能,不過這也是有代價的,咱們須要深知使用 Combine 過程當中可能會遭遇的問題。若是不瞭解這些「坑」就開始上路,犯錯的機率會很是高,犯錯的成本也會很是高。

 

本文內容來源: Controlling Publishing with Connectable Publishers,轉載請註明出處

相關文章
相關標籤/搜索