參考資料:http://erlangcentral.org/wiki/index.php/Building_a_Non-blocking_TCP_server_using_OTP_principlesphp
服務器設計
tcp_server_app下的根監控樹使用one_for_one重啓策略。兩個子樹應用,第一個是一個tcp套接字監聽服務器,使用gen_server模式來實現,採用異步監聽的客戶端鏈接的模式。第二個是一個客戶端應用,使用gen_fsm模式實現,使用標準SASL錯誤報告接口,記錄客戶端消息處理的日誌以及非正常與服務器斷開鏈接日誌。node
總體應用架構:
+----------------+
| tcp_server_app |
+--------+-------+
| (one_for_one)
+----------------+---------+
| |
+-------+------+ +-------+--------+
| tcp_listener | + tcp_client_sup |
+--------------+ +-------+--------+
| (simple_one_for_one)
+-----|---------+
+-------|--------+|
+--------+-------+|+
| tcp_echo_fsm |+
+----------------+api
tcp_server代碼以下:服務器
1 %% TCP Server Application (tcp_server_app.erl) 2 -module(tcp_server_app). 3 -author('saleyn@gmail.com'). 4 5 %% 實現application模式 6 -behaviour(application). 7 8 -export([start_client/0]). 9 10 %% 應用程序啓動以及監控樹回調函數 11 -export([start/2, stop/1, init/1]). 12 13 %% 宏變量定義 14 -define(MAX_RESTART, 5). 15 -define(MAX_TIME, 60). 16 -define(DEF_PORT, 2222). 17 18 %% 啓動客戶端進程的接口 19 %% 在監聽程序創建鏈接時調用 20 start_client() -> 21 %% 回調第二個init函數,由於第二個是動態添加監控樹子節點 22 %% 也就是說這裏是兩顆不一樣的監控樹,使用了一個模塊兩個 init 函數來實現 23 supervisor:start_child(tcp_client_sup, []). 24 25 %%---------------------------------------------------------------------- 26 %% Application behaviour callbacks 27 %%---------------------------------------------------------------------- 28 start(_Type, _Args) -> 29 %% 獲取端口配置參數,找不到時返回默認端口 ?DEF_PORT 30 ListenPort = get_app_env(listen_port, ?DEF_PORT), 31 32 %% 啓動應用程序,回調函數爲 第一個 init 函數,根據參數匹配,參數爲 [端口,客戶端回調模塊] 33 %% 第一個 init 函數僅僅是啓動了兩個監控樹 34 supervisor:start_link({local, ?MODULE}, ?MODULE, [ListenPort, tcp_echo_fsm]). 35 36 stop(_S) -> 37 ok. 38 39 %%---------------------------------------------------------------------- 40 %% Supervisor behaviour callbacks 41 %%---------------------------------------------------------------------- 42 init([Port, Module]) -> 43 {ok, 44 %% 監控樹策略參數,ono_for_one策略,設置MAX_TIME最多重啓的MAX_RESTART次 45 {_SupFlags = {one_for_one, ?MAX_RESTART, ?MAX_TIME}, 46 [ 47 % TCP Listener 48 { tcp_server_sup, % Id = internal id 49 {tcp_listener,start_link,[Port,Module]}, % StartFun = {M, F, A} 50 permanent, % Restart = permanent | transient | temporary 51 2000, % Shutdown = brutal_kill | int() >= 0 | infinity 52 worker, % Type = worker | supervisor 53 [tcp_listener] % Modules = [Module] | dynamic 54 }, 55 % Client instance supervisor 56 { 57 %% Module參數初始化了tcp_client_sup監控樹的 init 函數, init 函數在下面 58 tcp_client_sup, 59 %% 子節點啓動策略 60 {supervisor,start_link,[{local, tcp_client_sup}, ?MODULE, [Module]]}, 61 permanent, % Restart = permanent | transient | temporary 62 infinity, % Shutdown = brutal_kill | int() >= 0 | infinity 63 supervisor, % Type = worker | supervisor 64 [] % Modules = [Module] | dynamic 65 } 66 ] 67 } 68 }; 69 70 %% 在服務器接收鏈接時,建立客戶端進程時會回調到這個函數,使用simple_one_for_one啓動策略 71 %% 參數 Module 在第一個 72 init([Module]) -> 73 {ok, 74 %% 另一種根監督樹模式,simple_one_for_one策略子節點只能動態添加 75 {_SupFlags = {simple_one_for_one, ?MAX_RESTART, ?MAX_TIME}, 76 [ 77 % TCP Client 78 { undefined, % Id = internal id 79 {Module,start_link,[]}, % StartFun = {M, F, A} 80 temporary, % Restart = permanent | transient | temporary 81 2000, % Shutdown = brutal_kill | int() >= 0 | infinity 82 worker, % Type = worker | supervisor 83 [] % Modules = [Module] | dynamic 84 } 85 ] 86 } 87 }. 88 89 %%---------------------------------------------------------------------- 90 %% Internal functions 91 %%---------------------------------------------------------------------- 92 %% 獲取配置文件xxx.app文件中的配置變量 93 get_app_env(Opt, Default) -> 94 case application:get_env(application:get_application(), Opt) of 95 {ok, Val} -> Val; 96 _ -> 97 case init:get_argument(Opt) of 98 [[Val | _]] -> Val; 99 error -> Default 100 end 101 end.
下面是服務端socket監聽程序,這裏使用了一個不具備官方文檔的 api
prim_inet:async_accept/2 來實現一個異步監聽套接字的服務器程序,代碼以下:架構
1 % TCP Listener Process (tcp_listener.erl) 2 -module(tcp_listener). 3 -author('saleyn@gmail.com'). 4 5 %% 實現 gen_server 模式 6 -behaviour(gen_server). 7 8 %% 內部接口 9 -export([start_link/2]). 10 11 %% gen_server 回調函數 12 -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, 13 code_change/3]). 14 15 %% 定義了一個 record 記錄 gen_server 進程的狀態 16 -record(state, { 17 listener, % Listening socket 18 acceptor, % Asynchronous acceptor's internal reference 19 module % FSM handling module 20 }). 21 22 %%-------------------------------------------------------------------- 23 %% @spec (Port::integer(), Module) -> {ok, Pid} | {error, Reason} 24 %% @doc 監控樹調用並開始進行tcp套接字監聽 25 %% @end 26 %%---------------------------------------------------------------------- 27 start_link(Port, Module) when is_integer(Port), is_atom(Module) -> 28 gen_server:start_link({local, ?MODULE}, ?MODULE, [Port, Module], []). 29 30 %%%------------------------------------------------------------------------ 31 %%% Callback functions from gen_server 32 %%%------------------------------------------------------------------------ 33 34 %%---------------------------------------------------------------------- 35 %% @spec (Port::integer()) -> {ok, State} | 36 %% {ok, State, Timeout} | 37 %% ignore | 38 %% {stop, Reason} 39 %% 40 %% @doc gen_server啓動時回調,並建立 tcp 監聽 41 %% @end 42 %%---------------------------------------------------------------------- 43 init([Port, Module]) -> 44 process_flag(trap_exit, true), 45 Opts = [binary, {packet, 2}, {reuseaddr, true}, 46 {keepalive, true}, {backlog, 30}, {active, false}], 47 %% 使用 gen_tcp 模塊啓動套接字監聽,這是一個阻塞動做 48 case gen_tcp:listen(Port, Opts) of 49 {ok, Listen_socket} -> %% 建立監聽成功返回監聽socket 50 %% 建立第一個接受鏈接的進程 51 %% prim_inet:async_accept/2開啓異步監聽 52 %% 以後有客戶端鏈接時會向此進程發送一個異步消息inet_async到進程消息隊列 53 %% Ref 存儲接受進程的引用 54 {ok, Ref} = prim_inet:async_accept(Listen_socket, -1), 55 {ok, #state{listener = Listen_socket, 56 acceptor = Ref, 57 module = Module}}; 58 {error, Reason} -> 59 {stop, Reason} 60 end. 61 62 %%------------------------------------------------------------------------- 63 %% @spec (Request, From, State) -> {reply, Reply, State} | 64 %% {reply, Reply, State, Timeout} | 65 %% {noreply, State} | 66 %% {noreply, State, Timeout} | 67 %% {stop, Reason, Reply, State} | 68 %% {stop, Reason, State} 69 %% @doc 服務進程被同步調用時的回調函數 70 %% @end 71 %% @private 72 %%------------------------------------------------------------------------- 73 handle_call(Request, _From, State) -> 74 {stop, {unknown_call, Request}, State}. 75 76 %%------------------------------------------------------------------------- 77 %% @spec (Msg, State) ->{noreply, State} | 78 %% {noreply, State, Timeout} | 79 %% {stop, Reason, State} 80 %% @doc 服務進程被異步調用時的回調函數 81 %% @end 82 %% @private 83 %%------------------------------------------------------------------------- 84 handle_cast(_Msg, State) -> 85 {noreply, State}. 86 87 %%------------------------------------------------------------------------- 88 %% @spec (Msg, State) ->{noreply, State} | 89 %% {noreply, State, Timeout} | 90 %% {stop, Reason, State} 91 %% @doc 回調函數,處理那些直接發消息到進程郵箱的事件 92 %% 這裏處理的是 {inet_async, ListSock, Ref, {ok, CliSocket}}事件, 93 %% inet_async 表示是一個異步事件,服務器端接收鏈接採用異步的方式, 94 %% 客戶端鏈接最終會被轉化成一個 inet_async 消息發送到進程郵箱等待處理 95 %% {{ok, CliSocket}} 裏的CliSocket表示客戶端創建的鏈接套接口 96 %% @end 97 %% @private 98 %%------------------------------------------------------------------------- 99 100 %% 注意這裏 ListSock 以及 Ref 作了匹配,只有匹配了纔是該監聽口接收的鏈接 101 handle_info({inet_async, ListSock, Ref, {ok, CliSocket}}, 102 #state{listener=ListSock, acceptor=Ref, module=Module} = State) -> 103 try 104 case set_sockopt(ListSock, CliSocket) of 105 ok -> ok; 106 {error, Reason} -> exit({set_sockopt, Reason}) 107 end, 108 109 %% 接收新的客戶端鏈接,啓動一個新的客戶端狀態機進程,動態添加到 tcp_client_sup 客戶端監控樹 110 {ok, Pid} = tcp_server_app:start_client(), 111 112 %% 綁定 CliSocet 到客戶端進程 Pid, 這樣CliSocket接收數據都會被轉化成Pid表明進程的郵箱消息 113 gen_tcp:controlling_process(CliSocket, Pid), 114 %% Instruct the new FSM that it owns the socket. 115 116 Module:set_socket(Pid, CliSocket), 117 118 %% Signal the network driver that we are ready to accept another connection 119 %% 從新設置異步監聽下一個客戶端鏈接的消息,設置新的監聽引用 120 %% 必須從新設置才能監聽到 {inet_async,S,Ref,Status} 消息 121 case prim_inet:async_accept(ListSock, -1) of 122 {ok, NewRef} -> ok; 123 {error, NewRef} -> exit({async_accept, inet:format_error(NewRef)}) 124 end, 125 126 %% 更新新的監聽引用 127 {noreply, State#state{acceptor=NewRef}} 128 catch exit:Why -> 129 error_logger:error_msg("Error in async accept: ~p.\n", [Why]), 130 {stop, Why, State} 131 end; 132 133 %%客戶端創建鏈接的容錯處理 134 handle_info({inet_async, ListSock, Ref, Error}, #state{listener=ListSock, acceptor=Ref} = State) -> 135 error_logger:error_msg("Error in socket acceptor: ~p.\n", [Error]), 136 {stop, Error, State}; 137 138 handle_info(_Info, State) -> 139 {noreply, State}. 140 141 %%------------------------------------------------------------------------- 142 %% @spec (Reason, State) -> any 143 %% @doc Callback executed on server shutdown. It is only invoked if 144 %% `process_flag(trap_exit, true)' is set by the server process. 145 %% The return value is ignored. 146 %% @end 147 %% @private 148 %%------------------------------------------------------------------------- 149 terminate(_Reason, State) -> 150 gen_tcp:close(State#state.listener), 151 ok. 152 153 %%------------------------------------------------------------------------- 154 %% @spec (OldVsn, State, Extra) -> {ok, NewState} 155 %% @doc Convert process state when code is changed. 156 %% @end 157 %% @private 158 %%------------------------------------------------------------------------- 159 code_change(_OldVsn, State, _Extra) -> 160 {ok, State}. 161 162 %%%------------------------------------------------------------------------ 163 %%% Internal functions 164 %%%------------------------------------------------------------------------ 165 166 %% 設置客戶端socket的參數選項,只是簡單的複製了監聽服務器的配置選項 167 set_sockopt(ListSock, CliSocket) -> 168 true = inet_db:register_socket(CliSocket, inet_tcp), 169 case prim_inet:getopts(ListSock, [active, nodelay, keepalive, delay_send, priority, tos]) of 170 {ok, Opts} -> 171 case prim_inet:setopts(CliSocket, Opts) of 172 ok -> ok; 173 Error -> gen_tcp:close(CliSocket), Error 174 end; 175 Error -> 176 gen_tcp:close(CliSocket), Error 177 end.
下面是客戶端處理輸出的狀態機:app
1 %% TCP Client Socket Handling FSM (tcp_echo_fsm.erl) 2 %% 客戶端輸出處理狀態機,這裏其實就是一個 echo_server 的客戶端版本 3 4 -module(tcp_echo_fsm). 5 -author('saleyn@gmail.com'). 6 7 %% 實現 gen_fsm 模式,事實上狀態機應用場景沒有 gen_server 多 8 %% 不過能用的場景都比較特殊,好比遊戲客戶端,服務端戰鬥模塊 9 -behaviour(gen_fsm). 10 11 -export([start_link/0, set_socket/2]). 12 13 %% gen_fsm 回調函數 14 -export([init/1, handle_event/3, 15 handle_sync_event/4, handle_info/3, terminate/3, code_change/4]). 16 17 %% FSM States FSM 狀態機的狀態 18 -export([ 19 'WAIT_FOR_SOCKET'/2, %% 等待socket 20 'WAIT_FOR_DATA'/2 %% 等待socket數據 21 ]). 22 23 -record(state, { 24 socket, % client socket 25 addr % client address 26 }). 27 28 -define(TIMEOUT, 120000). 29 30 %%%------------------------------------------------------------------------ 31 %%% API 32 %%%------------------------------------------------------------------------ 33 34 %%------------------------------------------------------------------------- 35 %% @spec (Socket) -> {ok,Pid} | ignore | {error,Error} 36 %% @doc To be called by the supervisor in order to start the server. 37 %% If init/1 fails with Reason, the function returns {error,Reason}. 38 %% If init/1 returns {stop,Reason} or ignore, the process is 39 %% terminated and the function returns {error,Reason} or ignore, 40 %% respectively. 41 %% @end 42 %%------------------------------------------------------------------------- 43 start_link() -> 44 gen_fsm:start_link(?MODULE, [], []). 45 46 set_socket(Pid, Socket) when is_pid(Pid), is_port(Socket) -> 47 gen_fsm:send_event(Pid, {socket_ready, Socket}). 48 49 %%%------------------------------------------------------------------------ 50 %%% Callback functions from gen_server 51 %%%------------------------------------------------------------------------ 52 53 %%------------------------------------------------------------------------- 54 %% Func: init/1 55 %% Returns: {ok, StateName, StateData} | 56 %% {ok, StateName, StateData, Timeout} | 57 %% ignore | 58 %% {stop, StopReason} 59 %% @private 60 %%------------------------------------------------------------------------- 61 init([]) -> 62 process_flag(trap_exit, true), 63 64 %% 狀態機啓動以後的初始化狀態 65 {ok, 'WAIT_FOR_SOCKET', #state{}}. 66 67 %%------------------------------------------------------------------------- 68 %% Func: StateName/2 69 %% Returns: {next_state, NextStateName, NextStateData} | 70 %% {next_state, NextStateName, NextStateData, Timeout} | 71 %% {stop, Reason, NewStateData} 72 %% @private 73 %%------------------------------------------------------------------------- 74 75 %% 建立客戶端以後 set_socket 函數發送消息以後在這裏被處理了 76 %% 大體邏輯是:收到通知,客戶端鏈接socket到手,能夠設置套接字選項並開始接收數據 77 'WAIT_FOR_SOCKET'({socket_ready, Socket}, State) when is_port(Socket) -> 78 % Now we own the socket 79 inet:setopts(Socket, [{active, once}, {packet, 2}, binary]), 80 {ok, {IP, _Port}} = inet:peername(Socket), 81 82 %% 肯定了socket以後,狀態機的下一個狀態就是等着接收數據了 83 {next_state, 'WAIT_FOR_DATA', State#state{socket=Socket, addr=IP}, ?TIMEOUT}; 84 'WAIT_FOR_SOCKET'(Other, State) -> 85 error_logger:error_msg("State: 'WAIT_FOR_SOCKET'. Unexpected message: ~p\n", [Other]), 86 %% Allow to receive async messages 87 {next_state, 'WAIT_FOR_SOCKET', State}. 88 89 %% 顯示來自客戶端的事件 90 'WAIT_FOR_DATA'({data, Data}, #state{socket=S} = State) -> 91 ok = gen_tcp:send(S, Data), 92 {next_state, 'WAIT_FOR_DATA', State, ?TIMEOUT}; 93 94 'WAIT_FOR_DATA'(timeout, State) -> 95 error_logger:error_msg("~p Client connection timeout - closing.\n", [self()]), 96 {stop, normal, State}; 97 98 'WAIT_FOR_DATA'(Data, State) -> 99 io:format("~p Ignoring data: ~p\n", [self(), Data]), 100 {next_state, 'WAIT_FOR_DATA', State, ?TIMEOUT}. 101 102 %%------------------------------------------------------------------------- 103 %% Func: handle_event/3 104 %% Returns: {next_state, NextStateName, NextStateData} | 105 %% {next_state, NextStateName, NextStateData, Timeout} | 106 %% {stop, Reason, NewStateData} 107 %% @private 108 %%------------------------------------------------------------------------- 109 handle_event(Event, StateName, StateData) -> 110 {stop, {StateName, undefined_event, Event}, StateData}. 111 112 %%------------------------------------------------------------------------- 113 %% Func: handle_sync_event/4 114 %% Returns: {next_state, NextStateName, NextStateData} | 115 %% {next_state, NextStateName, NextStateData, Timeout} | 116 %% {reply, Reply, NextStateName, NextStateData} | 117 %% {reply, Reply, NextStateName, NextStateData, Timeout} | 118 %% {stop, Reason, NewStateData} | 119 %% {stop, Reason, Reply, NewStateData} 120 %% @private 121 %%------------------------------------------------------------------------- 122 handle_sync_event(Event, _From, StateName, StateData) -> 123 {stop, {StateName, undefined_event, Event}, StateData}. 124 125 %%------------------------------------------------------------------------- 126 %% Func: handle_info/3 127 %% Returns: {next_state, NextStateName, NextStateData} | 128 %% {next_state, NextStateName, NextStateData, Timeout} | 129 %% {stop, Reason, NewStateData} 130 %% @private 131 %%------------------------------------------------------------------------- 132 handle_info({tcp, Socket, Bin}, StateName, #state{socket=Socket} = StateData) -> 133 % Flow control: enable forwarding of next TCP message 134 inet:setopts(Socket, [{active, once}]), 135 ?MODULE:StateName({data, Bin}, StateData); 136 137 handle_info({tcp_closed, Socket}, _StateName, 138 #state{socket=Socket, addr=Addr} = StateData) -> 139 error_logger:info_msg("~p Client ~p disconnected.\n", [self(), Addr]), 140 {stop, normal, StateData}; 141 142 handle_info(_Info, StateName, StateData) -> 143 {noreply, StateName, StateData}. 144 145 %%------------------------------------------------------------------------- 146 %% Func: terminate/3 147 %% Purpose: Shutdown the fsm 148 %% Returns: any 149 %% @private 150 %%------------------------------------------------------------------------- 151 terminate(_Reason, _StateName, #state{socket=Socket}) -> 152 (catch gen_tcp:close(Socket)), 153 ok. 154 155 %%------------------------------------------------------------------------- 156 %% Func: code_change/4 157 %% Purpose: Convert process state when code is changed 158 %% Returns: {ok, NewState, NewStateData} 159 %% @private 160 %%------------------------------------------------------------------------- 161 code_change(_OldVsn, StateName, StateData, _Extra) -> 162 {ok, StateName, StateData}.
最後是app文件了:異步
1 %% tcp_server.app 文件 2 3 {application, tcp_server, 4 [ 5 {description, "Demo TCP server"}, 6 {vsn, "1.0"}, 7 {id, "tcp_server"}, 8 {modules, [tcp_listener, tcp_echo_fsm]}, 9 {registered, [tcp_server_sup, tcp_listener]}, 10 {applications, [kernel, stdlib]}, 11 %% 12 %% mod: 指定應用啓動初始化的模塊 13 %% 14 {mod, {tcp_server_app, []}}, 15 {env, []} 16 ] 17 }.
以上基本上都是我的查找資料過程的筆記,有理解錯誤的地方請評論指出,謝謝!socket