rabbitmaq的安裝使用python
python操做rabbitmq隊列python代碼:web
1.生產者的代碼 #!/usr/bin/env python3 import pika # 建立憑證,使用rabbitmq用戶密碼登陸 # 去郵局取郵件,必須得驗證身份 credentials = pika.PlainCredentials("selfju","cxk") # 新建鏈接,這裏localhost能夠更換爲服務器ip # 找到這個郵局,等於鏈接上服務器 connection = pika.BlockingConnection(pika.ConnectionParameters('192.168.16.142',credentials=credentials)) # 建立頻道 # 建造一個大郵箱,隸屬於這家郵局的郵箱,就是個鏈接 channel = connection.channel() # 聲明一個隊列,用於接收消息,隊列名字叫「水許傳」 channel.queue_declare(queue='水許傳') # 注意在rabbitmq中,消息想要發送給隊列,必須通過交換(exchange),初學可使用空字符串交換(exchange=''),它容許咱們精確的指定發送給哪一個隊列(routing_key=''),參數body值發送的數據 channel.basic_publish(exchange='', routing_key='水許傳', body='武大郎出攤賣燒餅了') print("已經發送了消息") # 程序退出前,確保刷新網絡緩衝以及消息發送給rabbitmq,須要關閉本次鏈接 connection.close() 2.消費者的代碼 import pika # 創建與rabbitmq的鏈接 credentials = pika.PlainCredentials("selfju","cxk") connection = pika.BlockingConnection(pika.ConnectionParameters('192.168.16.142',credentials=credentials)) channel = connection.channel() channel.queue_declare(queue="水許傳") def callbak(ch,method,properties,body): print("消費者接收到了數據:%r"%body.decode("utf8")) # 有消息來臨,當即執行callbak,沒有消息則夯住,等待消息 # 老百姓開始去郵箱取郵件啦,隊列名字是水許傳 channel.basic_consume(callbak,queue="水許傳",no_ack=True) # 開始消費,接收消息 channel.start_consuming() 3.消息確認機制的生產者代碼 #!/usr/bin/env python3 import pika # 建立憑證,使用rabbitmq用戶密碼登陸 # 去郵局取郵件,必須得驗證身份 credentials = pika.PlainCredentials("selfju","cxk") # 新建鏈接,這裏localhost能夠更換爲服務器ip # 找到這個郵局,等於鏈接上服務器 connection = pika.BlockingConnection(pika.ConnectionParameters('192.168.16.142',credentials=credentials)) # 建立頻道 # 建造一個大郵箱,隸屬於這家郵局的郵箱,就是個鏈接 channel = connection.channel() # 新建一個hello隊列,用於接收消息 # 這個郵箱能夠收發各個班級的郵件,經過 channel.queue_declare(queue='西遊記') # 注意在rabbitmq中,消息想要發送給隊列,必須通過交換(exchange),初學可使用空字符串交換(exchange=''),它容許咱們精確的指定發送給哪一個隊列(routing_key=''),參數body值發送的數據 channel.basic_publish(exchange='', routing_key='西遊記', body='大師兄,師傅被蔡許坤抓走了') print("已經發送了消息") # 程序退出前,確保刷新網絡緩衝以及消息發送給rabbitmq,須要關閉本次鏈接 connection.close() 4.消息確認機制的消費者代碼 import pika credentials = pika.PlainCredentials("selfju","cxk") connection = pika.BlockingConnection(pika.ConnectionParameters('192.168.16.142',credentials=credentials)) channel = connection.channel() # 聲明一個隊列(建立一個隊列) channel.queue_declare(queue='西遊記') def callback(ch, method, properties, body): print("消費者接受到了任務: %r" % body.decode("utf-8")) # int('asdfasdf') # 我告訴rabbitmq服務端,我已經取走了消息 # 回覆方式在這 ch.basic_ack(delivery_tag=method.delivery_tag) # 關閉no_ack,表明給與服務端ack回覆,確認給與回覆 channel.basic_consume(callback,queue='西遊記',no_ack=False) channel.start_consuming() 5.支持持久化的隊列和消息 1.生產者的代碼 import pika # 有密碼 credentials = pika.PlainCredentials("selfju","cxk") connection = pika.BlockingConnection(pika.ConnectionParameters('192.168.16.142',credentials=credentials)) channel = connection.channel() # 聲明一個隊列(建立一個隊列) # 默認此隊列不支持持久化,若是服務掛掉,數據丟失 # durable=True 開啓持久化,必須新開啓一個隊列,本來的隊列已經不支持持久化了 ''' 實現rabbitmq持久化條件 delivery_mode=2 使用durable=True聲明queue是持久化 ''' channel.queue_declare(queue='LOL',durable=True) channel.basic_publish(exchange='', routing_key='LOL', # 消息隊列名稱 body='我用雙手成就你的夢想', # 支持數據持久化 properties=pika.BasicProperties( delivery_mode=2,#表明消息是持久的 2 ) ) connection.close() 6.持久化的消費者代碼 import pika credentials = pika.PlainCredentials("selfju","cxk") connection = pika.BlockingConnection(pika.ConnectionParameters('192.168.16.142',credentials=credentials)) channel = connection.channel() # 確保隊列持久化 channel.queue_declare(queue='LOL',durable=True) ''' 必須確保給與服務端消息回覆,表明我已經消費了數據,不然數據一直持久化,不會消失 ''' def callback(ch, method, properties, body): print("消費者接受到了任務: %r" % body.decode("utf-8")) # 模擬代碼報錯 # int('asdfasdf') # 此處報錯,沒有給予回覆,保證客戶端掛掉,數據不丟失 # 告訴服務端,我已經取走了數據,不然數據一直存在 ch.basic_ack(delivery_tag=method.delivery_tag) # 關閉no_ack,表明給與回覆確認 channel.basic_consume(callback,queue='LOL',no_ack=False) channel.start_consuming()