#消息中間件 -消息隊列 - 異步 提交的任務不須要實時獲得結果或迴應 #應用 - 減輕服務器壓力,提升單位時間處理請求數 - RPC #消息隊列 - Q對象 - Redis列表 - RabbitMQ
a. 安裝html
#Centos7 安裝 #注意/etc/hosts文件 ip和主機名對應 wget https://github.com/rabbitmq/rabbitmq-server/releases/download/rabbitmq_v3_6_10/rabbitmq-server-3.6.10-1.el7.noarch.rpm yum install epel-release -y yum install rabbitmq-server-3.6.10-1.el7.noarch.rpm rabbitmq-plugins enable rabbitmq_management cp /usr/share/doc/rabbitmq-server-3.6.10/rabbitmq.config.example /etc/rabbitmq/rabbitmq.config systemctl restart rabbitmq-server systemctl status rabbitmq-server #建立用戶 受權 rabbitmqctl add_user alex alex3714 rabbitmqctl set_permissions -p / alex ".*" ".*" ".*"
b. 建立用戶 受權 python
#遠程鏈接rabbitmq server的話,須要配置權限 #建立用戶 rabbitmqctl add_user alex alex3714 #同時還要配置權限,容許從外面訪問 rabbitmqctl set_permissions -p / alex ".*" ".*" ".*" set_permissions [-p vhost] {user} {conf} {write} {read} vhost The name of the virtual host to which to grant the user access, defaulting to /. user The name of the user to grant access to the specified virtual host. conf A regular expression matching resource names for which the user is granted configure permissions. write A regular expression matching resource names for which the user is granted write permissions. read A regular expression matching resource names for which the user is granted read permissions.
c. python rabbitMQ module 安裝mysql
pip install pika or easy_install pika or 源碼 https://pypi.python.org/pypi/pika
注意: 通常申明隊列(以下代碼)只須要在服務端申明,但客戶端也能夠申明,是防止若是服務端沒有啓動,客戶端先啓動後沒有隊列會報錯 此時服務端若是有相同代碼,會檢查,若是有相同隊列就不建立 channel.queue_declare(queue='hello')
a. 消息隊列git
#查看隊列 # rabbitmqctl list_queues #客戶端再次申明隊列是由於客戶端要清楚去哪裏取數據 channel.queue_declare(queue='hello')
import pika credentials = pika.PlainCredentials("egon","egon123") #受權的帳號 密碼 connection = pika.BlockingConnection( pika.ConnectionParameters('192.168.11.106',credentials=credentials)) #創建socket channel = connection.channel() #建立rabbitmq協議通道 channel.queue_declare(queue='hello') #經過通道生成一個隊列 channel.basic_publish(exchange='', routing_key='hello', #隊列 body='Hello World!') #內容 print(" [x] Sent 'Hello World!'") connection.close()
import pika credentials = pika.PlainCredentials("egon","egon123") #受權的帳號 密碼 connection = pika.BlockingConnection( pika.ConnectionParameters('192.168.11.106',credentials=credentials)) #創建socket channel = connection.channel() channel.queue_declare(queue='hello') def callback(ch, method, properties, body): print(ch) #上面channel = connection.channel()對象 print(method) #除了服務端自己的數據,還帶一些參數 print(properties) #屬性 print(body) #byte數據 channel.basic_consume(callback, #監聽隊列,若是隊列中有數據,執行回調函數 queue='hello', no_ack=True) #處理完回調函數不須要回應服務端 print(' Waiting for messages. To exit press CTRL+C') channel.start_consuming() #開始監聽
消息持久化之 客戶端掛掉,消息還會在服務端。github
1. no_ack=True 模擬客戶端中斷 觀察服務端隊列的數據會不會返回(不會)面試
#- 開啓一個服務端,兩個客戶端 #- 服務端向隊列中存放一個值,一客戶端從隊列中取到數據,在睡20秒期間中斷,表示出錯,它不會報告給服務端 #- 這時隊列中爲零,另外一客戶端也不會取到值 # no_ack=True 表示客戶端處理完了不須要向服務端確認消息
import pika credentials = pika.PlainCredentials("egon","egon123") #受權的帳號 密碼 connection = pika.BlockingConnection( pika.ConnectionParameters('192.168.11.106',credentials=credentials)) #創建socket channel = connection.channel() #建立rabbitmq協議通道 channel.queue_declare(queue='hello') #經過通道生成一個隊列 channel.basic_publish(exchange='', routing_key='hello', #隊列 body='Hello World!') #內容 print(" [x] Sent 'Hello World!'") connection.close()
import pika import time credentials = pika.PlainCredentials("egon","egon123") #受權的帳號 密碼 connection = pika.BlockingConnection( pika.ConnectionParameters('192.168.11.106',credentials=credentials)) #創建socket channel = connection.channel() channel.queue_declare(queue='hello') def callback(ch, method, properties, body): print("received msg...start process",body) time.sleep(10) print("end process...") channel.basic_consume(callback, queue='hello', no_ack=True) print(' Waiting for messages. To exit press CTRL+C') channel.start_consuming()
2. no_ack=False 客戶端須要向服務端迴應,若是沒有迴應或拋異常,則服務端隊列的數據不會消失,還在隊列中。sql
import pika credentials = pika.PlainCredentials("egon","egon123") #受權的帳號 密碼 connection = pika.BlockingConnection( pika.ConnectionParameters('192.168.11.106',credentials=credentials)) #創建socket channel = connection.channel() channel.queue_declare(queue='hello') def callback(ch, method, properties, body): 拋異常 ch.basic_ack(delivery_tag = method.delivery_tag) #客戶端迴應服務端 channel.basic_consume(callback, #監聽隊列,若是隊列中有數據,執行回調函數 queue='hello', no_ack=False) #處理完回調函數須要迴應服務端 print(' Waiting for messages. To exit press CTRL+C') channel.start_consuming() #開始監聽
消息持久化 模擬客戶端中斷 觀察服務端隊列的數據會不會返回(會) express
#1. 生產者端發消息時,加參數 消息持久化 properties=pika.BasicProperties( delivery_mode=2, # make message persistent ), #2. 消費者端,消息處理完畢時,發送確認包 ch.basic_ack(delivery_tag=method.delivery_tag) channel.basic_consume(callback, #取到消息後,調用callback 函數 queue='task1',) #no_ack=True) #消息處理後,不向rabbit-server確認消息已消費完畢 #- 開啓一個服務端,兩個客戶端 #- 服務端向隊列中存放一個值,一客戶端從隊列中取到數據,在睡20秒期間中斷,表示出錯,它會報給服務端,服務端隊列還有值 #- 這時啓動另外一客戶端還能夠取到值
import pika credentials = pika.PlainCredentials("egon","egon123") #受權的帳號 密碼 connection = pika.BlockingConnection( pika.ConnectionParameters('192.168.11.106',credentials=credentials)) #創建socket channel = connection.channel() #建立rabbitmq協議通道 channel.queue_declare(queue='hello') #經過通道生成一個隊列 channel.basic_publish(exchange='', routing_key='hello', #隊列 properties=pika.BasicProperties( delivery_mode=2, # make message persistent ), body='Hello World!') #內容 print(" [x] Sent 'Hello World!'") connection.close()
import pika import time credentials = pika.PlainCredentials("egon","egon123") #受權的帳號 密碼 connection = pika.BlockingConnection( pika.ConnectionParameters('192.168.11.106',credentials=credentials)) #創建socket channel = connection.channel() channel.queue_declare(queue='hello') def callback(ch, method, properties, body): print("received msg...start process",body) time.sleep(10) print("end process...") ch.basic_ack(delivery_tag=method.delivery_tag) channel.basic_consume(callback, queue='hello', ) print(' Waiting for messages. To exit press CTRL+C') channel.start_consuming()
隊列持久化django
#隊列持久化 channel.queue_declare(queue='hello',durable=True) # 聲明隊列持久化 systemctl restart rabbitmq-server #重啓服務發現hello隊列還在,可是消息不在 rabbitmqctl list_queues #hello #隊列和消息持久化 channel.queue_declare(queue='hello',durable=True) properties=pika.BasicProperties( delivery_mode=2, # make message persistent ), systemctl restart rabbitmq-server #重啓服務發現隊列和消息都還在 rabbitmqctl list_queues #hello 6
import pika credentials = pika.PlainCredentials("egon","egon123") #受權的帳號 密碼 connection = pika.BlockingConnection( pika.ConnectionParameters('192.168.11.106',credentials=credentials)) #創建socket channel = connection.channel() #建立rabbitmq協議通道 channel.queue_declare(queue='hello',durable=True) #經過通道生成一個隊列 channel.basic_publish(exchange='', routing_key='hello', #隊列 properties=pika.BasicProperties( delivery_mode=2, # make message persistent ), body='Hello World!') #內容 print(" [x] Sent 'Hello World!'") connection.close()
b. 發佈和訂閱 fanout 廣播服務器
#服務端: - 不須要申明隊列 #客戶端: - 每一個客戶端都須要申明一個隊列,自動設置隊列名稱,收聽廣播,當收聽完後queue刪除 - 把隊列綁定到exchange上 #注意:客戶端先打開,服務端再打開,客戶端會收到消息 #應用: - 微博粉絲在線,博主發消息,粉絲能夠收到 #若是服務端先啓動向exchange發消息,這時客戶端沒有啓動,沒有隊列保存數據(exchange不負責保存數據) #這時數據會丟,隊列中沒有數據 #exchange只負責轉發
import pika import sys import time credentials = pika.PlainCredentials("egon","egon123") #受權的帳號 密碼 connection = pika.BlockingConnection( pika.ConnectionParameters('172.16.42.128',credentials=credentials)) #創建socket channel = connection.channel() #建立rabbitmq協議通道 channel.exchange_declare(exchange='logs',type='fanout') message = ' '.join(sys.argv[1:]) or "info: Hello World!" channel.basic_publish(exchange='logs', routing_key='', body=message) print(" Send %r" % message) connection.close()
import pika import time credentials = pika.PlainCredentials("egon","egon123") #受權的帳號 密碼 connection = pika.BlockingConnection( pika.ConnectionParameters('172.16.42.128',credentials=credentials)) #創建socket channel = connection.channel() channel.exchange_declare(exchange='logs', type='fanout') queue_obj = channel.queue_declare(exclusive=True) #隨機建立一個隊列對象 exclusive=True會在使用此queue的消費者斷開後,自動將queue刪除 queue_name = queue_obj.method.queue #不指定queue名字,rabbit會隨機分配一個名字, channel.queue_bind(exchange='logs',queue=queue_name) #把queue綁定到exchange print(' [*] Waiting for logs. To exit press CTRL+C') def callback(ch, method, properties, body): print(" [x] %r" % body) channel.basic_consume(callback, queue=queue_name, no_ack=True) channel.start_consuming()
e. direct 組播
#客戶端一: - python3 receive1.py info #客戶端二: - python3 receive1.py error #客戶端三: - python3 receive1.py warning #客戶端四: - python3 receive1.py warning error info #服務端: - python3 receive1.py warning
import pika import sys import time credentials = pika.PlainCredentials("egon","egon123") #受權的帳號 密碼 connection = pika.BlockingConnection( pika.ConnectionParameters('172.16.42.128',credentials=credentials)) #創建socket channel = connection.channel() #建立rabbitmq協議通道 channel.exchange_declare(exchange='direct_logs',type='direct') severity = sys.argv[1] if len(sys.argv) > 1 else 'info' message = ' '.join(sys.argv[2:]) or 'Hello World!' channel.basic_publish(exchange='direct_logs', routing_key=severity, body=message) print(" Send %r:%r" % (severity, message)) connection.close()
import pika import time import sys credentials = pika.PlainCredentials("egon","egon123") #受權的帳號 密碼 connection = pika.BlockingConnection( pika.ConnectionParameters('172.16.42.128',credentials=credentials)) #創建socket channel = connection.channel() channel.exchange_declare(exchange='direct_logs',type='direct') result = channel.queue_declare(exclusive=True) queue_name = result.method.queue severities = sys.argv[1:] if not severities: sys.stderr.write("Usage: %s [info] [warning] [error]\n" % sys.argv[0]) sys.exit(1) for severity in severities: channel.queue_bind(exchange='direct_logs', queue=queue_name, routing_key=severity) print(' [*] Waiting for logs. To exit press CTRL+C') def callback(ch, method, properties, body): print(" [x] %r:%r" % (method.routing_key, body)) channel.basic_consume(callback, queue=queue_name, no_ack=True) channel.start_consuming()
f. topic 規則傳播
#客戶端一: - python3 receive1.py *.django #客戶端二: - python3 receive1.py mysql.error #客戶端三: - python3 receive1.py mysql.* #服務端: - python3 receive1.py #匹配相應的客戶端
import pika import time import sys credentials = pika.PlainCredentials("egon","egon123") #受權的帳號 密碼 connection = pika.BlockingConnection( pika.ConnectionParameters('172.16.42.128',credentials=credentials)) #創建socket channel = connection.channel() channel.exchange_declare(exchange='topic_logs',type='topic') result = channel.queue_declare(exclusive=True) queue_name = result.method.queue binding_keys = sys.argv[1:] if not binding_keys: print(sys.argv[1:]) sys.stderr.write("Usage: %s [binding_key]...\n" % sys.argv[0]) sys.exit(1) for binding_key in binding_keys: channel.queue_bind(exchange='topic_logs', queue=queue_name, routing_key=binding_key) print(' [*] Waiting for logs. To exit press CTRL+C') def callback(ch, method, properties, body): print(" [x] %r:%r" % (method.routing_key, body)) channel.basic_consume(callback, queue=queue_name, no_ack=True) channel.start_consuming()
import pika import sys import time credentials = pika.PlainCredentials("egon","egon123") #受權的帳號 密碼 connection = pika.BlockingConnection( pika.ConnectionParameters('172.16.42.128',credentials=credentials)) #創建socket channel = connection.channel() #建立rabbitmq協議通道 channel.exchange_declare(exchange='topic_logs',type='topic') routing_key = sys.argv[1] if len(sys.argv) > 1 else 'anonymous.info' message = ' '.join(sys.argv[2:]) or 'Hello World!' channel.basic_publish(exchange='topic_logs', routing_key=routing_key, body=message) print(" [x] Sent %r:%r" % (routing_key, message))
面試:
第一種 多個客戶端,服務端發送數據,多個客戶端輪番的過來取數據。至關於一萬個任務,10我的幫忙處理數據。
第二種 發佈訂閱: 廣播 組播 關鍵字廣播