elixir官方教程Mix與OTP(七) 任務與gen_tcp

#任務與gen_tcp服務器

  1. 回顯服務器
  2. 任務
  3. 任務主管

本章,咱們將學習如何使用Erlang的:gen_tcp模塊來處理請求.這是一個探索Elixir的Task模塊的好機會.在後面的章節咱們將擴展咱們的服務器,讓它可以執行命令.併發

#回顯服務器(Echo server)app

咱們將實現一個回顯服務器,做爲咱們的TCP服務器的開始.它會簡單地發送一個回覆,內容就是在請求中收到的文本.咱們將緩慢地升級服務器,直到它被監督並準備好處理多重鏈接.socket

一個TCP服務器,歸納地說,會有以下表現:tcp

  1. 監聽一個端口,直到端口可用,並得到套接字
  2. 在那個端口等待客戶端鏈接並接受它
  3. 讀取客戶端請求並進行回覆

讓咱們來實現這些步驟.進入apps/kv_server應用,打開lib/kv_server.ex,並添加下列函數:ide

require Logger

def accept(port) do
  # 下列選項的意思是:
  #
  # 1. `:binary` - 以二進制數接受數據 (而非列表)
  # 2. `packet: :line` - 逐行接收數據
  # 3. `active: false` - 阻塞在 `:gen_tcp.recv/2` 直到數據可用
  # 4. `reuseaddr: true` - 容許咱們重用數據當監聽器崩潰時
  {:ok, socket} = :gen_tcp.listen(port,
                    [:binary, packet: :line, active: false, reuseaddr: true])
  Logger.info "Accepting connections on port #{port}"
  loop_acceptor(socket)
end

defp loop_acceptor(socket) do
  {:ok, client} = :gen_tcp.accept(socket)
  serve(client)
  loop_acceptor(socket)
end

defp serve(socket) do
  socket
  |> read_line()
  |> write_line(socket)

  serve(socket)
end

defp read_line(socket) do
  {:ok, data} = :gen_tcp.recv(socket, 0)
  data
end

defp write_line(line, socket) do
  :gen_tcp.send(socket, line)
end

咱們將經過調用KVServer.accept(4040)來開啓咱們的服務器,這裏的4040是端口.accept/1的第一步就是監聽端口,直到套接字可用,而後調用loop_acceptor/1.loop_acceptor/1只是一個接收客戶端鏈接的循環.對於每一個被接收的鏈接,咱們調用serve/1.函數

serve/1是另外一個循環,它從套接字中讀取一行並將那些行寫回套接字中.注意serve/1函數使用了管道操做符|>來表示這個操做流.管道操做符的左邊會被執行,並將結果做爲右邊函數的第一個參數.上面的例子:oop

socket |> read_line() |> write_line(socket)

等同於:學習

write_line(read_line(socket), socket)

read_line/1經過:gen_tcp.recv/2實現了從套接字中接收數據,而write_line/2使用:gen_tcp.send/2向套接字中寫入.ui

這些就是咱們實現回顯服務器所需的一切.讓咱們來試一試!

經過iex -S mixkv_server應用中運行一個IEx會話.在IEx中,運行:

iex> KVServer.accept(4040)

如今服務器已經運行了,你會發現控制檯阻塞了.讓咱們使用一個telnet客戶端來訪問咱們的服務器.這些客戶端在大多數操做系統中都是可用的,它們的命令行也是相似的:

$ telnet 127.0.0.1 4040
Trying 127.0.0.1...
Connected to localhost.
Escape character is '^]'.
hello
hello
is it me
is it me
you are looking for?
you are looking for?

輸入"hello",敲擊回車,將返回"hello".完美!

個人telnet客戶端能夠經過輸入ctrl + ],輸入quit,再敲擊<Enter>來退出,但你的客戶端可能步驟會有不一樣.

一旦你退出了telnet客戶端,你會在IEx會話中看到一個錯誤:

** (MatchError) no match of right hand side value: {:error, :closed}
    (kv_server) lib/kv_server.ex:41: KVServer.read_line/1
    (kv_server) lib/kv_server.ex:33: KVServer.serve/1
    (kv_server) lib/kv_server.ex:27: KVServer.loop_acceptor/1

這是由於咱們指望從:gen_tcp.recv/2中得到數據,但客戶端關閉了鏈接.咱們將在以後對服務器的修改中更好地處理這種情形.

如今,這裏有一個更加劇要的bug須要咱們去修復:當咱們的TCP接收器崩潰時會發生什麼?由於這裏沒有監督,因此服務器死亡後,咱們不能再處理更多請求,由於它不會重啓.這就是爲何咱們必須將服務器移動到一個監督樹上.

#任務(Tasks)

咱們已經學習了代理,通用服務器,以及主管.它們都須要處理多重的消息或消息狀態.可是當咱們只須要執行一些任務時應該使用什麼呢?

Task模塊提供了這個功能.例如,它有一個start_link/3函數用於接收模塊,函數和參數,容許咱們運行一個給定的函數做爲監督樹的一部分.

來試一下.打開lib/kv_server.ex,將start/2函數中的主管修改爲:

def start(_type, _args) do
  import Supervisor.Spec

  children = [
    worker(Task, [KVServer, :accept, [4040]])
  ]

  opts = [strategy: :one_for_one, name: KVServer.Supervisor]
  Supervisor.start_link(children, opts)
end

經過這個改動,咱們將KVServer.accept(4040)做爲一個工人來運行.咱們一直在編寫端口,而等一下咱們將會討論它能夠被如何改動.

如今服務器是監督樹的一部分了,它會在咱們運行應用時自動啓動.在終端中輸入mix run --no-halt,並再次使用telnet客戶端來確認一切正常:

$ telnet 127.0.0.1 4040
Trying 127.0.0.1...
Connected to localhost.
Escape character is '^]'.
say you
say you
say me
say me

正常!若是你殺死客戶端,致使整個服務器崩潰,你會看到另外一個馬上啓動了.然而,它是規模化(scale)的嗎?

試着同時鏈接兩個telnet客戶端.當你這麼作時,你會發現,第二個客戶端沒有回聲:

$ telnet 127.0.0.1 4040
Trying 127.0.0.1...
Connected to localhost.
Escape character is '^]'.
hello
hello?
HELLOOOOOO?

它彷佛徹底不能工做.緣由是咱們接受鏈接和處理請求是在同一個進程中.當一個客戶端鏈接上,咱們就不能再接受其它客戶端了.

#任務主管(Task Supervisor)

爲了使咱們的服務器可以處理同時鏈接,咱們須要有一個接收器進程,它可以生成其它進程來處理請求.一個方法要修改一下:

defp loop_acceptor(socket) do
  {:ok, client} = :gen_tcp.accept(socket)
  serve(client)
  loop_acceptor(socket)
end

要使用Task.start_link/1,它相似於Task.start_link/3,可是它接收的是一個匿名函數而非模塊,函數和參數:

defp loop_acceptor(socket) do
  {:ok, client} = :gen_tcp.accept(socket)
  Task.start_link(fn -> serve(client) end)
  loop_acceptor(socket)
end

咱們直接在接收器進程中啓動了一個連接任務.但咱們已經犯過這種錯誤了.你還記得嗎?

這相似於咱們從註冊表中直接調用KV.Bucket.start_link/0時所犯的錯誤.那意味着任何桶的失敗都會致使整個註冊表崩潰.

上述代碼也有一樣的缺陷:若是咱們將serve(client)任務連接到接收器,一個處理請求時的崩潰將會傳遞給接收器,從而致使其它全部鏈接失敗.

咱們爲註冊表修正了這個問題,經過使用一個簡單地一對一主管.咱們在此將如法炮製,因爲這個模式太經常使用於任務了,Task已經提供了一個方案:在咱們的監督樹上,可使用一個簡單的一對一主管,附帶臨時的工人.

讓咱們再次修改start/2,添加一個主管到咱們的樹上:

def start(_type, _args) do
  import Supervisor.Spec

  children = [
    supervisor(Task.Supervisor, [[name: KVServer.TaskSupervisor]]),
    worker(Task, [KVServer, :accept, [4040]])
  ]

  opts = [strategy: :one_for_one, name: KVServer.Supervisor]
  Supervisor.start_link(children, opts)
end

咱們簡單地啓動了一個名爲KVSever.TaskSupervisorTask.Supervisor進程.記住,由於接受其任務依賴於該主管,因此主管必須先啓動.

如今咱們只須要修改loop_acceptor/1來使用Task.Supervisor處理每一個請求:

defp loop_acceptor(socket) do
  {:ok, client} = :gen_tcp.accept(socket)
  {:ok, pid} = Task.Supervisor.start_child(KVServer.TaskSupervisor, fn -> serve(client) end)
  :ok = :gen_tcp.controlling_process(client, pid)
  loop_acceptor(socket)
end

你可能注意到咱們添加了一行,:ok = :gen_tcp.controlling_process(client, pid).這使得子進程成爲了client套接字的"控制進程".若是不這樣作的話,一旦接收器崩潰,它就會關閉全部客戶端,由於套接字默認被綁定到了那個接收它們的進程上.

經過mix run --no-halt啓動一個新的服務器,如今咱們能夠打開許多併發的telnet客戶端了.你也會發現退出一個客戶端不會致使接收器崩潰了.優秀!

這裏是完整的回顯服務器實現,在單獨的模塊中:

defmodule KVServer do
  use Application
  require Logger

  @doc false
  def start(_type, _args) do
    import Supervisor.Spec

    children = [
      supervisor(Task.Supervisor, [[name: KVServer.TaskSupervisor]]),
      worker(Task, [KVServer, :accept, [4040]])
    ]

    opts = [strategy: :one_for_one, name: KVServer.Supervisor]
    Supervisor.start_link(children, opts)
  end

  @doc """
  Starts accepting connections on the given `port`.
  """
  def accept(port) do
    {:ok, socket} = :gen_tcp.listen(port,
                      [:binary, packet: :line, active: false, reuseaddr: true])
    Logger.info "Accepting connections on port #{port}"
    loop_acceptor(socket)
  end

  defp loop_acceptor(socket) do
    {:ok, client} = :gen_tcp.accept(socket)
    {:ok, pid} = Task.Supervisor.start_child(KVServer.TaskSupervisor, fn -> serve(client) end)
    :ok = :gen_tcp.controlling_process(client, pid)
    loop_acceptor(socket)
  end

  defp serve(socket) do
    socket
    |> read_line()
    |> write_line(socket)

    serve(socket)
  end

  defp read_line(socket) do
    {:ok, data} = :gen_tcp.recv(socket, 0)
    data
  end

  defp write_line(line, socket) do
    :gen_tcp.send(socket, line)
  end
end

因爲咱們改變了主管的規則,咱們不由要問:咱們的監督策略還正確嗎?

在這種狀況下,是正確的:若是接收器崩潰,不須要關閉現存的鏈接.換句話說,若是任務主管崩潰,也不須要關閉接收器.

下一章咱們將開始解析客戶端請求和發送回覆,完成咱們的服務器.

相關文章
相關標籤/搜索