heartbeat一般用來檢測通訊的對端是否存活(未正常關閉socket鏈接而異常crash)。其基本原理是檢測對應的socket鏈接上數據的收發是否正常,若是一段時間內沒有收發數據,則向對端發送一個心跳檢測包,若是一段時間內沒有迴應則認爲心跳超時,即認爲對端可能異常crash了。html
rabbitmq也不例外,heatbeat在客戶端和服務端之間用於檢測對端是否正常,即客戶端與服務端之間的tcp連接是否正常。web
1. heartbeat檢測時間間隔的設置app
1). 服務端的設置socket
heartbeat檢測時間間隔可在配置文件rabbitmq.config中增長配置項{heartbeat,Timeout}進行配置,其中Timeout指定時間間隔,單位爲秒。tcp
例如:code
若是沒有進行配置,默認的時間間隔爲600秒(最新版本3.2.2修改成580秒)htm
在rabbit.app中有heartbeat的配置項。rabbitmq
2). 客戶端的設置進程
根據AMQP協議,rabbitmq會經過connection.tune信令將heartbeat檢測時間間隔告知客戶端,客戶端能夠根據須要從新設置該值,並經過Connection.tune-ok信令將時間間隔再告訴給rabbitmq,rabbitmq會以客戶端的時間做爲該tcp鏈接上heartbeat檢測的間隔時間。get
這裏要注意的是:若是時間間隔配置爲0,則表示不啓用heartbeat檢測。
若是啓用了rabbitmq_management的話,從web控制檯能夠看到客戶端的檢測的時間間隔。
2. heartbeat的實現
rabbitmq在收到來自客戶端的connection.tune-ok信令後,啓用心跳檢測,rabbitmq會爲每一個tcp鏈接建立兩個進程用於心跳檢測,一個進程定時檢測tcp鏈接上是否有數據發送(這裏的發送是指rabbitmq發送數據給客戶端),若是一段時間內沒有數據發送給客戶端,則發送一個心跳包給客戶端,而後循環進行下一次檢測;另外一個進程定時檢測tcp鏈接上是否有數據的接收,若是一段時間內沒有收到任何數據,則斷定爲心跳超時,最終會關閉tcp鏈接。另外,rabbitmq的流量控制機制可能會暫停heartbeat檢測,這裏不展開描述。
涉及的源碼:
start(SupPid, Sock, SendTimeoutSec, SendFun, ReceiveTimeoutSec, ReceiveFun) -> %%數據發送檢測進程 {ok, Sender} = start_heartbeater(SendTimeoutSec, SupPid, Sock, SendFun, heartbeat_sender, start_heartbeat_sender), %%數據接收檢測進程 {ok, Receiver} = start_heartbeater(ReceiveTimeoutSec, SupPid, Sock, ReceiveFun, heartbeat_receiver, start_heartbeat_receiver), {Sender, Receiver}. start_heartbeat_sender(Sock, TimeoutSec, SendFun) -> %% the 'div 2' is there so that we don't end up waiting for %% nearly 2 * TimeoutSec before sending a heartbeat in the %% boundary case heartbeater({Sock, TimeoutSec * 1000 div 2, send_oct, 0, fun () -> SendFun(), continue end}). start_heartbeat_receiver(Sock, TimeoutSec, ReceiveFun) -> %% we check for incoming data every interval, and time out after %% two checks with no change. As a result we will time out %% between 2 and 3 intervals after the last data has been %% received heartbeater({Sock, TimeoutSec * 1000, recv_oct, 1, fun () -> ReceiveFun(), stop end}). heartbeater({Sock, TimeoutMillisec, StatName, Threshold, Handler} = Params, Deb, {StatVal, SameCount} = State) -> Recurse = fun (State1) -> heartbeater(Params, Deb, State1) end, receive ... %% 定時檢測 after TimeoutMillisec -> case rabbit_net:getstat(Sock, [StatName]) of {ok, [{StatName, NewStatVal}]} -> %% 收發數據有變化 if NewStatVal =/= StatVal -> %%從新開始檢測 Recurse({NewStatVal, 0}); %%未達到指定次數, 發送爲0, 接收爲1 SameCount < Threshold -> %%計數加1, 再次檢測 Recurse({NewStatVal, SameCount + 1}); %%heartbeat超時 true -> %%對於發送檢測超時, 向客戶端發送heartbeat包 %%對於接收檢測超時, 向父進程發送超時通知 %%由父進程觸發tcp關閉等操做 case Handler() of %%接收檢測超時 stop -> ok; %%發送檢測超時 continue -> Recurse({NewStatVal, 0}) end; ...
收發檢測的時候利用了inet模塊的getstat,查看socket的統計信息
recv_oct: 查看socket上接收的字節數
send_oct: 查看socket上發送的字節數
inet詳細見這裏: http://www.erlang.org/doc/man/inet.html