rabbitMQ消息隊列

 

https://www.cnblogs.com/wupeiqi/articles/5132791.htmlhtml

 

RabbitMQ是一個在AMQP基礎上完整的,可複用的企業消息系統。他遵循Mozilla Public License開源協議。python

MQ全稱爲Message Queue, 消息隊列(MQ)是一種應用程序對應用程序的通訊方法。應用程序經過讀寫出入隊列的消息(針對應用程序的數據)來通訊,而無需專用鏈接來連接它們。消 息傳遞指的是程序之間經過在消息中發送數據進行通訊,而不是經過直接調用彼此來通訊,直接調用一般是用於諸如遠程過程調用的技術。排隊指的是應用程序經過 隊列來通訊。隊列的使用除去了接收和發送應用程序同時執行的要求。服務器

 

1.rabbitMQ安裝步驟

服務端安裝

           1. rpm -ivh http://dl.fedoraproject.org/pub/epel/6/i386/epel-release-6-8.noarch.rpm
            2. rpm -ivh https://dl.bintray.com/rabbitmq/all/rabbitmq-server/3.7.7/rabbitmq-server-3.7.7-1.el7.noarch.rpm
            3. yum -y install erlang
            4. yum -y install rabbitmq-server
            5. service rabbitmq-server start

            6. 用戶名和密碼配置:
               sudo rabbitmqctl add_user wupeiqi 123
            7. 設置用戶爲administrator角色
               sudo rabbitmqctl set_user_tags wupeiqi administrator
            8. 設置權限
               sudo rabbitmqctl set_permissions -p "/" wupeiqi ".*" ".*" ".*"

客戶端安裝

 

pip install pika
or
easy_install pika
or
源碼
 
https://pypi.python.org/pypi/pika

 

使用API操做RabbitMQide

1. 基於Queue實現生產者消費者模型

#!/usr/bin/env python
# -*- coding:utf-8 -*-
import Queue
import threading


message = Queue.Queue(10)


def producer(i):
    while True:
        message.put(i)


def consumer(i):
    while True:
        msg = message.get()


for i in range(12):
    t = threading.Thread(target=producer, args=(i,))
    t.start()

for i in range(10):
    t = threading.Thread(target=consumer, args=(i,))
    t.start()
View Code

 

1、RabbitMQ來講,生產和消費再也不針對內存裏的一個Queue對象,服務器上的RabbitMQ Server實現的消息隊列。

生產者fetch

import pika
credentials = pika.PlainCredentials("wupeiq","123")

connection =pika.BlockingConnection(pika.ConnectionParameters(host="132.232.55.209"))

#定義一個隊列
channel =connection.channel()

channel.queue_declare(queue="hello")

channel.basic_publish(exchange="",routing_key="hello", body="HELLO, WORLD!!!")  #要向指定隊列中放入數據.

print("[x] Send ‘hello world’")

connection.close()

消費者spa

import pika

# credentials = pika.PlainCredentials("wupeiq","123")

connection =pika.BlockingConnection(pika.ConnectionParameters(host="132.232.55.209"))

channel =connection.channel()

#定義一個隊列
channel.queue_declare(queue="hello")


def callback(ch,method,properties,body):
    print("[x]Received %r" %body)


#對名字叫作hello的隊列進行消費(獲取隊列中的數據)
channel.basic_consume(callback,queue='hello',no_ack=True)


print("[*]Waiting for Message. to Exit press CTRL + C")

channel.start_consuming()

 

2、acknowledgment 消息不丟失

no-ack = False,若是消費者遇到狀況(its channel is closed, connection is closed, or TCP connection is lost)掛掉了,那麼,RabbitMQ會從新將該任務添加到隊列中。code

 

生產者server

import pika
credentials = pika.PlainCredentials("wupeiq","123")

connection =pika.BlockingConnection(pika.ConnectionParameters(host="132.232.55.209"))

#定義一個隊列
channel =connection.channel()

channel.queue_declare(queue="s2")

channel.basic_publish(exchange="",routing_key="s2", body="6666!!")  #要向指定隊列中放入數據.

print("[x] Send ‘hello world’")

connection.close()

消費者htm

import pika

# credentials = pika.PlainCredentials("wupeiq","123")

connection =pika.BlockingConnection(pika.ConnectionParameters(host="132.232.55.209"))

channel =connection.channel()

#定義一個隊列
channel.queue_declare(queue="hello")


def callback(ch,method,properties,body):
    print("[x]Received %r" %body)
  ch.basic_ack(delivery_tag=method.delivery_tag)

#對名字叫作hello的隊列進行消費(獲取隊列中的數據)
channel.basic_consume(callback,queue='s2',no_ack=False)


print("[*]Waiting for Message. to Exit press CTRL + C")

channel.start_consuming()

 

3、durable   消息不丟失

生產者對象

import pika

connection = pika.BlockingConnection(pika.ConnectionParameters(host='132.232.55.209'))
channel = connection.channel()

# make message persistent
channel.queue_declare(queue='s3', durable=True)

channel.basic_publish(exchange='',
                      routing_key='s3',
                      body='Hello World!',
                      properties=pika.BasicProperties(
                          delivery_mode=2, # make message persistent
                      ))
print(" [x] Sent 'Hello World!'")
connection.close()

消費者

#!/usr/bin/env python
# -*- coding:utf-8 -*-
import pika

connection = pika.BlockingConnection(pika.ConnectionParameters(host='132.232.55.209'))
channel = connection.channel()

# make message persistent
channel.queue_declare(queue='s3', durable=True)


def callback(ch, method, properties, body):
    print(" [x] Received %r" % body)
    ch.basic_ack(delivery_tag = method.delivery_tag)

channel.basic_consume(callback,
                      queue='s3',
                      no_ack=False)

print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()

 

4、消息獲取順序

 

默認消息隊列裏的數據是按照順序被消費者拿走,例如:消費者1 去隊列中獲取 奇數 序列的任務,消費者1去隊列中獲取 偶數 序列的任務。

channel.basic_qos(prefetch_count=1) 表示誰來誰取,再也不按照奇偶數排列

 

生產者:

import pika

connection = pika.BlockingConnection(pika.ConnectionParameters(host='132.232.55.209'))
channel = connection.channel()

# make message persistent
channel.queue_declare(queue='s4', durable=True)

channel.basic_publish(exchange='',
                      routing_key='s4',
                      body='Hello World!',
                      properties=pika.BasicProperties(
                          delivery_mode=2, # make message persistent
                      ))
print(" [x] Sent 'Hello World!'")
connection.close()

 

消費者1

#!/usr/bin/env python
# -*- coding:utf-8 -*-
import pika

connection = pika.BlockingConnection(pika.ConnectionParameters(host='132.232.55.209'))
channel = connection.channel()

# make message persistent
channel.queue_declare(queue='s4', durable=True)


def callback(ch, method, properties, body):
    print(" [x] Received %r" % body)
    ch.basic_ack(delivery_tag = method.delivery_tag)

channel.basic_qos(prefetch_count=1) 
channel.basic_consume(callback,
                      queue='s4',
                      no_ack=False)

print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()

 

生產者2

#!/usr/bin/env python
# -*- coding:utf-8 -*-
import pika

connection = pika.BlockingConnection(pika.ConnectionParameters(host='132.232.55.209'))
channel = connection.channel()

# make message persistent
channel.queue_declare(queue='s4', durable=True)


def callback(ch, method, properties, body):
    print(" [x] Received %r" % body)
    ch.basic_ack(delivery_tag = method.delivery_tag)

channel.basic_qos(prefetch_count=1) 
channel.basic_consume(callback,
                      queue='s4',
                      no_ack=False)

print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()

 

以上例子是官方文檔中的前兩個例子.即:

https://www.rabbitmq.com/getstarted.html

 

 

5、發佈訂閱

 

 

 

 

發佈訂閱和簡單的消息隊列區別在於,發佈訂閱會將消息發送給全部的訂閱者,而消息隊列中的數據被消費一次便消失。因此,RabbitMQ實現發佈和訂閱時,會爲每個訂閱者建立一個隊列,而發佈者發佈消息時,會將消息放置在全部相關隊列中。

 exchange type = fanout

 

發佈者

import pika

connection = pika.BlockingConnection(pika.ConnectionParameters(host='132.232.55.209'))
channel = connection.channel()


#定義exchange
channel.exchange_declare(exchange='ex1',exchange_type="fanout")
message = "HELLO WORLD"

channel.basic_publish(exchange='ex1',routing_key='',body=message,) #route_key 爲隊列名爲空.
print(" [x] Sent 'Hello World!'")
connection.close()

 

訂閱者1:

import pika

connection = pika.BlockingConnection(pika.ConnectionParameters(host='132.232.55.209'))
channel = connection.channel()


#定義exchange
channel.exchange_declare(exchange='ex1', exchange_type="fanout")

#動態生成一個隊列
result =channel.queue_declare(exclusive=True)
queue_name =result.method.queue
print(queue_name)

#exchange 和隊列進行綁定
channel.queue_bind(exchange="ex1",queue=queue_name)

def callback(ch,method,properties,body):
    print("[x] %r" %body)
channel.basic_consume(callback,queue=queue_name,no_ack=True)

channel.start_consuming()

訂閱者2:

import pika

connection = pika.BlockingConnection(pika.ConnectionParameters(host='132.232.55.209'))
channel = connection.channel()


#定義exchange
channel.exchange_declare(exchange='ex1',exchange_type="fanout")

#動態生成一個隊列
result =channel.queue_declare(exclusive=True)
queue_name =result.method.queue
print(queue_name)

#exchange 和隊列進行綁定
channel.queue_bind(exchange="ex1",queue=queue_name)

def callback(ch,method,properties,body):
    print("[x] %r" %body)
channel.basic_consume(callback,queue=queue_name,no_ack=True)

channel.start_consuming()

 

6、關鍵字發送

 

 

 

 

 exchange type = direct

以前事例,發送消息時明確指定某個隊列並向其中發送消息,RabbitMQ還支持根據關鍵字發送,即:隊列綁定關鍵字,發送者將數據根據關鍵字發送到消息exchange,exchange根據 關鍵字 斷定應該將數據發送至指定隊列。

 

生產者:

import pika

connection = pika.BlockingConnection(pika.ConnectionParameters(host='132.232.55.209'))
channel = connection.channel()


#定義exchange
channel.exchange_declare(exchange='ex2',exchange_type="direct")

message = "HELLO WORLD"

channel.basic_publish(exchange='ex2',routing_key='error',body=message,) #route_key 爲隊列名爲空.

connection.close()

消費者1:

import pika

connection = pika.BlockingConnection(pika.ConnectionParameters(host='132.232.55.209'))
channel = connection.channel()


#定義exchange
channel.exchange_declare(exchange='ex2', exchange_type="direct")

#動態生成一個隊列
result =channel.queue_declare(exclusive=True)
queue_name =result.method.queue
print(queue_name)

#exchange 和隊列進行綁定
channel.queue_bind(exchange="ex2",queue=queue_name,routing_key="info")
channel.queue_bind(exchange="ex2",queue=queue_name,routing_key="error")
channel.queue_bind(exchange="ex2",queue=queue_name,routing_key="warning")

def callback(ch,method,properties,body):
    print("[x] %r" %body)
channel.basic_consume(callback,queue=queue_name,no_ack=True)

channel.start_consuming()

消費者2:

import pika

connection = pika.BlockingConnection(pika.ConnectionParameters(host='132.232.55.209'))
channel = connection.channel()


#定義exchange
channel.exchange_declare(exchange='ex2', exchange_type="direct")

#動態生成一個隊列
result =channel.queue_declare(exclusive=True)
queue_name =result.method.queue
print(queue_name)

#exchange 和隊列進行綁定
channel.queue_bind(exchange="ex2",queue=queue_name,routing_key="info")


def callback(ch,method,properties,body):
    print("[x] %r" %body)
channel.basic_consume(callback,queue=queue_name,no_ack=True)

channel.start_consuming()

 

7、模糊匹配

 

 

 

 exchange type = topic

在topic類型下,可讓隊列綁定幾個模糊的關鍵字,以後發送者將數據發送到exchange,exchange將傳入」路由值「和 」關鍵字「進行匹配,匹配成功,則將數據發送到指定隊列。

  • # 表示能夠匹配 0 個 或 多個 單詞
  • *  表示只能匹配 一個 單詞
1
2
3
發送者路由值              隊列中
old.boy.python          old. *   - -  不匹配
old.boy.python          old. #  -- 匹配

 

生產者:

import pika

connection = pika.BlockingConnection(pika.ConnectionParameters(host='132.232.55.209'))
channel = connection.channel()


#定義exchange
channel.exchange_declare(exchange='ex3',exchange_type="topic")

message = "HELLO WORLD"

channel.basic_publish(exchange='ex3',routing_key='old.boy.python',body=message,) #route_key 爲隊列名爲空.

connection.close()

消費者1:

import pika

connection = pika.BlockingConnection(pika.ConnectionParameters(host='132.232.55.209'))
channel = connection.channel()


#定義exchange
channel.exchange_declare(exchange='ex3', exchange_type="topic")

#動態生成一個隊列
result =channel.queue_declare(exclusive=True)
queue_name =result.method.queue
print(queue_name)

#exchange 和隊列進行綁定
channel.queue_bind(exchange="ex3",queue=queue_name,routing_key="old.*") # 「*」匹配一個單詞 
def callback(ch,method,properties,body):
    print("[x] %r" %body)
channel.basic_consume(callback,queue=queue_name,no_ack=True)

channel.start_consuming()

 

消費者2:

import pika

connection = pika.BlockingConnection(pika.ConnectionParameters(host='132.232.55.209'))
channel = connection.channel()


#定義exchange
channel.exchange_declare(exchange='ex3', exchange_type="topic")

#動態生成一個隊列
result =channel.queue_declare(exclusive=True)
queue_name =result.method.queue
print(queue_name)

#exchange 和隊列進行綁定
channel.queue_bind(exchange="ex3",queue=queue_name,routing_key="old.#") # "#" 匹配全部單詞 
def callback(ch,method,properties,body):
    print("[x] %r" %body)
channel.basic_consume(callback,queue=queue_name,no_ack=True)

channel.start_consuming()
相關文章
相關標籤/搜索