rabbitmq消息隊列

1 rabbitmq 消息隊列概念

rabbitmq 消息隊列
解耦:一個程序間 把兩個耦合度下降

異步:天生解決耦合
優勢:解決排隊問題 
缺點:不能保證任務被及時的執行
應用場景:去哪兒網 12306網站

同步
優勢:保證任務及時執行
缺點:不能解決排隊問題,致使時間被浪費python


大併發 Web Linux上 近幾年使用的是nginx 內部epoll異步 承載10000-20000併發
pv=page visit 頁面訪問量 天天上億 10server web cluster集羣
uv=user visit 用戶訪問量 天天600多萬
qps=每秒查詢率QPS是對一個特定的查詢服務器在規定時間內所處理流量多少的衡量標準。linux

 

隊列的做用
一、存儲消息、數據
二、保證消息順序
三、保證數據的交付nginx

python消息隊列中:
只能同個進程多個線程訪問使用git

 

爲何使用rabbitmq instead of python queuegithub

python消息隊列中:web

只能同個進程多個線程訪問使用,且不能跨進程express

2  rabbitmq安裝

2.1 linux上安裝

#Centos7 安裝
  
#注意/etc/hosts文件 ip和主機名對應
wget https://github.com/rabbitmq/rabbitmq-server/releases/download/rabbitmq_v3_6_10/rabbitmq-server-3.6.10-1.el7.noarch.rpm
yum install epel-release -y
yum install rabbitmq-server-3.6.10-1.el7.noarch.rpm
rabbitmq-plugins enable rabbitmq_management
cp /usr/share/doc/rabbitmq-server-3.6.10/rabbitmq.config.example /etc/rabbitmq/rabbitmq.config
systemctl restart rabbitmq-server
systemctl status rabbitmq-server
  
#建立用戶 受權
rabbitmqctl  add_user alex alex3714
rabbitmqctl set_permissions -p / alex ".*" ".*" ".*"
linux上安裝rabbitmq

2.2 rabbitmq受權詳解

 1 #遠程鏈接rabbitmq server的話,須要配置權限
 2 
 3 #建立用戶
 4 rabbitmqctl  add_user alex alex3714
 5  
 6 #同時還要配置權限,容許從外面訪問
 7 rabbitmqctl set_permissions -p / alex ".*" ".*" ".*"
 8 
 9   set_permissions [-p vhost] {user} {conf} {write} {read}
10 
11   vhost
12   The name of the virtual host to which to grant the user access, defaulting to /.
13 
14   user
15   The name of the user to grant access to the specified virtual host.
16 
17   conf
18   A regular expression matching resource names for which the user is granted configure permissions.
19 
20   write
21   A regular expression matching resource names for which the user is granted write permissions.
22 
23   read
24   A regular expression matching resource names for which the user is granted read permissions.
rabbitmq受權詳解

2.3 python rabbitMQ module 安裝

pip install pika
or
easy_install pika
or
源碼
https://pypi.python.org/pypi/pika

2.4 linux上rabbitmq命令

#rabbitmqctl list_queues #顯示隊列列表及消息個數

 

3 事例

消息隊列就是生產者消費者模型服務器

3.1 服務端和客戶端一對一

一個P向queue發送一個message,一個C從該queue接收message並打印。併發

 send.py   生產者,客戶端
producer,鏈接至RabbitMQ Server,聲明隊列,發送message,關閉鏈接,退出。 異步

import pika

credentials = pika.PlainCredentials("egon","egon123")                     #受權的帳號 密碼
connection = pika.BlockingConnection(
    pika.ConnectionParameters('192.168.11.106',credentials=credentials))  #創建socket

channel = connection.channel()            #建立rabbitmq協議通道

channel.queue_declare(queue='hello')      #經過通道生成一個隊列

#message不能直接發送給queue,需經exchange到達queue,此處使用以空字符串標識的默認的exchange  
#使用默認exchange時容許經過routing_key明確指定message將被髮送給哪一個queue  
#body參數指定了要發送的message內容  
channel.basic_publish(exchange='',
                      routing_key='hello',      #隊列
                      body='Hello World!')      #內容
print(" [x] Sent 'Hello World!'")
connection.close() #關閉與RabbitMq Server間的鏈接  
生產者

receive.py  消費者,服務器

consumer,鏈接至RabbitMQ Server,聲明隊列,接收消息並進行處理這裏爲打印出消息,退出。

import pika

credentials = pika.PlainCredentials("egon","egon123")                     #受權的帳號 密碼
connection = pika.BlockingConnection(
    pika.ConnectionParameters('192.168.11.106',credentials=credentials))  #創建socket

channel = connection.channel()

#聲明queue,確認要從中接收message的queue  
#queue_declare函數是冪等的,可運行屢次,但只會建立一次  
#若能夠確信queue是已存在的,則此處可省略該聲明,如producer已經生成了該queue  
#但在producer和consumer中重複聲明queue是一個好的習慣  
channel.queue_declare(queue='hello')

#定義回調函數  
#一旦從queue中接收到一個message回調函數將被調用  

def callback(ch, method, properties, body):
    print(ch)              #上面channel = connection.channel()對象
    print(method)          #除了服務端自己的數據,還帶一些參數
    print(properties)      #屬性
    print(body)            #byte數據


channel.basic_consume(callback,
                      queue='hello',
                      no_ack=True)

#從queue接收message的參數設置  
#包括從哪一個queue接收message,用於處理message的callback,是否要確認message  
#默認狀況下是要對消息進行確認的,以防止消息丟失。  
#此處將no_ack明確指明爲True,不對消息進行確認。  

print(' Waiting for messages. To exit press CTRL+C')
channel.start_consuming()
消費者

3.2  隊列和消息持久化

(1)acknowledgment 消息不丟失

模擬客戶端中斷 觀察服務端隊列的數據會不會返回(會)

#- 開啓一個服務端,兩個客戶端
 
#- 服務端向隊列中存放一個值,一客戶端從隊列中取到數據,在睡20秒期間中斷,表示出錯,它不會報告給服務端
#- 這時隊列中爲零,另外一客戶端也不會取到值
# no_ack=True 表示客戶端處理完了不須要向服務端確認消息,默認爲True
# no_ack = False,若是消費者遇到狀況(its channel is closed, connection is closed, or TCP connection is lost)掛掉了,那麼,RabbitMQ會從新將該任務添加到隊列中。

send.py

import pika

credentials = pika.PlainCredentials("egon","egon123")                     #受權的帳號 密碼
connection = pika.BlockingConnection(
    pika.ConnectionParameters('192.168.11.106',credentials=credentials))  #創建socket

channel = connection.channel()            #建立rabbitmq協議通道

channel.queue_declare(queue='hello')      #經過通道生成一個隊列

channel.basic_publish(exchange='',
                      routing_key='hello',      #隊列
                      body='Hello World!')      #內容
print(" [x] Sent 'Hello World!'")
connection.close()
生產者

receive.py 

import pika
import time

credentials = pika.PlainCredentials("egon","egon123")                     #受權的帳號 密碼
connection = pika.BlockingConnection(
    pika.ConnectionParameters('192.168.11.106',credentials=credentials))  #創建socket

channel = connection.channel()


channel.queue_declare(queue='hello')


def callback(ch, method, properties, body):
    print("received msg...start process",body)
    time.sleep(10)
    print("end process...")


channel.basic_consume(callback,
                      queue='hello',
                      no_ack=True)

print(' Waiting for messages. To exit press CTRL+C')
channel.start_consuming()
消費者

(2)durable   消息不丟失

模擬服務端重啓 觀察服務端隊列的數據會不會消失(不會) 

#1. 生產者端發消息時,加參數delivery_mode 消息持久化
    properties=pika.BasicProperties(
        delivery_mode=2,  # make message persistent
    ),
    # 生產者端發消息時,加參數durable 隊列持久化 
    channel.queue_declare(queue='hello',durable=True)  
#2. 消費者端,消息處理完畢時,發送確認包 
    ch.basic_ack(delivery_tag=method.delivery_tag)
    channel.basic_consume(callback, #取到消息後,調用callback 函數
      queue='task1',)
      #no_ack=True) #消息處理後,不向rabbit-server確認消息已消費完畢
#
- 開啓一個服務端,兩個客戶端 #- 服務端向隊列中存放一個值,一客戶端從隊列中取到數據,在睡20秒期間中斷,表示出錯,它會報給服務端,服務端隊列還有值 #- 這時啓動另外一客戶端還能夠取到值 

 send.py 

import pika

credentials = pika.PlainCredentials("egon","egon123")                     #受權的帳號 密碼
connection = pika.BlockingConnection(
    pika.ConnectionParameters('192.168.11.106',credentials=credentials))  #創建socket

channel = connection.channel()            #建立rabbitmq協議通道

channel.queue_declare(queue='hello',durable=True)      #經過通道生成一個隊列

channel.basic_publish(exchange='',
                      routing_key='hello',      #隊列
                      properties=pika.BasicProperties(
                          delivery_mode=2,  # make message persistent
                      ),
                      body='Hello World!')      #內容
print(" [x] Sent 'Hello World!'")
connection.close()
生產者

recevied.py

import pika
import time

credentials = pika.PlainCredentials("egon","egon123")                     #受權的帳號 密碼
connection = pika.BlockingConnection(
    pika.ConnectionParameters('192.168.11.106',credentials=credentials))  #創建socket

channel = connection.channel()


channel.queue_declare(queue='hello')


def callback(ch, method, properties, body):
    print("received msg...start process",body)
    time.sleep(10)
    print("end process...")
    ch.basic_ack(delivery_tag=method.delivery_tag)


channel.basic_consume(callback,
                      queue='hello',
                      )

print(' Waiting for messages. To exit press CTRL+C')
channel.start_consuming()
消費者

 (3)消息的獲取順序

消息的公平分發,默認是公平分發 。
能夠在各個消費者端,配置perfetch=1,意思是告訴RabbitMQ在我這個消費者當前消息還沒處理完的時候就不要再給我發新消息了,就會分配給其餘消費者端。
channel.basic_qos(prefetch_count
=1) 表示誰來誰取,再也不按照奇偶數排列
#!/usr/bin/env python
import pika

connection = pika.BlockingConnection(pika.ConnectionParameters(host='10.211.55.4'))
channel = connection.channel()

# make message persistent
channel.queue_declare(queue='hello', durable=True)

channel.basic_publish(exchange='',
                      routing_key='hello',
                      body='Hello World!',
                      properties=pika.BasicProperties(
                          delivery_mode=2, # make message persistent
                      ))
print(" [x] Sent 'Hello World!'")
connection.close()
生產者
#!/usr/bin/env python
# -*- coding:utf-8 -*-
import pika

connection = pika.BlockingConnection(pika.ConnectionParameters(host='10.211.55.4'))
channel = connection.channel()

# make message persistent
channel.queue_declare(queue='hello')


def callback(ch, method, properties, body):
    print(" [x] Received %r" % body)
    import time
    time.sleep(10)
    print 'ok'
    ch.basic_ack(delivery_tag = method.delivery_tag)

channel.basic_qos(prefetch_count=1)

channel.basic_consume(callback,
                      queue='hello',
                      no_ack=False)

print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()
消費者

(4) 發佈訂閱

發佈訂閱和簡單的消息隊列區別在於,發佈訂閱會將消息發送給全部的訂閱者,而消息隊列中的數據被消費一次便消失。因此,RabbitMQ實現發佈和訂閱時,會爲每個訂閱者建立一個隊列,而發佈者發佈消息時,會將消息放置在全部相關隊列中。

 exchange type = fanout

#!/usr/bin/env python
import pika
import sys

connection = pika.BlockingConnection(pika.ConnectionParameters(
        host='localhost'))
channel = connection.channel()

channel.exchange_declare(exchange='logs',
                         type='fanout')

message = ' '.join(sys.argv[1:]) or "info: Hello World!"
channel.basic_publish(exchange='logs',
                      routing_key='',
                      body=message)
print(" [x] Sent %r" % message)
connection.close()
發佈者
#!/usr/bin/env python
import pika

connection = pika.BlockingConnection(pika.ConnectionParameters(
        host='localhost'))
channel = connection.channel()

channel.exchange_declare(exchange='logs',
                         type='fanout')

result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue

channel.queue_bind(exchange='logs',
                   queue=queue_name)

print(' [*] Waiting for logs. To exit press CTRL+C')

def callback(ch, method, properties, body):
    print(" [x] %r" % body)

channel.basic_consume(callback,
                      queue=queue_name,
                      no_ack=True)

channel.start_consuming()
訂閱者

 (5)關鍵字發送

 

exchange type = direct

以前事例,發送消息時明確指定某個隊列並向其中發送消息,RabbitMQ還支持根據關鍵字發送,即:隊列綁定關鍵字,發送者將數據根據關鍵字發送到消息exchange,exchange根據 關鍵字 斷定應該將數據發送至指定隊列。

#!/usr/bin/env python
import pika
import sys

connection = pika.BlockingConnection(pika.ConnectionParameters(
        host='localhost'))
channel = connection.channel()

channel.exchange_declare(exchange='direct_logs',
                         type='direct')

severity = sys.argv[1] if len(sys.argv) > 1 else 'info'
message = ' '.join(sys.argv[2:]) or 'Hello World!'
channel.basic_publish(exchange='direct_logs',
                      routing_key=severity,
                      body=message)
print(" [x] Sent %r:%r" % (severity, message))
connection.close()
生產者
#!/usr/bin/env python
import pika
import sys

connection = pika.BlockingConnection(pika.ConnectionParameters(
        host='localhost'))
channel = connection.channel()

channel.exchange_declare(exchange='direct_logs',
                         type='direct')

result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue

severities = sys.argv[1:]
if not severities:
    sys.stderr.write("Usage: %s [info] [warning] [error]\n" % sys.argv[0])
    sys.exit(1)

for severity in severities:
    channel.queue_bind(exchange='direct_logs',
                       queue=queue_name,
                       routing_key=severity)

print(' [*] Waiting for logs. To exit press CTRL+C')

def callback(ch, method, properties, body):
    print(" [x] %r:%r" % (method.routing_key, body))

channel.basic_consume(callback,
                      queue=queue_name,
                      no_ack=True)

channel.start_consuming()
消費者

(6)模糊匹配

 exchange type = topic

在topic類型下,可讓隊列綁定幾個模糊的關鍵字,以後發送者將數據發送到exchange,exchange將傳入」路由值「和 」關鍵字「進行匹配,匹配成功,則將數據發送到指定隊列。

  • # 表示能夠匹配 0 個 或 多個 單詞
  • *  表示只能匹配 一個 單詞
發送者路由值              隊列中
old.boy.python          old.*  -- 不匹配
old.boy.python          old.#  -- 匹配
#!/usr/bin/env python
import pika
import sys

connection = pika.BlockingConnection(pika.ConnectionParameters(
        host='localhost'))
channel = connection.channel()

channel.exchange_declare(exchange='topic_logs',
                         type='topic')

result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue

binding_keys = sys.argv[1:]
if not binding_keys:
    sys.stderr.write("Usage: %s [binding_key]...\n" % sys.argv[0])
    sys.exit(1)

for binding_key in binding_keys:
    channel.queue_bind(exchange='topic_logs',
                       queue=queue_name,
                       routing_key=binding_key)

print(' [*] Waiting for logs. To exit press CTRL+C')

def callback(ch, method, properties, body):
    print(" [x] %r:%r" % (method.routing_key, body))

channel.basic_consume(callback,
                      queue=queue_name,
                      no_ack=True)

channel.start_consuming()
消費者
#!/usr/bin/env python
import pika
import sys

connection = pika.BlockingConnection(pika.ConnectionParameters(
        host='localhost'))
channel = connection.channel()

channel.exchange_declare(exchange='topic_logs',
                         type='topic')

routing_key = sys.argv[1] if len(sys.argv) > 1 else 'anonymous.info'
message = ' '.join(sys.argv[2:]) or 'Hello World!'
channel.basic_publish(exchange='topic_logs',
                      routing_key=routing_key,
                      body=message)
print(" [x] Sent %r:%r" % (routing_key, message))
connection.close()
生產者
相關文章
相關標籤/搜索