RbbitMQ消息隊列及python實現

  一、簡介

    RabbitMQ是實現了高級消息隊列協議(AMQP)的開源消息代理軟件(亦稱面向消息的中間件)。RabbitMQ服務器是用Erlang語言編寫的,而集羣和故障轉移是構建在開放電信平臺框架上的。html

  全部主要的編程語言均有與代理接口通信的客戶端庫。官網:http://www.rabbitmq.com/python

    RabbidMQ是一個消息代理:它接受和轉發消息。你能夠把它想象成一個郵局:當你把你想要寄出的郵件放在一個郵箱裏時,你能夠肯定,郵遞員先生或女士最終會把郵件交給你的收件人。linux

  在這個類比中,rabbitmq是一個郵箱、一個郵局和一個郵遞員。編程

    幫助文檔:http://www.rabbitmq.com/getstarted.html小程序

  二、安裝

    Erlang與RabbitMQ,安裝路徑都應不含空格符。windows

  Erlang使用了環境變量HOMEDRIVE與HOMEPATH來訪問配置文件.erlang.cookie,應注意這兩個環境變量的有效性。須要設定環境變量ERLANG_HOME,並把%ERLANG_HOME%\bin加入到全局路徑中。
  RabbitMQ使用本地computer name做爲服務器的地址,所以須要注意其有效性,或者直接解析爲127.0.0.1
  可能須要在本地網絡防火牆打開相應的端口
  首先下載安裝 Erlang:http://www.erlang.org/downloads
  下載 RabbitMQhttp://www.rabbitmq.com/download.html
  windows下安裝完成後
   系統服務中有RabbitMQ服務,中止、啓動、重啓
  
 

三、測試

 3.一、啓用管理插件

  

 

  進入安裝路徑D:\Program Files\RabbitMQ Server\rabbitmq_server-3.7.12下的sbin目錄,啓動命令行,而後輸入: rabbitmq-plugins enable rabbitmq_management,以下:
  

   3.二、管理界面

     打開瀏覽器輸入:http://127.0.0.1:15672/瀏覽器

    

 

    經過默認帳戶 guest/guest 登陸,若是可以登陸,說明安裝成功。bash

    

 

 

     添加Admin用戶服務器

    

 

    添加用戶角色cookie

    一、超級管理員(administrator)
    可登錄管理控制檯,可查看全部的信息,而且能夠對用戶,策略(policy)進行操做。
    二、監控者(monitoring)
    可登錄管理控制檯,同時能夠查看rabbitmq節點的相關信息(進程數,內存使用狀況,磁盤使用狀況等)
    三、策略制定者(policymaker)
    可登錄管理控制檯, 同時能夠對policy進行管理。但沒法查看節點的相關信息(上圖紅框標識的部分)。
    四、普通管理者(management)
    僅可登錄管理控制檯,沒法看到節點信息,也沒法對策略進行管理。
    五、其餘
    沒法登錄管理控制檯,一般就是普通的生產者和消費者。
    

    添加角色testhost

    

      選中Admin用戶,設置權限:

    

      看到權限已加:

 

      

 

  

 

 

 

    

 

四、測試實例

    安裝pika模塊,python使用rabbitmq服務,可使用現成的類庫pika、txAMQP或者py-amqplib,這裏選擇了pika。在命令行中直接使用pip命令:pip install pika

    隊列是位於rabbitmq中的郵箱的名稱。儘管消息經過rabbitmq和在程序中傳送,但它們只能存儲在隊列中。隊列只受主機內存和磁盤限制的約束,

    它本質上是一個大的消息緩衝區。許多生產者能夠向一個隊列發送消息,許多消費者能夠嘗試從一個隊列接收數據。

    請注意:生產者、消費者和代理沒必要駐留在同一主機上;實際上,在大多數應用程序中,它們沒必要駐留在同一主機上。應用程序也能夠同時是生產者和消費者。

 

    在本文章中,使用python編寫兩個小程序:一個發送單個消息的生產者(發送者)和一個接收並打印消息的消費者(接收者)。這是一個信息傳遞的「你好世界」。

  在下圖中,「P」是咱們的生產者,「C」是咱們的消費者。中間的框是一個隊列-一個消息緩衝區,rabbitmq表明使用者保留該緩衝區。

    

    生產者將消息發送到「hello」隊列。消費者從該隊列接收消息。

 4.一、發送

    咱們的第一個程序send.py將向隊列發送一條消息。咱們須要作的第一件事是與rabbitmq服務器創建鏈接。

     

 

import pika

  connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
  channel = connection.channel()

    咱們如今已經鏈接到本地機器上的一個代理,所以就是本地主機。若是咱們想鏈接到另外一臺機器上的代理,咱們只需在這裏指定它的名稱或IP地址。

     接下來,在發送以前,咱們須要確保收件人隊列存在。若是咱們向不存在的位置發送消息,rabbitmq將只刪除該消息。讓咱們建立一個Hello隊列,將消息傳遞到該隊列:

channel.queue_declare(queue='hello')

     此時,準備發送消息。第一條消息將只包含一個字符串hello world!把它發送到問候隊列。

    在rabbitmq中,消息永遠不能直接發送到隊列,它老是須要經過一個交換。但不要被這些細節拖累。如今須要知道的只是如何使用由空字符串標識的默認交換。

  此交換是特殊的它容許咱們精確地指定消息應該進入哪一個隊列。隊列名稱須要在路由routing_key參數中指定:

   

channel.basic_publish(exchange='',
                      routing_key='hello',
                      body='Hello World!')
  print(" [x] Sent 'Hello World!'")

    在退出程序以前,須要確保網絡緩衝區已刷新,而且消息實際上已傳遞到rabbitmq。此時能夠輕輕地關閉鏈接。

connection.close()

    若是這是您第一次使用rabbitmq,而且您沒有看到「已發送」消息,那麼您可能會感到頭疼,想知道可能出了什麼問題。可能代理啓動時沒有足夠的可用磁盤空間(默認狀況下,它須要至少200 MB的可用空間),

  所以拒絕接受消息。檢查代理日誌文件以確認並在必要時下降限制。配置文件文檔將向您展現如何設置。

 

4.二、接收

    

    我第二個程序receive.py將接收來自隊列的消息,並將它們打印到屏幕上。一樣,首先咱們須要鏈接到rabbitmq服務器。負責鏈接到Rabbit的代碼與之前相同。

   下一步和前面同樣,是確保隊列存在。使用queue_declare建立隊列是等冪的,咱們能夠根據須要屢次運行該命令,而且只建立一個隊列。

    

channel.queue_declare(queue='hello')

    您可能會問咱們爲何要再次聲明隊列咱們已經在之前的代碼中聲明瞭隊列。若是咱們肯定隊列已經存在,就能夠避免這種狀況。例如,若是send.py程序之前運行過。但咱們還不肯定先運行哪一個程序。

  在這種狀況下,最好在兩個程序中重複聲明隊列。

    若是但願看到rabbitmq有哪些隊列以及其中有多少消息。可使用rabbitmqctl工具(做爲特權用戶)執行此操做:

    在linux上:sudo rabbitmqctl list_queues

     在windows上:rabbitmqctl.bat list_queues

    從隊列接收消息更加複雜。它經過向隊列訂閱回調函數來工做。每當咱們收到一條消息,這個回調函數就會被PIKA庫調用。在咱們的示例中,此函數將在屏幕上打印消息的內容。

def callback(ch, method, properties, body):
    print(" [x] Received %r" % body)

 

    接下來,咱們須要告訴rabbitmq這個特定的回調函數應該從hello隊列接收消息:

channel.basic_consume(callback,
                      queue='hello',
                      no_ack=True)

    要使該命令成功,咱們必須確保要訂閱的隊列存在。幸運的是,咱們相信咱們已經使用上面queue declare建立了的隊列。

    最後,咱們進入一個永不結束的循環,它等待數據並在必要時運行回調。

print(' [*] Waiting for messages. To exit press CTRL+C')
  channel.start_consuming()

    完整的send.py代碼

#!/usr/bin/env python
import pika

connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()


channel.queue_declare(queue='hello')

channel.basic_publish(exchange='',
                      routing_key='hello',
                      body='Hello World!')
print(" [x] Sent 'Hello World!'")
connection.close()

    完整的receive.py代碼

#!/usr/bin/env python
import pika

connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()


channel.queue_declare(queue='hello')

def callback(ch, method, properties, body):
    print(" [x] Received %r" % body)

channel.basic_consume(callback,
                      queue='hello',
                      no_ack=True)

print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()

    如今咱們能夠在終端上試用咱們的程序。首先,讓咱們啓動一個消費者,它將連續運行等待交付:

 python receive.py

send.py

相關文章
相關標籤/搜索