消息(Message)是指在應用間傳送的數據。消息能夠很是簡單,好比只包含文本字符串,也能夠更復雜,可能包含嵌入對象。html
消息隊列(Message Queue)是一種應用間的通訊方式,消息發送後能夠當即返回,由消息系統來確保消息的可靠傳遞。消息發佈者只管把消息發佈到 MQ 中而不用管誰來取,消息使用者只管從 MQ 中取消息而不論是誰發佈的。這樣發佈者和使用者都不用知道對方的存在。redis
RabbitMQ 是一個由 Erlang 語言開發的 AMQP 的開源實現。RabbitMQ 最初起源於金融系統,用於在分佈式系統中存儲轉發消息,在易用性、擴展性、高可用性等方面表現不俗。具體特色包括:數據庫
可靠性(Reliability)
RabbitMQ 使用一些機制來保證可靠性,如持久化、傳輸確認、發佈確認。服務器
靈活的路由(Flexible Routing)
在消息進入隊列以前,經過 Exchange 來路由消息的。對於典型的路由功能,RabbitMQ 已經提供了一些內置的 Exchange 來實現。針對更復雜的路由功能,能夠將多個 Exchange 綁定在一塊兒,也經過插件機制實現本身的 Exchange 。網絡
消息集羣(Clustering)
多個 RabbitMQ 服務器能夠組成一個集羣,造成一個邏輯 Broker 。分佈式
高可用(Highly Available Queues)
隊列能夠在集羣中的機器上進行鏡像,使得在部分節點出問題的狀況下隊列仍然可用。ide
多種協議(Multi-protocol)
RabbitMQ 支持多種消息隊列協議,好比 STOMP、MQTT 等等。fetch
多語言客戶端(Many Clients)
RabbitMQ 幾乎支持全部經常使用語言,好比 Java、.NET、Ruby 等等。加密
管理界面(Management UI)
RabbitMQ 提供了一個易用的用戶界面,使得用戶能夠監控和管理消息 Broker 的許多方面。spa
跟蹤機制(Tracing)
若是消息異常,RabbitMQ 提供了消息跟蹤機制,使用者能夠找出發生了什麼。
插件機制(Plugin System)
RabbitMQ 提供了許多插件,來從多方面進行擴展,也能夠編寫本身的插件。
RabbitMQ的安裝能夠參考博客:
https://www.cnblogs.com/ericli-ericli/p/5902270.html
全部 MQ 產品從模型抽象上來講都是同樣的過程:
消費者(consumer)訂閱某個隊列。生產者(producer)建立消息,而後發佈到隊列(queue)中,最後將消息發送到監聽的消費者。
import pika,time connection = pika.BlockingConnection(pika.ConnectionParameters( 'localhost')) channel = connection.channel() channel.queue_declare(queue='hello1') def callback(ch, method, properties, body): print('>>',ch,method,properties) time.sleep(30) print(" [x] Received %r" % body) channel.basic_qos(prefetch_count=1) channel.basic_consume(callback, queue='hello1', no_ack=True ) print(' [*] Waiting for messages. To exit press CTRL+C') channel.start_consuming()
發送端
import pika connection = pika.BlockingConnection(pika.ConnectionParameters( 'localhost')) channel = connection.channel() channel.queue_declare(queue='hello1') channel.basic_publish(exchange='', routing_key='hello1', body='Hello World!') connection.close()
PS:遠程鏈接rabbitmq server的話,須要配置權限
可參考博客:https://www.cnblogs.com/alex3714/articles/5248247.html
在這種工做模式下,若是啓動了多個接收端,RabbitMQ會默認把p發的消息依次分發給各個消費者(c),至關於輪詢。
可是上面這樣寫的話,隊列裏面的消息若是沒有被所有取出來,並且同時RabbitMQ又down機了,那這樣的話隊列裏面的消息就都沒有了,因此咱們須要作一個消息的持久化。
import pika,time connection = pika.BlockingConnection(pika.ConnectionParameters( 'localhost')) channel = connection.channel() channel.queue_declare(queue='hello',durable=True) def callback(ch, method, properties, body): print('>>',ch,method,properties) time.sleep(30) print(" [x] Received %r" % body) channel.basic_consume(callback, queue='hello', ) print(' [*] Waiting for messages. To exit press CTRL+C') channel.start_consuming()
import pika connection = pika.BlockingConnection(pika.ConnectionParameters( 'localhost')) channel = connection.channel() # 聲明queue channel.queue_declare(queue='hello',durable=True) channel.basic_publish(exchange='', routing_key='hello', body='Hello World!', properties=pika.BasicProperties( delivery_mode=2, # make message persistent )) print(" [x] Sent 'Hello World!'") connection.close()
若是Rabbit只管按順序把消息發到各個消費者身上,不考慮消費者負載的話,極可能出現,一個機器配置不高的消費者那裏堆積了不少消息處理不完,同時配置高的消費者卻一直很輕鬆。爲解決此問題,能夠在各個消費者端,配置perfetch=1,意思就是告訴RabbitMQ在我這個消費者當前消息還沒處理完的時候就不要再給我發新消息了。
channel.basic_qos(prefetch_count=1)
import pika import time connection = pika.BlockingConnection(pika.ConnectionParameters( host='localhost')) channel = connection.channel() channel.queue_declare(queue='task_queue', durable=True) print(' [*] Waiting for messages. To exit press CTRL+C') def callback(ch, method, properties, body): print(" [x] Received %r" % body) time.sleep(60) print(" [x] Done") ch.basic_ack(delivery_tag=method.delivery_tag) channel.basic_qos(prefetch_count=1) channel.basic_consume(callback, queue='task_queue') channel.start_consuming()
import pika connection = pika.BlockingConnection(pika.ConnectionParameters( host='localhost')) channel = connection.channel() channel.queue_declare(queue='task_queue', durable=True) message = "Hello World!" channel.basic_publish(exchange='', routing_key='task_queue', body=message, properties=pika.BasicProperties( delivery_mode=2, )) connection.close()
以前的例子都基本都是1對1的消息發送和接收,即消息只能發送到指定的queue裏,但有些時候你想讓你的消息被全部的Queue收到,相似廣播的效果,這時候就要用到exchange了。
fanout: 全部bind到此exchange的queue均可以接收消息
import pika connection = pika.BlockingConnection(pika.ConnectionParameters( host='localhost')) channel = connection.channel() channel.exchange_declare(exchange='logs', exchange_type='fanout') message = "Hello World!" channel.basic_publish(exchange='logs', routing_key='', body=message) print(" [x] Sent %r" % message) connection.close()
import pika connection = pika.BlockingConnection(pika.ConnectionParameters( host='localhost')) channel = connection.channel() channel.exchange_declare(exchange='logs',exchange_type='fanout') result = channel.queue_declare(exclusive=True) # 不指定queue名字,rabbit會隨機分配一個名字,exclusive=True會在使用此queue的消費者斷開後,自動將queue刪除 queue_name = result.method.queue channel.queue_bind(exchange='logs', queue=queue_name) print(' [*] Waiting for logs. To exit press CTRL+C') def callback(ch, method, properties, body): print(" [x] %r" % body) channel.basic_consume(callback, queue=queue_name, no_ack=True) channel.start_consuming()
direct: 經過routingKey和exchange決定的那個惟一的queue能夠接收消息
import pika import sys connection = pika.BlockingConnection(pika.ConnectionParameters( host='localhost')) channel = connection.channel() channel.exchange_declare(exchange='direct_logs', exchange_type='direct') severity = sys.argv[1] if len(sys.argv) > 1 else 'info' message = ' '.join(sys.argv[2:]) or 'Hello World!' channel.basic_publish(exchange='direct_logs', routing_key=severity, body=message) print(" [x] Sent %r:%r" % (severity, message)) connection.close()
import pika import sys connection = pika.BlockingConnection(pika.ConnectionParameters( host='localhost')) channel = connection.channel() channel.exchange_declare(exchange='direct_logs', exchange_type='direct') result = channel.queue_declare(exclusive=True) queue_name = result.method.queue severities = sys.argv[1:] if not severities: sys.stderr.write("Usage: %s info warning error\n" % sys.argv[0]) sys.exit(1) for severity in severities: channel.queue_bind(exchange='direct_logs', queue=queue_name, routing_key=severity) def callback(ch, method, properties, body): print(" [x] %r:%r" % (method.routing_key, body)) channel.basic_consume(callback, queue=queue_name, no_ack=True) channel.start_consuming()
topic:全部符合routingKey(此時能夠是一個表達式)的routingKey所bind的queue能夠接收消息
import pika import sys connection = pika.BlockingConnection(pika.ConnectionParameters( host='localhost')) channel = connection.channel() channel.exchange_declare(exchange='topic_logs', exchange_type='topic') routing_key = sys.argv[1] if len(sys.argv) > 1 else 'anonymous.info' message = ' '.join(sys.argv[2:]) or 'Hello World!' channel.basic_publish(exchange='topic_logs', routing_key=routing_key, body=message) print(" [x] Sent %r:%r" % (routing_key, message)) connection.close()
import pika import sys connection = pika.BlockingConnection(pika.ConnectionParameters( host='localhost')) channel = connection.channel() channel.exchange_declare(exchange='topic_logs', exchange_type='topic') result = channel.queue_declare(exclusive=True) queue_name = result.method.queue binding_keys = sys.argv[1:] if not binding_keys: sys.stderr.write("Usage: %s [binding_key]...\n" % sys.argv[0]) sys.exit(1) for binding_key in binding_keys: channel.queue_bind(exchange='topic_logs', queue=queue_name, routing_key=binding_key) def callback(ch, method, properties, body): print(" [x] %r:%r" % (method.routing_key, body)) channel.basic_consume(callback, queue=queue_name, no_ack=True) channel.start_consuming()
PS:表達式符號說明:#表明收全部,*表明任何字符。(在終端運行,運行命令後面跟上符合條件的topic)