以前只是用celery, 此次用一下pikahtml
參考rabbitMQ官網的python版,https://www.rabbitmq.com/tutorials/tutorial-one-python.htmlpython
沒想到各類坑.sql
若是說rabbitMQ官網是爲了讓新人入門,因此刻意忽略掉細節, 那麼必須吐槽pika的官方文檔, 很很差.遠不如celeryjson
使用pika 的BlockingConnection服務器
raise self._closed_result.value.error
pika.exceptions.StreamLostError: Stream connection lost: BrokenPipeError(32, 'Broken pipe')app
根據http://www.javashuo.com/article/p-oglbiqik-d.htmlasync
是要在鏈接時設置心跳爲0,就不會超時自動下線了, 不然RabbitMQ服務器會發過來默認值580函數
#--------------rabbitMQ------------------ import pika connection = pika.BlockingConnection( pika.ConnectionParameters( host='localhost', heartbeat=0, #never exit after start )) channel = connection.channel() channel.queue_declare(queue='update_sql')
這個錯誤在測試消費端時沒測出來,由於測試使用的發佈者和官方文檔裏同樣,發完就下線退出了. 這樣固然看不出這個心跳問題.測試
可是聯調時就暴露了. 真無語.fetch
默認的body是二進制的. 而後消費端要
channel.basic_publish('exchange_name', 'routing_key', 'Test Message', pika.BasicProperties(content_type='text/plain', type='example'))
這彷佛時能夠發文本的嗎?
而後,看見別人還能夠這麼寫https://blog.csdn.net/fzlulee/article/details/98480724
self.channel.basic_publish(exchange=exchange, routing_key=routing_key, body=message,
properties=pika.BasicProperties(delivery_mode=2,message_id=message_id,content_type="application/json"))
彷佛就是html請求頭常見的寫法了? 可是pika裏沒有對BasicProperties的詳細文檔,
,源碼裏也看不出註釋https://pika.readthedocs.io/en/stable/_modules/pika/spec.html#BasicProperties
ack防止消費者出問題, durable防止rabbitMQ服務器自己出問題
因此ack在消費端定義
channel.basic_consume(queue='update_sql', auto_ack=False, on_message_callback=callback)
而durable在channel裏隊列聲明裏 在 生產端,消費端都要統一聲明隊列
channel.queue_declare(queue='update_sql', durable=True, exclusive=False, auto_delete=False)
引用 https://blog.csdn.net/hlxx55/article/details/80964440
ack
rabbitMQ是默認開啓自動應答的,這樣當rabbitMQ將消息發給消費者,就會從內存中將消息刪除,這樣會帶來一個問題,若是消費者未處理完消息而宕機,那麼消息就會丟失。因此,咱們將自動應答關閉,當rabbitMQ收到消費者處理完消息的迴應後纔會從內存中刪除消息。
durable
rabbitMQ默認將消息存儲在內存中,若rabbitMQ宕機,那麼全部數據就會丟失,因此在聲明隊列的時候能夠聲明將數據持久化,可是若是已經聲明瞭一個未持久化的隊列,那麼不能修改,只能將這個隊列刪除或從新聲明一個持久化數據。
只在消費者這裏加上basic_qos就能夠了
connection = pika.BlockingConnection( pika.ConnectionParameters( host= self.HOST_RABBITMQ, heartbeat = 0, #never exit after start )) channel = connection.channel() #durable 隊列中消息持久化 #exclusive (bool) – Don’t allow other consumers on the queue #./ exchange 不支持 exclusive channel.queue_declare(queue='update_sql', durable=True, exclusive=False, auto_delete=False) #1次1條消息 channel.basic_qos(prefetch_count=1) channel.basic_consume(queue='update_sql', auto_ack=False, #不自動確認 在callback最後確認 等於 no_ack on_message_callback=self.callback) print(' [*] wg-Executor waiting for sql cmds. To exit press CTRL+C') channel.start_consuming()
此外,在消費者的callback函數裏,
最好在最外層用 異常處理包裹起來,確保不管執行結果如何,都在finally裏執行ack
try:
except:
else:
finally: