發佈端:函數
import pika import time credentials = pika.credentials.PlainCredentials('root', 'root',erase_on_connect =False) s_conn = pika.BlockingConnection(pika.ConnectionParameters('127.0.0.1',credentials=credentials)) # 建立鏈接 # 原則上,消息,只能有交換機傳到隊列。就像咱們家裏面的交換機道理同樣。 # 有多個設備鏈接到交換機,那麼,這個交換機把消息發給那個設備呢,就是根據 # 交換機的類型來定。類型有:direct\topic\headers\fanout # fanout:這個就是,全部的設備都能收到消息,就是廣播。 # 此處定義一個名稱爲'logs'的'fanout'類型的exchange chan = s_conn.channel() # 在鏈接上建立一個頻道 # chan.queue_declare(queue='hello') # 聲明一個隊列,生產者和消費者都要聲明一個相同的隊列,用來防止萬一某一方掛了,另外一方能正常運行 chan.exchange_declare(exchange='logs', exchange_type='fanout' ) while 1: time.sleep(1) # 將消息發送到名爲log的exchange中 # 由於是fanout類型的exchange,因此無需指定routing_key ack = chan.basic_publish(exchange='logs', # 交換機 routing_key='', # 路由鍵,寫明將消息發往哪一個隊列 body='Ye:當前時間%s ' % time.strftime('%m-%d %H:%M:%S')) # 生產者要發送的消息 print("[生產者] send 'hello world") time.sleep(10) print(ack) s_conn.close() # 當生產者發送完消息後,可選擇關閉
接收端:日誌
import pika credentials = pika.credentials.PlainCredentials('root', 'root',erase_on_connect =False) s_conn = pika.BlockingConnection(pika.ConnectionParameters('39.106.205.106',credentials=credentials)) # 建立鏈接 chan = s_conn.channel() # 在鏈接上建立一個頻道 chan.exchange_declare(exchange='logs', exchange_type='fanout' ) # chan.queue_declare(queue='hello') # 聲明一個隊列,生產者和消費者都要聲明一個相同的隊列,用來防止萬一某一方掛了,另外一方能正常運行 # 相似的,好比log,咱們其實最想看的,當鏈接上的時刻到消費者退出,這段時間的日誌 # 有些消息,過時了的對咱們並無什麼用 # 而且,一個終端,咱們要收到隊列的全部消息,好比:這個隊列收到兩個消息,一個終端收到一個。 # 咱們如今要作的是:兩個終端都要收到兩個 # 那麼,咱們就只需作個臨時隊列。消費端斷開後就自動刪除 result = chan.queue_declare(queue='temp2', exclusive=True) # 取得隊列名稱 queue_name = result.method.queue # 將隊列和交換機綁定一塊兒 chan.queue_bind(exchange='logs', queue=queue_name ) def callback(ch, method, properties, body): # 定義一個回調函數,用來接收生產者發送的消息 print(ch, method, properties, ) print(body.decode('utf8')) chan.basic_consume( # 指定取消息的隊列名, queue_name, callback, # 調用回調函數,從隊列裏取消息 # queue=, auto_ack=True ) # 取完一條消息後,給生產者發送確認消息,默認是False的,即 默認不給rabbitmq發送一個收到消息的確認 # 若是auto_ack=True則消失接收以後就會刪除也就是隻能取一次 print('[消費者] waiting for msg .') chan.start_consuming() # 開始循環取消息
多個接收端須要修改臨時隊列的名字,以防止衝突,會報錯關於鎖的錯誤code