SwiftNIO is a cross-platform asynchronous event-driven network application framework for rapid development of maintainable high performance protocol servers & clients. It’s like Netty, but written for Swift.web
SwiftNIO 是由蘋果推進並開源的一款基於事件驅動的跨平臺網絡應用開發框架,用於快速開發可維護的高性能服務器與客戶端應用協議。NIO 是(Non-blocking)I/O 的縮寫,即爲了提高性能,其採用的是非阻塞 I/O。express
SwiftNIO 其實是一個底層工具,致力於爲上層框架專一提供基礎 I/O 功能與協定。接下來咱們就將採用其構建一個相似 Express 的小型 Web 框架。json
目標:看看咱們最終實現的框架能作些什麼bootstrap
import MicroExpress
let app = Express()
app.get("/hello") { req, res, next in
res.send("Hello, ExpressSwift")
}
app.get("/todolist") { _, res, _ in
res.json(todolist)
}
app.listen(1337)
複製代碼
實現這樣一個網絡應用,咱們要作如下這些組件:swift
Express
實例類,用於運行服務IncomingMessage
)與響應(ServerResponse
) 對象Middleware
)和路由(Router
)安裝相應的swift-xcode-nio
api
brew install swiftxcode/swiftxcode/swift-xcode-nio
swift xcode link-templates
複製代碼
建立一個新項目,選中Swift-NIO
模板xcode
let app = Express()
app.listen(1337)
複製代碼
import Foundation
import NIO
import NIOHTTP1
open class Express {
let loopGroup = MultiThreadedEventLoopGroup(numberOfThreads: System.coreCount)
open func listen(_ port: Int) {
let reuseAddrOpt = ChannelOptions.socket(
SocketOptionLevel(SOL_SOCKET),
SO_REUSEADDR)
let bootstrap = ServerBootstrap(group: loopGroup)
.serverChannelOption(ChannelOptions.backlog, value: 256)
.serverChannelOption(reuseAddrOpt, value: 1)
.childChannelInitializer { channel in
channel.pipeline.configureHTTPServerPipeline()
// this is where the action is going to be!
}
.childChannelOption(ChannelOptions.socket(
IPPROTO_TCP, TCP_NODELAY), value: 1)
.childChannelOption(reuseAddrOpt, value: 1)
.childChannelOption(ChnanelOptions.maxMessagePerRead, value: 1)
do {
let serverChannel = try bootstrap.bind(host: "localhost", port: port).wait()
print("Server running on: ", serverChannel.localAddress)
try serverChannel.closeFuture.wait() // runs forever
} catch {
fatalError("failed to start server: \(error)")
}
}
}
複製代碼
let loopGroup = MultiThreadedEventLoopGroup(numberOfThreads: System.coreCount)
複製代碼
EventLoop 是 SwfitNIO 最基本的 IO 元素,它等待事件的發生,在發生事件時觸發某種回調操做。在大部分 SwfitNIO 應用程序中,EventLoop 對象的數量並很少,一般每一個CPU核數對應一到兩個 EventLoop 對象。通常來講,EventLoop 會在應用程序的整個生命週期中存在,進行無限的事件分發。瀏覽器
EventLoop 能夠組合成 EventLoopGroup,EventLoopGroup 提供了一種機制用於在各個EventLoop 間分發工做負載。例如,服務器在監聽外部鏈接時,用於監聽鏈接的 socket 會被註冊到一個 EventLoop 上。但咱們不但願這個 EventLoop 承擔全部的鏈接負載,那麼就能夠經過 EventLoopGroup 在多個EventLoop間分攤鏈接負載。bash
目前,SwiftNIO 提供了一個 EventLoopGroup 實現(MultiThreadedEventLoopGroup)和兩個 EventLoop 實現(SelectableEventLoop 和 EmbeddedEventLoop)。服務器
MultiThreadedEventLoopGroup 會建立多個線程(使用 POSIX 的 pthreads 庫),併爲每一個線程分配一個 SelectableEventLoop 對象。
SelectableEventLoop使用選擇器(基於 kqueue 或 epoll)來管理來自文件和網絡IO事件。EmbeddedEventLoop 是一個空的 EventLoop,什麼事也不作,主要用於測試。
open func listen(_ port: Int) {
...
let bootstrap = ServerBootstrap(group: loopGroup)
...
.childChannelInitializer { channel in
channel.pipeline.configureHTTPServerPipeline()
// this is where the action is going to be!
}
...
let serverChannel =
try bootstrap.bind(host: "localhost", port: port)
.wait()
複製代碼
儘管 EventLoop 很是重要,但大部分開發者並不會與它有太多的交互,最多就是用它建立 EventLoopPromise 和調度做業。開發者常常用到的是 Channel 和 ChannelHandler。
每一個文件描述符對應一個 Channel,Channel 負責管理文件描述符的生命週期,並處理髮生在文件描述符上的事件:每當 EventLoop 檢測到一個與相應的文件描述符相關的事件,就會通知 Channel。
ChannelPipeline 由一系列 ChannelHandler 組成,ChannelHandler 負責按順序處理 Channel 中的事件。ChannelPipeline 就像數據處理管道同樣,因此纔有了這個名字。
ChannelHandler 要麼是 Inbound,要麼是 Outbound,要麼二者兼有。Inbound 的ChannelHandler 負責處理 「inbound」 事件,例如從 socket 讀取數據、關閉 socket 或者其餘由遠程發起的事件。Outbound 的 ChannelHandler 負責處理 「outbound」 事件,例如寫數據、發起鏈接以及關閉本地 socket。
ChannelHandler 按照必定順序處理事件,例如,讀取事件從管道的前面傳到後面,而寫入事件則從管道的後面傳到前面。每一個 ChannelHandler 都會在處理完一個事件後生成一個新的事件給下一個 ChannelHandler。
ChannelHandler 是高度可重用的組件,因此儘量設計得輕量級,每一個 ChannelHandler 只處理一種數據轉換,這樣就能夠靈活組合各類 ChannelHandler,提高代碼的可重用性和封裝性。
咱們能夠經過 ChannelHandlerContext 來跟蹤 ChannelHandler 在 ChannelPipeline 中的位置。ChannelHandlerContext 包含了當前 ChannelHandler 到上一個和下一個 ChannelHandler的引用,所以,在任什麼時候候,只要 ChannelHandler 還在管道當中,就能觸發新事件。
SwiftNIO 內置了多種 ChannelHandler,包括 HTTP 解析器。另外,SwiftNIO 還提供了一些Channel 實現,好比 ServerSocketChannel(用於接收鏈接)、SocketChannel(用於TCP鏈接)、DatagramChannel(用於UDP socket)和 EmbeddedChannel(用於測試)。
open class Express {
...
.childChannelInitializer { channel in
channel.pipeline.configureHTTPServerPipeline().then {
channel.pipeline.add(handler: HTTPHandler())
}
}
...
}
複製代碼
添加真正的處理器方法
open class Express {
//...
final class HTTPHandler : ChannelInboundHandler {
typealias InboundIn = HTTPServerRequestPart
func channelRead(ctx: ChannelHandlerContext, data: NIOAny) {
let reqPart = unwrapInboundIn(data)
switch reqPart {
case .head(let header):
print("req:", header)
// ignore incoming content to keep it micro :-)
case .body, .end: break
}
}
}
} // end of Express class
複製代碼
編譯並運行,經過瀏覽器訪問 http://localhost:1337
,暫時未有響應,但在控制檯中能夠看到輸出:
Server running on: [IPv6]::1:1337``
req: HTTPRequestHead(method: NIOHTTP1.HTTPMethod.GET, uri: "/", ...)
複製代碼
在 .head
中添加如下代碼
case .head(let header):
print("req:", header)
let head = HTTPResponseHead(version: header.version,
status: .ok)
let part = HTTPServerResponsePart.head(head)
_ = ctx.channel.write(part)
var buffer = ctx.channel.allocator.buffer(capacity: 42)
buffer.write(string: "Hello Schwifty World!")
let bodypart = HTTPServerResponsePart.body(.byteBuffer(buffer))
_ = ctx.channel.write(bodypart)
let endpart = HTTPServerResponsePart.end(nil)
_ = ctx.channel.writeAndFlush(endpart).then {
ctx.channel.close()
}
複製代碼
如今,咱們第一步就完成了,實現了一個 Express 對象,運行咱們的 Web 服務。
IncomingMessage
)與響應(ServerResponse
) 對象import NIOHTTP1
open class IncomingMessage {
public let header : HTTPRequestHead // <= from NIOHTTP1
public var userInfo = [ String : Any ]()
init(header: HTTPRequestHead) {
self.header = header
}
}
複製代碼
import NIO
import NIOHTTP1
open class ServerResponse {
public var status = HTTPResponseStatus.ok
public var headers = HTTPHeaders()
public let channel : Channel
private var didWriteHeader = false
private var didEnd = false
public init(channel: Channel) {
self.channel = channel
}
/// An Express like `send()` function.
open func send(_ s: String) {
flushHeader()
let utf8 = s.utf8
var buffer = channel.allocator.buffer(capacity: utf8.count)
buffer.write(bytes: utf8)
let part = HTTPServerResponsePart.body(.byteBuffer(buffer))
_ = channel.writeAndFlush(part)
.mapIfError(handleError)
.map { self.end() }
}
/// Check whether we already wrote the response header.
/// If not, do so.
func flushHeader() {
guard !didWriteHeader else { return } // done already
didWriteHeader = true
let head = HTTPResponseHead(version: .init(major:1, minor:1),
status: status, headers: headers)
let part = HTTPServerResponsePart.head(head)
_ = channel.writeAndFlush(part).mapIfError(handleError)
}
func handleError(_ error: Error) {
print("ERROR:", error)
end()
}
func end() {
guard !didEnd else { return }
didEnd = true
_ = channel.writeAndFlush(HTTPServerResponsePart.end(nil))
.map { self.channel.close() }
}
}
複製代碼
在 HTTPHandler 中使用
case .head(let header):
let request = IncomingMessage(header: header)
let response = ServerResponse(channel: ctx.channel)
print("req:", header.method, header.uri, request)
response.send("Way easier to send data!!!")
複製代碼
中間件其實就是閉包,採用 typealias 進行別名定義:
public typealias Next = ( Any... ) -> Void
public typealias Middleware = (IncomingMessage, ServerResponse, @escaping Next ) -> Void
複製代碼
open class Router {
/// The sequence of Middleware functions.
private var middleware = [ Middleware ]()
/// Add another middleware (or many) to the list
open func use(_ middleware: Middleware...) {
self.middleware.append(contentsOf: middleware)
}
/// Request handler. Calls its middleware list
/// in sequence until one doesn't call `next()`.
func handle(request : IncomingMessage, response : ServerResponse, next upperNext : @escaping Next)
{
let stack = self.middleware
guard !stack.isEmpty else { return upperNext() }
var next : Next? = { ( args : Any... ) in }
var i = stack.startIndex
next = { (args : Any...) in
// grab next item from matching middleware array
let middleware = stack[i]
i = stack.index(after: i)
let isLast = i == stack.endIndex
middleware(request, response, isLast ? upperNext : next!)
}
next!()
}
}
複製代碼
將路由類接入Express
open class Express : Router { // <= make Router the superclass
...
}
// -------
final class HTTPHandler : ChannelInboundHandler {
typealias InboundIn = HTTPServerRequestPart
let router : Router
init(router: Router) {
self.router = router
}
func channelRead(ctx: ChannelHandlerContext, data: NIOAny) {
let reqPart = unwrapInboundIn(data)
switch reqPart {
case .head(let header):
let request = IncomingMessage(header: header)
let response = ServerResponse(channel: ctx.channel)
// trigger Router
router.handle(request: request, response: response) {
(items : Any...) in // the final handler
response.status = .notFound
response.send("No middleware handled the request!")
}
// ignore incoming content to keep it micro :-)
case .body, .end: break
}
}
}
// ------
...
.childChannelInitializer { channel in
channel.pipeline.configureHTTPServerPipeline().then {
channel.pipeline.add(
handler: HTTPHandler(router: self))
}
}
...
複製代碼
在 main.swift 中使用中間件和路由
let app = Express()
// Logging
app.use { req, res, next in
print("\(req.header.method):", req.header.uri)
next() // continue processing
}
// Request Handling
app.use { _, res, _ in
res.send("Hello, Schwifty world!")
}
app.listen(1337)
複製代碼
有了 use()
,接下來實現 get(path)
public extension Router {
/// Register a middleware which triggers on a `GET`
/// with a specific path prefix.
public func get(_ path: String = "", middleware: @escaping Middleware)
{
use { req, res, next in
guard req.header.method == .GET,
req.header.uri.hasPrefix(path)
else { return next() }
middleware(req, res, next)
}
}
}
複製代碼
import Foundation
fileprivate let paramDictKey =
"de.zeezide.µe.param"
/// A middleware which parses the URL query
/// parameters. You can then access them
/// using:
///
/// req.param("id")
///
public
func queryString(req : IncomingMessage, res : ServerResponse, next : @escaping Next)
{
// use Foundation to parse the `?a=x`
// parameters
if let queryItems = URLComponents(string: req.header.uri)?.queryItems {
req.userInfo[paramDictKey] =
Dictionary(grouping: queryItems, by: { $0.name })
.mapValues { $0.flatMap({ $0.value })
.joined(separator: ",") }
}
// pass on control to next middleware
next()
}
public extension IncomingMessage {
/// Access query parameters, like:
///
/// let userID = req.param("id")
/// let token = req.param("token")
///
func param(_ id: String) -> String? {
return (userInfo[paramDictKey]
as? [ String : String ])?[id]
}
}
複製代碼
app.use(queryString) // parse query params
app.get { req, res, _ in
let text = req.param("text")
?? "Schwifty"
res.send("Hello, \(text) world!")
}
複製代碼
public extension ServerResponse {
/// A more convenient header accessor. Not correct for
/// any header.
public subscript(name: String) -> String? {
set {
assert(!didWriteHeader, "header is out!")
if let v = newValue {
headers.replaceOrAdd(name: name, value: v)
}
else {
headers.remove(name: name)
}
}
get {
return headers[name].joined(separator: ", ")
}
}
}
複製代碼
struct Todo : Codable {
var id : Int
var title : String
var completed : Bool
}
// Our fancy todo "database". Since it is
// immutable it is webscale and lock free,
// if not useless.
let todos = [
Todo(id: 42, title: "Buy beer",
completed: false),
Todo(id: 1337, title: "Buy more beer",
completed: false),
Todo(id: 88, title: "Drink beer",
completed: true)
]
複製代碼
import Foundation
public extension ServerResponse {
/// Send a Codable object as JSON to the client.
func json<T: Encodable>(_ model: T) {
// create a Data struct from the Codable object
let data : Data
do {
data = try JSONEncoder().encode(model)
}
catch {
return handleError(error)
}
// setup JSON headers
self["Content-Type"] = "application/json"
self["Content-Length"] = "\(data.count)"
// send the headers and the data
flushHeader()
var buffer = channel.allocator.buffer(capacity: data.count)
buffer.write(bytes: data)
let part = HTTPServerResponsePart.body(.byteBuffer(buffer))
_ = channel.writeAndFlush(part)
.mapIfError(handleError)
.map { self.end() }
}
}
複製代碼
app.get("/todomvc") { _, res, _ in
// send JSON to the browser
res.json(todos)
}
複製代碼
以上就實現了一個小型的 Express Web 框架了,總體寫下來,對平時使用的諸多現成框架也有了更深入的理解。