RabbitMQ是在兩個獨立得python程序,或其餘語言交互時使用。
RabbitMQ:erlang語言 開發的。
Python中鏈接RabbitMQ的模塊:pika 、Celery(分佈式任務隊列) 、haigha
能夠維護不少得隊列
RabbitMQ 教程官網:http://www.rabbitmq.com/getstarted.html
幾個概念說明:html
Broker:簡單來講就是消息隊列服務器實體。
Exchange:消息交換機,他制定消息按什麼規則,路由到哪一個隊列。
Queue:消息隊列載體,每一個消息都會被投入一個或多個隊列。
Binding:綁定,他的做用就是把exchange和queue按照路由規則綁定起來。
Routing Key:路由關鍵字,exchange根據這個關鍵字進行消息投遞。
vhost:虛擬主機,一個broker裏能夠設多個vhost,用做不一樣用戶得權限分離。
producer:消息生產者,就是投遞消息得程序。
consumer:消息消費者,就是接受消息得程序。
channel:消息通道,在客戶端得每一個鏈接裏。能夠創建多個channel,每一個channel表明一個會話任務。python
ubuntu系統mysql
install rabbitmq-server # 直接搞定
---
centos系統
1)Install Erlangsql
1)Install Erlang # For EL5: rpm -Uvh http://download.fedoraproject.org/pub/epel/5/i386/epel-release-5-4.noarch.rpm # For EL6: rpm -Uvh http://download.fedoraproject.org/pub/epel/6/i386/epel-release-6-8.noarch.rpm # For EL7: rpm -Uvh http://download.fedoraproject.org/pub/epel/7/x86_64/e/epel-release-7-8.noarch.rpm yum install erlang
2)Install RabbitMQ Serverapache
rpm --import https://www.rabbitmq.com/rabbitmq-release-signing-key.asc
yum install rabbitmq-server-3.6.5-1.noarch.rpmubuntu
3)use RabbitMQ Servercentos
chkconfig rabbitmq-server on
service rabbitmq-server stop/start服務器
發送端producerdom
import pika # 創建一個實例 connection = pika.BlockingConnection( pika.ConnectionParameters('localhost',5672) # 默認端口5672,可不寫 ) # 聲明一個管道,在管道里發消息 channel = connection.channel() # 在管道里聲明queue channel.queue_declare(queue='hello') # RabbitMQ a message can never be sent directly to the queue, it always needs to go through an exchange. channel.basic_publish(exchange='', routing_key='hello', # queue名字 body='Hello World!') # 消息內容 print(" [x] Sent 'Hello World!'") connection.close() # 隊列關閉
接收端consumersocket
import pika import time # 創建實例 connection = pika.BlockingConnection(pick.ConnectionParameters('hocalhost')) # 聲明管道 channel = connection.channel() # 爲何聲明瞭一個‘hello’隊列? # 若是肯定已經聲明瞭,能夠不聲明。可是你不知道那個機器先運行,因此要聲明兩次。 # 一般是先運行消費者 channel.queue_declare(queue='hello') def callback(ch, method, properties, body):#四個參數爲標準格式 print("[x]Received %r"%body) time.sleep(15) ch.basic_ack(delivry_tay = method.delivery_tay)# 告訴生成者,消息處理完成 channel.basic_consume(# 消費消息 callback, # 若是收到消息,就調用callback函數來處理消息 queue='hello',# 要消費的隊列 # no ack=True # 消息確認 # 通常不寫。宕機則生產者檢測到發給其餘消費者 ) print('[*]Waiting for messages.To exit press CTRL+C') channel.start_consuming() # 開始消費消息
一個生產者多個消費者
採用輪詢機制;把消息依次分發
假如消費者處理雄安熙須要15秒,若是宕機了,那這個消息處理尚未處理完,怎麼處理?
(能夠模擬消費端斷了,分別註釋和不註釋no_ack=True看一下)
沒有回覆,就表明消息沒有處理完,
上面的效果消費端斷了就轉到另一個消費端去了,可是生產者怎麼知道消費端斷了呢?
由於生產者和消費者是經過socket鏈接的,socket斷了,就說明消費端斷開了。
上面的模式只是依次分發,實際狀況是機器配置不同。怎麼設置相似權重的操做?
RabbitMQ怎麼辦呢,RabbitMQ作了簡單的處理就能實現公平的分發。
就是RabbitMQ給消費者發消息的時候檢測下消費者裏的消息數量,若是超過指定值(好比1條),就不給你發了。
只須要在消費者端,channel.basic_consume前加上就能夠了。
channel.basic_qos(prefetch_count=1)# 相似權重,按能力分發,若是有一個消息,就不在給你發
channel.basic_consume( # 消費消息
rabbitmqctl list_queues # 查看當前queue數量及queue裏消息數量
若是隊列裏還有消息,RabbitMQ服務端宕機了呢?消息還在不在?
把RabbitMQ服務重啓,看一下消息在不在。
上面的狀況下,宕機了,消息就久了,下面看看如何把消息持久化。
每次聲明隊列的時候,都加上durable,注意每一個隊列都得寫,客戶端、服務端聲明的時候都得寫。
在管道里聲明queue
channel.queue_declare(queue='hello2', durable=True)
測試結果發現,只是把隊列持久化了,但隊列裏的消息沒了。
durable的做用只是把隊列持久化。離消息持久話還差一步:
發送端發送消息時,加上properties
import pika import sys connection = pika.BlockingConnection(pika.ConnectionParameters( host='localhost')) channel = connection.channel() channel.exchange_declare(exchange='direct_logs', type='direct') # 重要程度級別,這裏默認定義爲 info severity = sys.argv[1] if len(sys.argv) > 1 else 'info' message = ' '.join(sys.argv[2:]) or 'Hello World!' channel.basic_publish(exchange='direct_logs', routing_key=severity, body=message) print(" [x] Sent %r:%r" % (severity, message)) connection.close()
接收端subscriber
import pika import sys connection = pika.BlockingConnection(pika.ConnectionParameters( host='localhost')) channel = connection.channel() channel.exchange_declare(exchange='topic_logs', type='topic') routing_key = sys.argv[1] if len(sys.argv) > 1 else 'anonymous.info' message = ' '.join(sys.argv[2:]) or 'Hello World!' channel.basic_publish(exchange='topic_logs', routing_key=routing_key, body=message) print(" [x] Sent %r:%r" % (routing_key, message)) connection.close()
接收端 consumer
import pika import time connection = pika.BlockingConnection(pika.ConnectionParameters( 'localhost')) channel = connection.channel() channel.queue_declare(queue='hello2', durable=True) def callback(ch, method, properties, body): print(" [x] Received %r" % body) time.sleep(10) ch.basic_ack(delivery_tag = method.delivery_tag) # 告訴生產者,消息處理完成 channel.basic_qos(prefetch_count=1) # 相似權重,按能力分發,若是有一個消息,就不在給你發 channel.basic_consume( # 消費消息 callback, # 若是收到消息,就調用callback queue='hello2', # no_ack=True # 通常不寫,處理完接收處理結果。宕機則發給其餘消費者 ) print(' [*] Waiting for messages. To exit press CTRL+C') channel.start_consuming()
前面的效果都是一對一發,若是作一個廣播效果可不能夠,這時候就要用到exchange了
exchange必須精確的知道收到的消息要發給誰。exchange的類型決定了怎麼處理,
類型有如下幾種:
fanout: 全部綁定到此exchange的queue均可以接收消息
direct: 經過routingKey和exchange決定的那個惟一的queue能夠接收消息
topic: 全部符合routingKey(此時能夠是一個表達式)的routingKey所bind的queue能夠接收消息
須要queue和exchange綁定,由於消費者不是和exchange直連的,消費者是連在queue上,queue綁定在exchange上,消費者只會在queu裏度消息
|------------------------| | /—— queue <—|—> consumer1 producer —|—exchange1 <bind | \ | \—— queue <—|—> consumer2 \-|-exchange2 …… | |------------------------|
發送端 publisher 發佈、廣播
import pika import sys connection = pika.BlockingConnection(pika.ConnectionParameters( host='localhost')) channel = connection.channel() # 注意:這裏是廣播,不須要聲明queue channel.exchange_declare(exchange='logs', # 聲明廣播管道 type='fanout') # message = ' '.join(sys.argv[1:]) or "info: Hello World!" message = "info: Hello World!" channel.basic_publish(exchange='logs', routing_key='', # 注意此處空,必須有 body=message) print(" [x] Sent %r" % message) connection.close()
接收端 subscriber 訂閱
import pika connection = pika.BlockingConnection(pika.ConnectionParameters( host='localhost')) channel = connection.channel() channel.exchange_declare(exchange='logs', type='fanout') # 不指定queue名字,rabbit會隨機分配一個名字,exclusive=True會在使用此queue的消費者斷開後,自動將queue刪除 result = channel.queue_declare(exclusive=True) # 獲取隨機的queue名字 queue_name = result.method.queue print("random queuename:", queue_name) channel.queue_bind(exchange='logs', # queue綁定到轉發器上 queue=queue_name) print(' [*] Waiting for logs. To exit press CTRL+C') def callback(ch, method, properties, body): print(" [x] %r" % body) channel.basic_consume(callback, queue=queue_name, no_ack=True) channel.start_consuming()
注意:廣播,是實時的,收不到就沒了,消息不會存下來,相似收音機。
接收者能夠過濾消息,只收我想要的消息
發送端publisher
import pika import sys connection = pika.BlockingConnection(pika.ConnectionParameters( host='localhost')) channel = connection.channel() channel.exchange_declare(exchange='direct_logs', type='direct') # 重要程度級別,這裏默認定義爲 info severity = sys.argv[1] if len(sys.argv) > 1 else 'info' message = ' '.join(sys.argv[2:]) or 'Hello World!' channel.basic_publish(exchange='direct_logs', routing_key=severity, body=message) print(" [x] Sent %r:%r" % (severity, message)) connection.close()
接收端subscriber
import pika import sys connection = pika.BlockingConnection(pika.ConnectionParameters( host='localhost')) channel = connection.channel() channel.exchange_declare(exchange='direct_logs', type='direct') result = channel.queue_declare(exclusive=True) queue_name = result.method.queue # 獲取運行腳本全部的參數 severities = sys.argv[1:] if not severities: sys.stderr.write("Usage: %s [info] [warning] [error]\n" % sys.argv[0]) sys.exit(1) # 循環列表去綁定 for severity in severities: channel.queue_bind(exchange='direct_logs', queue=queue_name, routing_key=severity) print(' [*] Waiting for logs. To exit press CTRL+C') def callback(ch, method, properties, body): print(" [x] %r:%r" % (method.routing_key, body)) channel.basic_consume(callback, queue=queue_name, no_ack=True) channel.start_consuming()
運行接收端,指定接收級別的參數,例:
python direct_sonsumer.py info warning
python direct_sonsumer.py warning error
好比把error中,apache和mysql的分別或取出來
發送端publisher
import pika import sys connection = pika.BlockingConnection(pika.ConnectionParameters( host='localhost')) channel = connection.channel() channel.exchange_declare(exchange='topic_logs', type='topic') routing_key = sys.argv[1] if len(sys.argv) > 1 else 'anonymous.info' message = ' '.join(sys.argv[2:]) or 'Hello World!' channel.basic_publish(exchange='topic_logs', routing_key=routing_key, body=message) print(" [x] Sent %r:%r" % (routing_key, message)) connection.close()
接收端 subscriber
import pika import sys connection = pika.BlockingConnection(pika.ConnectionParameters( host='localhost')) channel = connection.channel() channel.exchange_declare(exchange='direct_logs', type='direct') result = channel.queue_declare(exclusive=True) queue_name = result.method.queue # 獲取運行腳本全部的參數 severities = sys.argv[1:] if not severities: sys.stderr.write("Usage: %s [info] [warning] [error]\n" % sys.argv[0]) sys.exit(1) # 循環列表去綁定 for severity in severities: channel.queue_bind(exchange='direct_logs', queue=queue_name, routing_key=severity) print(' [*] Waiting for logs. To exit press CTRL+C') def callback(ch, method, properties, body): print(" [x] %r:%r" % (method.routing_key, body)) channel.basic_consume(callback, queue=queue_name, no_ack=True) channel.start_consuming()
運行接收端,指定接收哪些消息,例:
python topic_sonsumer.py *.info python topic_sonsumer.py *.error mysql.* python topic_sonsumer.py '#' # 接收全部消息 # 接收全部的 logs run: # python receive_logs_topic.py "#" # To receive all logs from the facility "kern": # python receive_logs_topic.py "kern.*" # Or if you want to hear only about "critical" logs: # python receive_logs_topic.py "*.critical" # You can create multiple bindings: # python receive_logs_topic.py "kern.*" "*.critical" # And to emit a log with a routing key "kern.critical" type: # python emit_log_topic.py "kern.critical" "A critical kernel error"
不知道你有沒有發現,上面的流都是單向的,若是遠程的機器執行完返回結果,就實現不了了。
若是返回,這種模式叫什麼呢,RPC(遠程過程調用),snmp就是典型的RPC
RabbitMQ能不能返回呢,怎麼返回呢?既是發送端又是接收端。
可是接收端返回消息怎麼返回?能夠發送到發過來的queue裏麼?不能夠。
返回時,再創建一個queue,把結果發送新的queue裏
爲了服務端返回的queue不寫死,在客戶端給服務端發指令的的時候,同時帶一條消息說,你結果返回給哪一個queue
RPC client
import pika import uuid import time class FibonacciRpcClient(object): def __init__(self): self.connection = pika.BlockingConnection(pika.ConnectionParameters( host='localhost')) self.channel = self.connection.channel() result = self.channel.queue_declare(exclusive=True) self.callback_queue = result.method.queue self.channel.basic_consume(self.on_response, # 只要一收到消息就調用on_response no_ack=True, queue=self.callback_queue) # 收這個queue的消息 def on_response(self, ch, method, props, body): # 必須四個參數 # 若是收到的ID和本機生成的相同,則返回的結果就是我想要的指令返回的結果 if self.corr_id == props.correlation_id: self.response = body def call(self, n): self.response = None # 初始self.response爲None self.corr_id = str(uuid.uuid4()) # 隨機惟一字符串 self.channel.basic_publish( exchange='', routing_key='rpc_queue', # 發消息到rpc_queue properties=pika.BasicProperties( # 消息持久化 reply_to = self.callback_queue, # 讓服務端命令結果返回到callback_queue correlation_id = self.corr_id, # 把隨機uuid同時發給服務器 ), body=str(n) ) while self.response is None: # 當沒有數據,就一直循環 # 啓動後,on_response函數接到消息,self.response 值就不爲空了 self.connection.process_data_events() # 非阻塞版的start_consuming() # print("no msg……") # time.sleep(0.5) # 收到消息就調用on_response return int(self.response) if __name__ == '__main__': fibonacci_rpc = FibonacciRpcClient() print(" [x] Requesting fib(7)") response = fibonacci_rpc.call(7) print(" [.] Got %r" % response)
RPC server
import pika import time def fib(n): if n == 0: return 0 elif n == 1: return 1 else: return fib(n-1) + fib(n-2) def on_request(ch, method, props, body): n = int(body) print(" [.] fib(%s)" % n) response = fib(n) ch.basic_publish( exchange='', # 把執行結果發回給客戶端 routing_key=props.reply_to, # 客戶端要求返回想用的queue # 返回客戶端發過來的correction_id 爲了讓客戶端驗證消息一致性 properties=pika.BasicProperties(correlation_id = props.correlation_id), body=str(response) ) ch.basic_ack(delivery_tag = method.delivery_tag) # 任務完成,告訴客戶端 if __name__ == '__main__': connection = pika.BlockingConnection(pika.ConnectionParameters( host='localhost')) channel = connection.channel() channel.queue_declare(queue='rpc_queue') # 聲明一個rpc_queue , channel.basic_qos(prefetch_count=1) # 在rpc_queue裏收消息,收到消息就調用on_request channel.basic_consume(on_request, queue='rpc_queue') print(" [x] Awaiting RPC requests") channel.start_consuming()