<!DOCTYPE html>
RabbitMQ 消息隊列
python裏有threading QUEUE 只用於線程間交互,進程QUEUE 用於父進程與子進程或者是兄弟進程
RabbitMQ採用消息輪詢的方式發送消息。一個一個的給每一個消費者
應用之間使用socket實現數據共享
連接幾個應用的中間商著名的有:
html
import pika
connection=pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel=connection.channel()
channel.queue_declare(queue='隊列名')
channel.basic_publish(exchange='',routing_key='隊列名',body='Hello World!')
connection.close()
import pika
connection=pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel=connection.channel()
channel.queue_declare(queue='隊列名')
def callback(ch,method,properties,body):
print(ch,method,properties,body)
print("[x] Received %r"%body)
ch.basic_ack(delivery_tag=method.delivery_tag)手動確認消息處理完,否則消息一直不銷燬
channel.basic_consume('定義的函數名',queue='隊列名',no_ack=True)
channel.start_consuming()
採用輪詢的方式,依次的發給每一個消費者
生產者會等待消費者肯定處理完消息的回覆纔會銷燬消息。
當消息執行到一半的時候,消費者斷開,消息會保留髮送給下一個消費者處理
python
消息必須等消費者手動肯定後,才銷燬ch.basicack(deliverytag=method.delivery_tag)手動確認消息處理完,否則消息一直不銷燬
當RabbitMQ服務中止,服務裏的消息隊列會銷燬。
若是想保持消息隊列的持久化,必須在聲明隊列的時候設置,durable=True。這樣當RabbitMQ服務斷開再重啓,消息隊列仍是存在,消息會銷燬
channel.queue_declare(queue='隊列名',durable=True)
消息也持久化
channel.basic_publish(exchange='',routing_key='隊列名',body='Hello World!',properties=pika.BasicProperties(delivery_mode=2))
mysql
消費者端加channel.basicqos(prefetchcount=1),加過這句話實現了,不是按順序分發,而是看哪個是空閒的,才分發給空閒的消費者消息。多大本事幹多少活。
廣播是生產者發消息,全部消費者都收到。
用exchange實現廣播。
fanout:全部bind到此exchange的queue均可以接收消息
direct:經過routingkey和exchange決定的那個惟一的queue能夠接收消息
topic:全部符合routingkey(此時能夠是一個表達式)的routingkey所bind的queue能夠接受消息。
git
設置管道的時候設置
channel.exchange_declare(exchange='logs',type='fanout')
不聲明queue
生產者:
github
import pika
connection=pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel=connection.channel()
channel.exchange_declare(exchange='logs',exchange_type='fanout')
channel.basic_publish(exchange='logs',routing_key='',body='Hello World!')
connection.close()
import pika
connection=pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel=connection.channel()
channel.exchange_declare(exchange='logs',exchange_type='fanout')
result=channel.queue<em>declare(exclusive=True)exclusive排他的,單獨的,生成隨街queue,綁定再exchange上 queue</em>name=result.method.queue<br /> channel.queue<em>bind(exchange='logs',queue=queue</em>name)綁定轉發器,接受轉發器裏的消息,exchange與隨機生成的queue綁定 </p> <p><code>6. 定義函數,標準的處理消息的函數都會帶下面四個參數ch管道聲明的對象地址,method包含你要發消息給誰的信息,properties</code> def callback(ch,method,properties,body): print(ch,method,properties,body) print("[x] Received %r"%body)<br /> ch.basic<em>ack(delivery</em>tag=method.delivery<em>tag)手動確認消息處理完,否則消息一直不銷燬
channel.basic_consume('定義的函數名',queue='隊列名',no_ack=True)
channel.start_consuming()
生產者:
web
import pika
connection=pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel=connection.channel()
channel.exchange_declare(exchange='direct_logs',exchange_type='direct')
severity=sys.argv[1] if len(sys.argv)>1 else 'info'
message=' '.join(sys.argv[2:]) or 'hello world!'
channel.basic_publish(exchange='direct_logs',
routing_key=severity,#相似指定queue
body=message)
connection.close()
import pika
connection=pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel=connection.channel()
channel.exchange_declare(exchange='direct_logs',exchange_type='direct')
result=channel.queue<em>declare(exclusive=True)exclusive排他的,單獨的,生成隨街queue,綁定再exchange上 queue</em>name=result.method.queue<br /> severities = sys.argv[1:] if not severities: sys.stderr.write("Usage: %s [info] [warning] [error]\n" % sys.argv[0]) sys.exit(1)</p> <p>for severity in severities: channel.queue<em>bind(exchange='direct</em>logs',queue=queue<em>name,routing</em>key=severity)</p> <p><code>6. 定義函數,標準的處理消息的函數都會帶下面四個參數ch管道聲明的對象地址,method包含你要發消息給誰的信息,properties</code> def callback(ch,method,properties,body): print(ch,method,properties,body) print("[x] Received %r"%body)<br /> ch.basic<em>ack(delivery</em>tag=method.delivery<em>tag)手動確認消息處理完,否則消息一直不銷燬
channel.basic_consume('定義的函數名',queue='隊列名',no_ack=True)
channel.start_consuming()
更細緻的消息過濾,包括應用程序,#收全部消息,.info接受帶有.info的消息,mysql.接受帶mysql的消息
生產者
import pika import sys</p> <p>connection = pika.BlockingConnection(pika.ConnectionParameters( host='localhost')) channel = connection.channel()</p> <p>channel.exchange<em>declare(exchange='topic</em>logs', type='topic')</p> <p>routing<em>key = sys.argv[1] if len(sys.argv) > 1 else 'anonymous.info' message = ' '.join(sys.argv[2:]) or 'Hello World!' channel.basic</em>publish(exchange='topic<em>logs', routing<em>key=routing</em>key, body=message) print(" [x] Sent %r:%r" % (routing</em>key, message)) connection.close() <code>消費者</code> import pika import sys</p> <p>connection = pika.BlockingConnection(pika.ConnectionParameters( host='localhost')) channel = connection.channel()</p> <p>channel.exchange<em>declare(exchange='topic</em>logs', type='topic')</p> <p>result = channel.queue<em>declare(exclusive=True) queue</em>name = result.method.queue</p> <p>binding<em>keys = sys.argv[1:] if not binding</em>keys: sys.stderr.write("Usage: %s [binding_key]...\n" % sys.argv[0]) sys.exit(1)</p> <p>for binding<em>key in binding</em>keys: channel.queue<em>bind(exchange='topic<em>logs', queue=queue</em>name, routing</em>key=binding_key)</p> <p>print(' [*] Waiting for logs. To exit press CTRL+C') </p> <p>def callback(ch, method, properties, body): print(" [x] %r:%r" % (method.routing_key, body)) </p> <p>channel.basic<em>consume(callback, queue=queue</em>name, no_ack=True) </p> <p>channel.start_consuming() <br />
redis
即便生產者又是消費者
startcosuming爲阻塞模式,rpc不用阻塞,rpc是執行一會這個再去執行另外一個。processdata_events()非阻塞方法,能收到消息就收,沒有消息不阻塞繼續往下執行
服務器端
```sql
author = 'Alex Li'
import pika
import time
connection = pika.BlockingConnection(pika.ConnectionParameters(
host='localhost'))mongodb
channel = connection.channel()
channel.queuedeclare(queue='rpcqueue')
def fib(n):
if n == 0:
return 0
elif n == 1:
return 1
else:
return fib(n-1) + fib(n-2)
def on_request(ch, method, props, body):
n = int(body)
print(" [.] fib(%s)" % n)
response = fib(n)
ch.basic_publish(exchange='',
routing_key=props.reply_to,
properties=pika.BasicProperties(correlation_id = <br/>
props.correlation_id),
body=str(response))
ch.basic_ack(delivery_tag = method.delivery_tag)
channel.basicqos(prefetchcount=1) channel.basic_consume(onrequest, queue='rpcqueue')
print(" [x] Awaiting RPC requests") channel.start_consuming() 客戶端
import pika import uuid
class FibonacciRpcClient(object): def init(self): self.connection = pika.BlockingConnection(pika.ConnectionParameters( host='localhost'))
self.channel = self.connection.channel()
result = self.channel.queue_declare(exclusive=True)
self.callback_queue = result.method.queue
self.channel.basic_consume(self.on_response, no_ack=True,
queue=self.callback_queue)
def on_response(self, ch, method, props, body):
if self.corr_id == props.correlation_id:
self.response = body
def call(self, n):
self.response = None
self.corr_id = str(uuid.uuid4())
self.channel.basic_publish(exchange='',
routing_key='rpc_queue',
properties=pika.BasicProperties(
replyto = self.callbackqueue,#客戶端發送消息是,把接收返回消息的管道也告訴給服務器端 correlationid = self.corrid,#用於判斷服務器端返回的結果和個人請求是不是同一條。目的就是爲了客戶端能夠同時發兩條消息,當服務器端返回結果時,須要有判斷關於哪條請求的結果 ), body=str(n)) while self.response is None: self.connection.processdataevents() return int(self.response)
fibonacci_rpc = FibonacciRpcClient()
print(" [x] Requesting fib(30)") response = fibonacci_rpc.call(30) print(" [.] Got %r" % response) ```
緩存中間商,用socket,例如:mongodb,redis,memcache
* 鏈接方式
* 鏈接池
* 操做
- String 操做
- Hash 操做
- List 操做
- Set 操做
- Sort Set 操做
* 管道
* 發佈訂閱