Rabbitmq 官方給的NET consumer示例代碼以下,但使用過程,會遇到connection斷開的問題,一旦斷開,這個代碼就會報錯,就會致使消費者或者生產者掛掉。
下圖是生產者發送消息,我手動中止了rabbitmq,而後又從新啓動了rabbitmq,大概等啓動成功之後,爲了防止服務沒有徹底啓動,我又等待了10秒鐘
服務徹底啓動成功之後,我嘗試從新發送一些消息,報錯,以下:
html
************** 異常文本 **************
RabbitMQ.Client.Exceptions.AlreadyClosedException: Already closed: The AMQP operation was interrupted: AMQP close-reason, initiated by Peer, code=320, text="CONNECTION_FORCED - broker forced connection closure with reason 'shutdown'", classId=0, methodId=0, cause=
在 RabbitMQ.Client.Impl.SessionBase.Transmit(Command cmd)
在 RabbitMQ.Client.Impl.ModelBase.ModelSend(MethodBase method, ContentHeaderBase header, Byte[] body)
在 RabbitMQ.Client.Impl.ModelBase.BasicPublish(String exchange, String routingKey, Boolean mandatory, IBasicProperties basicProperties, Byte[] body)
在 RabbitMQ.Client.Impl.ModelBase.BasicPublish(String exchange, String routingKey, IBasicProperties basicProperties, Byte[] body)
在 rabbitMQ_Publish.Form1.button1_Click(Object sender, EventArgs e) 位置 C:\project\my\RabbitMQ-demo\rabbitMQ-Publish\Form1.cs:行號 37
在 System.Windows.Forms.Control.OnClick(EventArgs e)
在 System.Windows.Forms.Button.OnClick(EventArgs e)
在 System.Windows.Forms.Button.PerformClick()
在 System.Windows.Forms.Form.ProcessDialogKey(Keys keyData)
在 System.Windows.Forms.TextBoxBase.ProcessDialogKey(Keys keyData)
在 System.Windows.Forms.Control.PreProcessMessage(Message& msg)
在 System.Windows.Forms.Control.PreProcessControlMessageInternal(Control target, Message& msg)
在 System.Windows.Forms.Application.ThreadContext.PreTranslateMessage(MSG& msg)
那麼如何會異常恢復呢?或者說斷線重連呢?
RabbitMQ NET Client的源碼,研究發現一種自動的錯誤恢復機制 AutomaticRecoveryEnabled = true 使用方式以下
ide
- var factory = new ConnectionFactory() { HostName = "localhost", AutomaticRecoveryEnabled = true };
具體的恢復機制以下
1.在AutoRecoveringConnection初始化時,在連接關閉事件委託上增長斷開處理
this
- public void init()
- {
- m_delegate = new Connection(m_factory, false, m_factory.CreateFrameHandler());
- AutorecoveringConnection self = this;
- EventHandler<ShutdownEventArgs> recoveryListener = (_, args) =>
- {
- lock (recoveryLockTarget)
- {
- if (ShouldTriggerConnectionRecovery(args))
- {
- try
- {
- self.BeginAutomaticRecovery();
- }
- catch (Exception e)
- {
- // TODO: logging
- Console.WriteLine("BeginAutomaticRecovery() failed: {0}", e);
- }
- }
- }
- };
- lock (m_eventLock)
- {
- ConnectionShutdown += recoveryListener;
- if (!m_recordedShutdownEventHandlers.Contains(recoveryListener))
- {
- m_recordedShutdownEventHandlers.Add(recoveryListener);
- }
- }
- }
觀察調用的方式BeginAutomaticRecovery,能夠看到這個方法內部調用了PerformAutomaticRecovery方法。咱們直接看這個方法的內容,其中第一個調用的是方法RecoverConnectionDelegate
spa
- protected void PerformAutomaticRecovery()
- {
- lock (recoveryLockTarget)
- {
- RecoverConnectionDelegate();
- RecoverConnectionShutdownHandlers();
- RecoverConnectionBlockedHandlers();
- RecoverConnectionUnblockedHandlers();
- RecoverModels();
- if (m_factory.TopologyRecoveryEnabled)
- {
- RecoverEntities();
- RecoverConsumers();
- }
- RunRecoveryEventHandlers();
- }
- }
這個方法中調用的是
code
- protected void RecoverConnectionDelegate()
- {
- bool recovering = true;
- while (recovering)
- {
- try
- {
- m_delegate = new Connection(m_factory, false, m_factory.CreateFrameHandler());
- recovering = false;
- }
- catch (Exception)
- {
- // TODO: exponential back-off
- Thread.Sleep(m_factory.NetworkRecoveryInterval);
- // TODO: provide a way to handle these exceptions
- }
- }
- }
能夠看出,它是執行了死循環,直到鏈接從新打開,固然,若是遇到異常,它會調用Thread.Sleep來等待一下,而後再次執行鏈接恢復。orm
轉載自: https://www.itsvse.com/thread-4636-1-1.htmlhtm