rabbitmq pika(python)訂閱發佈多客戶端消費場景簡單使用

發佈端:函數

import pika
import time
credentials  = pika.credentials.PlainCredentials('root', 'root',erase_on_connect =False)
s_conn = pika.BlockingConnection(pika.ConnectionParameters('127.0.0.1',credentials=credentials))  # 建立鏈接
# 原則上,消息,只能有交換機傳到隊列。就像咱們家裏面的交換機道理同樣。
# 有多個設備鏈接到交換機,那麼,這個交換機把消息發給那個設備呢,就是根據
# 交換機的類型來定。類型有:direct\topic\headers\fanout
# fanout:這個就是,全部的設備都能收到消息,就是廣播。
# 此處定義一個名稱爲'logs'的'fanout'類型的exchange
chan = s_conn.channel()  # 在鏈接上建立一個頻道

# chan.queue_declare(queue='hello')  # 聲明一個隊列,生產者和消費者都要聲明一個相同的隊列,用來防止萬一某一方掛了,另外一方能正常運行

chan.exchange_declare(exchange='logs',
                    exchange_type='fanout'
                      )

while 1:
    time.sleep(1)
# 將消息發送到名爲log的exchange中
# 由於是fanout類型的exchange,因此無需指定routing_key
    ack = chan.basic_publish(exchange='logs',  # 交換機
                       routing_key='',  # 路由鍵,寫明將消息發往哪一個隊列
                       body='Ye:當前時間%s ' % time.strftime('%m-%d %H:%M:%S'))  # 生產者要發送的消息
    print("[生產者] send 'hello world")
    time.sleep(10)
    print(ack)


s_conn.close()  # 當生產者發送完消息後,可選擇關閉

接收端:日誌

import pika
credentials = pika.credentials.PlainCredentials('root', 'root',erase_on_connect =False)
s_conn = pika.BlockingConnection(pika.ConnectionParameters('39.106.205.106',credentials=credentials))  # 建立鏈接

chan = s_conn.channel()  # 在鏈接上建立一個頻道

chan.exchange_declare(exchange='logs',
                    exchange_type='fanout'
                      )
# chan.queue_declare(queue='hello')  # 聲明一個隊列,生產者和消費者都要聲明一個相同的隊列,用來防止萬一某一方掛了,另外一方能正常運行
# 相似的,好比log,咱們其實最想看的,當鏈接上的時刻到消費者退出,這段時間的日誌
# 有些消息,過時了的對咱們並無什麼用
# 而且,一個終端,咱們要收到隊列的全部消息,好比:這個隊列收到兩個消息,一個終端收到一個。
# 咱們如今要作的是:兩個終端都要收到兩個
# 那麼,咱們就只需作個臨時隊列。消費端斷開後就自動刪除
result = chan.queue_declare(queue='temp2', exclusive=True)
# 取得隊列名稱
queue_name = result.method.queue

# 將隊列和交換機綁定一塊兒
chan.queue_bind(exchange='logs',
                queue=queue_name
                )

def callback(ch, method, properties, body):  # 定義一個回調函數,用來接收生產者發送的消息
    print(ch, method, properties, )
    print(body.decode('utf8'))


chan.basic_consume(  # 指定取消息的隊列名,
                    queue_name,
                   callback,  # 調用回調函數,從隊列裏取消息
                    # queue=,

                   auto_ack=True
                   )  # 取完一條消息後,給生產者發送確認消息,默認是False的,即  默認不給rabbitmq發送一個收到消息的確認
# 若是auto_ack=True則消失接收以後就會刪除也就是隻能取一次
print('[消費者] waiting for msg .')
chan.start_consuming()  # 開始循環取消息

注意

多個接收端須要修改臨時隊列的名字,以防止衝突,會報錯關於鎖的錯誤code

相關文章
相關標籤/搜索