RabbitMQ
RabbitMQ是一個在AMQP基礎上完整的,可複用的企業消息系統。他遵循Mozilla Public License開源協議。
MQ全稱爲Message Queue, 消息隊列(MQ)是一種應用程序對應用程序的通訊方法。應用程序經過讀寫出入隊列的消息(針對應用程序的數據)來通訊,而無需專用鏈接來連接它們。消 息傳遞指的是程序之間經過在消息中發送數據進行通訊,而不是經過直接調用彼此來通訊,直接調用一般是用於諸如遠程過程調用的技術。排隊指的是應用程序經過 隊列來通訊。隊列的使用除去了接收和發送應用程序同時執行的要求。
RabbitMQ安裝
Linux 安裝配置epel源 $ rpm -ivh http://dl.fedoraproject.org/pub/epel/6/i386/epel-release-6-8.noarch.rpm 安裝erlang,由於RabiitMQ用erlang語言寫的 $ yum -y install erlang 安裝RabbitMQ $ yum -y install rabbitmq-server 注意:service rabbitmq-server start/stop
MAC 安裝 http://www.rabbitmq.com/install-standalone-mac.html
安裝API
pip install pika #pika是官方提供的,固然還有其餘的 or easy_install pika or 源碼 https://pypi.python.org/pypi/pika
1、實現最簡單的隊列通訊
send端
1 #!/usr/bin/evn python3.5 2 #__author__:"ted.zhou" 3 ''' 4 zibbitMQ最簡單的隊列通訊代碼範例 5 ''' 6 import pika 7 8 connection = pika.BlockingConnection(pika.ConnectionParameters('localhost')) # 鏈接一個rabbitMQ,返回鏈接成功後的實例 9 channel = connection.channel() # 建立一個管道,用於傳輸各類隊列.--鏈接成功後,還不能直接使用,須要在這個鏈接的實例中建立一個管道. 10 11 # 聲明一個queue 12 # channel.queue_declare(queue='hello') # 在這個管道里聲明一個隊列 ,隊列的名稱爲"hello" 13 ''' 14 使用pika鏈接並建立隊列須要三步 15 1.使用pika.BlockingConnection() 建立一個鏈接 16 2.建立一個管道 17 3.聲明一個隊列 18 ''' 19 20 # 緊接着就能夠經過這個管道發送內容了,在發送時,必須有三個參數 21 # exchege = '' 這個在發佈訂閱模式時,會用到,具體高級用法會提到,這裏默認給'',這樣它內部仍是會調用一個默認類型. 22 # routing_key = 'hello',這裏的routing_key 是選擇經過哪一個隊列發送 23 # body = 'Hello World!' 要發送的內容 24 channel.basic_publish(exchange='', 25 routing_key='hello', # 接收端不是這個參數,而是queue 26 body='Hello World!') 27 28 print(" [x] Sent 'Hello World!'") # 生產者端打印發送信息,表示代碼已經執行到這裏 29 connection.close() # 關閉這個鏈接
receive端
1 #!/usr/bin/env python3.5 2 __author__ = "ted.zhou" 3 ''' 4 python使用zabbitMQ實現最簡單的隊列通訊之接收端代碼範例 5 ''' 6 7 import pika 8 9 # 使用pika模塊,鏈接到指定的rabbitMQ服務器 10 connection = pika.BlockingConnection(pika.ConnectionParameters('localhost')) 11 12 # 鏈接建立成功後,實例化一個管道 13 channel = connection.channel() 14 15 # 而後在管道中聲明一個隊列,表示我這個管道里能夠跑 'hello'這個隊列, 16 # 咱們在發送端聲明瞭一個'hello' 的queue,這裏爲何還要聲明一次 ,由於當接收端先啓動的時候,此時不聲明,下面代碼在接收時會報錯. 17 # 固然發送端若是先啓動了,這裏聲明也不會報錯. 18 19 20 channel.queue_declare(queue='hello') 21 22 # 緊接着咱們就要進行接收隊列裏的消息,可是接收以前咱們要知道這個消息咱們收來作哪些操做呢,只接過來沒啥意義. 23 # 因此定義一個callback函數 24 25 # 這裏注意,接收端定義的callback函數,必定要帶三個參數 26 # 1.ch 2.method 3.properties 4.其後纔是信息主題body 27 # 前面3個參數是作什麼的,暫時用不到,後面高級的用法會舉例 28 def callback(ch,method,properties,body): 29 print("[x] Received %r" %body) 30 31 # 緊接着定義接收,定義完接收並非直接就接收了,這個和發送端的basic_publish()方法不太同樣,basic_publish()是直接就發送了,而接收basic_consume()方法定義後,還須要調用一個start方法 32 # 定義管道的接收方法. 33 # 參數介紹: queue 指定 接收的隊列名稱 , no_ack=True 是定義此接收方法是否要確認執行完成,若是爲True, 34 # 說明不須要驗證執行狀態,也就是說當一個callback須要處理6分鐘,當5分鐘時程序卡死了,此消息也就沒了,若是爲False,5分鐘卡死後,消息在隊列中依然存在 35 channel.basic_consume(callback, 36 queue='hello', # 發送端不是這個參數,而是routing_key 37 no_ack=True) 38 print(' [*] Waiting for messages. To exit press CTRL+C') 39 channel.start_consuming() # 開啓接收,沒有就阻塞
晉級:
2、隊列持久化&消息持久化:
咱們上面的例子,在發送端管道cannel中聲明瞭'hello'隊列.
爲了不當接收端先啓動的狀況下,由於發送端還未運行程序致使rabbitMQ服務中沒有'hello'隊列,致使接收端程序報錯,因此在接收端中的管道也聲明瞭'hello'隊列
不管是發送端仍是接收端在管道cannel中聲明瞭'hello'隊列,在rabbitMQ服務器中,你均可以經過命令查看此隊列的信息:
MacBook-Pro:~ tedzhou$ sudo rabbitmqctl list_queues
Listing queues ...
hello 0
那麼問題來了,當發送端發送了不少信息在'hello'隊列中,接收端還沒啓動呢,這時候全部的信息都存在hello隊列,以下這種狀況:
MacBook-Pro:~ tedzhou$ sudo rabbitmqctl list_queues
Listing queues ...
hello 7
若是此時rabbitMQ服務器掛了,或者重啓了,會有兩個問題:1.這個'hello'隊列還存在嗎? 2.'hello'隊列中的信息還存在嗎?
咱們作下測試:
中止rabbitMQ服務 MacBook-Pro:~ tedzhou$ sudo rabbitmqctl stop Stopping and halting node 'rabbit@zhoumingdeMacBook-Pro' ... 啓動rabbitMQ服務 MacBook-Pro:~ tedzhou$ sudo rabbitmq-server 查看rabbitMQ的隊列 MacBook-Pro:~ tedzhou$ sudo rabbitmqctl list_queues Listing queues ... 結果證實了: 1.隊列沒有了 2.消息更沒有了
整成業務中,咱們確定但願這些隊列和消息可以保留下來.因此咱們要解決兩個問題.
1.持久化隊列
2.持久化消息
1.隊列持久化代碼範例
要在聲明隊列的時候,加上隊列持久化參數
channel.queue_declare(queue='hello', durable=True)
2.消息持久化代碼範例
要在發送消息的代碼部分,加上消息持久化的屬性,delivery_mode=2就是說這個消息持久化消息,直到消費掉.(老實說delivery_mode有30多種,經常使用的就這一種)
channel.basic_publish(exchange='',
routing_key="task_queue",
body=message,
properties=pika.BasicProperties(
delivery_mode = 2, # make message persistent
))
發送端要在聲明隊列和發送消息中更改代碼
1 #!/usr/bin/evn python3.5 2 #__author__:"ted.zhou" 3 ''' 4 zibbitMQ最簡單的隊列通訊代碼範例 5 ''' 6 import pika 7 8 connection = pika.BlockingConnection(pika.ConnectionParameters('localhost')) # 鏈接一個rabbitMQ,返回鏈接成功後的實例 9 channel = connection.channel() # 建立一個管道,用於傳輸各類隊列.--鏈接成功後,還不能直接使用,須要在這個鏈接的實例中建立一個管道. 10 11 # 聲明一個queue 12 # channel.queue_declare(queue='hello') # 在這個管道里聲明一個隊列 ,隊列的名稱爲"hello" 13 channel.queue_declare(queue='hello',durable=True) # durable=True 設置此隊列持久化屬性爲True 14 ''' 15 使用pika鏈接並建立隊列須要三步 16 1.使用pika.BlockingConnection() 建立一個鏈接 17 2.建立一個管道 18 3.聲明一個隊列 19 ''' 20 21 # 緊接着就能夠經過這個管道發送內容了,在發送時,必須有三個參數 22 # exchege = '' 這個在發佈訂閱模式時,會用到,具體高級用法會提到,這裏默認給'',這樣它內部仍是會調用一個默認類型. 23 # routing_key = 'hello',這裏的routing_key 是選擇經過哪一個隊列發送 24 # body = 'Hello World!' 要發送的內容 25 channel.basic_publish(exchange='', 26 routing_key='hello', # 接收端不是這個參數,而是queue 27 body='Hello World!', 28 properties=pika.BasicProperties( # 消息持久化加入的參數 29 delivery_mode = 2,) 30 ) 31 32 print(" [x] Sent 'Hello World!'") # 生產者端打印發送信息,表示代碼已經執行到這裏 33 connection.close() # 關閉這個鏈接
接收端1.須要在聲明隊列中設置持久化屬性,2.它要在callback中得到接收到的數據de
1 #!/usr/bin/env python3.5 2 __author__ = "ted.zhou" 3 ''' 4 python使用zabbitMQ實現最簡單的隊列通訊之接收端代碼範例 5 ''' 6 7 import pika 8 9 # 使用pika模塊,鏈接到指定的rabbitMQ服務器 10 connection = pika.BlockingConnection(pika.ConnectionParameters('localhost')) 11 12 # 鏈接建立成功後,實例化一個管道 13 channel = connection.channel() 14 15 # 而後在管道中聲明一個隊列,表示我這個管道里能夠跑 'hello'這個隊列, 16 # 咱們在發送端聲明瞭一個'hello' 的queue,這裏爲何還要聲明一次 ,由於當接收端先啓動的時候,此時不聲明,下面代碼在接收時會報錯. 17 # 固然發送端若是先啓動了,這裏聲明也不會報錯. 18 19 20 #channel.queue_declare(queue='hello') 21 channel.queue_declare(queue='hello',durable=True) #durable=True 設置此隊列持久化屬性爲True 22 23 24 # 緊接着咱們就要進行接收隊列裏的消息,可是接收以前咱們要知道這個消息咱們收來作哪些操做呢,只接過來沒啥意義. 25 # 因此定義一個callback函數 26 27 # 這裏注意,接收端定義的callback函數,必定要帶三個參數 28 # 1.ch 2.method 3.properties 4.其後纔是信息主題body 29 # 前面3個參數是作什麼的,暫時用不到,後面高級的用法會舉例 30 def callback(ch,method,properties,body): 31 print("[x] Received %r" %body) 32 time.sleep(body.count(b'.')) 33 print(" [x] Done") 34 ch.basic_ack(delivery_tag = method.delivery_tag) # 得到delivery_tag,具體啥一塊兒,老師沒說,就說咱加上! 35 36 # 緊接着定義接收,定義完接收並非直接就接收了,這個和發送端的basic_publish()方法不太同樣,basic_publish()是直接就發送了,而接收basic_consume()方法定義後,還須要調用一個start方法 37 # 定義管道的接收方法. 38 # 參數介紹: queue 指定 接收的隊列名稱 , no_ack=True 是定義此接收方法是否要確認執行完成,若是爲True, 39 # 說明不須要驗證執行狀態,也就是說當一個callback須要處理6分鐘,當5分鐘時程序卡死了,此消息也就沒了,若是爲False,5分鐘卡死後,消息在隊列中依然存在 40 channel.basic_consume(callback, 41 queue='hello') # 發送端不是這個參數,而是routing_key 42 43 print(' [*] Waiting for messages. To exit press CTRL+C') 44 channel.start_consuming() # 開啓接收,沒有就阻塞
咱們經過查看rabbitMQ裏的隊列狀況,來驗證下是否持久化成功.
1 首先只運行發送端程序,運行6遍. 2 查看隊列: 3 MacBook-Pro:~ tedzhou$ sudo rabbitmqctl list_queues 4 Listing queues ... 5 hello 6 6 停掉服務: 7 MacBook-Pro:~ tedzhou$ sudo rabbitmqctl stop 8 Stopping and halting node 'rabbit@zhoumingdeMacBook-Pro' ... 9 開啓服務: 10 MacBook-Pro:~ tedzhou$sudo rabbitmq-server & 11 再次查看隊列: 12 MacBook-Pro:~ tedzhou$ sudo rabbitmqctl list_queues 13 Listing queues ... 14 hello 6
驗證結果: 持久化 隊列&消息成功.
用法晉級2
三.Work Queues
在這種模式下,RabbitMQ會默認把p發的消息依次分發給各個消費者(c),跟負載均衡差很少
消息生產者代碼
1 import pika 2 3 connection = pika.BlockingConnection(pika.ConnectionParameters( 4 'localhost')) 5 channel = connection.channel() 6 7 #聲明queue 8 channel.queue_declare(queue='task_queue') 9 10 #n RabbitMQ a message can never be sent directly to the queue, it always needs to go through an exchange. 11 import sys 12 13 message = ' '.join(sys.argv[1:]) or "Hello World!" 14 channel.basic_publish(exchange='', 15 routing_key='task_queue', 16 body=message, 17 properties=pika.BasicProperties( 18 delivery_mode = 2, # make message persistent 19 )) 20 print(" [x] Sent %r" % message) 21 connection.close()
消費者代碼:
1 import pika,time 2 3 connection = pika.BlockingConnection(pika.ConnectionParameters( 4 'localhost')) 5 channel = connection.channel() 6 7 8 9 def callback(ch, method, properties, body): 10 print(" [x] Received %r" % body) 11 time.sleep(body.count(b'.')) 12 print(" [x] Done") 13 ch.basic_ack(delivery_tag = method.delivery_tag) 14 15 16 channel.basic_consume(callback, 17 queue='task_queue', 18 ) 19 20 print(' [*] Waiting for messages. To exit press CTRL+C') 21 channel.start_consuming()
當你屢次運行一個生產者的代碼,而運行3個消費者的代碼,你會發現消息會輪詢3個消費者程序,也就是消費者會依次接收到代碼,這個就像簡單的負載均衡.
那麼問題來了,加入運行消費者程序的3臺機器的配置不同,好的1臺,消費一條消息須要1分鐘, 性能差的機器要10分鐘,那麼前面說到的負載均衡就會致使,差的嚴重影響效率.
咱們在LVS這類負載均衡是能夠設置權重,一樣消費者在接收消息時也能夠設置相應的功能,但不是權重,它比權重更人性化,它能夠保證一個消費者程序,同時只能保證1個信息在消費,固然也能夠設置同一時刻保證在消費2個信息
具體實現代碼以下:
生產者代碼不變:
1 #!/usr/bin/env python 2 import pika 3 import sys 4 5 connection = pika.BlockingConnection(pika.ConnectionParameters( 6 host='localhost')) 7 channel = connection.channel() 8 9 channel.queue_declare(queue='task_queue', durable=True) 10 11 message = ' '.join(sys.argv[1:]) or "Hello World!" 12 channel.basic_publish(exchange='', 13 routing_key='task_queue', 14 body=message, 15 properties=pika.BasicProperties( 16 delivery_mode = 2, # make message persistent 17 )) 18 print(" [x] Sent %r" % message) 19 connection.close()
消費者代碼加入channel.basic_qos(prefetch_count=1),代碼以下:
1 #!/usr/bin/env python 2 import pika 3 import time 4 5 connection = pika.BlockingConnection(pika.ConnectionParameters( 6 host='localhost')) 7 channel = connection.channel() 8 9 channel.queue_declare(queue='task_queue', durable=True) 10 print(' [*] Waiting for messages. To exit press CTRL+C') 11 12 def callback(ch, method, properties, body): 13 print(" [x] Received %r" % body) 14 time.sleep(body.count(b'.')) 15 print(" [x] Done") 16 ch.basic_ack(delivery_tag = method.delivery_tag) 17 18 channel.basic_qos(prefetch_count=1) #表示贊成時刻保證客戶端程序只處理一個消息 19 channel.basic_consume(callback, 20 queue='task_queue') 21 22 channel.start_consuming()
rabbitMQ高級用法
4、Publish\Subscribe(消息發佈\訂閱)
以前的例子都基本都是1對1的消息發送和接收,即消息只能發送到指定的queue裏,但有些時候你想讓你的消息被全部的Queue收到,相似廣播的效果,這時候就要用到exchange了,
Exchange在定義的時候是有類型的,以決定究竟是哪些Queue符合條件,能夠接收消息
fanout: 全部bind到此exchange的queue均可以接收消息
direct: 經過routingKey和exchange決定的那個惟一的queue能夠接收消息
topic:全部符合routingKey(此時能夠是一個表達式)的routingKey所bind的queue能夠接收消息,不是經過queue名來過濾,能夠監控關鍵字,具體用法
表達式符號說明:#表明一個或多個字符,*表明任何字符
例:#.a會匹配a.a,aa.a,aaa.a等
*.a會匹配a.a,b.a,c.a等
注:使用RoutingKey爲#,Exchange Type爲topic的時候至關於使用fanout
headers: 經過headers 來決定把消息發給哪些queue #這個用的很是少,官網上暫時沒有例子
exchange 的 fanout 類型舉例(廣播給全部bind的queue)html
消息publishernode
1 #!/usr/bin/env python3.5 2 #__author__:'ted.zhou' 3 ''' 4 使用pika模塊中的exchage的fanout類型,廣播發送消息,全部綁定了類型爲fanout的exchage的queue都會收到廣播信息,且發送端不要在指定發送路由routing_key 5 ''' 6 import pika 7 import sys 8 9 connection = pika.BlockingConnection(pika.ConnectionParameters( 10 host='localhost')) 11 channel = connection.channel() 12 13 channel.exchange_declare(exchange='logs',type='fanout') 14 # 這裏聲明瞭exchange,可是沒有聲明queue,由於exchange的fanout類型,默認是廣播給全部綁定了此exchange的隊列. 15 # 廣播發送端,就不必在發送的時候指定我是經過哪一個queue發送了.由於默認綁定這個exchange的queue都會路由.因此下面發送的時候routing=key=''爲空 16 17 18 message = ' '.join(sys.argv[1:]) or "info: Hello World!" # 若是執行此程序時後面沒有其餘參數,則默認發送內容爲"info: Hello World!" 19 20 channel.basic_publish(exchange='logs', 21 routing_key='', 22 body=message) 23 print(" [x] Sent %r" % message) 24 connection.close()
消息subscriber
1 #_*_coding:utf-8_*_ 2 __author__ = 'ted.zhou' 3 import pika 4 5 connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost')) 6 channel = connection.channel() 7 8 channel.exchange_declare(exchange='logs',type='fanout') # 這裏也是爲了不客戶端先啓動時沒法找到名爲logs的exchage,纔在客戶端進行聲明的 9 10 result = channel.queue_declare(exclusive=True) #不指定queue名字,rabbit會隨機分配一個名字,exclusive=True會在使用此queue的消費者斷開後,自動將queue刪除 11 queue_name = result.method.queue 12 13 channel.queue_bind(exchange='logs',queue=queue_name) # 將此exchage和queue綁定起來 14 15 print(' [*] Waiting for logs. To exit press CTRL+C') 16 17 def callback(ch, method, properties, body): 18 print(" [x] %r" % body) 19 20 channel.basic_consume(callback, 21 queue=queue_name, 22 no_ack=True) 23 24 channel.start_consuming()
有選擇的接收消息(exchange type=direct)
RabbitMQ還支持根據關鍵字發送,即:隊列綁定關鍵字,發送者將數據根據關鍵字發送到消息exchange,exchange根據 關鍵字 斷定應該將數據發送至指定隊列。
publisher
1 #!/usr/bin/evn python3.5 2 #__author__:"ted.zhou" 3 ''' 4 exchange類型爲direct,發送廣播消息,只有指定的隊列收到該消息 5 ''' 6 import pika 7 import sys 8 9 connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost')) 10 channel = connection.channel() 11 12 channel.exchange_declare(exchange='direct_logs',type='direct') 13 14 severity = sys.argv[1] if len(sys.argv) > 1 else 'info' 15 message = ' '.join(sys.argv[2:]) or 'Hello World!' 16 channel.basic_publish(exchange='direct_logs', 17 routing_key=severity, # 發送內容的關鍵字,千萬不要理解成隊列名,這個關鍵字在exchange=''時,表示隊列名.而當exchange不是空時,爲廣播的關鍵字 18 body=message) 19 print(" [x] Sent %r:%r" % (severity, message)) 20 connection.close()
subscriber
1 #!/usr/bin/env python3.5 2 #__author__:"ted.zhou" 3 ''' 4 direct類型的exchage接收指定類型的內容 5 ''' 6 import pika 7 import sys 8 9 connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost')) 10 channel = connection.channel() 11 12 channel.exchange_declare(exchange='direct_logs',type='direct') 13 14 result = channel.queue_declare(exclusive=True) #不指定queue名字,rabbit會隨機分配一個名字,exclusive=True會在使用此queue的消費者斷開後,自動將queue刪除 15 queue_name = result.method.queue # 獲取一個隨機名稱 16 17 severities = sys.argv[1:] # 執行此程序時,參數指接收到的內容的類別,記住不是隊列名 18 if not severities: # 若是沒指定參數,程序退出 19 sys.stderr.write("Usage: %s [info] [warning] [error]\n" % sys.argv[0]) 20 sys.exit(1) 21 22 for severity in severities: # 循環綁定exchange,隊列,內容類別(內容關鍵字) 23 channel.queue_bind(exchange='direct_logs', 24 queue=queue_name, 25 routing_key=severity) 26 27 print(' [*] Waiting for logs. To exit press CTRL+C') # 28 29 def callback(ch, method, properties, body): # 定義callback 30 print(" [x] %r:%r" % (method.routing_key, body)) 31 32 channel.basic_consume(callback, 33 queue=queue_name, 34 no_ack=True) 35 36 channel.start_consuming() # 開啓接收狀態
對direct類型代碼的執行測試
更細緻的消息過濾(exchange type=topic)python
其實direct 和 topic 差很少,都是能夠在運行客戶端時指定客戶端能夠得到的消息內容關鍵字。mysql
區別在於,topic支持相似正則的模式,好比我想得到mysql.error 和apache.*(全部apache.info,apache.error,apache.warnning等)sql
因此 我以爲若是你有須要根據關鍵字來接讓不一樣的客戶端接收不一樣的消息,直接用topic類型就好apache
publisher端代碼以下:
1 #!/usr/bin/env python3.5 2 #__author__="ted.zhou" 3 ''' 4 使用exchange的topic類型,支持客戶端接收時更細緻的過濾 5 ''' 6 import pika 7 import sys 8 9 connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost')) 10 channel = connection.channel() 11 12 channel.exchange_declare(exchange='topic_logs',type='topic') 13 14 routing_key = sys.argv[1] if len(sys.argv) > 1 else 'anonymous.info' 15 message = ' '.join(sys.argv[2:]) or 'Hello World!' 16 channel.basic_publish(exchange='topic_logs', 17 routing_key=routing_key, 18 body=message) 19 print(" [x] Sent %r:%r" % (routing_key, message)) 20 connection.close()
subscriber代碼以下
1 #!/usr/bin/env python3.5 2 __author__='ted.zhou' 3 ''' 4 使用exchange的topic類型,更精細的過濾消息內容 5 ''' 6 import pika 7 import sys 8 9 connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost')) 10 channel = connection.channel() 11 12 channel.exchange_declare(exchange='topic_logs',type='topic') 13 14 result = channel.queue_declare(exclusive=True) 15 queue_name = result.method.queue # 隨機產生一個queue名稱 16 17 binding_keys = sys.argv[1:] # 得到執行程序的參數 18 if not binding_keys: # 判斷是否有參數 19 sys.stderr.write("Usage: %s [binding_key]...\n" % sys.argv[0]) 20 sys.exit(1) 21 22 for binding_key in binding_keys: #循環綁定 exchange,queue,routing_key 23 channel.queue_bind(exchange='topic_logs', 24 queue=queue_name, 25 routing_key=binding_key) 26 27 print(' [*] Waiting for logs. To exit press CTRL+C') 28 29 def callback(ch, method, properties, body): # 定義callback函數 30 print(" [x] %r:%r" % (method.routing_key, body)) 31 32 channel.basic_consume(callback, 33 queue=queue_name, 34 no_ack=True) 35 36 channel.start_consuming()
對topic類型代碼的執行測試服務器
Remote procedure call (RPC)
RPC是遠程方法調用,經過客戶端發給服務端一個命令,服務端執行命令,把執行結果返回給客戶端
這個RPC和剛纔的rabbitMQ的幾個例子不同,以前幾個例子生產者只把消息發送給消費者,RPC不斷要把消息發給你,還要把你的結果反回來
那麼問題1來了,負責發送的一端又負責接收,代碼上實現接收不難,可是一個管道隊列怎樣保證發送方發送後,在接收的時候是遠程服務器返回來的消息.答案是發送的隊列和接收的隊列不一致,這樣就能夠保證消息的準確性.
那麼怎樣實現 發送命令的隊列 和接收返回消息的隊列不一致呢.就是發送命令的一端,在發送遠程命令的同時,把它將要用來接收返回信息的隊列名稱也告訴遠端服務器.這樣就實現了.
不要着急,pika中或者說rabbitMQ中發送信息時能夠設置信息屬性返回隊列的名稱.
那麼問題2來了,若是你向同一臺服務器發送了多個命令,也就是說客戶端執行一次程序,發送了多個命令,那麼遠端服務器收到屢次命令後,也進行操做並返回屢次的執行結果,那麼哪次命令對應哪次結果呢.
這時候咱們想到,不只要在發送信息時設置接收返回信息的隊列名稱的屬性.另外還要給這個發送的信息一個編號.當接收到返回信息時,讓遠程服務器在發送回來,call端在進行驗證.
放心,kipa或者說rabbitMQ一樣也支持給信息設置一個屬性ID
下面就看下具體實現代碼:
rabbitMQ_RPC_call端代碼:
1 #!/usr/bin/env python3.5 2 #__author__:"ted.zhou" 3 ''' 4 rabbitMQ實現RPC 5 ''' 6 import pika 7 import uuid 8 9 class FibonacciRpcClient(object): 10 11 def __init__(self): # 定義初始化方法 12 # 實例化鏈接 13 self.connection = pika.BlockingConnection(pika.ConnectionParameters( 14 host='localhost')) 15 # 實例化管道 16 self.channel = self.connection.channel() 17 18 # 隨機生成一個queue名稱 19 result = self.channel.queue_declare(exclusive=True) 20 self.callback_queue = result.method.queue 21 22 # 接收方法 23 self.channel.basic_consume(self.on_response, no_ack=True, 24 queue=self.callback_queue) 25 # 接收完,對信息的處理方法 26 def on_response(self, ch, method, props, body): 27 if self.corr_id == props.correlation_id: # 若是corr_id 等於 客戶端經過rabbitMQ 的self.callback_queue隊列發來的消息的props.correlation_id相等,說明接收到的信息是發送命令的結果 28 self.response = body # 把結果值付給self.response 屬性 29 30 # 31 def call(self, n): 32 self.response = None # 先設置response默認爲空 33 self.corr_id = str(uuid.uuid4()) # 用uuid隨機生成一個uuid值,用來記錄程序執行的惟一ID值,主要做用是保證發送和接收內容的一致性,好比你屢次執行這個程序,返回來多個值,那麼你從管道里取值時,固然要肯定這個值是哪次執行程序的結果 34 self.channel.basic_publish(exchange='', 35 routing_key='rpc_queue', 36 properties=pika.BasicProperties( 37 reply_to = self.callback_queue, # reply_to 參數,接收端可以獲取,這裏用做服務端返回信息時使用的queue 38 correlation_id = self.corr_id,), # 消息ID,這個是多條命令傳輸時,命令的返回結果和發送的ID保持一致 39 body=str(n)) # 發送的信息 40 while self.response is None: #這段代碼的意思是若是self.response值爲空,也就是說RPC_server沒有返回值,要不就是self.corr_id值不對,要不就是客戶端沒返回 41 self.connection.process_data_events() # 不斷的去這個queue裏接收,這個接收不是阻塞,一直去取.,那麼問題來了,若是你取的消息和你發送的self.corr_id值不一致,這個消息怎麼處理,是放回去,仍是刪掉.須要測試 42 return int(self.response) # 若是信息獲得反饋,則返回獲得的計算結果 43 44 fibonacci_rpc = FibonacciRpcClient() # 實例化Rpc客戶端 45 46 print(" [x] Requesting fib(30)") 47 response = fibonacci_rpc.call(30) # 傳入一個數,得到server端計算的斐波那契結果 48 print(" [.] Got %r" % response)
rabbitMQ_RPC_Server端代碼:
1 #!/usr/bin/env python3.5 2 #_*_coding:utf-8_*_ 3 __author__ = 'ted.zhou' 4 import pika 5 import time 6 # 實例化鏈接 7 connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost')) 8 9 # 實例化管道 10 channel = connection.channel() 11 12 # 聲明隊列,名稱爲rpc_queue 13 channel.queue_declare(queue='rpc_queue') 14 15 # 斐波那契計算函數 16 def fib(n): 17 if n == 0: 18 return 0 19 elif n == 1: 20 return 1 21 else: 22 return fib(n-1) + fib(n-2) 23 24 # 定義接收到消息後消息的處理方法 25 def on_request(ch, method, props, body): 26 n = int(body) # 接收到的字符串,因爲要進行斐波那契處理,換算成數字 27 28 print(" [.] fib(%s)" % n) 29 response = fib(n) # 調用斐波那契函數計算返回結果 30 31 # 這裏咱們看到ch參數的做用了,它至關於管道,對!對!對!這個ch就是管道實例,props就是properties聲明的屬性,這些屬性是是綁定到接收這個信息的管道上的 32 ch.basic_publish(exchange='', # 33 routing_key=props.reply_to, # 這裏當exchange=''空時,routing_key參數就是指queue名稱了,這個props.reply_to正是RPC的客戶端在發送命令時一同發送過來的. 34 properties=pika.BasicProperties(correlation_id = props.correlation_id), # 同時把correlation_id也發送回去,表明執行命令的記錄ID,這裏我要嘗試下若是返回去的值和RPC call端發送的值不同的狀況下,信息是否是不能返回 35 body=str(response)) #發送body,body就是斐波那契函數執行後返回的結果 36 ch.basic_ack(delivery_tag = method.delivery_tag) #這裏delivery_tag 目前還不知道是啥做用,老師視頻中說是消息持久化配置中接收端的代碼,可是我以爲確定不是 37 38 channel.basic_qos(prefetch_count=1) # 表示同一時刻保證程序只從rabbitMQ中獲取一個消息進行處理,在未處理完的狀況下,不會取第二個消息 39 channel.basic_consume(on_request, queue='rpc_queue') # 定義接收消息的方式 40 41 print(" [x] Awaiting RPC requests") 42 channel.start_consuming() # 開啓這個管道接收消息的方法
在項目中,將一些無需即時返回且耗時的操做提取出來,進行了異步處理,而這種異步處理的方式大大的節省了服務器的請求響應時間,從而提升了系統的吞吐量。負載均衡