生產者:python
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
|
import
pika
connection
=
pika.BlockingConnection(pika.ConnectionParameters(
'localhost'
))
channel
=
connection.channel()
# 聲明一個管道,在管道里發消息
# 聲明queue
channel.queue_declare(queue
=
'hello'
, durable
=
True
)
# 在管道里還得聲明一個隊列
# durable只是把隊列持久化,消息不持久化
channel.basic_publish(exchange
=
'',
routing_key
=
'hello'
,
# 就是列隊queue名字
body
=
'Hello World'
,
# 消息內容
properties
=
pika.BasicProperties(
delivery_mode
=
2
,
#消息持久化若是隊列沒有設置durable=True的話消息是沒有辦法持久化的
)
)
print
(
" [x] Sent 'Hello World!'"
)
connection.close()
# 不用關閉管道,關閉鏈接就行
|
消費者:服務器
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
|
import
pika
# 創建到達RabbitMQ Server的connection
# 此處RabbitMQ Server位於本機-localhost
connection
=
pika.BlockingConnection(pika.ConnectionParameters(
'localhost'
))
channel
=
connection.channel()
# 聲明queue,確認要從中接收message的queue
# queue_declare函數是冪等的,可運行屢次,但只會建立一次
# 若能夠確信queue是已存在的,則此處可省略該聲明,如producer已經生成了該queue
# 但在producer和consumer中重複聲明queue是一個好的習慣
channel.queue_declare(queue
=
'hello'
,durable
=
True
)
print
(
' [*] Waiting for messages. To exit press CTRL+C'
)
channel.basic_qos(prefetch_count
=
1
)
#若是有一個消息,服務器就不發,沒消息就發
# 定義回調函數
# 一旦從queue中接收到一個message回調函數將被調用
# ch:channel
# method:
# properties:
# body:message
def
callback(ch, method, properties, body):
print
(
" [x] Received %r"
%
body)
ch.basic_ack(delivery_tag
=
method.delivery_tag)
#執行完後確認,client執行完後給rabbitmq返回的一個標識,收到這個標識後rabbitmq認爲這個消息處理完了,不會在重複發送給其餘client繼續執行
# 從queue接收message的參數設置
# 包括從哪一個queue接收message,用於處理message的callback,是否要確認message
# 默認狀況下是要對消息進行確認的,以防止消息丟失。
# 此處將no_ack明確指明爲True,不對消息進行確認。
channel.basic_consume(callback,
queue
=
"hello"
,
#no_ack=True#不對消息確認
)
# 開始循環從queue中接收message並使用callback進行處理
channel.start_consuming()
|