#任務與gen_tcp服務器
本章,咱們將學習如何使用Erlang的:gen_tcp
模塊來處理請求.這是一個探索Elixir的Task
模塊的好機會.在後面的章節咱們將擴展咱們的服務器,讓它可以執行命令.併發
#回顯服務器(Echo server)app
咱們將實現一個回顯服務器,做爲咱們的TCP服務器的開始.它會簡單地發送一個回覆,內容就是在請求中收到的文本.咱們將緩慢地升級服務器,直到它被監督並準備好處理多重鏈接.socket
一個TCP服務器,歸納地說,會有以下表現:tcp
讓咱們來實現這些步驟.進入apps/kv_server
應用,打開lib/kv_server.ex
,並添加下列函數:ide
require Logger def accept(port) do # 下列選項的意思是: # # 1. `:binary` - 以二進制數接受數據 (而非列表) # 2. `packet: :line` - 逐行接收數據 # 3. `active: false` - 阻塞在 `:gen_tcp.recv/2` 直到數據可用 # 4. `reuseaddr: true` - 容許咱們重用數據當監聽器崩潰時 {:ok, socket} = :gen_tcp.listen(port, [:binary, packet: :line, active: false, reuseaddr: true]) Logger.info "Accepting connections on port #{port}" loop_acceptor(socket) end defp loop_acceptor(socket) do {:ok, client} = :gen_tcp.accept(socket) serve(client) loop_acceptor(socket) end defp serve(socket) do socket |> read_line() |> write_line(socket) serve(socket) end defp read_line(socket) do {:ok, data} = :gen_tcp.recv(socket, 0) data end defp write_line(line, socket) do :gen_tcp.send(socket, line) end
咱們將經過調用KVServer.accept(4040)
來開啓咱們的服務器,這裏的4040是端口.accept/1
的第一步就是監聽端口,直到套接字可用,而後調用loop_acceptor/1
.loop_acceptor/1
只是一個接收客戶端鏈接的循環.對於每一個被接收的鏈接,咱們調用serve/1
.函數
serve/1
是另外一個循環,它從套接字中讀取一行並將那些行寫回套接字中.注意serve/1
函數使用了管道操做符|>
來表示這個操做流.管道操做符的左邊會被執行,並將結果做爲右邊函數的第一個參數.上面的例子:oop
socket |> read_line() |> write_line(socket)
等同於:學習
write_line(read_line(socket), socket)
read_line/1
經過:gen_tcp.recv/2
實現了從套接字中接收數據,而write_line/2
使用:gen_tcp.send/2
向套接字中寫入.ui
這些就是咱們實現回顯服務器所需的一切.讓咱們來試一試!
經過iex -S mix
在kv_server
應用中運行一個IEx會話.在IEx中,運行:
iex> KVServer.accept(4040)
如今服務器已經運行了,你會發現控制檯阻塞了.讓咱們使用一個telnet
客戶端來訪問咱們的服務器.這些客戶端在大多數操做系統中都是可用的,它們的命令行也是相似的:
$ telnet 127.0.0.1 4040 Trying 127.0.0.1... Connected to localhost. Escape character is '^]'. hello hello is it me is it me you are looking for? you are looking for?
輸入"hello",敲擊回車,將返回"hello".完美!
個人telnet客戶端能夠經過輸入ctrl + ]
,輸入quit
,再敲擊<Enter>
來退出,但你的客戶端可能步驟會有不一樣.
一旦你退出了telnet客戶端,你會在IEx會話中看到一個錯誤:
** (MatchError) no match of right hand side value: {:error, :closed} (kv_server) lib/kv_server.ex:41: KVServer.read_line/1 (kv_server) lib/kv_server.ex:33: KVServer.serve/1 (kv_server) lib/kv_server.ex:27: KVServer.loop_acceptor/1
這是由於咱們指望從:gen_tcp.recv/2
中得到數據,但客戶端關閉了鏈接.咱們將在以後對服務器的修改中更好地處理這種情形.
如今,這裏有一個更加劇要的bug須要咱們去修復:當咱們的TCP接收器崩潰時會發生什麼?由於這裏沒有監督,因此服務器死亡後,咱們不能再處理更多請求,由於它不會重啓.這就是爲何咱們必須將服務器移動到一個監督樹上.
#任務(Tasks)
咱們已經學習了代理,通用服務器,以及主管.它們都須要處理多重的消息或消息狀態.可是當咱們只須要執行一些任務時應該使用什麼呢?
Task模塊提供了這個功能.例如,它有一個start_link/3
函數用於接收模塊,函數和參數,容許咱們運行一個給定的函數做爲監督樹的一部分.
來試一下.打開lib/kv_server.ex
,將start/2
函數中的主管修改爲:
def start(_type, _args) do import Supervisor.Spec children = [ worker(Task, [KVServer, :accept, [4040]]) ] opts = [strategy: :one_for_one, name: KVServer.Supervisor] Supervisor.start_link(children, opts) end
經過這個改動,咱們將KVServer.accept(4040)
做爲一個工人來運行.咱們一直在編寫端口,而等一下咱們將會討論它能夠被如何改動.
如今服務器是監督樹的一部分了,它會在咱們運行應用時自動啓動.在終端中輸入mix run --no-halt
,並再次使用telnet
客戶端來確認一切正常:
$ telnet 127.0.0.1 4040 Trying 127.0.0.1... Connected to localhost. Escape character is '^]'. say you say you say me say me
正常!若是你殺死客戶端,致使整個服務器崩潰,你會看到另外一個馬上啓動了.然而,它是規模化(scale)的嗎?
試着同時鏈接兩個telnet客戶端.當你這麼作時,你會發現,第二個客戶端沒有回聲:
$ telnet 127.0.0.1 4040 Trying 127.0.0.1... Connected to localhost. Escape character is '^]'. hello hello? HELLOOOOOO?
它彷佛徹底不能工做.緣由是咱們接受鏈接和處理請求是在同一個進程中.當一個客戶端鏈接上,咱們就不能再接受其它客戶端了.
#任務主管(Task Supervisor)
爲了使咱們的服務器可以處理同時鏈接,咱們須要有一個接收器進程,它可以生成其它進程來處理請求.一個方法要修改一下:
defp loop_acceptor(socket) do {:ok, client} = :gen_tcp.accept(socket) serve(client) loop_acceptor(socket) end
要使用Task.start_link/1
,它相似於Task.start_link/3
,可是它接收的是一個匿名函數而非模塊,函數和參數:
defp loop_acceptor(socket) do {:ok, client} = :gen_tcp.accept(socket) Task.start_link(fn -> serve(client) end) loop_acceptor(socket) end
咱們直接在接收器進程中啓動了一個連接任務.但咱們已經犯過這種錯誤了.你還記得嗎?
這相似於咱們從註冊表中直接調用KV.Bucket.start_link/0
時所犯的錯誤.那意味着任何桶的失敗都會致使整個註冊表崩潰.
上述代碼也有一樣的缺陷:若是咱們將serve(client)
任務連接到接收器,一個處理請求時的崩潰將會傳遞給接收器,從而致使其它全部鏈接失敗.
咱們爲註冊表修正了這個問題,經過使用一個簡單地一對一主管.咱們在此將如法炮製,因爲這個模式太經常使用於任務了,Task
已經提供了一個方案:在咱們的監督樹上,可使用一個簡單的一對一主管,附帶臨時的工人.
讓咱們再次修改start/2
,添加一個主管到咱們的樹上:
def start(_type, _args) do import Supervisor.Spec children = [ supervisor(Task.Supervisor, [[name: KVServer.TaskSupervisor]]), worker(Task, [KVServer, :accept, [4040]]) ] opts = [strategy: :one_for_one, name: KVServer.Supervisor] Supervisor.start_link(children, opts) end
咱們簡單地啓動了一個名爲KVSever.TaskSupervisor
的Task.Supervisor
進程.記住,由於接受其任務依賴於該主管,因此主管必須先啓動.
如今咱們只須要修改loop_acceptor/1
來使用Task.Supervisor
處理每一個請求:
defp loop_acceptor(socket) do {:ok, client} = :gen_tcp.accept(socket) {:ok, pid} = Task.Supervisor.start_child(KVServer.TaskSupervisor, fn -> serve(client) end) :ok = :gen_tcp.controlling_process(client, pid) loop_acceptor(socket) end
你可能注意到咱們添加了一行,:ok = :gen_tcp.controlling_process(client, pid)
.這使得子進程成爲了client
套接字的"控制進程".若是不這樣作的話,一旦接收器崩潰,它就會關閉全部客戶端,由於套接字默認被綁定到了那個接收它們的進程上.
經過mix run --no-halt
啓動一個新的服務器,如今咱們能夠打開許多併發的telnet客戶端了.你也會發現退出一個客戶端不會致使接收器崩潰了.優秀!
這裏是完整的回顯服務器實現,在單獨的模塊中:
defmodule KVServer do use Application require Logger @doc false def start(_type, _args) do import Supervisor.Spec children = [ supervisor(Task.Supervisor, [[name: KVServer.TaskSupervisor]]), worker(Task, [KVServer, :accept, [4040]]) ] opts = [strategy: :one_for_one, name: KVServer.Supervisor] Supervisor.start_link(children, opts) end @doc """ Starts accepting connections on the given `port`. """ def accept(port) do {:ok, socket} = :gen_tcp.listen(port, [:binary, packet: :line, active: false, reuseaddr: true]) Logger.info "Accepting connections on port #{port}" loop_acceptor(socket) end defp loop_acceptor(socket) do {:ok, client} = :gen_tcp.accept(socket) {:ok, pid} = Task.Supervisor.start_child(KVServer.TaskSupervisor, fn -> serve(client) end) :ok = :gen_tcp.controlling_process(client, pid) loop_acceptor(socket) end defp serve(socket) do socket |> read_line() |> write_line(socket) serve(socket) end defp read_line(socket) do {:ok, data} = :gen_tcp.recv(socket, 0) data end defp write_line(line, socket) do :gen_tcp.send(socket, line) end end
因爲咱們改變了主管的規則,咱們不由要問:咱們的監督策略還正確嗎?
在這種狀況下,是正確的:若是接收器崩潰,不須要關閉現存的鏈接.換句話說,若是任務主管崩潰,也不須要關閉接收器.
下一章咱們將開始解析客戶端請求和發送回覆,完成咱們的服務器.