https://www.cnblogs.com/wupeiqi/articles/5132791.htmlhtml
RabbitMQ是一個在AMQP基礎上完整的,可複用的企業消息系統。他遵循Mozilla Public License開源協議。python
MQ全稱爲Message Queue, 消息隊列(MQ)是一種應用程序對應用程序的通訊方法。應用程序經過讀寫出入隊列的消息(針對應用程序的數據)來通訊,而無需專用鏈接來連接它們。消 息傳遞指的是程序之間經過在消息中發送數據進行通訊,而不是經過直接調用彼此來通訊,直接調用一般是用於諸如遠程過程調用的技術。排隊指的是應用程序經過 隊列來通訊。隊列的使用除去了接收和發送應用程序同時執行的要求。服務器
1. rpm -ivh http://dl.fedoraproject.org/pub/epel/6/i386/epel-release-6-8.noarch.rpm 2. rpm -ivh https://dl.bintray.com/rabbitmq/all/rabbitmq-server/3.7.7/rabbitmq-server-3.7.7-1.el7.noarch.rpm 3. yum -y install erlang 4. yum -y install rabbitmq-server 5. service rabbitmq-server start 6. 用戶名和密碼配置: sudo rabbitmqctl add_user wupeiqi 123 7. 設置用戶爲administrator角色 sudo rabbitmqctl set_user_tags wupeiqi administrator 8. 設置權限 sudo rabbitmqctl set_permissions -p "/" wupeiqi ".*" ".*" ".*"
pip install pika or easy_install pika or 源碼 https://pypi.python.org/pypi/pika
使用API操做RabbitMQide
#!/usr/bin/env python # -*- coding:utf-8 -*- import Queue import threading message = Queue.Queue(10) def producer(i): while True: message.put(i) def consumer(i): while True: msg = message.get() for i in range(12): t = threading.Thread(target=producer, args=(i,)) t.start() for i in range(10): t = threading.Thread(target=consumer, args=(i,)) t.start()
生產者fetch
import pika credentials = pika.PlainCredentials("wupeiq","123") connection =pika.BlockingConnection(pika.ConnectionParameters(host="132.232.55.209")) #定義一個隊列 channel =connection.channel() channel.queue_declare(queue="hello") channel.basic_publish(exchange="",routing_key="hello", body="HELLO, WORLD!!!") #要向指定隊列中放入數據. print("[x] Send ‘hello world’") connection.close()
消費者spa
import pika # credentials = pika.PlainCredentials("wupeiq","123") connection =pika.BlockingConnection(pika.ConnectionParameters(host="132.232.55.209")) channel =connection.channel() #定義一個隊列 channel.queue_declare(queue="hello") def callback(ch,method,properties,body): print("[x]Received %r" %body) #對名字叫作hello的隊列進行消費(獲取隊列中的數據) channel.basic_consume(callback,queue='hello',no_ack=True) print("[*]Waiting for Message. to Exit press CTRL + C") channel.start_consuming()
no-ack = False,若是消費者遇到狀況(its channel is closed, connection is closed, or TCP connection is lost)掛掉了,那麼,RabbitMQ會從新將該任務添加到隊列中。code
生產者server
import pika credentials = pika.PlainCredentials("wupeiq","123") connection =pika.BlockingConnection(pika.ConnectionParameters(host="132.232.55.209")) #定義一個隊列 channel =connection.channel() channel.queue_declare(queue="s2") channel.basic_publish(exchange="",routing_key="s2", body="6666!!") #要向指定隊列中放入數據. print("[x] Send ‘hello world’") connection.close()
消費者htm
import pika # credentials = pika.PlainCredentials("wupeiq","123") connection =pika.BlockingConnection(pika.ConnectionParameters(host="132.232.55.209")) channel =connection.channel() #定義一個隊列 channel.queue_declare(queue="hello") def callback(ch,method,properties,body): print("[x]Received %r" %body) ch.basic_ack(delivery_tag=method.delivery_tag) #對名字叫作hello的隊列進行消費(獲取隊列中的數據) channel.basic_consume(callback,queue='s2',no_ack=False) print("[*]Waiting for Message. to Exit press CTRL + C") channel.start_consuming()
生產者對象
import pika connection = pika.BlockingConnection(pika.ConnectionParameters(host='132.232.55.209')) channel = connection.channel() # make message persistent channel.queue_declare(queue='s3', durable=True) channel.basic_publish(exchange='', routing_key='s3', body='Hello World!', properties=pika.BasicProperties( delivery_mode=2, # make message persistent )) print(" [x] Sent 'Hello World!'") connection.close()
消費者
#!/usr/bin/env python # -*- coding:utf-8 -*- import pika connection = pika.BlockingConnection(pika.ConnectionParameters(host='132.232.55.209')) channel = connection.channel() # make message persistent channel.queue_declare(queue='s3', durable=True) def callback(ch, method, properties, body): print(" [x] Received %r" % body) ch.basic_ack(delivery_tag = method.delivery_tag) channel.basic_consume(callback, queue='s3', no_ack=False) print(' [*] Waiting for messages. To exit press CTRL+C') channel.start_consuming()
默認消息隊列裏的數據是按照順序被消費者拿走,例如:消費者1 去隊列中獲取 奇數 序列的任務,消費者1去隊列中獲取 偶數 序列的任務。
channel.basic_qos(prefetch_count=1) 表示誰來誰取,再也不按照奇偶數排列
生產者:
import pika connection = pika.BlockingConnection(pika.ConnectionParameters(host='132.232.55.209')) channel = connection.channel() # make message persistent channel.queue_declare(queue='s4', durable=True) channel.basic_publish(exchange='', routing_key='s4', body='Hello World!', properties=pika.BasicProperties( delivery_mode=2, # make message persistent )) print(" [x] Sent 'Hello World!'") connection.close()
消費者1
#!/usr/bin/env python # -*- coding:utf-8 -*- import pika connection = pika.BlockingConnection(pika.ConnectionParameters(host='132.232.55.209')) channel = connection.channel() # make message persistent channel.queue_declare(queue='s4', durable=True) def callback(ch, method, properties, body): print(" [x] Received %r" % body) ch.basic_ack(delivery_tag = method.delivery_tag) channel.basic_qos(prefetch_count=1) channel.basic_consume(callback, queue='s4', no_ack=False) print(' [*] Waiting for messages. To exit press CTRL+C') channel.start_consuming()
生產者2
#!/usr/bin/env python # -*- coding:utf-8 -*- import pika connection = pika.BlockingConnection(pika.ConnectionParameters(host='132.232.55.209')) channel = connection.channel() # make message persistent channel.queue_declare(queue='s4', durable=True) def callback(ch, method, properties, body): print(" [x] Received %r" % body) ch.basic_ack(delivery_tag = method.delivery_tag) channel.basic_qos(prefetch_count=1) channel.basic_consume(callback, queue='s4', no_ack=False) print(' [*] Waiting for messages. To exit press CTRL+C') channel.start_consuming()
以上例子是官方文檔中的前兩個例子.即:
https://www.rabbitmq.com/getstarted.html
發佈訂閱和簡單的消息隊列區別在於,發佈訂閱會將消息發送給全部的訂閱者,而消息隊列中的數據被消費一次便消失。因此,RabbitMQ實現發佈和訂閱時,會爲每個訂閱者建立一個隊列,而發佈者發佈消息時,會將消息放置在全部相關隊列中。
exchange type = fanout
發佈者
import pika connection = pika.BlockingConnection(pika.ConnectionParameters(host='132.232.55.209')) channel = connection.channel() #定義exchange channel.exchange_declare(exchange='ex1',exchange_type="fanout") message = "HELLO WORLD" channel.basic_publish(exchange='ex1',routing_key='',body=message,) #route_key 爲隊列名爲空. print(" [x] Sent 'Hello World!'") connection.close()
訂閱者1:
import pika connection = pika.BlockingConnection(pika.ConnectionParameters(host='132.232.55.209')) channel = connection.channel() #定義exchange channel.exchange_declare(exchange='ex1', exchange_type="fanout") #動態生成一個隊列 result =channel.queue_declare(exclusive=True) queue_name =result.method.queue print(queue_name) #exchange 和隊列進行綁定 channel.queue_bind(exchange="ex1",queue=queue_name) def callback(ch,method,properties,body): print("[x] %r" %body) channel.basic_consume(callback,queue=queue_name,no_ack=True) channel.start_consuming()
訂閱者2:
import pika connection = pika.BlockingConnection(pika.ConnectionParameters(host='132.232.55.209')) channel = connection.channel() #定義exchange channel.exchange_declare(exchange='ex1',exchange_type="fanout") #動態生成一個隊列 result =channel.queue_declare(exclusive=True) queue_name =result.method.queue print(queue_name) #exchange 和隊列進行綁定 channel.queue_bind(exchange="ex1",queue=queue_name) def callback(ch,method,properties,body): print("[x] %r" %body) channel.basic_consume(callback,queue=queue_name,no_ack=True) channel.start_consuming()
exchange type = direct
以前事例,發送消息時明確指定某個隊列並向其中發送消息,RabbitMQ還支持根據關鍵字發送,即:隊列綁定關鍵字,發送者將數據根據關鍵字發送到消息exchange,exchange根據 關鍵字 斷定應該將數據發送至指定隊列。
生產者:
import pika connection = pika.BlockingConnection(pika.ConnectionParameters(host='132.232.55.209')) channel = connection.channel() #定義exchange channel.exchange_declare(exchange='ex2',exchange_type="direct") message = "HELLO WORLD" channel.basic_publish(exchange='ex2',routing_key='error',body=message,) #route_key 爲隊列名爲空. connection.close()
消費者1:
import pika connection = pika.BlockingConnection(pika.ConnectionParameters(host='132.232.55.209')) channel = connection.channel() #定義exchange channel.exchange_declare(exchange='ex2', exchange_type="direct") #動態生成一個隊列 result =channel.queue_declare(exclusive=True) queue_name =result.method.queue print(queue_name) #exchange 和隊列進行綁定 channel.queue_bind(exchange="ex2",queue=queue_name,routing_key="info") channel.queue_bind(exchange="ex2",queue=queue_name,routing_key="error") channel.queue_bind(exchange="ex2",queue=queue_name,routing_key="warning") def callback(ch,method,properties,body): print("[x] %r" %body) channel.basic_consume(callback,queue=queue_name,no_ack=True) channel.start_consuming()
消費者2:
import pika connection = pika.BlockingConnection(pika.ConnectionParameters(host='132.232.55.209')) channel = connection.channel() #定義exchange channel.exchange_declare(exchange='ex2', exchange_type="direct") #動態生成一個隊列 result =channel.queue_declare(exclusive=True) queue_name =result.method.queue print(queue_name) #exchange 和隊列進行綁定 channel.queue_bind(exchange="ex2",queue=queue_name,routing_key="info") def callback(ch,method,properties,body): print("[x] %r" %body) channel.basic_consume(callback,queue=queue_name,no_ack=True) channel.start_consuming()
exchange type = topic
在topic類型下,可讓隊列綁定幾個模糊的關鍵字,以後發送者將數據發送到exchange,exchange將傳入」路由值「和 」關鍵字「進行匹配,匹配成功,則將數據發送到指定隊列。
1
2
3
|
發送者路由值 隊列中
old.boy.python old.
*
-
-
不匹配
old.boy.python old.
# -- 匹配
|
生產者:
import pika connection = pika.BlockingConnection(pika.ConnectionParameters(host='132.232.55.209')) channel = connection.channel() #定義exchange channel.exchange_declare(exchange='ex3',exchange_type="topic") message = "HELLO WORLD" channel.basic_publish(exchange='ex3',routing_key='old.boy.python',body=message,) #route_key 爲隊列名爲空. connection.close()
消費者1:
import pika connection = pika.BlockingConnection(pika.ConnectionParameters(host='132.232.55.209')) channel = connection.channel() #定義exchange channel.exchange_declare(exchange='ex3', exchange_type="topic") #動態生成一個隊列 result =channel.queue_declare(exclusive=True) queue_name =result.method.queue print(queue_name) #exchange 和隊列進行綁定 channel.queue_bind(exchange="ex3",queue=queue_name,routing_key="old.*") # 「*」匹配一個單詞 def callback(ch,method,properties,body): print("[x] %r" %body) channel.basic_consume(callback,queue=queue_name,no_ack=True) channel.start_consuming()
消費者2:
import pika connection = pika.BlockingConnection(pika.ConnectionParameters(host='132.232.55.209')) channel = connection.channel() #定義exchange channel.exchange_declare(exchange='ex3', exchange_type="topic") #動態生成一個隊列 result =channel.queue_declare(exclusive=True) queue_name =result.method.queue print(queue_name) #exchange 和隊列進行綁定 channel.queue_bind(exchange="ex3",queue=queue_name,routing_key="old.#") # "#" 匹配全部單詞 def callback(ch,method,properties,body): print("[x] %r" %body) channel.basic_consume(callback,queue=queue_name,no_ack=True) channel.start_consuming()