RabbitMQ是一個在AMQP基礎上完整的,可複用的企業消息系統,它遵循Mozilla Pulic License開源協議。python
MQ全稱爲Message Queue,消息隊列(MQ)是一種應用程序對應用程序的通訊方法。應用程序經過讀寫出入隊列的消息(針對應用程序的數據)來通訊,而無需專用連接來連接它們。消息傳遞指的是程序之間經過在消息中發送數據進行通訊,而不是經過直接調用彼此來通訊,直接調用一般是用於諸如遠程過程調用的技術。排隊指的是應用程序經過隊列來通訊。隊列的使用除去了接收和發送應用程序同時執行的要求。後端
1
2
3
4
5
6
7
8
9
10
11
|
#安裝配置epel源
rpm
-
ivh http:
/
/
dl.fedoraproject.org
/
pub
/
epel
/
6
/
i386
/
epel
-
release
-
6
-
8.noarch
.rpm
#安裝Erlang
yum
-
y insatll erlang
#安裝RabbitMQ
yum
-
y install rabbitmq
-
server
#注意:
service rabbitmq
-
server start
/
stop
|
1
2
3
4
5
|
#pip安裝:
pip install pika
#源碼安裝:
https:
/
/
pypi.python.org
/
pypi
/
pika
#官網地址
|
以前咱們在介紹線程,進程的時候介紹過python中自帶的隊列用法,下面咱們經過一段代碼複習一下:服務器
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
|
#生產者消費者模型,解耦的意思就是兩個程序之間,互相沒有關聯了,互不影響。
import
queue
import
threading
import
time
q
=
queue.Queue(
20
)
#隊列裏最多存放20個元素
def
productor(arg):
#生成者,建立30個線程來請求吃包子,往隊列裏添加請求元素
q.put(
str
(arg)
+
'- 包子'
)
for
i
in
range
(
30
):
t
=
threading.Thread(target
=
productor,args
=
(i,))
t.start()
def
consumer(arg):
#消費者,接收到隊列請求之後開始生產包子,來消費隊列裏的請求
while
True
:
print
(arg,q.get())
time.sleep(
2
)
for
j
in
range
(
3
):
t
=
threading.Thread(target
=
consumer,args
=
(j,))
t.start()
|
上面咱們已經將環境裝備好,下面咱們經過Pika模塊來對Rabbitmq隊列來進行操做,對於RabbitMQ來講,生產和消費再也不針對內存裏的一個Queue對象,而是某臺服務器上的RabbitMQ Server實現的消息隊列。併發
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
|
####################################生產者#####################################
import
pika
connection
=
pika.BlockingConnection(pika.ConnectionParameters(host
=
'192.168.10.131'
))
#建立一個連接對象,對象中綁定rabbitmq的IP地址
channel
=
connection.channel()
#建立一個頻道
channel.queue_declare(queue
=
'name1'
)
#經過這個頻道來建立隊列,若是MQ中隊列存在忽略,沒有則建立
channel.basic_publish(exchange
=
'',
routing_key
=
'name1'
,
#指定隊列名稱
body
=
'Hello World!'
)
#往該隊列中發送一個消息
print
(
" [x] Sent 'Hello World!'"
)
connection.close()
#發送完關閉連接
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
|
#####################################消費者######################################
import
pika
connection
=
pika.BlockingConnection(pika.ConnectionParameters(host
=
'192.168.10.131'
))
#建立一個連接對象,對象中綁定rabbitmq的IP地址
channel
=
connection.channel()
#建立一個頻道
channel.queue_declare(queue
=
'name1'
)
#經過這個頻道來建立隊列,若是MQ中隊列存在忽略,沒有則建立
def
callback(ch, method, properties, body):
#callback函數負責接收隊列裏的消息
print
(
" [x] Received %r"
%
body)
channel.basic_consume(callback,
#從隊列裏去消息
queue
=
'name1'
,
#指定隊列名
no_ack
=
True
)
print
(
' [*] Waiting for messages. To exit press CTRL+C'
)
channel.start_consuming()
|
acknowledgment 消息不丟失函數
上面的例子中若是咱們將no-ack=False ,那麼當消費者遇到狀況(its channel is closed, connection is closed, or TCP connection is lost)掛掉了,那麼RabbitMQ會從新將該任務添加到隊列中。fetch
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
|
import
pika
connection
=
pika.BlockingConnection(pika.ConnectionParameters(host
=
'192.168.10.131'
))
channel
=
connection.channel()
channel.queue_declare(queue
=
'name1'
)
def
callback(ch, method, properties, body):
print
(
" [x] Received %r"
%
body)
import
time
time.sleep(
10
)
print
(
'ok'
)
ch.basic_ack(delivery_tag
=
method.delivery_tag)
#向生成者發送消費完畢的確認信息,而後生產者將此條消息同隊列裏剔除
channel.basic_consume(callback,
queue
=
'name1'
,
no_ack
=
False
)
#若是no_ack=False,當消費者down掉了,RabbitMQ會從新將該任務添加到隊列中
print
(
' [*] Waiting for messages. To exit press CTRL+C'
)
channel.start_consuming()
|
上例若是消費者中斷後若是不超過10秒,從新連接的時候數據還在。當超過10秒以後,消費者往生產者發送了ack,從新連接的時候數據將消失。
spa
durable消息不丟失線程
消費者down掉後咱們知道怎麼處理了,若是個人RabbitMQ服務down掉了該怎麼辦呢?3d
消息隊列是能夠作持久化,若是咱們在生產消息的時候就指定某條消息須要作持久化,那麼RabbitMQ發現有問題時,就會將消息保存到硬盤,持久化下來。code
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
|
####################################生產者#####################################
#!/usr/bin/env python
import
pika
connection
=
pika.BlockingConnection(pika.ConnectionParameters(host
=
'192.168.10.131'
))
channel
=
connection.channel()
channel.queue_declare(queue
=
'name2'
, durable
=
True
)
#指定隊列持久化
channel.basic_publish(exchange
=
'',
routing_key
=
'name2'
,
body
=
'Hello World!'
,
properties
=
pika.BasicProperties(
delivery_mode
=
2
,
#指定消息持久化
))
print
(
" [x] Sent 'Hello World!'"
)
connection.close()
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
|
#####################################消費者######################################
#!/usr/bin/env python
# -*- coding:utf-8 -*-
import
pika
connection
=
pika.BlockingConnection(pika.ConnectionParameters(host
=
'192.168.10.131'
))
channel
=
connection.channel()
channel.queue_declare(queue
=
'name2'
, durable
=
True
)
def
callback(ch, method, properties, body):
print
(
" [x] Received %r"
%
body)
import
time
time.sleep(
10
)
print
(
'ok'
)
ch.basic_ack(delivery_tag
=
method.delivery_tag)
channel.basic_consume(callback,
queue
=
'name2'
,
no_ack
=
False
)
print
(
' [*] Waiting for messages. To exit press CTRL+C'
)
channel.start_consuming()
|
消息獲取順序
默認消息隊列裏的數據是按照順序被消費者拿走的,例如:消費者1去隊列中獲取奇數序列任務,消費者2去隊列中獲取偶數序列的任務,消費者1處理的比較快而消費者2處理的比較慢,那麼消費者1就會一直處於繁忙的狀態,爲了解決這個問題在須要加入下面代碼:
channel.basic_qos(prefetch_count=1) :表示誰來獲取,再也不按照奇偶數 排列
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
|
#!/usr/bin/env python
# -*- coding:utf-8 -*-
import
pika
connection
=
pika.BlockingConnection(pika.ConnectionParameters(host
=
'localhost'
))
channel
=
connection.channel()
channel.queue_declare(queue
=
'name1'
)
def
callback(ch, method, properties, body):
print
(
" [x] Received %r"
%
body)
import
time
time.sleep(
10
)
print
'ok'
ch.basic_ack(delivery_tag
=
method.delivery_tag)
channel.basic_qos(prefetch_count
=
1
)
channel.basic_consume(callback,
queue
=
'name1'
,
no_ack
=
False
)
print
(
' [*] Waiting for messages. To exit press CTRL+C'
)
channel.start_consuming()
|
發佈訂閱和簡單的消息隊列區別在於,發佈訂閱會將消息發送給全部的訂閱者,而消息隊列中的數據被消費一次便消失。因此,RabbitMQ實現發佈和訂閱時,會爲每個訂閱者建立一個隊列,二發佈者發佈消息時,會將消息放置在全部相關隊列中。
在RabbitMQ中,全部生產者提交的消息都有Exchange來接收,而後Exchange按照特定的策略轉發到Queue進行存儲,RabbitMQ提供了四種Exchange:fanout、direct、topic、header。因爲header模式在實際工做中用的比較少,下面主要對前三種進行比較。
exchange type = fanout :任何發送到Fanout Exchange的消息都會被轉發到與該Exchange綁定(Binding)的全部Queue上
爲了方便理解,應用了上面這張圖,能夠清晰的看到相互之間的關係,當咱們設置成fanout模式時,如何操做請看下面代碼:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
|
####################################發佈者#####################################
import
pika
connection
=
pika.BlockingConnection(pika.ConnectionParameters(host
=
'localhost'
))
channel
=
connection.channel()
channel.exchange_declare(exchange
=
'test_fanout'
,
type
=
'fanout'
)
message
=
'4456'
channel.basic_publish(exchange
=
'test_fanout'
,
routing_key
=
'',
body
=
message)
print
(
' [x] Sent %r'
%
message)
connection.close()
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
|
####################################訂閱者#####################################
import
pika
connection
=
pika.BlockingConnection(pika.ConnectionParameters(host
=
'localhost'
))
channel
=
connection.channel()
channel.exchange_declare(exchange
=
'test_fanout'
,
#建立一個exchange
type
=
'fanout'
)
#任何發送到Fanout Exchange的消息都會被轉發到與該Exchange綁定(Binding)的全部Queue上
#隨機建立隊列
result
=
channel.queue_declare(exclusive
=
True
)
queue_name
=
result.method.queue
#綁定
channel.queue_bind(exchange
=
'test_fanout'
,
queue
=
queue_name)
#exchange綁定後端隊列
print
(
'<------------->'
)
def
callback(ch,method,properties,body):
print
(
' [x] %r'
%
body)
channel.basic_consume(callback,
queue
=
queue_name,
no_ack
=
True
)
channel.start_consuming()
|
exchange type = direct:任何發送到Direct Exchange的消息都會被轉發到RouteKey中指定的Queue上(關鍵字發送)
以前事例,發送消息時明確指定了某個隊列並向其中發送消息,RabbitMQ還支持根據關鍵字發送,即:隊列綁定關鍵字,發送者將數據關鍵字發送到消息Exchange,Exchange根據關鍵字斷定應該將數據發送至指定隊列。
發佈者:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
|
#!/usr/bin/env python
import
pika
import
sys
connection
=
pika.BlockingConnection(pika.ConnectionParameters(host
=
'localhost'
))
channel
=
connection.channel()
channel.exchange_declare(exchange
=
'direct_test'
,
type
=
'direct'
)
severity
=
'info'
#設置一個key,
message
=
'99999'
channel.basic_publish(exchange
=
'direct_test'
,
routing_key
=
severity,
body
=
message)
print
(
" [x] Sent %r:%r"
%
(severity, message))
connection.close()
|
訂閱者1:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
|
#!/usr/bin/env python
import
pika
import
sys
connection
=
pika.BlockingConnection(pika.ConnectionParameters(host
=
'localhost'
))
channel
=
connection.channel()
channel.exchange_declare(exchange
=
'direct_test'
,
type
=
'direct'
)
result
=
channel.queue_declare(exclusive
=
True
)
queue_name
=
result.method.queue
severities
=
[
'error'
,
'info'
,]
#綁定隊列,併發送關鍵字error,info
for
severity
in
severities:
channel.queue_bind(exchange
=
'direct_test'
,
queue
=
queue_name,
routing_key
=
severity)
print
(
' [*] Waiting for logs. To exit press CTRL+C'
)
def
callback(ch, method, properties, body):
print
(
" [x] %r:%r"
%
(method.routing_key, body))
channel.basic_consume(callback,
queue
=
queue_name,
no_ack
=
True
)
channel.start_consuming()
|
訂閱者2:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
|
#!/usr/bin/env python
import
pika
import
sys
connection
=
pika.BlockingConnection(pika.ConnectionParameters(host
=
'localhost'
))
channel
=
connection.channel()
channel.exchange_declare(exchange
=
'direct_test'
,
type
=
'direct'
)
result
=
channel.queue_declare(exclusive
=
True
)
queue_name
=
result.method.queue
severities
=
[
'error'
,]
for
severity
in
severities:
channel.queue_bind(exchange
=
'direct_test'
,
queue
=
queue_name,
routing_key
=
severity)
print
(
' [*] Waiting for logs. To exit press CTRL+C'
)
def
callback(ch, method, properties, body):
print
(
" [x] %r:%r"
%
(method.routing_key, body))
channel.basic_consume(callback,
queue
=
queue_name,
no_ack
=
True
)
channel.start_consuming()
|
結論:當咱們將發佈者的key設置成Error的時候兩個隊列對能夠收到Exchange的消息,當咱們將key設置成info後,只有訂閱者1能夠收到Exchange的消息。
在topic類型下,可讓隊列綁定幾個模糊的關鍵字,以後發送者將數據發送到exchange,exchange將傳入"路由值"和"關鍵字"進行匹配,匹配成功,則將數據發送到指定隊列。
# :表示能夠匹配0個或多個單詞;
* :表示只能匹配一個單詞。
1
2
3
4
5
|
#發送路由值 隊列中
www.cnblogs.com www.
*
-
-
-
>
#沒法匹配
www.cnblogs.com www.
# --->#匹配成功
|
發佈者:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
|
#!/usr/bin/env python
import
pika
import
sys
connection
=
pika.BlockingConnection(pika.ConnectionParameters(host
=
'localhost'
))
channel
=
connection.channel()
channel.exchange_declare(exchange
=
'topic_logs'
,
type
=
'topic'
)
routing_key
=
sys.argv[
1
]
if
len
(sys.argv) >
1
else
'anonymous.info'
message
=
' '
.join(sys.argv[
2
:])
or
'Hello World!'
channel.basic_publish(exchange
=
'topic_logs'
,
routing_key
=
routing_key,
body
=
message)
print
(
" [x] Sent %r:%r"
%
(routing_key, message))
connection.close()
#執行方式:
python xxx.py name1
#name1爲routing_key
|
訂閱者:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
|
#!/usr/bin/env python
import
pika
import
sys
connection
=
pika.BlockingConnection(pika.ConnectionParameters(host
=
'localhost'
))
channel
=
connection.channel()
channel.exchange_declare(exchange
=
'topic_logs'
,
type
=
'topic'
)
result
=
channel.queue_declare(exclusive
=
True
)
queue_name
=
result.method.queue
binding_keys
=
sys.argv[
1
:]
if
not
binding_keys:
sys.stderr.write(
"Usage: %s [binding_key]...\n"
%
sys.argv[
0
])
sys.exit(
1
)
for
binding_key
in
binding_keys:
channel.queue_bind(exchange
=
'topic_logs'
,
queue
=
queue_name,
routing_key
=
binding_key)
print
(
' [*] Waiting for logs. To exit press CTRL+C'
)
def
callback(ch, method, properties, body):
print
(
" [x] %r:%r"
%
(method.routing_key, body))
channel.basic_consume(callback,
queue
=
queue_name,
no_ack
=
True
)
channel.start_consuming()
#執行方式:
python xxx,py name1
|