生產者javascript
1 import pika 2 import sys 3 4 username = 'wt' #指定遠程rabbitmq的用戶名密碼 5 pwd = '111111' 6 user_pwd = pika.PlainCredentials(username, pwd) 7 s_conn = pika.BlockingConnection(pika.ConnectionParameters('192.168.1.240', credentials=user_pwd))#建立鏈接 8 chan = s_conn.channel() #在鏈接上建立一個頻道 9 10 chan.queue_declare(queue='hello') #聲明一個隊列,生產者和消費者都要聲明一個相同的隊列,用來防止萬一某一方掛了,另外一方能正常運行 11 chan.basic_publish(exchange='', #交換機 12 routing_key='hello',#路由鍵,寫明將消息發往哪一個隊列,本例是將消息發往隊列hello 13 body='hello world')#生產者要發送的消息 14 print("[生產者] send 'hello world") 15 16 s_conn.close()#當生產者發送完消息後,可選擇關閉鏈接 17 18 19 輸出: 20 [生產者] send 'hello world
消費者html
import pika username = 'wt'#指定遠程rabbitmq的用戶名密碼 pwd = '111111' user_pwd = pika.PlainCredentials(username, pwd) s_conn = pika.BlockingConnection(pika.ConnectionParameters('192.168.1.240', credentials=user_pwd))#建立鏈接 chan = s_conn.channel()#在鏈接上建立一個頻道 chan.queue_declare(queue='hello')#聲明一個隊列,生產者和消費者都要聲明一個相同的隊列,用來防止萬一某一方掛了,另外一方能正常運行 def callback(ch,method,properties,body): #定義一個回調函數,用來接收生產者發送的消息 print("[消費者] recv %s" % body) chan.basic_consume(callback, #調用回調函數,從隊列裏取消息 queue='hello',#指定取消息的隊列名 no_ack=True) #取完一條消息後,不給生產者發送確認消息,默認是False的,即 默認給rabbitmq發送一個收到消息的確認,通常默認便可 print('[消費者] waiting for msg .') chan.start_consuming()#開始循環取消息 輸出: [消費者] waiting for msg . [消費者] recv b'hello world'
(1)rabbitmq循環調度,將消息循環發送給不一樣的消費者,如:消息1,3,5發送給消費者1;消息2,4,6發送給消費者2。
(2)消息確認機制,爲了確保一個消息不會丟失,RabbitMQ支持消息的確認 , 一個 ack(acknowlegement) 是從消費者端發送一個確認去告訴RabbitMQ 消息已經接收了、處理了,RabbitMQ能夠釋放並刪除掉了。若是一個消費者死掉了(channel關閉、connection關閉、或者TCP鏈接斷開了)而沒有發送ack,RabbitMQ 就會認爲這個消息沒有被消費者處理,並會從新發送到生產者的隊列裏,若是同時有另一個消費者在線,rabbitmq將會將消息很快轉發到另一個消費者中。 那樣的話你就能確保雖然一個消費者死掉,但消息不會丟失。java
這個是沒有超時的,當消費方(consumer)死掉後RabbitMQ會從新轉發消息,即便處理這個消息須要很長很長時間也沒有問題。消息的 acknowlegments 默認是打開的,在前面的例子中關閉了: no_ack = True . 如今刪除這個標識 而後 發送一個 acknowledgment。python
(3)消息持久化,將消息寫入硬盤中。 RabbitMQ不容許你從新定義一個已經存在、但屬性不一樣的queue。須要標記消息爲持久化的 - 要經過設置 delivery_mode 屬性爲 2來實現。json
消息持久化的注意點:緩存
標記消息爲持久化並不能徹底保證消息不會丟失,儘管已經告訴RabbitMQ將消息保存到磁盤,但RabbitMQ接收到的消息在尚未保存的時候,仍然有一個短暫的時間窗口。RabbitMQ不會對每一個消息都執行同步 --- 可能只是保存到緩存cache尚未寫入到磁盤中。所以這個持久化保證並非很強,但這比咱們簡單的任務queue要好不少,若是想要很強的持久化保證,可使用 publisher confirms。app
(4)公平調度。在一個消費者未處理完一個消息以前不要分發新的消息給它,而是將這個新消息分發給另外一個不是很忙的消費者進行處理。爲了解決這個問題咱們能夠在消費者代碼中使用 channel.basic.qos ( prefetch_count = 1 ),將消費者設置爲公平調度。函數
生產者測試
1 import pika 2 import sys 3 4 username = 'wt' #指定遠程rabbitmq的用戶名密碼 5 pwd = '111111' 6 user_pwd = pika.PlainCredentials(username, pwd) 7 s_conn = pika.BlockingConnection(pika.ConnectionParameters('192.168.1.240', credentials=user_pwd))#建立鏈接 8 channel = s_conn.channel() #在鏈接上建立一個頻道 9 10 channel.queue_declare(queue='task_queue', durable=True) #建立一個新隊列task_queue,設置隊列持久化,注意不要跟已存在的隊列重名,不然有報錯 11 12 message = "Hello World" 13 channel.basic_publish(exchange='', 14 routing_key='worker',#寫明將消息發送給隊列worker 15 body=message, #要發送的消息 16 properties=pika.BasicProperties(delivery_mode=2,)#設置消息持久化,將要發送的消息的屬性標記爲2,表示該消息要持久化 17 ) 18 print(" [生產者] Send %r " % message)
消費者fetch
1 import pika 2 import time 3 4 username = 'wt'#指定遠程rabbitmq的用戶名密碼 5 pwd = '111111' 6 user_pwd = pika.PlainCredentials(username, pwd) 7 s_conn = pika.BlockingConnection(pika.ConnectionParameters('192.168.1.240', credentials=user_pwd))#建立鏈接 8 channel = s_conn.channel()#在鏈接上建立一個頻道 9 10 channel.queue_declare(queue='task_queue', durable=True) #建立一個新隊列task_queue,設置隊列持久化,注意不要跟已存在的隊列重名,不然有報錯 11 12 13 def callback(ch, method, properties, body): 14 print(" [消費者] Received %r" % body) 15 time.sleep(1) 16 print(" [消費者] Done") 17 ch.basic_ack(delivery_tag=method.delivery_tag)# 接收到消息後會給rabbitmq發送一個確認 18 19 channel.basic_qos(prefetch_count=1) # 消費者給rabbitmq發送一個信息:在消費者處理完消息以前不要再給消費者發送消息 20 21 channel.basic_consume(callback, 22 queue='worker', 23 #這裏就不用再寫no_ack=False了 24 ) 25 channel.start_consuming()
exchange:交換機。生產者不是將消息發送給隊列,而是將消息發送給交換機,由交換機決定將消息發送給哪一個隊列。因此exchange必須準確知道消息是要送到哪一個隊列,仍是要被丟棄。所以要在exchange中給exchange定義規則,全部的規則都是在exchange的類型中定義的。
exchange有4個類型:direct, topic, headers ,fanout
以前,咱們並無講過exchange,可是咱們仍然能夠將消息發送到隊列中。這是由於咱們用的是默認exchange.也就是說以前寫的:exchange='',空字符串表示默認的exchange。
以前的代碼結構:
1 channel.basic_publish(exchange='', 2 routing_key='hello', 3 body=message)
exchange = '參數'
參數表示exchange 的名字,空字符串是默認或者沒有exchange。消息被路由到某隊列的根據是:routing_key.。若是routing_key的值存在的話。
如今,咱們能夠用咱們本身命名的exchange來代替默認的exchange。
1 channel.basic_publish(exchange='logs',#本身命名exchange爲logs 2 routing_key='', 3 body=message)
廣播類型,生產者將消息發送給全部消費者,若是某個消費者沒有收到當前消息,就再也收不到了(消費者就像收音機)
生產者:(能夠用做日誌收集系統)
1 import pika 2 import sys 3 username = 'wt' #指定遠程rabbitmq的用戶名密碼 4 pwd = '111111' 5 user_pwd = pika.PlainCredentials(username, pwd) 6 s_conn = pika.BlockingConnection(pika.ConnectionParameters('192.168.1.240', credentials=user_pwd))#建立鏈接 7 channel = s_conn.channel() #在鏈接上建立一個頻道 8 channel.exchange_declare(exchange='logs', 9 type='fanout')#建立一個fanout(廣播)類型的交換機exchange,名字爲logs。 10 11 message = "info: Hello World!" 12 channel.basic_publish(exchange='logs',#指定交換機exchange爲logs,這裏只須要指定將消息發給交換機logs就能夠了,不須要指定隊列,由於生產者消息是發送給交換機的。 13 routing_key='',#在fanout類型中,綁定關鍵字routing_key必須忽略,寫空便可 14 body=message) 15 print(" [x] Sent %r" % message) 16 connection.close()
消費者:
1 import pika 2 import sys 3 4 username = 'wt' #指定遠程rabbitmq的用戶名密碼 5 pwd = '111111' 6 user_pwd = pika.PlainCredentials(username, pwd) 7 s_conn = pika.BlockingConnection(pika.ConnectionParameters('192.168.1.240', credentials=user_pwd))#建立鏈接 8 channel = s_conn.channel() #在鏈接上建立一個頻道 9 10 channel.exchange_declare(exchange='logs', 11 type='fanout')#消費者需再次聲明一個exchange 以及類型。 12 13 result = channel.queue_declare(exclusive=True)#建立一個隊列,exclusive=True(惟一性)表示在消費者與rabbitmq斷開鏈接時,該隊列會自動刪除掉。 14 queue_name = result.method.queue#由於rabbitmq要求新隊列名必須是與現存隊列名不一樣,因此爲保證隊列的名字是惟一的,method.queue方法會隨機建立一個隊列名字,如:‘amq.gen-JzTY20BRgKO-HjmUJj0wLg‘。 15 16 channel.queue_bind(exchange='logs', 17 queue=queue_name)#將交換機logs與接收消息的隊列綁定。表示生產者將消息發給交換機logs,logs將消息發給隨機隊列queue,消費者在隨機隊列queue中取消息 18 19 print(' [消費者] Waiting for logs. To exit press CTRL+C') 20 21 def callback(ch, method, properties, body): 22 print(" [消費者] %r" % body) 23 24 channel.basic_consume(callback,#調用回調函數從queue中取消息 25 queue=queue_name, 26 no_ack=True)#設置爲消費者不給rabbitmq回覆確認。 27 28 channel.start_consuming()#循環等待接收消息。
這樣,開啓多個消費者後,會同時從生產者接收相同的消息。
關鍵字類型。功能:交換機根據生產者消息中含有的不一樣的關鍵字將消息發送給不一樣的隊列,消費者根據不一樣的關鍵字從不一樣的隊列取消息
生產者:不用建立對列
1 import pika 2 import sys 3 4 username = 'wt' #指定遠程rabbitmq的用戶名密碼 5 pwd = '111111' 6 user_pwd = pika.PlainCredentials(username, pwd) 7 s_conn = pika.BlockingConnection(pika.ConnectionParameters('192.168.1.240', credentials=user_pwd))#建立鏈接 8 channel = s_conn.channel() #在鏈接上建立一個頻道 9 10 channel.exchange_declare(exchange='direct_logs', 11 type='direct')#建立一個交換機並聲明exchange的類型爲:關鍵字類型,表示該交換機會根據消息中不一樣的關鍵字將消息發送給不一樣的隊列 12 13 severity = 'info'#severity這裏只能爲一個字符串,這裏爲‘info’代表本生產者只將下面的message發送到info隊列中,消費者也只能從info隊列中接收info消息 14 message = 'Hello World!' 15 channel.basic_publish(exchange='direct_logs',#指明用於發佈消息的交換機、關鍵字 16 routing_key=severity,#綁定關鍵字,即將message與關鍵字info綁定,明確將消息發送到哪一個關鍵字的隊列中。 17 body=message) 18 print(" [生產者] Sent %r:%r" % (severity, message)) 19 connection.close()
消費者:
1 import pika 2 import sys 3 4 username = 'wt' #指定遠程rabbitmq的用戶名密碼 5 pwd = '111111' 6 user_pwd = pika.PlainCredentials(username, pwd) 7 s_conn = pika.BlockingConnection(pika.ConnectionParameters('192.168.1.240', credentials=user_pwd))#建立鏈接 8 channel = s_conn.channel() #在鏈接上建立一個頻道 9 10 channel.exchange_declare(exchange='direct_logs', 11 type='direct')#建立交換機,命名爲‘direct_logs’並聲明exchange類型爲關鍵字類型。 12 13 result = channel.queue_declare(exclusive=True)#建立隨機隊列,當消費者與rabbitmq斷開鏈接時,這個隊列將自動刪除。 14 queue_name = result.method.queue#分配隨機隊列的名字。 15 16 severities = ['info','err']#能夠接收綁定關鍵字info或err的消息,列表中也能夠只有一個 17 if not severities:#判斷若是輸入有誤,輸出用法 18 sys.stderr.write("Usage: %s [info] [warning] [error]\n" % sys.argv[0]) 19 sys.exit(1) 20 21 for severity in severities: 22 channel.queue_bind(exchange='direct_logs',#將交換機、隊列、關鍵字綁定在一塊兒,使消費者只能根據關鍵字從不一樣隊列中取消息 23 queue=queue_name, 24 routing_key=severity)#該消費者綁定的關鍵字。 25 26 print(' [消費者] Waiting for logs. To exit press CTRL+C') 27 28 def callback(ch, method, properties, body):#定義回調函數,接收消息 29 print(" [消費者] %r:%r" % (method.routing_key, body)) 30 31 channel.basic_consume(callback, 32 queue=queue_name, 33 no_ack=True)#消費者接收消息後,不給rabbimq回執確認。 34 35 channel.start_consuming()#循環等待消息接收。
發送到一個 topics交換機的消息,它的 routing_key不能是任意的 -- 它的routing_key必須是一個用小數點分割的單詞列表。 這個字符能夠是任何單詞,可是一般是一些指定意義的字符。好比:「stock.usd.nyse","nyse.vmw","quick.orange.rabbit". 這裏能夠是你想要路由鍵的任意字符。最高限制爲255字節。
生產者與消費者的routing_key必須在同一個表單中。 Topic交換的背後的邏輯相似直接交換(direct) -- 包含特定關鍵字的消息將會分發到全部匹配的關鍵字隊列中。而後有兩個重要的特殊狀況:
綁定鍵值:
> * (星) 可代替一個單詞
> # (井) 可代替0個或多個單詞
生產者:
1 import pika 2 import sys 3 4 username = 'wt' #指定遠程rabbitmq的用戶名密碼 5 pwd = '111111' 6 user_pwd = pika.PlainCredentials(username, pwd) 7 s_conn = pika.BlockingConnection(pika.ConnectionParameters('192.168.1.240', credentials=user_pwd))#建立鏈接 8 channel = s_conn.channel() #在鏈接上建立一個頻道 9 10 channel.exchange_declare(exchange='topic_logs', 11 type='topic') # 建立模糊匹配類型的exchange。。 12 13 routing_key = '[warn].kern'##這裏關鍵字必須爲點號隔開的單詞,以便於消費者進行匹配。引伸:這裏能夠作一個判斷,判斷產生的日誌是什麼級別,而後產生對應的routing_key,使程序能夠發送多種級別的日誌 14 message = 'Hello World!' 15 channel.basic_publish(exchange='topic_logs',#將交換機、關鍵字、消息進行綁定 16 routing_key=routing_key, # 綁定關鍵字,將隊列變成[warn]日誌的專屬隊列 17 body=message) 18 print(" [x] Sent %r:%r" % (routing_key, message)) 19 s_conn.close()
消費者:
1 import pika 2 import sys 3 4 username = 'wt'#指定遠程rabbitmq的用戶名密碼 5 pwd = '111111' 6 user_pwd = pika.PlainCredentials(username, pwd) 7 s_conn = pika.BlockingConnection(pika.ConnectionParameters('192.168.1.240', credentials=user_pwd))#建立鏈接 8 channel = s_conn.channel()#在鏈接上建立一個頻道 9 10 channel.exchange_declare(exchange='topic_logs', 11 type='topic') # 聲明exchange的類型爲模糊匹配。 12 13 result = channel.queue_declare(exclusive=True) # 建立隨機一個隊列當消費者退出的時候,該隊列被刪除。 14 queue_name = result.method.queue # 建立一個隨機隊列名字。 15 16 binding_keys = ['[warn]', 'info.*']#綁定鍵。‘#’匹配全部字符,‘*’匹配一個單詞。這裏列表中能夠爲一個或多個條件,能經過列表中字符匹配到的消息,消費者均可以取到 17 if not binding_keys: 18 sys.stderr.write("Usage: %s [binding_key]...\n" % sys.argv[0]) 19 sys.exit(1) 20 21 for binding_key in binding_keys:#經過循環綁定多個「交換機-隊列-關鍵字」,只要消費者在rabbitmq中能匹配到與關鍵字相應的隊列,就從那個隊列裏取消息 22 channel.queue_bind(exchange='topic_logs', 23 queue=queue_name, 24 routing_key=binding_key) 25 26 print(' [*] Waiting for logs. To exit press CTRL+C') 27 28 29 def callback(ch, method, properties, body): 30 print(" [x] %r:%r" % (method.routing_key, body)) 31 32 33 channel.basic_consume(callback, 34 queue=queue_name, 35 no_ack=True)#不給rabbitmq發送確認 36 37 channel.start_consuming()#循環接收消息
Remote procedure call
消息屬性
AMQP協議在一個消息中預先定義了一個包含14個屬性的集合。大部分屬性不多用到,如下幾種除外:
> delivery_mode: 標記一個消息爲持久的(值爲2)或者 瞬時的(其它值), 你須要記住這個屬性(在第二課時用到過)
> content_type : 用來描述 MIME 類型的編碼 ,好比咱們常用的 JSON 編碼,設置這個屬性就很是好實現: application/json
> reply_to:reply_to沒有特別的意義,只是一個普通的變量名,只是它一般用來命名一個 callback 隊列
> correlation_id : 用來關聯RPC的請求與應答。關聯id的做用:當在一個隊列中接收了一個返回,咱們並不清楚這個結果時屬於哪一個請求的,這樣當correlation_id屬性使用後,咱們爲每一個請求設置一個惟一值,這個值就是關聯id。這樣,請求會有一個關聯id,該請求的返回結果也有一個相同的關聯id。而後當咱們從callback隊列中接收到一個消息後,咱們查看一下這個關聯,基於這個咱們就能將請求和返回進行匹配。若是咱們看到一個未知的correlation_id值,咱們能夠直接丟棄這個消息 -- 它是不屬於咱們的請求。
callback隊列
咱們的RPC將會這樣執行:
> 當客戶端啓動後,它建立一個匿名的惟一的回調隊列
> 對一個RPC請求, 客戶端發送一個消息包含兩個屬性: reply_to (用來設置回調隊列)和 correlation_id(用來爲每一個請求設置一個惟一標識)
> 請求發送到 rpc_queue隊列
> RPC worker( 服務端) 在那個隊列中等待請求,當一個請求出現後,服務端就執行一個job並將結果消息發送給客戶端,使用reply_to字段中的隊列
> 客戶端在callback 隊列中等待數據, 當一個消息出現後,檢查這個correlation_id屬性,若是和請求中的值匹配將返回給應用
[python]
服務端代碼詳單簡單:
> (4) 和往常同樣咱們創建一個鏈接並定義一個隊列
> (11) 咱們定義了 斐波納契 函數,假定輸入的都是合法正數
> (19) 咱們定義了一個回調的 basic_consume, RPC服務的核心。 當收到請求後執行這個函數並返回結果
> (32) 咱們可能會執行多個服務端,爲了在多個服務端上均勻的分佈負荷,咱們須要這是 prefetch_count。
[python]
客戶端代碼稍微複雜些:
> (7) 咱們創建一個鏈接,通道並定義一個專門的’callback‘隊列用來接收回復
> (16) 咱們訂閱了「callback」隊列,所以咱們可以接收 RPC 的返回結果
> (18) ’on_response' 在每一個返回中執行的回調是一個簡單的job, 對每一個返回消息將檢查是否correlation_id使咱們須要查找的那個ID,若是是,將保存結果到 self.response 並終端consuming循環
> (23) 下一步,咱們定義咱們的main方法 - 執行實際的RPC請求
> (24) 在這方法中,首先咱們生產一個惟一的 correlatin_id 號並保存 -- 'on_response"回調函數將用着號碼來匹配發送和接收的消息值
> (25) 下一步,發佈請求信息,使用兩個屬性: reply_to 和 correlation_id
> (32) 這一步咱們能夠坐等結果的返回
>(33) 最後咱們返回結果給用戶
或者 看下邊一篇好理解一點
前面的例子都有個共同點,就是發送端發送消息出去後沒有結果返回。若是隻是單純發送消息,固然沒有問題了,可是在實際中,經常會須要接收端將收到的消息進行處理以後,返回給發送端。
處理方法描述:發送端在發送信息前,產生一個接收消息的臨時隊列,該隊列用來接收返回的結果。其實在這裏接收端、發送端的概念已經比較模糊了,由於發送端也一樣要接收消息,接收端一樣也要發送消息,因此這裏筆者使用另外的示例來演示這一過程。
示例內容:假設有一個控制中心和一個計算節點,控制中心會將一個天然數N發送給計算節點,計算節點將N值加1後,返回給控制中心。這裏用center.py模擬控制中心,compute.py模擬計算節點。
計算節點的代碼比較簡單,值得一提的是,原來的接收方法都是直接將消息打印出來,這邊進行了加一的計算,並將結果發送回控制中心。
上例代碼定義了接收返回數據的隊列和處理方法,而且在發送請求的時候將該隊列賦值給reply_to,在計算節點代碼中就是經過這個參數來獲取返回隊列的。
打開兩個終端,一個運行代碼python compute.py,另一個終端運行center.py,若是執行成功,應該就能看到效果了。
筆者在測試的時候,出了些小問題,就是在center.py發送消息時沒有指明返回隊列,結果compute.py那邊在計算完結果要發回數據時報錯,提示routing_key不存在,再次運行也報錯。用rabbitmqctl list_queues查看隊列,發現compute_queue隊列有1條數據,每次從新運行compute.py的時候,都會從新處理這條數據。後來使用/etc/init.d/rabbitmq-server restart從新啓動下rabbitmq就ok了。
參考文章:http://www.rabbitmq.com/tutorials/tutorial-six-python.html