rabbitmq消息隊列

rabbitmaq的安裝使用python

  • 經過阿里雲yum源, 在epel園中有這個rabbitmq
    • yum install rabbitmq-server erlang -y
  • 啓動rabbitmq-server
    • systemclt start rabbitmq-server
  • 開啓後臺管理界面
    • rabbitmq-plugins enable rabbitmq_management
  • 建立rabbitmq的帳號密碼
    • rabbitmqctl add_user zhangjian 123
  • 設置用戶爲管理員
    • sudorrabbitmqctl set_user_tags zhangjian  administrator
  • 設置用戶有權限訪問全部隊列
    • 語法:rabbitmqctl set_permissions -p "/" zhangjian ".*" ".*" ".*"
  • 重啓rabbitmq服務端, 讓用戶生效
    • systemctl restart rabbitmq-server
  • 訪問web管理界面, 登陸, 查看隊列消息
    • http://localhost:15672//#queues
  • 用python操做rabbitmq, 實現生產消費這模型
    • 安裝皮卡模塊,m模塊版本須要指定,y引文代碼參數發生了變化
      • pip3 install -i https://pypi.douban.com/simple pika=0.13.1

 python操做rabbitmq隊列python代碼:web

1.生產者的代碼
        #!/usr/bin/env python3
        import pika
        # 建立憑證,使用rabbitmq用戶密碼登陸
        # 去郵局取郵件,必須得驗證身份
        credentials = pika.PlainCredentials("selfju","cxk")
        # 新建鏈接,這裏localhost能夠更換爲服務器ip
        # 找到這個郵局,等於鏈接上服務器
        connection = pika.BlockingConnection(pika.ConnectionParameters('192.168.16.142',credentials=credentials))
        # 建立頻道
        # 建造一個大郵箱,隸屬於這家郵局的郵箱,就是個鏈接
        channel = connection.channel()
        # 聲明一個隊列,用於接收消息,隊列名字叫「水許傳」
        channel.queue_declare(queue='水許傳')
        # 注意在rabbitmq中,消息想要發送給隊列,必須通過交換(exchange),初學可使用空字符串交換(exchange=''),它容許咱們精確的指定發送給哪一個隊列(routing_key=''),參數body值發送的數據
        channel.basic_publish(exchange='',
                          routing_key='水許傳',
                          body='武大郎出攤賣燒餅了')
        print("已經發送了消息")
        # 程序退出前,確保刷新網絡緩衝以及消息發送給rabbitmq,須要關閉本次鏈接
        connection.close()


    2.消費者的代碼
        import pika
        # 創建與rabbitmq的鏈接
        credentials = pika.PlainCredentials("selfju","cxk")
        connection = pika.BlockingConnection(pika.ConnectionParameters('192.168.16.142',credentials=credentials))
        channel = connection.channel()
        channel.queue_declare(queue="水許傳")

        def callbak(ch,method,properties,body):
            print("消費者接收到了數據:%r"%body.decode("utf8"))
        # 有消息來臨,當即執行callbak,沒有消息則夯住,等待消息
        # 老百姓開始去郵箱取郵件啦,隊列名字是水許傳
        channel.basic_consume(callbak,queue="水許傳",no_ack=True)
        # 開始消費,接收消息
        channel.start_consuming()


    
    3.消息確認機制的生產者代碼
        #!/usr/bin/env python3
        import pika
        # 建立憑證,使用rabbitmq用戶密碼登陸
        # 去郵局取郵件,必須得驗證身份
        credentials = pika.PlainCredentials("selfju","cxk")
        # 新建鏈接,這裏localhost能夠更換爲服務器ip
        # 找到這個郵局,等於鏈接上服務器
        connection = pika.BlockingConnection(pika.ConnectionParameters('192.168.16.142',credentials=credentials))
        # 建立頻道
        # 建造一個大郵箱,隸屬於這家郵局的郵箱,就是個鏈接
        channel = connection.channel()
        # 新建一個hello隊列,用於接收消息
        # 這個郵箱能夠收發各個班級的郵件,經過

        channel.queue_declare(queue='西遊記')
        # 注意在rabbitmq中,消息想要發送給隊列,必須通過交換(exchange),初學可使用空字符串交換(exchange=''),它容許咱們精確的指定發送給哪一個隊列(routing_key=''),參數body值發送的數據
        channel.basic_publish(exchange='',
                              routing_key='西遊記',
                              body='大師兄,師傅被蔡許坤抓走了')
        print("已經發送了消息")
        # 程序退出前,確保刷新網絡緩衝以及消息發送給rabbitmq,須要關閉本次鏈接
        connection.close()

    4.消息確認機制的消費者代碼
        import pika

        credentials = pika.PlainCredentials("selfju","cxk")
        connection = pika.BlockingConnection(pika.ConnectionParameters('192.168.16.142',credentials=credentials))
        channel = connection.channel()

        # 聲明一個隊列(建立一個隊列)
        channel.queue_declare(queue='西遊記')

        def callback(ch, method, properties, body):
            print("消費者接受到了任務: %r" % body.decode("utf-8"))
            # int('asdfasdf')
            # 我告訴rabbitmq服務端,我已經取走了消息
            # 回覆方式在這
            ch.basic_ack(delivery_tag=method.delivery_tag)
            
        # 關閉no_ack,表明給與服務端ack回覆,確認給與回覆
        channel.basic_consume(callback,queue='西遊記',no_ack=False)

        channel.start_consuming()

    5.支持持久化的隊列和消息
        1.生產者的代碼
        
        import pika
        # 有密碼
        credentials = pika.PlainCredentials("selfju","cxk")
        connection = pika.BlockingConnection(pika.ConnectionParameters('192.168.16.142',credentials=credentials))
        channel = connection.channel()

        # 聲明一個隊列(建立一個隊列)
        # 默認此隊列不支持持久化,若是服務掛掉,數據丟失
        # durable=True 開啓持久化,必須新開啓一個隊列,本來的隊列已經不支持持久化了
        '''
        實現rabbitmq持久化條件
         delivery_mode=2
        使用durable=True聲明queue是持久化
         
        '''
        channel.queue_declare(queue='LOL',durable=True)
        channel.basic_publish(exchange='',
                              routing_key='LOL', # 消息隊列名稱
                              body='我用雙手成就你的夢想',
                              # 支持數據持久化
                              properties=pika.BasicProperties(
                                  delivery_mode=2,#表明消息是持久的  2
                              )
                              )
        connection.close()
    
    6.持久化的消費者代碼
    
import pika
credentials = pika.PlainCredentials("selfju","cxk")
connection = pika.BlockingConnection(pika.ConnectionParameters('192.168.16.142',credentials=credentials))
channel = connection.channel()
# 確保隊列持久化
channel.queue_declare(queue='LOL',durable=True)

'''
必須確保給與服務端消息回覆,表明我已經消費了數據,不然數據一直持久化,不會消失
'''
def callback(ch, method, properties, body):
    print("消費者接受到了任務: %r" % body.decode("utf-8"))
    # 模擬代碼報錯
    # int('asdfasdf')    # 此處報錯,沒有給予回覆,保證客戶端掛掉,數據不丟失
   
    # 告訴服務端,我已經取走了數據,不然數據一直存在
    ch.basic_ack(delivery_tag=method.delivery_tag)
# 關閉no_ack,表明給與回覆確認
channel.basic_consume(callback,queue='LOL',no_ack=False)
channel.start_consuming()
相關文章
相關標籤/搜索