ActionCable: WebSocket Connection

每次 HTTP 請求,都是一個 request 到 response 的過程,因此 HTTP 服務器不須要維護任何狀態,html

而 WebSocket 須要 server 維護全部的鏈接狀態,Ruby 並不很擅長處理併發。ActionCable 爲了性能和易用性,用到了不少有趣的技術。java

好比 message 的 pub/sub 使用了 redis 的 pub/sub (也有其餘的 adapter)。git

在併發處理上,使用了 current-ruby 的 ThreadPoolExecutor,而處理 WebSocket 的鏈接,使用了 nio4r 和 websocket-driver。github

下面主要介紹一下這兩種技術和 I/O 模型。web

websocket-driver

websocket-driver 主要處理 WebSocket 協議,並對 I/O 層作了解耦,也就是說,能夠用不一樣的方式來處理 I/O 事件。同時提供了 :open,:message,:close 等一系列事件,方便使用。redis

使用 websocket-driver 的 Server-side scoket 須要實現兩個方法,url 和 write(string)urlruby

url 是用戶用來鏈接 websocket 的url。服務器

write(string) 方法用來將數據寫入到 stream 內。websocket

以後,當 I/O 監聽到數據的時候,再調用 WebSocket::Driver#parse 方法對數據進行解析。session

解析完成後,WebSocket 會經過 :message 事件將解析好的數據傳回

websocket-driver README 給出的 example 使用的是 EventMachine 處理 I/O 操做,而 ActionCable 則直接使用了 nio4r。

I/O 模型

I/O 操做,就是在主存(main memory)和外設之間作數據的複製操做。

而外設通常都比主存慢不少,爲了處理速度不匹配問題,可使用不一樣的方法對 I/O 進行操做,既 I/O 模型。

同步阻塞IO(Blocking IO)

同步阻塞 I/O,即傳統的 I/O 模型。

簡單來講,就是當進行 I/O 操做的時候,線程被掛起(suspend),直到 I/O 操做進行完,再執行程序以後的操做。也就是說,在進行 IO 操做的時候,線程一直處於等在狀態,因此稱爲 Blocking IO,Ruby 提供了 IO.read 方法。

同步非阻塞IO(Non-blocking IO)

進行 IO 操做時,不管系統是否準備好數據,都直接返回。若是沒有準備好,則返回 error,當用戶收到 error 的時候,能夠再次嘗試 IO 操做。Ruby 提供了IO.read_nonblock 方法。

IO多路複用(IO Multiplexing)

經過 select 讓 Kernal 掛起(suspend)當前線程,當一個或者多個 I/O 事件發生的時候,再把控制權交給應用程序。

IO多路複用適用於同時須要監聽多個 IO 對象的場景。

select 的時間複雜度是 O(n),Linux 提供了更高效的版本 epoll,epoll 時間複雜度是 O(log n))。thin 提供了是否使用 epoll 的選項。Ruby 提供了 IO.select 方法。

nio4r

nio4r 能夠說是 java.nio 的 Ruby 實現,但提供了更簡單的接口。

nio4r 主要有兩個部分:

Selectors: 用來同時監控多個 I/O 對象

Monitors: 用來追蹤所關心的 I/O 事件,好比讀。

require "nio"

server = TCPServer.new("127.0.0.1", 12345)

selector = NIO::Selector.new

3.times do
  client = server.accept
  _monitor = selector.register(client, :r)
end

ready = selector.select

首先是建立一個 NIO::Selector對象,而後將所關心的 I/O 事件經過 NIO::Selector#registor 註冊到 selector 對象上。

最後在一個循環內,調用 NIO::Selector#select 方法,選出 ready 的事件。

在 ActionCable 中處理 I/O 的類是 Connection::StreamEventLoop,初始化 selector 在 @nio ||= NIO::Selector.new,而後由 StreamEventLoop#attach 方法將 I/O 事件註冊到 nio selector 上。

最後由 run 方法來處理事件監聽 run,使用 IO.read_nonblock 對 stream 進行讀取操做

References

相關文章
相關標籤/搜索