以前的幾篇文章介紹了一下RabbitMQ的概念以及環境的搭建和配置,有了RabbitMQ環境就能夠基於其實現一些特殊的任務場景了。RabbitMQ官方有個很好的Tutorials基本覆蓋了RabbitMQ的各中常見應用場景,現以代碼加註釋的方式以其Python客戶端pika爲例簡單介紹以下。更詳盡的信息可參閱:http://www.rabbitmq.com/getstarted.html 。html
以前的幾篇文章:
RabbitMQ概念及環境搭建(一)單節點安裝與配置
RabbitMQ概念及環境搭建(二)RabbitMQ Broker管理
RabbitMQ概念及環境搭建(三)RabbitMQ cluster
RabbitMQ概念及環境搭建(四)RabbitMQ High Availability
RabbitMQ概念及環境搭建(五)與web的整合
python
RabbitMQ是一個消息代理,從「生產者」接收消息並傳遞消息至「消費者」,期間可根據規則路由、緩存、持久化消息。「生產者」也即message發送者如下簡稱P,相對應的「消費者」乃message接收者如下簡稱C,message經過queue由P到C,queue存在於RabbitMQ,可存儲儘量多的message,多個P可向同一queue發送message,多個C可從同一個queue接收message。
web
應用場景1-「Hello Word」緩存
一個P向queue發送一個message,一個C從該queue接收message並打印。
併發
send.py
producer,鏈接至RabbitMQ Server,聲明隊列,發送message,關閉鏈接,退出。函數
- import pika
-
- connection = pika.BlockingConnection(pika.ConnectionParameters(
- host='localhost'))
- channel = connection.channel()
-
- channel.queue_declare(queue='hello', durable=True)
-
- channel.basic_publish(exchange='',
- routing_key='hello',
- body='Hello World!')
-
- print " [x] Sent 'Hello World!'"
-
- connection.close()
receive.py
consumer,鏈接至RabbitMQ Server,聲明隊列,接收消息並進行處理這裏爲打印出消息,退出。測試
- import pika
-
- connection = pika.BlockingConnection(pika.ConnectionParameters(
- host='localhost'))
- channel = connection.channel()
-
- channel.queue_declare(queue='hello')
-
- print ' [*] Waiting for messages. To exit press CTRL+C'
-
- def callback(ch, method, properties, body):
- print " [x] Received %r" % (body,)
-
- channel.basic_consume(callback,
- queue='hello',
- no_ack=True)
-
- channel.start_consuming()
測試fetch
- python send.py
- python receive.py
應用場景2-work queuesui
將耗時的消息處理經過隊列分配給多個consumer來處理,咱們稱此處的consumer爲worker,咱們將此處的queue稱爲Task Queue,其目的是爲了不資源密集型的task的同步處理,也即當即處理task並等待完成。相反,調度task使其稍後被處理。也即把task封裝進message併發送到task queue,worker進程在後臺運行,從task queue取出task並執行job,若運行了多個worker,則task可在多個worker間分配。spa

new_task.py
創建鏈接,聲明隊列,發送能夠模擬耗時任務的message,斷開鏈接、退出。
- import pika
- import sys
-
- connection = pika.BlockingConnection(pika.ConnectionParameters(
- host='localhost'))
- channel = connection.channel()
-
- channel.queue_declare(queue='task_queue', durable=True)
-
- message = ' '.join(sys.argv[1:]) or "Hello World!"
-
- channel.basic_publish(exchange='',
- routing_key='task_queue',
- body=message,
- properties=pika.BasicProperties(
- delivery_mode = 2,
- ))
- print " [x] Sent %r" % (message,)
- connection.close()
worker.py
創建鏈接,聲明隊列,不斷的接收message,處理任務,進行確認。
- import pika
- import time
-
-
- connection = pika.BlockingConnection(pika.ConnectionParameters(
- host='localhost'))
- channel = connection.channel()
-
- channel.queue_declare(queue='task_queue', durable=True)
-
- print ' [*] Waiting for messages. To exit press CTRL+C'
-
- def callback(ch, method, properties, body):
- print " [x] Received %r" % (body,)
- time.sleep( body.count('.') )
- print " [x] Done"
-
- ch.basic_ack(delivery_tag = method.delivery_tag)
-
- channel.basic_qos(prefetch_count=1)
-
- channel.basic_consume(callback,
- queue='task_queue')
-
- channel.start_consuming()
測試
- python new_task.py "A very hard task which takes two seconds.."
- python worker.py
應用場景3-Publish/Subscribe
在應用場景2中一個message(task)僅被傳遞給了一個comsumer(worker)。如今咱們設法將一個message傳遞給多個consumer。這種模式被稱爲publish/subscribe。此處以一個簡單的日誌系統爲例進行說明。該系統包含一個log發送程序和一個log接收並打印的程序。由log發送者發送到queue的消息能夠被全部運行的log接收者接收。所以,咱們能夠運行一個log接收者直接在屏幕上顯示log,同時運行另外一個log接收者將log寫入磁盤文件。

receive_logs.py
日誌消息接收者:創建鏈接,聲明exchange,將exchange與queue進行綁定,開始不停的接收log並打印。
- import pika
-
- connection = pika.BlockingConnection(pika.ConnectionParameters(
- host='localhost'))
- channel = connection.channel()
-
- channel.exchange_declare(exchange='logs',
- type='fanout')
-
- result = channel.queue_declare(exclusive=True)
-
- queue_name = result.method.queue
-
- channel.queue_bind(exchange='logs',
- queue=queue_name)
-
- print ' [*] Waiting for logs. To exit press CTRL+C'
-
- def callback(ch, method, properties, body):
- print " [x] %r" % (body,)
-
- channel.basic_consume(callback,
- queue=queue_name,
- no_ack=True)
-
- channel.start_consuming()
emit_log.py
日誌消息發送者:創建鏈接,聲明fanout類型的exchange,經過exchage向queue發送日誌消息,消息被廣播給全部接收者,關閉鏈接,退出。
-
- import pika
- import sys
-
- connection = pika.BlockingConnection(pika.ConnectionParameters(
- host='localhost'))
- channel = connection.channel()
-
- channel.exchange_declare(exchange='logs',
- type='fanout')
-
- message = ' '.join(sys.argv[1:]) or "info: Hello World!"
-
- channel.basic_publish(exchange='logs',
- routing_key='',
- body=message)
-
- print " [x] Sent %r" % (message,)
-
- connection.close()
測試
- python receive_logs.py
- python emit_log.py "info: This is the log message"
應用場景4-Routing
應用場景3中構建了簡單的log系統,能夠將log message廣播至多個receiver。如今咱們將考慮只把指定的message類型發送給其subscriber,好比,只把error message寫到log file而將全部log message顯示在控制檯。

receive_logs_direct.py
log message接收者:創建鏈接,聲明direct類型的exchange,聲明queue,使用提供的參數做爲routing_key將queue綁定到exchange,開始循環接收log message並打印。
- import pika
- import sys
-
- connection = pika.BlockingConnection(pika.ConnectionParameters(
- host='localhost'))
- channel = connection.channel()
-
- channel.exchange_declare(exchange='direct_logs',
- type='direct')
-
- result = channel.queue_declare(exclusive=True)
- queue_name = result.method.queue
-
- severities = sys.argv[1:]
- if not severities:
- print >> sys.stderr, "Usage: %s [info] [warning] [error]" % (sys.argv[0],)
- sys.exit(1)
-
- for severity in severities:
-
-
-
-
-
- channel.queue_bind(exchange='direct_logs',
- queue=queue_name,
- routing_key=severity)
-
- print ' [*] Waiting for logs. To exit press CTRL+C'
-
- def callback(ch, method, properties, body):
- print " [x] %r:%r" % (method.routing_key, body,)
-
- channel.basic_consume(callback,
- queue=queue_name,
- no_ack=True)
-
- channel.start_consuming()
emit_log_direct.py
log message發送者:創建鏈接,聲明direct類型的exchange,生成併發送log message到exchange,關閉鏈接,退出。
-
- import pika
- import sys
-
- connection = pika.BlockingConnection(pika.ConnectionParameters(
- host='localhost'))
- channel = connection.channel()
-
- channel.exchange_declare(exchange='direct_logs',
- type='direct')
-
- severity = sys.argv[1] if len(sys.argv) > 1 else 'info'
- message = ' '.join(sys.argv[2:]) or 'Hello World!'
-
- channel.basic_publish(exchange='direct_logs',
- routing_key=severity,
- body=message)
-
- print " [x] Sent %r:%r" % (severity, message)
- connection.close()
測試:
python receive_logs_direct.py info
python emit_log_direct.py info "The message"
應用場景5-topic
應用場景4中改進的log系統中用direct類型的exchange替換應用場景3中的fanout類型exchange實現將不一樣的log message發送給不一樣的subscriber(也即分別經過不一樣的routing_key將queue綁定到exchange,這樣exchange即可將不一樣的message根據message內容路由至不一樣的queue)。但仍然存在限制,不能根據多個規則路由消息,好比接收者要麼只能收error類型的log message要麼只能收info類型的message。若是咱們不只想根據log的重要級別如info、warning、error等來進行log message路由還想同時根據log message的來源如auth、cron、kern來進行路由。爲了達到此目的,須要topic類型的exchange。topic類型的exchange中routing_key中能夠包含兩個特殊字符:「*」用於替代一個詞,「#」用於0個或多個詞。

receive_logs_topic.py
log message接收者:創建鏈接,聲明topic類型的exchange,聲明queue,根據程序參數構造routing_key,根據routing_key將queue綁定到exchange,循環接收並處理message。
- import pika
- import sys
-
- connection = pika.BlockingConnection(pika.ConnectionParameters(
- host='localhost'))
- channel = connection.channel()
-
- channel.exchange_declare(exchange='direct_logs',
- type='direct')
-
- result = channel.queue_declare(exclusive=True)
- queue_name = result.method.queue
-
- severities = sys.argv[1:]
- if not severities:
- print >> sys.stderr, "Usage: %s [info] [warning] [error]" % (sys.argv[0],)
- sys.exit(1)
-
- for severity in severities:
-
-
-
-
-
- channel.queue_bind(exchange='direct_logs',
- queue=queue_name,
- routing_key=severity)
-
- print ' [*] Waiting for logs. To exit press CTRL+C'
-
- def callback(ch, method, properties, body):
- print " [x] %r:%r" % (method.routing_key, body,)
-
- channel.basic_consume(callback,
- queue=queue_name,
- no_ack=True)
-
- channel.start_consuming()
emit_log_topic.py
log message發送者:創建鏈接、聲明topic類型的exchange、根據程序參數構建routing_key和要發送的message,以構建的routing_key將message發送給topic類型的exchange,關閉鏈接,退出。
- import pika
- import sys
-
- connection = pika.BlockingConnection(pika.ConnectionParameters(
- host='localhost'))
- channel = connection.channel()
-
- channel.exchange_declare(exchange='topic_logs',
- type='topic')
-
- routing_key = sys.argv[1] if len(sys.argv) > 1 else 'anonymous.info'
- message = ' '.join(sys.argv[2:]) or 'Hello World!'
-
- channel.basic_publish(exchange='topic_logs',
- routing_key=routing_key,
- body=message)
-
- print " [x] Sent %r:%r" % (routing_key, message)
- connection.close()
測試:
- python receive_logs_topic.py "*.rabbit"
- python emit_log_topic.py red.rabbit Hello
應用場景6-PRC
在應用場景2中描述瞭如何使用work queue將耗時的task分配到不一樣的worker中。可是,若是咱們task是想在遠程的計算機上運行一個函數並等待返回結果呢。這根場景2中的描述是一個徹底不一樣的故事。這一模式被稱爲遠程過程調用。如今,咱們將構建一個RPC系統,包含一個client和可擴展的RPC server,經過返回斐波那契數來模擬RPC service。

rpc_server.py
RPC server:創建鏈接,聲明queue,定義了一個返回指定數字的斐波那契數的函數,定義了一個回調函數在接收到包含參數的調用請求後調用本身的返回斐波那契數的函數並將結果發送到與接收到message的queue相關聯的queue,並進行確認。開始接收調用請求並用回調函數進行請求處理。
- import pika
-
- connection = pika.BlockingConnection(pika.ConnectionParameters(
- host='localhost'))
- channel = connection.channel()
-
- channel.queue_declare(queue='rpc_queue')
-
- def fib(n):
- if n == 0:
- return 0
- elif n == 1:
- return 1
- else:
- return fib(n-1) + fib(n-2)
-
- def on_request(ch, method, props, body):
-
- n = int(body)
-
- print " [.] fib(%s)" % (n,)
-
- response = fib(n)
-
-
-
-
-
-
- ch.basic_publish(exchange='',
- routing_key=props.reply_to,
- properties=pika.BasicProperties(correlation_id = \
- props.correlation_id),
- body=str(response))
- ch.basic_ack(delivery_tag = method.delivery_tag)
-
- channel.basic_qos(prefetch_count=1)
-
- channel.basic_consume(on_request, queue='rpc_queue')
-
- print " [x] Awaiting RPC requests"
-
- channel.start_consuming()
rpc_client.py
RPC client:遠程過程調用發起者:定義了一個類,類中初始化到RabbitMQ Server的鏈接、聲明回調queue、開始在回調queue上等待接收響應、定義了在回調queue上接收到響應後的處理函數on_response根據響應關聯的correlation_id屬性做出響應、定義了調用函數並在其中向調用queue發送包含correlation_id等屬性的調用請求、初始化一個client實例,以30爲參數發起遠程過程調用。
- import pika
- import uuid
-
- class FibonacciRpcClient(object):
- def __init__(self):
-
- self.connection = pika.BlockingConnection(pika.ConnectionParameters(
- host='localhost'))
-
- self.channel = self.connection.channel()
-
-
- result = self.channel.queue_declare(exclusive=True)
- self.callback_queue = result.method.queue
-
-
-
-
-
- self.channel.basic_consume(self.on_response, no_ack=True,
- queue=self.callback_queue)
-
-
-
-
- def on_response(self, ch, method, props, body):
- if self.corr_id == props.correlation_id:
- self.response = body
-
- def call(self, n):
-
- self.response = None
- self.corr_id = str(uuid.uuid4())
-
-
-
-
- self.channel.basic_publish(exchange='',
- routing_key='rpc_queue',
- properties=pika.BasicProperties(
- reply_to = self.callback_queue,
- correlation_id = self.corr_id,
- ),
-
- body=str(n))
-
- while self.response is None:
- self.connection.process_data_events()
-
- return int(self.response)
-
- fibonacci_rpc = FibonacciRpcClient()
-
- print " [x] Requesting fib(30)"
- response = fibonacci_rpc.call(30)
- print " [.] Got %r" % (response,)
測試:
- python rpc_server.py
- python rpc_client.py