rabbitmq對network partition的處理

rabbitmq沒有很好的分區容錯性,所以,若是須要在廣域網裏使用rabbitmq集羣,建議使用federation或者shovel進行替代。那麼即便rabbitmq集羣運行在局域網內也不能徹底避免網絡分區現象(network partition),例如,當路由器或者交換機出現問題,或者網口down掉時,均可能發生網絡分區。 java

那麼,出現網絡分區對rabbitmq集羣有什麼影響呢?當發生網絡分區時,不一樣分區裏的節點都認爲對方down掉,對exchange,queue,binding的操做都僅針對本分區有效;存儲在mnesia的元數據(exchange相關屬性,queue相關屬性等)不會在集羣間進行數據同步;另外,對於鏡像隊列,在各自的分區裏都會存在一個master進程處理隊列的相關操做。更重要的是,當網絡分區恢復後,這些現象依舊是存在的! node

從3.1.0版本開始,rabbitmq增長了對網絡分區的處理。能夠在rabbitmq.conf中進行配置。 網絡

[
 {rabbit,
  [{tcp_listeners,[5672]},
   {cluster_partition_handling, ignore}]
 }
].

rabbitmq一共有三種處理方式:ignore,autoheal,pause_minority。默認的處理方式是ignore,即什麼也不作。 app

autoheal的處理方式:簡單來說就是當網絡分區恢復後,rabbitmq各分區彼此進行協商,分區中客戶端鏈接數最多的爲勝者,其他的所有會進行重啓,這樣也就恢復到同步的狀態了。 tcp

內部大體原理: ide

(1)rabbitmq啓動後會建立並註冊名爲rabbit_node_monitor的進程,該進程啓動時會訂閱節點的啓停狀態,訂閱mnesia的系統事件。 spa

init([]) ->
    process_flag(trap_exit, true),
    net_kernel:monitor_nodes(true),
    {ok, _} = mnesia:subscribe(system),
    {ok, #state{monitors    = pmon:new(),
                subscribers = pmon:new(),
                partitions  = [],
                autoheal    = rabbit_autoheal:init()}}.
(2)該進程收到{inconsistent_database,running_partitioned_network, Node}消息後,從集羣中挑選一個節點向其rabbit_node_monitor進程發送 {autoheal_msg,{request_start,node()}}消息。
rabbit_node_monitor.erl

handle_info({mnesia_system_event,
             {inconsistent_database, running_partitioned_network, Node}},
            State = #state{partitions = Partitions,
                           monitors   = Monitors,
                           autoheal   = AState}) ->
    State1 = case pmon:is_monitored({rabbit, Node}, Monitors) of
                 true  -> State;
                 false -> State#state{
                            monitors = pmon:monitor({rabbit, Node}, Monitors)}
             end,
    ok = handle_live_rabbit(Node),
    Partitions1 = ordsets:to_list(
                    ordsets:add_element(Node, ordsets:from_list(Partitions))),
    {noreply, State1#state{partitions = Partitions1,
                           autoheal   = rabbit_autoheal:maybe_start(AState)}};

rabbit_autoheal.erl

maybe_start(not_healing) ->
    case enabled() of
        true  -> [Leader | _] = lists:usort(rabbit_mnesia:cluster_nodes(all)),
                 send(Leader, {request_start, node()}),
                 rabbit_log:info("Autoheal request sent to ~p~n", [Leader]),
                 not_healing;
        false -> not_healing
    end;
(3)rabbit_node_monitor進程收到{autoheal_msg,{request_start,Node}}消息後,分析獲得客戶鏈接數最多的分區,並從該分區中取第一個節點通知它成爲勝利者,同時告知哪些節點是須要重啓的,另外通知其餘失敗者分區的節點進行重啓。
rabbit_node_monitor.erl

handle_info({autoheal_msg, Msg}, State = #state{autoheal   = AState,
                                                partitions = Partitions}) ->
    AState1 = rabbit_autoheal:handle_msg(Msg, AState, Partitions),
    {noreply, State#state{autoheal = AState1}};


rabbit_autoheal.erl

handle_msg({request_start, Node},
           not_healing, Partitions) ->
    rabbit_log:info("Autoheal request received from ~p~n", [Node]),
    case rabbit_node_monitor:all_rabbit_nodes_up() of
        false -> not_healing;
        true  -> AllPartitions = all_partitions(Partitions),
                 {Winner, Losers} = make_decision(AllPartitions),
                 rabbit_log:info("Autoheal decision~n"
                                 "  * Partitions: ~p~n"
                                 "  * Winner:     ~p~n"
                                 "  * Losers:     ~p~n",
                                 [AllPartitions, Winner, Losers]),
                 send(Winner, {become_winner, Losers}),
                 [send(L, {winner_is, Winner}) || L <- Losers],
                 not_healing
    end;
(4)節點收到成爲勝利者的消息後,等待全部失敗者分區節點中止rabbit應用以及rabbit依賴的應用,當全部失敗者分區的節點都中止rabbit應用後,再通知它們啓動rabbit應用。
rabbit_autoheal.erl

handle_msg({become_winner, Losers},
           not_healing, _Partitions) ->
    rabbit_log:info("Autoheal: I am the winner, waiting for ~p to stop~n",
                    [Losers]),
    {winner_waiting, Losers, Losers};

handle_msg({winner_is, Winner},
           not_healing, _Partitions) ->
    rabbit_log:warning(
      "Autoheal: we were selected to restart; winner is ~p~n", [Winner]),
    rabbit_node_monitor:run_outside_applications(
      fun () ->
              MRef = erlang:monitor(process, {?SERVER, Winner}),
              rabbit:stop(),
              send(Winner, {node_stopped, node()}),
              receive
                  {'DOWN', MRef, process, {?SERVER, Winner}, _Reason} -> ok;
                  autoheal_safe_to_start                              -> ok
              end,
              erlang:demonitor(MRef, [flush]),
              rabbit:start()
      end),
    restarting;

handle_msg({node_stopped, Node},
           {winner_waiting, [Node], Notify}, _Partitions) ->
    rabbit_log:info("Autoheal: final node has stopped, starting...~n",[]),
    [{rabbit_outside_app_process, N} ! autoheal_safe_to_start || N <- Notify],
    not_healing;

handle_msg({node_stopped, Node},
           {winner_waiting, WaitFor, Notify}, _Partitions) ->
    {winner_waiting, WaitFor -- [Node], Notify};
到這裏,rabbitmq完成了網絡分區的處理。注意:這種處理方式可能會出現數據丟失的現象。在CAP中,優先保證了AP。

pause_minority的處理方式:rabbitmq節點感知集羣中其餘節點down掉時,會判斷本身在集羣中處於多數派仍是少數派,也就是判斷與本身造成集羣的節點個數在整個集羣中的比例是否超過一半。若是是多數派,則正常工做,若是是少數派,則會中止rabbit應用並不斷檢測直到本身成爲多數派的一員後再次啓動rabbit應用。注意:這種處理方式集羣一般由奇數個節點組成。在CAP中,優先保證了CP。 rest

相關文章
相關標籤/搜索