<27>1 2018-xx-xxT06:59:03.038Z 660ece0ebaad admin/admin 14 - - Socket Error: 104 <31>1 2018-xx-xxT06:59:03.038Z 660ece0ebaad admin/admin 14 - - Removing timeout for next heartbeat interval <28>1 2018-xx-xxT06:59:03.039Z 660ece0ebaad admin/admin 14 - - Socket closed when connection was open <31>1 2018-xx-xxT06:59:03.039Z 660ece0ebaad admin/admin 14 - - Added: {'callback': <bound method SelectConnection._on_connection_start of <pika.adapters.select_connection.SelectConnection object at 0x7f74752525d0>>, 'only': None, 'one_shot': True, 'arguments': None, 'calls': 1} <28>1 2018-xx-xxT06:59:03.039Z 660ece0ebaad admin/admin 14 - - Disconnected from RabbitMQ at xx_host:5672 (0): Not specified <31>1 2018-xx-xxT06:59:03.039Z 660ece0ebaad admin/admin 14 - - Processing 0:_on_connection_closed <31>1 2018-xx-xxT06:59:03.040Z 660ece0ebaad admin/admin 14 - - Calling <bound method _CallbackResult.set_value_once of <pika.adapters.blocking_connection._CallbackResult object at 0x7f74752513f8>> for "0:_on_connection_closed"
有日誌就很好辦, 首先看日誌在哪裏打的. 從三個地方入手.linux
root@660ece0ebaad:/# uwsgi --version
從github上co下來, 沒有.git
>>> import sys >>> sys.path ['', '/usr/lib/python2.7', '/usr/lib/python2.7/plat-x86_64-linux-gnu', '/usr/lib/python2.7/lib-tk', '/usr/lib/python2.7/lib-old', '/usr/lib/python2.7/lib-dynload', '/usr/local/lib/python2.7/dist-packages', '/usr/lib/python2.7/dist-packages', '/usr/lib/python2.7/dist-packages/PILcompat', '/usr/lib/python2.7/dist-packages/gtk-2.0']
在這些目錄下grep, 在pika中找到docker
root@660ece0ebaad:/usr/local/lib/python2.7# grep "Socket Error" -R . Binary file ./dist-packages/pika/adapters/base_connection.pyc matches ./dist-packages/pika/adapters/base_connection.py: LOGGER.error("Fatal Socket Error: %r", error_value) ./dist-packages/pika/adapters/base_connection.py: LOGGER.error("Socket Error: %s", error_code)
>>> import pika >>> pika.__version__ '0.10.0'
經過代碼能夠看到, Socket Error是errno的錯誤碼, 肯定錯誤碼含義是對端發送了RST.python2.7
>>> import errno >>> errno.errorcode[104] 'ECONNRESET'
懷疑rabbitmq server地址錯誤, 一個未listen的端口是會返回RST的, 驗證後發現不是.
接着懷疑連接超時斷開未通知客戶端之類. 看rabbitmq server日誌, 發現大量:socket
=ERROR REPORT==== 7-Dec-2018::20:43:18 === closing AMQP connection <0.9753.18> ( -> missed heartbeats from client, timeout: 60s -- =ERROR REPORT==== 7-Dec-2018::20:43:18 === closing AMQP connection <0.9768.18> ( -> missed heartbeats from client, timeout: 60s
發現rabbitmq server和 admin docker的連接已經所有斷開函數
root@xxxxxxx:/home/dingxinglong# netstat -nap | grep 5672 | grep ""
那麼, 爲何rabbitmq server會踢掉 pika創建的連接呢? 看pika代碼註釋:
:param int heartbeat_interval: How often to send heartbeats. Min between this value and server's proposal will be used. Use 0 to deactivate heartbeats and None to accept server's proposal.
咱們沒有傳入心跳間隔, 理論上應該使用服務端默認的60S. 實際上, 客戶端歷來沒有發出過心跳包. 因而繼續看代碼:
經過打印, 確認了HeartbeatChecker對象成功建立, 也成功地建立了timer, 可是timer歷來沒有回調過.
從代碼一路跟下去, 咱們用的是blocking_connections, 在其add_timeout註釋中看到:
def add_timeout(self, deadline, callback_method): """Create a single-shot timer to fire after deadline seconds. Do not confuse with Tornado's timeout where you pass in the time you want to have your callback called. Only pass in the seconds until it's to be called. NOTE: the timer callbacks are dispatched only in the scope of specially-designated methods: see `BlockingConnection.process_data_events` and `BlockingChannel.start_consuming`. :param float deadline: The number of seconds to wait to call callback :param callable callback_method: The callback method with the signature callback_method()
timer的觸發要靠process_data_events, 而咱們沒有調用. 因此客戶端的heartbeat歷來沒被觸發. 簡單地將heartbeat關掉以解決這個問題.
調用代碼以下: 沒有跑main_loop, 故, 沒處理 rabbitmq_server的FIN包, 沒法跟蹤連接狀態.
在發送時, 收到RST, 最終跑到 base_connection.py:452, _handle_error函數中打印socket_error.
def connect_mq(): mq_conf = xxxxx connection = pika.BlockingConnection( pika.ConnectionParameters(mq_conf['host'], int(mq_conf['port']), mq_conf['path'], pika.PlainCredentials(mq_conf['user'], mq_conf['pwd']), heartbeat_interval=0)) channel = connection.channel() channel.exchange_declare(exchange=xxxxx, type='direct', durable=True) return channel channel = connect_mq() def notify_xxxxx(): global channel def _publish(product): channel.basic_publish(exchange=xxxxx, routing_key='xxxxx', body=json.dumps({'msg': 'xxxxx'}))