《Erlang程序設計》第十四章 套接字編程

第十四章 套接字編程

第十四章 套接字編程

14.1 使用TCP

14.1.1 從服務器上獲取數據

-module(socket_examples).
-export([nano_get_url/0]).
-import(lists, [reverse/1]).

nano_get_url() ->
    nano_get_url("www.google.com").
nano_get_url(Host) ->
    %% 連接到主機的80端口, 以二進制模式打開套接字, 原始方式發送TCP數據
    {ok, Socket} = gen_tcp:connect(Host, 80, [binary, {packet, 0}]),
    %% 發送GET消息到套接字, 使用reverse_data接收數據
    ok = gen_tcp:send(Socket, "GET / HTTP/1.0\r\n\r\n"),
    receive_data(Socket, []).

receive_data(Socket, SoFar) ->
    %% 迴應消息一幀一幀的返回, 所以這裏使用receive方式接收
    receive
        {tcp, Socket, Bin} ->
            %% 將接收到的數據添加到列表SoFar中
            receive_data(Socket, [Bin|SoFar]);
        {tcp_closed, Socket} ->
            %% 由於每接收一幀數據都是放在SoFar的頭部, 所以接收完成後須要翻轉列表獲得正常順序的數據
            list_to_binary(reverse(SoFar))
    end.

  運行結果:
html

1> socket_examples:nano_get_url().
<<"HTTP/1.0 200 OK\r\nDate: Mon, 04 Nov 2013 02:32:00 GMT\r\nExpires: -1\r\nCache-Control: private, max-age=0\r\nContent-Type: "...>>

14.1.2 一個簡單的TCP服務器

  服務端:
shell

start_nano_server() ->
    %% 監聽來自端口2345的連接, 設置包規則爲帶有4字節長的包頭
    {ok, Listen} = gen_tcp:listen(2345, [binary, {packet, 4}, {reuseaddr, true}, {active, true}]),
    %% 只處理正常打開的套接字
    {ok, Socket} = gen_tcp:accept(Listen),
    %% 只處理一個連接
    gen_tcp:close(Listen),
    %% 連接處理
    loop(Socket).

loop(Socket) ->
    receive
        {tcp, Socket, Bin} ->
            %% 輸出二進制數據
            io:format("Server received binary = ~p~n", [Bin]),
            %% 格式轉換
            Str = binary_to_term(Bin),
            io:format("Server (unpacked) ~p~n", [Str]),
            %% 對字符串求值
            Reply = string2value(Str),
            io:format("Server replying = ~p~n", [Reply]),
            %% 對結果編碼後發給套接字
            gen_tcp:send(Socket, term_to_binary(Reply)),
            loop(Socket);
        {tcp_closed, Socket} ->
            io:format("Server socket closed~n")
    end.

  客戶端:
編程

nano_client_eval(Str) ->
    %% 連接指定主機的2345端口, 發送數據時包頭設置爲4字節長
    {ok, Socket} = gen_tcp:connect("localhost", 2345, [binary, {packet, 4}]),
    %% 調用term_to_binary進行數據轉換後向服務端發送數據
    ok = gen_tcp:send(Socket, term_to_binary(Str)),
    receive
        %% 接收返回並輸出
        {tcp, Socket, Bin} ->
            io:format("Client received binary = ~p~n", [Bin]),
            Val = binary_to_term(Bin),
            io:format("Client result = ~p~n", [Val]),
            gen_tcp:close(Socket)
    end.

  運行結果:
服務器

# 首先啓動服務端
1> socket_examples:start_nano_server().

# 而後打開另外一個erl窗口啓動客戶端
# 隨後客戶端將服務端的計算結果接收後打印輸出
1> socket_examples:nano_client_eval("list_to_tuple([2+3*4, 10+20])").
Client received binary = <<131,104,2,97,14,97,30>>
Client result = {14,30}
ok

# 切換到服務端的erl窗口能夠看到以下輸出
Server received binary = <<131,107,0,29,108,105,115,116,95,116,111,95,116,117,
                           112,108,101,40,91,50,43,51,42,52,44,32,49,48,43,50,
                           48,93,41>>
Server (unpacked) "list_to_tuple([2+3*4, 10+20])"
Server replying = {14,30}
Server socket closed
ok

  而這裏服務端對客戶端提交的字符串表達式進行計算的實如今string2value函數中
網絡

string2value(Str) ->
    %% 按字符分解字符串
    {ok, Tokens, _} = erl_scan:string(Str ++ "."),
    %% 生成解析表達式
    {ok, Exprs} = erl_parse:parse_exprs(Tokens),
    Bindings = erl_eval:new_bindings(),
    %% 運行表達式
    {value, Value, _} = erl_eval:exprs(Exprs, Bindings),
    Value.

14.1.3 改進服務器

  • 順序型服務器
    一次只接收一個鏈接
    %% 接收鏈接後處理請求而後再次調用seq_loop等待下一個鏈接 
    start_seq_server() ->
        {ok, Listen} = gen_tcp:listen(2345, [binary, {packet, 4}, {reuseaddr, true}, {active, true}]),
        seq_loop(Listen).
    seq_loop(Listen) ->
        {ok, Socket} = gen_tcp:accept(Listen),
        loop(Socket),
        seq_loop(Listen).
    
  • 並行服務器
    一次能夠接收多個並行鏈接
    %% 接收鏈接後啓動新的進程來處理套接字 
    start_parallel_server() ->
        {ok, Listen} = gen_tcp:listen(2345, [binary, {packet, 4}, {reuseaddr, true}, {active, true}]),
        spawn(fun() ->par_connect(Listen) end).
    par_connect(Listen) ->
        {ok, Socket} = gen_tcp:accept(Listen),
        spawn(fun() ->par_connect(Listen) end),
        loop(Socket).
    

14.2 控制邏輯

14.2.1 主動型消息接收(非阻塞)

創建主動套接字後, 一個獨立的客戶機可能向服務端無限制的發送成千上萬條消息, 若是超過了服務器的處理速度, 則可能致使系統崩潰。由於其不會阻塞客戶端, 所以被稱爲異步服務器, 實現形式以下:
異步

%% 設置active爲true即爲異步方式 
{ok, Listen} = gen_tcp:listen(Port, [..., {active, true}, ...]),
{ok, Socket} = gen_tcp:accept(Listen),
loop(Socket).

loop(Socket) ->
    receive
        {tcp, Socket, Data}  ->
            %% 數據處理
        {tcp_closed, Socket} ->
            ...
    end.

14.2.2 被動型消息接收(阻塞)

創建被動套接字後, 只有服務端調用gen_tcp:recv(Socket, N)時纔會接收來自套接字的數據, 且只接收N字節的數據, 所以不會由於客戶端的大量請求而致使崩潰, 實現形式以下:
socket

%% 設置active爲false即爲阻塞方式 
{ok, Listen} = gen_tcp:listen(Port, [..., {active, false}, ...]),
{ok, Socket} = gen_tcp:accept(Listen),
loop(Socket).

loop(Socket) ->
    case gen_tcp:recv(Socket, N) of
        {ok, B}  ->
            %% 數據處理
            loop(Socket);
        {error, closed} ->
            ...
    end.

14.2.3 混合型模式(半阻塞)

半阻塞模式的套接字是主動的但僅針對一個消息, 須要顯式的調用inet:setopts從新激活以便接收下一個消息, 在此以前系統將處於阻塞狀態, 實現形式以下:
tcp

%% 設置active爲once即爲異步方式 
{ok, Listen} = gen_tcp:listen(Port, [..., {active, once}, ...]),
{ok, Socket} = gen_tcp:accept(Listen),
loop(Socket).

loop(Socket) ->
    receive
        {tcp, Socket, Data}  ->
            %% 數據處理
            inet:setopts(Socket, [{active, once}]),
            loop(Socket);
        {tcp_closed, Socket} ->
            ...
    end.

14.3 鏈接從何而來

  使用函數inet:peername(Socket)能夠獲取客戶端信息。
函數

inet:peername(Socket) -> {ok, {IP_Address, Port} | {error, Why}}

14.4 套接字的出錯處理

  測試代碼
oop

%% 服務端接收數據後調用atom_to_list處理數據
error_test_server() ->
    {ok, Listen} = gen_tcp:listen(4321, [binary, {packet, 2}]),
    {ok, Socket} = gen_tcp:accept(Listen),
    error_test_server_loop(Socket).
error_test_server_loop(Socket) ->
    receive
        {tcp, Socket, Data} ->
            io:format("received:~p~n", [Data]),
            atom_to_list(Data),
            error_test_server_loop(Socket)
    end.

%% 客戶端鏈接後發生二進制數據使atom_to_list發生異常
error_test() ->
    spawn(fun() ->error_test_server() end),
    sleep(2000),
    {ok, Socket} = gen_tcp:connect("localhost", 4321, [binary, {packet, 2}]),
    io:format("connected to:~p~n", [Socket]),
    gen_tcp:send(Socket, <<"123">>),
    receive
        Any ->
            io:format("Any=~p~n", [Any])
    end.

  運行結果:

# 服務端異常結果
1> socket_examples:error_test_server().
received:<<"123">>
   exception error: bad argument
     in function  atom_to_list/1
        called as atom_to_list(<<"123">>)
     in call from socket_examples:error_test_server_loop/1 (socket_examples.erl, line 120) 

# 客戶端異常結果
1> socket_examples:error_test().

=ERROR REPORT==== 5-Nov-2013::10:19:27 ===
Error in process <0.50.0> with exit value: {{badmatch,{error,eaddrinuse}},[{socket_examples,error_test_server,0,[{file,"socket_examples.erl"},{line,113}]}]}

connected to:#Port<0.2291>
Any={tcp_closed,#Port<0.2291>}
ok

14.5 UDP

14.5.1 最簡單的UDP服務器和客戶機

  UDP服務器的形式

server(Port) ->
    {ok, Socket} = gen_udp:open(Port, [binary]),
    loop(Socket).

loop(Socket) ->
    receive
        {udp, Socket, Host, Port, Bin} ->
            BinReply = ... ,
            gen_udp:send(Socket, Host, Port, BinReply),
            loop(Socket)
    end.

  UDP客戶機的形式

client(Request) ->
    {ok, Socket} = gen_udp:open(0, [binary]),
    ok = gen_udp:send(Socket, "localhost", 4000, Request),
    Value = receive
                {udp, Socket, _, _, Bin} ->{ok, Bin}
            %% 由於UDP協議傳輸的不可靠性, 有可能沒有獲得服務端的迴應, 所以這裏要設置超時時間 
            after 2000 ->error
            end,
    gen_udp:close(Socket),
    Value.

14.5.2 一個計算階乘的UDP服務器

  服務端實現:

start_server() ->
    spawn(fun() ->server(40000) end).

server(Port) ->
    {ok, Socket} = gen_udp:open(Port, [binary]),
    io:format("server opened socket:~p~n", [Socket]),
    loop(Socket).

loop(Socket) ->
    receive
        {udp, Socket, Host, Port, Bin} = Msg ->
            io:format("server received:~p~n", [Msg]),
            N = binary_to_term(Bin),
            Fac = fac(N),
            gen_udp:send(Socket, Host, Port, term_to_binary(Fac)),
            loop(Socket)
    end.

fac(0) ->1;
fac(N) ->N * fac(N-1).

  客戶端實現:

client(N) ->
    {ok, Socket} = gen_udp:open(0, [binary]),
    io:format("client opened socket=~p~n", [Socket]),
    ok = gen_udp:send(Socket, "localhost", 40000, term_to_binary(N)),
    Value = receive
                {udp, Socket, _, _, Bin} = Msg ->
                    io:format("client received:~p~n", [Msg]),
                    binary_to_term(Bin)
            after 2000 ->0
            end,
    gen_udp:close(Socket),
    Value.

  運行結果:

1> udp_test:start_server().
server opened socket:#Port<0.2308>
<0.68.0>
2> udp_test:client(40).
client opened socket=#Port<0.2309>
server received:{udp,#Port<0.2308>,{127,0,0,1},54449,<<131,97,40>>}
client received:{udp,#Port<0.2309>,
                     {127,0,0,1},
                     40000,
                     <<131,110,20,0,0,0,0,0,64,37,5,255,100,222,15,8,126,242,
                       199,132,27,232,234,142>>}
815915283247897734345611269596115894272000000000

14.5.3 關於UDP協議的其餘注意事項

由於UDP數據報可能被傳輸兩次, 所以爲了不這個問題, 可使用make_ref函數爲請求建立惟一標示。
客戶端實現:

client(Request) ->
    {ok, Socket} = gen_udp:open(0, [binary]),
    Ref  = make_ref(),
    B1 = term_to_binary(Ref, Request),
    ok = gen_udp:send(Socket, "localhost", 40000, B1),
    wait_for_ref(Socket, Ref).

wait_for_ref(Socket, Ref) ->
    receive
        {udp, Socket, _, _, Bin} ->
            case binary_to_term(Bin) of
                %% 在client(Request)函數中已經爲請求添加了惟一標示, 所以這裏要從 {Ref, Val} 這種格式的數據中提取出真正的請求 
                {Ref, Val} ->Val;
                {_SomeOtherRef, _} ->
                    %% 對於其餘數據則不用處理
                    wait_for_ref(Socket, Ref)
            end;
    after 1000 ->
        ...
    end. 

14.6 向多臺機器廣播消息

-module(broadcast).
-compile(export_all).

send(IoList) ->
    %% 獲取網卡en0的IP信息
    case inet:ifget("en0", [broadaddr]) of
        {ok, [{broadaddr, Ip}]} ->
            %% 打開5010端口
            {ok, S} = gen_udp:open(5010, [{broadcast, true}]),
            %% 向本地網絡的6000端口廣播數據
            gen_udp:send(S, Ip, 6000, IoList),
            gen_udp:close(S);
        _ ->
            io:format("Bad interface name, or broadcastng not supported\n")
    end.

listen() ->
    %% 監聽6000端口的廣播
    {ok, _} = gen_udp:open(6000),
    loop().

loop() ->
    receive
        Any ->
            %% 打印任何收到的信息
            io:format("received:~p~n", [Any]),
            loop()
    end.

  在單臺機器上測試:

# 一個shell打開監聽 
1> broadcast:listen().

# 一個shell發送廣播
1> broadcast:send(["test"]). 

# 能夠看到監聽端的輸出
1> broadcast:listen().      
received:{udp,#Port<0.2337>,{10,0,1,224},5010,"test"} 

Date: 2014-01-06 14:17:22 CST

Author: matrix

Org version 7.8.11 with Emacs version 24

Validate XHTML 1.0
相關文章
相關標籤/搜索