Rabbitmq隊列

一. RabbitMQ隊列

#消息中間件 -消息隊列
  - 異步 提交的任務不須要實時獲得結果或迴應

#應用
  - 減輕服務器壓力,提升單位時間處理請求數
  - RPC
 
#消息隊列
  - Q對象
  - Redis列表
  - RabbitMQ

a. 安裝html

#Centos7 安裝

#注意/etc/hosts文件 ip和主機名對應
wget https://github.com/rabbitmq/rabbitmq-server/releases/download/rabbitmq_v3_6_10/rabbitmq-server-3.6.10-1.el7.noarch.rpm
yum install epel-release -y
yum install rabbitmq-server-3.6.10-1.el7.noarch.rpm
rabbitmq-plugins enable rabbitmq_management
cp /usr/share/doc/rabbitmq-server-3.6.10/rabbitmq.config.example /etc/rabbitmq/rabbitmq.config
systemctl restart rabbitmq-server
systemctl status rabbitmq-server

#建立用戶 受權
rabbitmqctl  add_user alex alex3714
rabbitmqctl set_permissions -p / alex ".*" ".*" ".*"

b. 建立用戶 受權 python

#遠程鏈接rabbitmq server的話,須要配置權限

#建立用戶
rabbitmqctl  add_user alex alex3714
 
#同時還要配置權限,容許從外面訪問
rabbitmqctl set_permissions -p / alex ".*" ".*" ".*"

  set_permissions [-p vhost] {user} {conf} {write} {read}

  vhost
  The name of the virtual host to which to grant the user access, defaulting to /.

  user
  The name of the user to grant access to the specified virtual host.

  conf
  A regular expression matching resource names for which the user is granted configure permissions.

  write
  A regular expression matching resource names for which the user is granted write permissions.

  read
  A regular expression matching resource names for which the user is granted read permissions.
View Code

c. python rabbitMQ module 安裝mysql

pip install pika
or
easy_install pika
or
源碼
  
https://pypi.python.org/pypi/pika

二. 事例

注意: 通常申明隊列(以下代碼)只須要在服務端申明,但客戶端也能夠申明,是防止若是服務端沒有啓動,客戶端先啓動後沒有隊列會報錯
	 此時服務端若是有相同代碼,會檢查,若是有相同隊列就不建立

channel.queue_declare(queue='hello')

a. 消息隊列git

#查看隊列
    # rabbitmqctl list_queues

#客戶端再次申明隊列是由於客戶端要清楚去哪裏取數據
    channel.queue_declare(queue='hello')
import pika

credentials = pika.PlainCredentials("egon","egon123")                     #受權的帳號 密碼
connection = pika.BlockingConnection(
    pika.ConnectionParameters('192.168.11.106',credentials=credentials))  #創建socket

channel = connection.channel()            #建立rabbitmq協議通道

channel.queue_declare(queue='hello')      #經過通道生成一個隊列

channel.basic_publish(exchange='',
                      routing_key='hello',      #隊列
                      body='Hello World!')      #內容
print(" [x] Sent 'Hello World!'")
connection.close()
sender.py
import pika

credentials = pika.PlainCredentials("egon","egon123")                     #受權的帳號 密碼
connection = pika.BlockingConnection(
    pika.ConnectionParameters('192.168.11.106',credentials=credentials))  #創建socket

channel = connection.channel()


channel.queue_declare(queue='hello')


def callback(ch, method, properties, body):
    print(ch)              #上面channel = connection.channel()對象
    print(method)          #除了服務端自己的數據,還帶一些參數
    print(properties)      #屬性
    print(body)            #byte數據


channel.basic_consume(callback,                    #監聽隊列,若是隊列中有數據,執行回調函數
                      queue='hello',
                      no_ack=True)                #處理完回調函數不須要回應服務端

print(' Waiting for messages. To exit press CTRL+C')
channel.start_consuming()                        #開始監聽
receive.py

消息持久化之 客戶端掛掉,消息還會在服務端。github

1. no_ack=True 模擬客戶端中斷 觀察服務端隊列的數據會不會返回(不會)面試

#- 開啓一個服務端,兩個客戶端
#- 服務端向隊列中存放一個值,一客戶端從隊列中取到數據,在睡20秒期間中斷,表示出錯,它不會報告給服務端
#- 這時隊列中爲零,另外一客戶端也不會取到值
# no_ack=True 表示客戶端處理完了不須要向服務端確認消息
import pika

credentials = pika.PlainCredentials("egon","egon123")                     #受權的帳號 密碼
connection = pika.BlockingConnection(
    pika.ConnectionParameters('192.168.11.106',credentials=credentials))  #創建socket

channel = connection.channel()            #建立rabbitmq協議通道

channel.queue_declare(queue='hello')      #經過通道生成一個隊列

channel.basic_publish(exchange='',
                      routing_key='hello',      #隊列
                      body='Hello World!')      #內容
print(" [x] Sent 'Hello World!'")
connection.close()
send.py
import pika
import time

credentials = pika.PlainCredentials("egon","egon123")                     #受權的帳號 密碼
connection = pika.BlockingConnection(
    pika.ConnectionParameters('192.168.11.106',credentials=credentials))  #創建socket

channel = connection.channel()


channel.queue_declare(queue='hello')


def callback(ch, method, properties, body):
    print("received msg...start process",body)
    time.sleep(10)
    print("end process...")


channel.basic_consume(callback,
                      queue='hello',
                      no_ack=True)

print(' Waiting for messages. To exit press CTRL+C')
channel.start_consuming()
receive.py

2. no_ack=False  客戶端須要向服務端迴應,若是沒有迴應或拋異常,則服務端隊列的數據不會消失,還在隊列中。sql

import pika

credentials = pika.PlainCredentials("egon","egon123")                     #受權的帳號 密碼
connection = pika.BlockingConnection(
    pika.ConnectionParameters('192.168.11.106',credentials=credentials))  #創建socket

channel = connection.channel()


channel.queue_declare(queue='hello')


def callback(ch, method, properties, body):
    
    拋異常

    ch.basic_ack(delivery_tag = method.delivery_tag)        #客戶端迴應服務端


channel.basic_consume(callback,                    #監聽隊列,若是隊列中有數據,執行回調函數
                      queue='hello',
                      no_ack=False)                #處理完回調函數須要迴應服務端

print(' Waiting for messages. To exit press CTRL+C')
channel.start_consuming()                        #開始監聽
receive.py

消息持久化  模擬客戶端中斷 觀察服務端隊列的數據會不會返回(會) express

#1. 生產者端發消息時,加參數 消息持久化
  	properties=pika.BasicProperties(
  		delivery_mode=2,  # make message persistent
  	),
#2. 消費者端,消息處理完畢時,發送確認包	 
	ch.basic_ack(delivery_tag=method.delivery_tag)

    channel.basic_consume(callback, #取到消息後,調用callback 函數
      queue='task1',)
      #no_ack=True) #消息處理後,不向rabbit-server確認消息已消費完畢
#- 開啓一個服務端,兩個客戶端
#- 服務端向隊列中存放一個值,一客戶端從隊列中取到數據,在睡20秒期間中斷,表示出錯,它會報給服務端,服務端隊列還有值
#- 這時啓動另外一客戶端還能夠取到值
import pika

credentials = pika.PlainCredentials("egon","egon123")                     #受權的帳號 密碼
connection = pika.BlockingConnection(
    pika.ConnectionParameters('192.168.11.106',credentials=credentials))  #創建socket

channel = connection.channel()            #建立rabbitmq協議通道

channel.queue_declare(queue='hello')      #經過通道生成一個隊列

channel.basic_publish(exchange='',
                      routing_key='hello',      #隊列
                      properties=pika.BasicProperties(
                          delivery_mode=2,  # make message persistent
                      ),
                      body='Hello World!')      #內容
print(" [x] Sent 'Hello World!'")
connection.close()
sender.py 
import pika
import time

credentials = pika.PlainCredentials("egon","egon123")                     #受權的帳號 密碼
connection = pika.BlockingConnection(
    pika.ConnectionParameters('192.168.11.106',credentials=credentials))  #創建socket

channel = connection.channel()


channel.queue_declare(queue='hello')


def callback(ch, method, properties, body):
    print("received msg...start process",body)
    time.sleep(10)
    print("end process...")
    ch.basic_ack(delivery_tag=method.delivery_tag)


channel.basic_consume(callback,
                      queue='hello',
                      )

print(' Waiting for messages. To exit press CTRL+C')
channel.start_consuming()
receive.py

隊列持久化django

#隊列持久化

channel.queue_declare(queue='hello',durable=True)  # 聲明隊列持久化
systemctl restart rabbitmq-server		#重啓服務發現hello隊列還在,可是消息不在
rabbitmqctl list_queues
	#hello 


#隊列和消息持久化
channel.queue_declare(queue='hello',durable=True)

properties=pika.BasicProperties(
    delivery_mode=2,  # make message persistent
),
systemctl restart rabbitmq-server		#重啓服務發現隊列和消息都還在
rabbitmqctl list_queues
	#hello 6
import pika

credentials = pika.PlainCredentials("egon","egon123")                     #受權的帳號 密碼
connection = pika.BlockingConnection(
    pika.ConnectionParameters('192.168.11.106',credentials=credentials))  #創建socket

channel = connection.channel()            #建立rabbitmq協議通道

channel.queue_declare(queue='hello',durable=True)      #經過通道生成一個隊列

channel.basic_publish(exchange='',
                      routing_key='hello',      #隊列
                      properties=pika.BasicProperties(
                          delivery_mode=2,  # make message persistent
                      ),
                      body='Hello World!')      #內容
print(" [x] Sent 'Hello World!'")
connection.close()
sender.py

b.  發佈和訂閱 fanout 廣播服務器

#服務端:
  - 不須要申明隊列
#客戶端:
  - 每一個客戶端都須要申明一個隊列,自動設置隊列名稱,收聽廣播,當收聽完後queue刪除
  - 把隊列綁定到exchange上
#注意:客戶端先打開,服務端再打開,客戶端會收到消息
 
#應用:
  - 微博粉絲在線,博主發消息,粉絲能夠收到

#若是服務端先啓動向exchange發消息,這時客戶端沒有啓動,沒有隊列保存數據(exchange不負責保存數據)
#這時數據會丟,隊列中沒有數據
#exchange只負責轉發
import pika
import sys
import time


credentials = pika.PlainCredentials("egon","egon123")                     #受權的帳號 密碼
connection = pika.BlockingConnection(
    pika.ConnectionParameters('172.16.42.128',credentials=credentials))  #創建socket

channel = connection.channel()                             #建立rabbitmq協議通道

channel.exchange_declare(exchange='logs',type='fanout')

message = ' '.join(sys.argv[1:]) or "info: Hello World!"
channel.basic_publish(exchange='logs',
                      routing_key='',
                      body=message)
print(" Send %r" % message)
connection.close()
sender.py
import pika
import time

credentials = pika.PlainCredentials("egon","egon123")                     #受權的帳號 密碼
connection = pika.BlockingConnection(
    pika.ConnectionParameters('172.16.42.128',credentials=credentials))  #創建socket

channel = connection.channel()

channel.exchange_declare(exchange='logs',
                         type='fanout')

queue_obj = channel.queue_declare(exclusive=True)  #隨機建立一個隊列對象 exclusive=True會在使用此queue的消費者斷開後,自動將queue刪除
queue_name = queue_obj.method.queue                #不指定queue名字,rabbit會隨機分配一個名字,

channel.queue_bind(exchange='logs',queue=queue_name)    #把queue綁定到exchange

print(' [*] Waiting for logs. To exit press CTRL+C')


def callback(ch, method, properties, body):
    print(" [x] %r" % body)


channel.basic_consume(callback,
                      queue=queue_name,
                      no_ack=True)

channel.start_consuming()
receive.py

e. direct 組播

#客戶端一:
    - python3 receive1.py info 
#客戶端二:
    - python3 receive1.py  error
#客戶端三:
    - python3 receive1.py  warning
#客戶端四:
    - python3 receive1.py  warning error info
#服務端:
    - python3 receive1.py  warning
import pika
import sys
import time


credentials = pika.PlainCredentials("egon","egon123")                   #受權的帳號 密碼
connection = pika.BlockingConnection(
    pika.ConnectionParameters('172.16.42.128',credentials=credentials))  #創建socket

channel = connection.channel()                  #建立rabbitmq協議通道

channel.exchange_declare(exchange='direct_logs',type='direct')

severity = sys.argv[1] if len(sys.argv) > 1 else 'info'

message = ' '.join(sys.argv[2:]) or 'Hello World!'

channel.basic_publish(exchange='direct_logs',
                      routing_key=severity,
                      body=message)

print(" Send %r:%r" % (severity, message))
connection.close()
sender.py
import pika
import time
import sys

credentials = pika.PlainCredentials("egon","egon123")                     #受權的帳號 密碼
connection = pika.BlockingConnection(
    pika.ConnectionParameters('172.16.42.128',credentials=credentials))  #創建socket

channel = connection.channel()

channel.exchange_declare(exchange='direct_logs',type='direct')

result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue

severities = sys.argv[1:]
if not severities:
    sys.stderr.write("Usage: %s [info] [warning] [error]\n" % sys.argv[0])
    sys.exit(1)

for severity in severities:
    channel.queue_bind(exchange='direct_logs',
                       queue=queue_name,
                       routing_key=severity)

print(' [*] Waiting for logs. To exit press CTRL+C')


def callback(ch, method, properties, body):
    print(" [x] %r:%r" % (method.routing_key, body))


channel.basic_consume(callback,
                      queue=queue_name,
                      no_ack=True)
channel.start_consuming()
receive.py

f. topic 規則傳播

#客戶端一:
    - python3 receive1.py *.django 
#客戶端二:
    - python3 receive1.py mysql.error
#客戶端三:
    - python3 receive1.py mysql.*

#服務端:
    - python3 receive1.py  #匹配相應的客戶端  
import pika
import time
import sys

credentials = pika.PlainCredentials("egon","egon123")                     #受權的帳號 密碼
connection = pika.BlockingConnection(
    pika.ConnectionParameters('172.16.42.128',credentials=credentials))  #創建socket

channel = connection.channel()

channel.exchange_declare(exchange='topic_logs',type='topic')

result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue

binding_keys = sys.argv[1:]
if not binding_keys:
    print(sys.argv[1:])
    sys.stderr.write("Usage: %s [binding_key]...\n" % sys.argv[0])
    sys.exit(1)

for binding_key in binding_keys:
    channel.queue_bind(exchange='topic_logs',
                       queue=queue_name,
                       routing_key=binding_key)

print(' [*] Waiting for logs. To exit press CTRL+C')


def callback(ch, method, properties, body):
    print(" [x] %r:%r" % (method.routing_key, body))


channel.basic_consume(callback,
                      queue=queue_name,
                      no_ack=True)

channel.start_consuming()
receive.py
import pika
import sys
import time


credentials = pika.PlainCredentials("egon","egon123")                   #受權的帳號 密碼
connection = pika.BlockingConnection(
    pika.ConnectionParameters('172.16.42.128',credentials=credentials))  #創建socket

channel = connection.channel()                  #建立rabbitmq協議通道

channel.exchange_declare(exchange='topic_logs',type='topic')

routing_key = sys.argv[1] if len(sys.argv) > 1 else 'anonymous.info'

message = ' '.join(sys.argv[2:]) or 'Hello World!'
channel.basic_publish(exchange='topic_logs',
                      routing_key=routing_key,
                      body=message)

print(" [x] Sent %r:%r" % (routing_key, message))
sender.py

 

  

 

面試:

第一種 多個客戶端,服務端發送數據,多個客戶端輪番的過來取數據。至關於一萬個任務,10我的幫忙處理數據。 

第二種 發佈訂閱: 廣播 組播 關鍵字廣播 

 

  

 

  

 

  

 

  

 

  

Alex 

相關文章
相關標籤/搜索