(轉)RabbitMQ消息隊列(二):」Hello, World「

本文將使用Python(pika 0.9.8)實現從Producer到Consumer傳遞數據」Hello, World「。python

     首先複習一下上篇所學:RabbitMQ實現了AMQP定義的消息隊列。它實現的功能」很是簡單「:從Producer接收數據而後傳遞到Consumer。它能保證多併發,數據安全傳遞,可擴展。git

     和任何的Hello world同樣,它們都不復雜。咱們將會設計兩個程序,一個發送Hello world,另外一個接收這個數據而且打印到屏幕。
      總體的設計以下圖:github



1. 環境配置

RabbitMQ 實現了AMQP。所以,咱們須要安裝AMPQ的library。幸運的是對於多種編程語言都有實現。咱們可使用如下lib的任何一個:編程

在這裏咱們將使用pika. 能夠經過 pip 包管理工具來安裝:安全

 

[plain]  view plain copy 在CODE上查看代碼片 派生到個人代碼片
 
  1. $ sudo pip install pika==0.9.8  

 

這個安裝依賴於pip和git-core。架構

  • On Ubuntu:併發

    $ sudo apt-get install python-pip git-core
    
  • On Debian:編程語言

    $ sudo apt-get install python-setuptools git-core
    $ sudo easy_install pip 
  • On Windows:To install easy_install, run the MS Windows Installer for setuptools函數

    > easy_install pip
    > pip install pika==0.9.8 

 

2. Sending

第一個program send.py:發送Hello world 到queue。正如咱們在上篇文章提到的,你程序的第一句話就是創建鏈接,第二句話就是建立channel:工具

 

[python]  view plain copy 在CODE上查看代碼片 派生到個人代碼片
 
  1. #!/usr/bin/env python  
  2. import pika  
  3.   
  4. connection = pika.BlockingConnection(pika.ConnectionParameters(  
  5.                'localhost'))  
  6. channel = connection.channel()  

建立鏈接傳入的參數就是RabbitMQ Server的ip或者name。

 

關於誰建立queue,上篇文章也討論過:Producer和Consumer都應該去建立。

接下來咱們建立名字爲hello的queue:

 

[cpp]  view plain copy 在CODE上查看代碼片 派生到個人代碼片
 
  1. channel.queue_declare(queue='hello')  

建立了channel,咱們能夠經過相應的命令來list queue:

 

 

[plain]  view plain copy 在CODE上查看代碼片 派生到個人代碼片
 
  1. $ sudo rabbitmqctl list_queues  
  2. Listing queues ...  
  3. hello    0  
  4. ...done.  

如今咱們已經準備好了發送了。
從架構圖能夠看出,Producer只能發送到exchange,它是不能直接發送到queue的。如今咱們使用默認的exchange(名字是空字符)。這個默認的exchange容許咱們發送給指定的queue。routing_key就是指定的queue名字。

[python]  view plain copy 在CODE上查看代碼片 派生到個人代碼片
 
  1. channel.basic_publish(exchange='',  
  2.                       routing_key='hello',  
  3.                       body='Hello World!')  
  4. print " [x] Sent 'Hello World!'"  

退出前別忘了關閉connection。

[python]  view plain copy 在CODE上查看代碼片 派生到個人代碼片
 
  1. connection.close()  

 

 

3. Receiving

第二個program receive.py 將從queue中獲取Message而且打印到屏幕。

第一步仍是建立connection。第二步建立channel。第三步建立queue,name = hello:

 

[python]  view plain copy 在CODE上查看代碼片 派生到個人代碼片
 
  1. channel.queue_declare(queue='hello')  

 

接下來要subscribe了。在這以前,須要聲明一個回調函數來處理接收到的數據。

 

[python]  view plain copy 在CODE上查看代碼片 派生到個人代碼片
 
  1. def callback(ch, method, properties, body):  
  2.     print " [x] Received %r" % (body,)  

subscribe:

 

 

[python]  view plain copy 在CODE上查看代碼片 派生到個人代碼片
 
  1. channel.basic_consume(callback,  
  2.                       queue='hello',  
  3.                       no_ack=True)  

最後,準備好無限循環監聽吧:

 

 

[python]  view plain copy 在CODE上查看代碼片 派生到個人代碼片
 
  1. print ' [*] Waiting for messages. To exit press CTRL+C'  
  2. channel.start_consuming()  

 

 

4. 最終版本

send.py:

 

[python]  view plain copy 在CODE上查看代碼片 派生到個人代碼片
 
  1. #!/usr/bin/env python  
  2. import pika  
  3.   
  4. connection = pika.BlockingConnection(pika.ConnectionParameters(  
  5.         host='localhost'))  
  6. channel = connection.channel()  
  7.   
  8. channel.queue_declare(queue='hello')  
  9.   
  10. channel.basic_publish(exchange='',  
  11.                       routing_key='hello',  
  12.                       body='Hello World!')  
  13. print " [x] Sent 'Hello World!'"  
  14. connection.close()  

 

 receive.py:

 

[python]  view plain copy 在CODE上查看代碼片 派生到個人代碼片
 
  1. #!/usr/bin/env python  
  2. import pika  
  3.   
  4. connection = pika.BlockingConnection(pika.ConnectionParameters(  
  5.         host='localhost'))  
  6. channel = connection.channel()  
  7.   
  8. channel.queue_declare(queue='hello')  
  9.   
  10. print ' [*] Waiting for messages. To exit press CTRL+C'  
  11.   
  12. def callback(ch, method, properties, body):  
  13.     print " [x] Received %r" % (body,)  
  14.   
  15. channel.basic_consume(callback,  
  16.                       queue='hello',  
  17.                       no_ack=True)  
  18.   
  19. channel.start_consuming()  

 

 

5. 最終運行

先運行 send.py program:

 

[python]  view plain copy 在CODE上查看代碼片 派生到個人代碼片
 
  1. $ python send.py  
  2. [x] Sent 'Hello World!'  

send.py 每次運行完都會中止。注意:如今數據已經存到queue裏了。接收它:

 

 

[python]  view plain copy 在CODE上查看代碼片 派生到個人代碼片
 
  1. $ python receive.py  
  2. [*] Waiting for messages. To exit press CTRL+C  
  3. [x] Received 'Hello World!'  

 

接下來,就要奉上更接近實際環境的例子。取決與個人課餘時間啊。。

相關文章
相關標籤/搜索