RabbitMQ隊列

爲啥要使用MQ

以常見的訂單系統爲例,用戶點擊【下單】按鈕以後的業務邏輯可能包括:扣減庫存、生成相應單據、發紅包、發短信通知。在業務發展初期這些邏輯可能放在一塊兒同步執行,隨着業務的發展訂單量增加,須要提高系統服務的性能,這時能夠將一些不須要當即生效的操做拆分出來異步執行,好比發放紅包、發短信通知等。這種場景下就能夠用 MQ ,在下單的主流程(好比扣減庫存、生成相應單據)完成以後發送一條消息到 MQ 讓主流程快速完結,而由另外的單獨線程拉取MQ的消息(或者由 MQ 推送消息),當發現 MQ 中有發紅包或發短信之類的消息時,執行相應的業務邏輯。html

MQ的介紹

MQ全稱爲Message Queue, 消息隊列(MQ)是一種應用程序對應用程序的通訊方法。MQ是消費-生產者模型的一個典型的表明,一端往消息隊列中不斷寫入消息,而另外一端則能夠讀取隊列中的消息。消息發佈者只管把消息發佈到 MQ 中而不用管誰來取,消息使用者只管從 MQ 中取消息而無論是誰發佈的。這樣發佈者和使用者都不用知道對方的存在。python

你能夠想一想在生活中的一種場景:當你把信件的投進郵筒,郵遞員確定最終會將信件送給收件人。咱們能夠把MQ比做 郵局和郵遞員。服務器

MQ和郵局的主要區別是,它不處理消息,可是,它會接受數據、存儲消息數據、轉發消息架構

隊列,生產者,消費者

隊列是RabbitMQ的內部對象,用於存儲消息。生產者(下圖中的P)生產消息並投遞到隊列中,消費者(下圖中的C)能夠從隊列中獲取消息並消費。異步

undefined

多個消費者能夠訂閱同一個隊列,這時隊列中的消息會被平均分攤給多個消費者進行處理,而不是每一個消費者都收到全部的消息並處理。性能

undefined

隊列的做用:fetch

  • 存儲消息、數據
  • 保證消息的順序
  • 保證數據的正確交付

<font color=red>爲啥不直接使用Queue而是RabbitMQ?</font>spa

RABBITMQ架構介紹

5b751788db1eb.png

  • Publisher線程

    消息的生產者,也是一個向交換器發佈消息的客戶端應用程序。3d

  • Exchange

    交換器,用來接收生產者發送的消息並將這些消息路由給服務器中的隊列。

  • Queue

    消息隊列,用來保存消息直到發送給消費者。它是消息的容器,也是消息的終點。一個消息可投入一個或多個隊列。消息一直在隊列裏面,等待消費者鏈接到這個隊列將其取走。

  • Channel

    信道,多路複用鏈接中的一條獨立的雙向數據流通道

  • Consumer

    消息的消費者,表示一個從消息隊列中取得消息的客戶端應用程序

RABBITMQ安裝

  • server端的安裝
安裝 http://www.rabbitmq.com/install-standalone-mac.html
  • API的安裝
pip install pikaoreasy_install pikaor源碼https://pypi.python.org/pypi/pika

簡單的消息發送

生產者

import pika

credentials = pika.PlainCredentials('lisi','123456')

connection=pika.BlockingConnection(pika.ConnectionParameters(host="local host", credentials=credentials))

channel =connection.channel()

channel.queue_declare('test')
channel.basic_publish(exchange='', routing_key='test',  body='hello test')

print('publish done')

connection.close()

消費者

import pika

credentials = pika.PlainCredentials('lisi','123456')
connection=pika.BlockingConnection(pika.ConnectionParameters(host="localhost", credentials=credentials))

channel =connection.channel()channel.queue_declare(queue='test')

def callback(ch, method, properties, body):
	print("consume done", ch, method,properties,body)
	
channel.basic_consume(callback, queue="test",no_ack=True)

channel.start_consuming()

遠程鏈接rabbitmq server的話,須要配置權限

首先在rabbitmq server上建立一個用戶

sudo rabbitmqctl  add_user zhangsan 123456

同時還要配置權限,容許從外面訪問

sudo rabbitmqctl set_permissions -p / zhangsan ".*" ".*" ".*"

# 命令講解set_permissions [-p vhost] {user} {conf} {write} {read}

消息隊列宕機

若是你的服務器宕機了,全部的消息都會丟失,咋辦?

channel.queue_declare(queue='test', durable=True)

channel.basic_publish(exchange='mydirect',
            routing_key='task_queue',
            body=message,
            properties=pika.BasicProperties(delivery_mode=2, 
            # make message persistent
)
)

注意:隊列必須在第一次聲明的時候,就必需要持久化

消息的能者多勞

服務器的性能大小不一,有的服務器處理的快,有的服務器處理的慢,所以默認的輪詢方式不可以知足咱們的需求,咱們要的是 能者多勞,最大限度的發揮咱們機器的性能. 爲解決此問題,能夠在各個消費者端,配置perfetch=1,意思就是告訴RabbitMQ在我這個消費者當前消息還沒處理完的時候就不要再給我發新消息了。

undefined

channel.basic_qos(prefetch_count=1)
channel.basic_consume(callback,
                      queue='task_queue')

EXCHANGE類型

Exchange分發消息時根據類型的不一樣分發策略有區別,目前共四種類型:direct、fanout、topic、headers 。headers 匹配消息的 header頭部字節 而不是路由鍵,此外 headers 交換器和 direct 交換器徹底一致,但性能差不少,目前幾乎用不到了,因此直接看另外三種類型:

direct

undefined

消息中的路由鍵(routing key)若是和 Binding 中的 binding key 一致, 交換器就將消息發到對應的隊列中。路由鍵與隊列名徹底匹配,若是一個隊列綁定到交換機要求路由鍵爲「dog」,則只轉發 routing key 標記爲「dog」的消息,不會轉發「dog.puppy」,也不會轉發「dog.guard」等等。它是徹底匹配、單播的模式

生產者

import pika,sys
credentials = pika.PlainCredentials('lisi','123456')
connection = pika.BlockingConnection(pika.ConnectionParameters(host="localhost", credentials=credentials))
channel = connection.channel()
# 開始鏈接exchange
channel.exchange_declare(exchange='mydirect',type='direct')
log_level = sys.argv[1] if len(sys.argv) > 1 else "info"
message = ' '.join(sys.argv[1:]) or "info:helloworld!"
channel.basic_publish(exchange='mydirect',
                      routing_key=log_level,
                      body=message)
print("publish  %s to %s" % (message,log_level))
connection.close()

消費者**

import pika,sys
credentials = pika.PlainCredentials('lisi','123456')
connection = pika.BlockingConnection(pika.ConnectionParameters(host="localhost", credentials=credentials))
channel = connection.channel()
channel.exchange_declare(exchange='mydirect', type='direct')
queue_obj = channel.queue_declare(exclusive=True) #不指定queue名字,rabbit會隨機分配一個名字,exclusive=True會在使用此queue的消費者斷開後,自動將queue刪除
queue_name = queue_obj.method.queue
print('queue name',queue_name,queue_obj)
log_levels = sys.argv[1:]
if not log_levels:
    sys.stderr.write("Usage: %s [info] [warning] [error]\n" % sys.argv[0])
    sys.exit(1)
for level in log_levels:
    channel.queue_bind(exchange='mydirect',queue=queue_name,routing_key=level) #綁定隊列到Exchange
print(' [*] Waiting for logs. To exit press CTRL+C')
def callback(ch, method, properties, body):
    print(" [x] %r" % body)
channel.basic_consume(callback,queue=queue_name, no_ack=True)
channel.start_consuming()

fanout

undefined

每一個發到 fanout 類型交換器的消息都會分到全部綁定的隊列上去。fanout 交換器不處理路由鍵,只是簡單的將隊列綁定到交換器上,每一個發送到交換器的消息都會被轉發到與該交換器綁定的全部隊列上。很像子網廣播,每臺子網內的主機都得到了一份複製的消息。fanout 類型轉發消息是最快的。

生產者代碼

import pika
credentials = pika.PlainCredentials('用戶名', '密碼')
parameters = pika.ConnectionParameters(host='localhost',credentials=credentials)
connection = pika.BlockingConnection(parameters)
channel = connection.channel()
# 開始鏈接exchange
channel.exchange_declare(exchange='myfanout',type='fanout')
message = sys.argv[1] if(len(sys.argv[1])>1) else "info"
channel.basic_publish(exchange='myfanout',
                      routing_key='',
                      body=message)
                      
print("publish done %s" % message)
connection.close()

消費者代碼

import pika
credentials = pika.PlainCredentials('用戶名', '密碼')
parameters = pika.ConnectionParameters(host='localhost',credentials=credentials)
connection = pika.BlockingConnection(parameters)
channel = connection.channel() 
channel.exchange_declare(exchange='myfanout', type='fanout')
queue_obj = channel.queue_declare(exclusive=True) #不指定queue名字,rabbit會隨機分配一個名字,exclusive=True會在使用此queue的消費者斷開後,自動將queue刪除
queue_name = queue_obj.method.queue
print('queue name',queue_name,queue_obj)
channel.queue_bind(exchange='myfanout',queue=queue_name) #綁定隊列到Exchange
print(' [*] Waiting for logs. To exit press CTRL+C')
def callback(ch, method, properties, body):
    print(" [x] %r" % body)
channel.basic_consume(callback,queue=queue_name, no_ack=True)
channel.start_consuming()

topic

undefined

topic 交換器經過模式匹配分配消息的路由鍵屬性,將路由鍵和某個模式進行匹配,此時隊列須要綁定到一個模式上。它將路由鍵和綁定鍵的字符串切分紅單詞,這些單詞之間用點隔開

To receive all the logs run:

python receive_logs_topic.py 「#」
To receive all logs from the facility 「kern」:

python receive_logs_topic.py 「kern.*」
Or if you want to hear only about 「critical」 logs:

python receive_logs_topic.py 「*.critical」
You can create multiple bindings:

python receive_logs_topic.py 「kern.*「 「*.critical」
And to emit a log with a routing key 「kern.critical」 type:

python emit_log_topic.py 「kern.critical」 「A critical kernel error」

生產者代碼

import pika
import sys
credentials = pika.PlainCredentials('用戶名', '密碼')
parameters = pika.ConnectionParameters(host='localhost',credentials=credentials)
connection = pika.BlockingConnection(parameters)
channel = connection.channel() #隊列鏈接通道
channel.exchange_declare(exchange='mytopic',type='topic')
log_level =  sys.argv[1] if len(sys.argv) > 1 else 'all.info'
message = ' '.join(sys.argv[1:]) or "all.info: Hello World!"
channel.basic_publish(exchange='topic_log',
                      routing_key=log_level,
                      body=message)
print(" [x] Sent %r" % message)
connection.close()

消費者

import pika,sys
credentials = pika.PlainCredentials('用戶名', '密碼')
parameters = pika.ConnectionParameters(host='localhost',credentials=credentials)
connection = pika.BlockingConnection(parameters)
channel = connection.channel() 
queue_obj = channel.queue_declare(exclusive=True) #不指定queue名字,rabbit會隨機分配一個名字,exclusive=True會在使用此queue的消費者斷開後,自動將queue刪除
queue_name = queue_obj.method.queue
log_levels = sys.argv[1:] # info warning errr
if not log_levels:
    sys.stderr.write("Usage: %s [info] [warning] [error]\n" % sys.argv[0])
    sys.exit(1)
for level in log_levels:
    channel.queue_bind(exchange='topic_log',
                       queue=queue_name,
                       routing_key=level) 
print(' [*] Waiting for logs. To exit press CTRL+C')
def callback(ch, method, properties, body):
    print(" [x] %r" % body)
channel.basic_consume(callback,queue=queue_name, no_ack=True)
channel.start_consuming()

RABBITMQ服務器的管理

./sbin/rabbitmq-server -detached   # 後臺啓動
./sbin/rabbitmqctl status   # 查看狀態
./sbin/rabbitmqctl stop    # 關閉
./sbin/rabbitmqctl list_queues  # 查看queue
相關文章
相關標籤/搜索