SwiftNIO EventLoopFuture 和 EventLoopPromise

在上篇 Future & Promise 實現分析(Swift) 咱們簡單的闡述了下 future 和 Promise 的一些概念,並簡單的用了個樣例來體會這種處理異步的模式。本文接着上文遺留的一個話題 SwiftNIO 中如何實現 Future 和 Promise?git

Future 是一個只讀的值的容器,它的值在將來某個時刻會被計算出來(產生這個值的行爲是異步操做)。 Promise 是一個可寫的容器,能夠設置 Future 的值。github

帶着上面這句話,咱們來看看 SwiftNIO 中如何實現的。swift

概述

併發代碼和同步代碼之間最主要的區別在於並不是全部的動做都可以當即完成。例如,在向一個 Channel 寫入數據時,EventLoop 有可能不會當即將數據沖刷到網絡上。爲此,SwiftNIO 提供了 EventLoopPromiseEventLoopFuture,用於管理異步操做。api

EventLoopFuture 其實是一個容器,用於存放函數在將來某個時刻的返回值。每一個 EventLoopFuture<T> 對象都有一個對應的 EventLoopPromise<T>,用於存放實際的結果。只要 EventLoopPromise 執行成功,EventLoopFuture 也就完成了。數組

經過輪詢的方式檢查 EventLoopFuture 是否完成是一種很是低效的方式,因此 EventLoopFuture 被設計成能夠接收回調函數。也就是說,在有結果的時候回調函數會被執行。promise

EventLoopFuture 負責處理調度工做,確保回調函數是在最初建立 EventLoopPromise 的那個 EventLoop 上執行,因此就沒有必要再針對回調函數作任何同步操做。安全

EventLoopFuture

咱們看下面例子來感覺一下 EventLoopFuture & EventLoopPromise 的使用方式:微信

func someAsyncOperation(args) -> EventLoopFuture<ResultType> {
    let promise = eventLoop.makePromise(of: ResultType.self)
    someAsyncOperationWithACallback(args) { result -> Void in
        // when finished...
        promise.succeed(result)
        // if error...
        promise.fail(error)
    }
    return promise.futureResult
}
複製代碼

someAsyncOperation 裏面首先是建立一個 promise, 而後在 someAsyncOperationWithACallback 異步操做的會回調中根據各類場景執行 promise 的 successed 或者 fail 方法。someAsyncOperation 返回一個 EventLoopFuture<ResultType>, 而這個值從 promise.futureResult 中獲取。EventLoopFuture 一個只讀的值的容器,它的值在將來某個時刻會被計算出來。而它的值經過 EvnetLoopPromise 進行設置。 徹底符合咱們本文開篇提到的那句話。網絡

若是咱們在調用一個方法,它返回 EventLoopFuture<Value>, 咱們該如何操做這個值?併發

map & flatMap

能夠經過調用 flatMap() 或者 map() 能夠獲取到 EventLoopFuture 中真實的值。

let networkData = getNetworkData(args)

// When network data is received, convert it.
let processedResult: EventLoopFuture<Processed> = networkData.map { (n: NetworkResponse) -> Processed in
    ... parse network data ....
    return processedResult
}
複製代碼

map 的 callback 是將 Value 類型的結果轉換爲 NewValue 類型。它是個 Functor

可是在 flatMap 中 flatMap

let d1 = networkRequest(args).future()
let d2 = d1.flatMap { t -> EventLoopFuture<NewValue> in
    . . . something with t . . .
    return netWorkRequest(args)
}
d2.whenSuccess { u in
    NSLog("Result of second request: \(u)")
}
複製代碼

flatMap 的 callback 是將 Value 類型的結果轉化爲 EventLoopFuture<NewValue>, 它是一個 Monads

爲了更好的理解二者的差別,咱們一塊兒看看他們的實現代碼:

swiftnio-map

swiftnio-flatmap2

源碼淺析

public final class EventLoopFuture<Value> {
    @usableFromInline
    internal var _value: Result<Value, Error>? {
        didSet {
            self._isFulfilled.store(true)
        }
    }

    @usableFromInline
    internal let _isFulfilled: UnsafeEmbeddedAtomic<Bool>

    /// The `EventLoop` which is tied to the `EventLoopFuture` and is used to notify all registered callbacks.
    public let eventLoop: EventLoop

    /// Whether this `EventLoopFuture` has been fulfilled. This is a thread-safe
    /// computed-property.
    internal var isFulfilled: Bool {
        return self._isFulfilled.load()
    }

    @usableFromInline
    internal var _callbacks: CallbackList = CallbackList()
}
複製代碼

EventLoopFuture 的 fulfilled 狀態經過 _isFulfilled (線程安全的)進行管理。在初始化的時候也會進行初始化

@inlinable
internal init(_eventLoop eventLoop: EventLoop, value: Result<Value, Error>?, file: StaticString, line: UInt) {
    self.eventLoop = eventLoop
    self._value = value
    self._isFulfilled = UnsafeEmbeddedAtomic(value: value != nil)

    debugOnly {
        if let me = eventLoop as? SelectableEventLoop {
            me.promiseCreationStoreAdd(future: self, file: file, line: line)
        }
    }
}
複製代碼

在給 _value 設置值的時候,會設置 _isFulfilledtrue

@usableFromInline
internal var _value: Result<Value, Error>? {
    didSet {
        self._isFulfilled.store(true)
    }
    
}
複製代碼

EventLoopPromise

EventLoopPromise 設計是比較簡單的,咱們直接看它的源碼:

public struct EventLoopPromise<Value> {
    public let futureResult: EventLoopFuture<Value>
    @inlinable
    internal init(eventLoop: EventLoop, file: StaticString, line: UInt) {
        self.futureResult = EventLoopFuture<Value>(_eventLoop: eventLoop, file: file, line: line)
    }

    @inlinable
    public func succeed(_ value: Value) {
        self._resolve(value: .success(value))
    }

    @inlinable
    public func fail(_ error: Error) {
        self._resolve(value: .failure(error))
    }
    
    @inlinable
    public func completeWith(_ future: EventLoopFuture<Value>) {
        future.cascade(to: self)
    }

    @inlinable
    public func completeWith(_ result: Result<Value, Error>) {
        self._resolve(value: result)
    }

    @inlinable
    internal func _resolve(value: Result<Value, Error>) {
        if self.futureResult.eventLoop.inEventLoop {
            self._setValue(value: value)._run()
        } else {
            self.futureResult.eventLoop.execute {
                self._setValue(value: value)._run()
            }
        }
    }

    @inlinable
    internal func _setValue(value: Result<Value, Error>) -> CallbackList {
        return self.futureResult._setValue(value: value)
    }
}
複製代碼

經過 Eventloop 初始化其內部的 futureResult(EventLoopFuture), 這個時候的 futureResultfilfulledfalse 的。 而後提供了一些操做 futureResult 的方法,而這些方法都間接調用了 _resolve —— 確保回調都在建立 promise 這個 EventLoop 上進行執行。

EventLoop 是 SwfitNIO 最基本的 IO 原語,它等待事件的發生,在發生事件時觸發某種回調操做。

在下起文章咱們來解讀下這個 EventLoop,本文就不進行過多闡述。

Vapor 的 Future & Promise 真面目

Vapor 的官方文檔中,有個 Async 的模塊, 它有 Future 和 Promise。

咱們可使用它提供操做 Future 的方法:

  • map
  • flatMap
  • transform
  • always
  • wait
  • do / catch

對 Future 進行轉換, 或者阻塞等待 Future 的值的肯定。

使用 Promise:

/// 建立一個 promise 
let promiseString = req.eventLoop.newPromise(String.self)
print(promiseString) // Promise<String>
print(promiseString.futureResult) // Future<String>

/// Completes the associated future
promiseString.succeed(result: "Hello")

/// Fails the associated future
promiseString.fail(error: ...)
複製代碼

Future 和 Promise 在 Vapor 中的詳細用法請到 Async

上面的用法是否是很是的熟悉?咱們一塊兒看看它們的如山真面目。

Async+NIO.swift 中:

import Dispatch
import NIO

/// Convenience shorthand for `EventLoopFuture`.
public typealias Future = EventLoopFuture

/// Convenience shorthand for `EventLoopPromise`.
public typealias Promise = EventLoopPromise

extension EventLoop {
    /// Creates a new promise for the specified type.
    public func newPromise<T>(_ type: T.Type, file: StaticString = #file, line: UInt = #line) -> Promise<T> {
        return newPromise(file: file, line: line)
    }
}
複製代碼

Vapor 中的 Future 就是 SwiftNIO 的 EventLoopFuturePromise 就是 SwiftNIO 中的 EventLoopPromise

map vs flatMap

map 和 flatMap 在 Vapor 實踐中是經常使用到的方法,二者很容易理解的一個區別:

  • map 中的 callback 是 Value -> NewValue
  • flatMap 中的 callback 是 Value -> EventLoopFuture<NewValue>
let futureString: Future<String> = ...

/// Assume we have created an HTTP client
let client: Client = ... 

/// Flat-map the future string to a future response
let futureResponse = futureString.flatMap(to: Response.self) { string in
    return client.get(string) // Future<Response>
}
複製代碼

futureResponseFuture<Response> 類型, 若是 flatMap 替換爲 map:

let futureResponse = futureString.map(to: Response.self) { string in
    return client.get(string) // Future<Response>
}
複製代碼

那麼這時候 futureResponseFuture<Future<Response>> 類型,顯然這不是咱們想要的。

提早預告 Vapor 中 Worker, 它是 SwiftNIO 中的 EventLoopGrop

public typealias Worker = EventLoopGroup
複製代碼

想了解更多 Vapor 對 SwiftNIO 的包裝,能夠點擊查看源碼 vapor/core

API 學習

既然 Vapor 中 Future 和 Promise 是同同樣東西,那麼學習 SwiftNIO 中 EventLoopFuture 和 EventLoopPromise 提供的 API 是很是有必要的

EventLoopFuture API

flatMap(file:line:_:)

@inlinable
public func flatMap<NewValue>(file: StaticString = #file, line: UInt = #line, _ callback: @escaping (Value) -> EventLoopFuture<NewValue>) -> EventLoopFuture<NewValue>
複製代碼

能夠將 EventLoopFuture<Value> 的轉換爲 EventLoopFuture<NewValue> ,callback 參數須要返回 EventLoopFuture<NewValue>

flatMapThrowing(file:line:_:)

@inlinable
public func flatMapThrowing<NewValue>(file: StaticString = #file, line: UInt = #line, _ callback: @escaping (Value) throws -> NewValue) -> EventLoopFuture<NewValue>
複製代碼

EventLoopFuture<Value> fulfilled 狀態的時候,會運行 callback。callback 能夠拋出異常,當 callback 拋出異常,該方法返回的 EventLoopFuture 的值爲 error。

flatMapErrorThrowing(file:line:_:)

@inlinable
public func flatMapErrorThrowing(file: StaticString = #file, line: UInt = #line, _ callback: @escaping (Error) throws -> Value) -> EventLoopFuture<Value>
複製代碼

EventLoopFuture<Value> 是個 error 狀態,callback 會運行,callback 接受一個 error, 返回一個新的 Value 類型的值。

map(file:line:_:)

@inlinable
public func map<NewValue>(file: StaticString = #file, line: UInt = #line, _ callback: @escaping (Value) -> (NewValue)) -> EventLoopFuture<NewValue>
複製代碼

EventLoopFuture<Value> fulfilled

flatMapError(file:line:_:)

@inlinable
public func flatMapError(file: StaticString = #file, line: UInt = #line, _ callback: @escaping (Error) -> EventLoopFuture<Value>) -> EventLoopFuture<Value>
複製代碼

flatMapResult(file:line:_:)

@inlinable
public func flatMapResult<NewValue, SomeError: Error>(file: StaticString = #file, line: UInt = #line, _ body: @escaping (Value) -> Result<NewValue, SomeError>) -> EventLoopFuture<NewValue>
複製代碼

recover(file:line:_:)

@inlinable
public func recover(file: StaticString = #file, line: UInt = #line, _ callback: @escaping (Error) -> Value) -> EventLoopFuture<Value>
複製代碼

whenSuccess(_:)

@inlinable
public func whenSuccess(_ callback: @escaping (Value) -> Void)
複製代碼

whenFailure(_:)

@inlinable
public func whenFailure(_ callback: @escaping (Error) -> Void)
複製代碼

whenComplete(_:)

@inlinable
public func whenComplete(_ callback: @escaping (Result<Value, Error>) -> Void)
複製代碼

and(_:file:line:)

@inlinable
public func and<OtherValue>(_ other: EventLoopFuture<OtherValue>, file: StaticString = #file, line: UInt = #line) -> EventLoopFuture<(Value, OtherValue)>
複製代碼

and(value:file:line:)

@inlinable
public func and<OtherValue>(value: OtherValue, file: StaticString = #file, line: UInt = #line) -> EventLoopFuture<(Value, OtherValue)>
複製代碼

cascade(to:)

@inlinable
public func cascade(to promise: EventLoopPromise<Value>?)
複製代碼

cascadeSuccess(to:)

@inlinable
public func cascadeSuccess(to promise: EventLoopPromise<Value>?)
複製代碼

cascadeFailure(to:)

@inlinable
public func cascadeFailure<NewValue>(to promise: EventLoopPromise<NewValue>?)
複製代碼

wait(file:line:)

public func wait(file: StaticString = #file, line: UInt = #line) throws -> Value
複製代碼

經過阻塞當前線程直到 resolve。

fold(_:with:)

@inlinable
public func fold<OtherValue>(_ futures: [EventLoopFuture<OtherValue>], with combiningFunction: @escaping (Value, OtherValue) -> EventLoopFuture<Value>) -> EventLoopFuture<Value>
複製代碼

所有的 futures 完成觸發返回一個新的 EventLoopFuture。 一旦遇到 future 失敗將立馬失敗。

reduce(::on:_:)

public static func reduce<InputValue>(_ initialResult: Value, _ futures: [EventLoopFuture<InputValue>], on eventLoop: EventLoop, _ nextPartialResult: @escaping (Value, InputValue) -> Value) -> EventLoopFuture<Value>
複製代碼

reduce(into::on::)

public static func reduce<InputValue>(into initialResult: Value, _ futures: [EventLoopFuture<InputValue>], on eventLoop: EventLoop, _ updateAccumulatingResult: @escaping (inout Value, InputValue) -> Void) -> EventLoopFuture<Value>
複製代碼

全部的 EventLoopFutures 成功後,返回一個新的 EventLoopFuture。新的 EveentLoopFuture 的值是每一個Future 的值組成的數組。

andAllSucceed(_:on:)

public static func andAllSucceed(_ futures: [EventLoopFuture<Value>], on eventLoop: EventLoop) -> EventLoopFuture<Void>
複製代碼

全部的 EventLoopFutures 成功後,返回一個新的 EventLoopFuture。忽略每一個 Future 的值。

whenAllSucceed(_:on:)

public static func whenAllSucceed(_ futures: [EventLoopFuture<Value>], on eventLoop: EventLoop) -> EventLoopFuture<[Value]>
複製代碼

全部的 EventLoopFutures 完成後,返回一個新的 EventLoopFuture。

這個返回的 EventLoopFuture 老是成功的,無論這些 futures 是否成功仍是失敗。忽略每一個 Future 的值。

andAllComplete(_:on:)

public static func andAllComplete(_ futures: [EventLoopFuture<Value>], on eventLoop: EventLoop) -> EventLoopFuture<Void>
複製代碼

在全部的 EventLoopFutures 完成後,返回一個新的 EventLoopFuture, 這個 EventLoopFuture 的值是這些 futures 的結果數組。

這個返回的 EventLoopFuture 老是成功的,無論這些 futures 是否成功仍是失敗。

若是但願將它們展平單個 EventLoopFuture,且遇到第一個 future 失敗也將失敗,建議使用 reduce 方法。

whenAllComplete(_:on:)

public static func whenAllComplete(_ futures: [EventLoopFuture<Value>], on eventLoop: EventLoop) -> EventLoopFuture<[Result<Value, Error>]>
複製代碼

hop(to:)

@inlinable
public func hop(to target: EventLoop) -> EventLoopFuture<Value>
複製代碼

當這個 future 完成時會觸發返回一個 EventLoopFuture。 可是執行回調是在 target 這個 Eventloop 上而不是在原來的那個。

在某些場合你須要切換 Eventloop: 好比, 你須要在另外一個 eventloop 去關閉一個 channel。而後關閉完成時候須要切回到當前的 eventloop。它還有一個優化,避免兩個 eventloop 是同一個 eventloop 從新分配的狀況。

always(_:)

public func always(_ callback: @escaping (Result<Value, Error>) -> Void) -> EventLoopFuture<Value>
複製代碼

給 EventLoopFuture 添加一個監聽的回調,該回調在 EventLoopFuture 有任何值的時候運行。

EventLoopPromise API

futureResult

public let futureResult: EventLoopFuture<Value>
複製代碼

經過 EventLoopPromise 獲取 EventLoopFuture。 你可使用它來添加回調,這些回調會在 EventLoopPromise 完成立馬調用。

succeed(_:)

@inlinable
public func succeed(_ value: Value)
複製代碼

將成功的結果傳遞給關聯的 EventLoopFuture<Value>

fail(_:)

@inlinable
public func fail(_ error: Error)
複製代碼

將失敗的結果傳遞給關聯的 EventLoopFuture<Value>

completeWith(_:)

@inlinable
public func completeWith(_ future: EventLoopFuture<Value>)
複製代碼

經過傳入 EventLoopFuture<Value> 完成 promise。

間接調用 EventLoopFuture 的 eascade 方法。

completeWith(_:)

@inlinable
public func completeWith(_ result: Result<Value, Error>)
複製代碼

經過傳入 Result<Value, Error> 完成 promise。

此方法至關於調用

switch result {
case .success(let value):
    promise.succeed(value)
case .failure(let error):
    promise.fail(error)
}
複製代碼

總結

SwiftNIO 中的 Promise 和 Future 的前綴都是 EventLoop,在 Netty 的源碼裏沒有這倆名字,它有不少 Future 類。這應該和 Netty 的設計不一樣。EventLoopFuture 能夠經過 flatMap 生成一個新的 EventLoopFuture(間接經過 EventLoopPromise), 也能夠經過初始化方式進行生成。EventLoopFuture 對象都有一個對應的 EventLoopPromise, 只要 EventLoopPromise 執行成功,EventLoopFuture 也就完成,完成後會觸發回調,而這些回調函數都在最初建立 EventLoopPromise 的那個 EventLoop 上執行。 Vapor 的 Future 和 Promise 僅僅是 EventLoopFuture 和 EventLoopPromise 的一個別稱,理解了它倆,也基本上了解 Vapor 不少方法須要返回一個 Future 的這樣的設計, 爲啥要用 map & flatMap 實現鏈式調用。

本文到此完結,更多閱讀,能夠關注 SwiftOldBird 微信公衆號:

相關文章
相關標籤/搜索