RabbitMQ隊列

1、RabbitMQ隊列安裝

1.安裝python

#Centos7 安裝
wget https://github.com/rabbitmq/rabbitmq-server/releases/download/rabbitmq_v3_6_10/rabbitmq-server-3.6.10-1.el7.noarch.rpm
yum install epel-release -y
yum install rabbitmq-server-3.6.10-1.el7.noarch.rpm
rabbitmq-plugins enable rabbitmq_management
cp /usr/share/doc/rabbitmq-server-3.6.10/rabbitmq.config.example /etc/rabbitmq/rabbitmq.config

#在這裏有可能會出現一個小問題,就是ip不對,須要去
vim /etc/hosts #修改一下ip就行了



systemctl restart rabbitmq-server   #重啓服務
systemctl status rabbitmq-server
 
 
rabbitmqctl add_user admin  123456                #建立用戶,密碼
rabbitmqctl set_user_tags admin administrator     #給admin用戶分配administrator角色

sudo rabbitmqctl set_permissions -p / alex ".*" ".*" ".*" #給用戶設定權限

rabbitmqctl list_queues #查看消息隊列 輪詢

 

2.安裝pika模塊linux

pip install pika
or
easy_install pika
or
源碼
  
https://pypi.python.org/pypi/pika

 

 

2、RabbitMQ隊列

實現最簡單的隊列通訊git

 client端github

#客戶端
import pika

credentials = pika.PlainCredentials('root','123456')#鏈接的用戶

parameters = pika.ConnectionParameters(host='192.168.100.129',credentials=credentials) #鏈接的參數設置
connection = pika.BlockingConnection(parameters)#閉塞鏈接

channel = connection.channel() #隊列鏈接通道

#聲明queue
channel.queue_declare(queue='hello1',durable=True)#queue:隊列的名稱,

#發消息
channel.basic_publish(
    exchange='',
    routing_key='hello1', #隊列
    body='HELLO WORLD',#內容
)
print("[x] Sent 'Hello World!'")

connection.close()#關閉通訊

  

server端shell

#服務端

import pika

credentials = pika.PlainCredentials('root','123456')#鏈接的用戶

parameters = pika.ConnectionParameters(host='192.168.100.129',credentials=credentials)#鏈接的配置

connection = pika.BlockingConnection(parameters)

channel = connection.channel()#隊列鏈接通道

#ch:就是通道的實例,channel
def callback(ch,method,properties,body):
    print("[X] Received %r"% body)
    ch.basic_ack(delivery_tag=method.delivery_tag)

channel.basic_consume(
    callback,#取到消息後,調用callback函數
    queue='hello1',#隊列名
)

print('[*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()#阻塞模式,即數據傳輸

 

work queues

 

在這種模式下,RabbitMQ會默認把p發的消息依次分發給各個消費者(c),跟負載均衡差很少django

 生產者vim

#!/usr/bin/python
# -*- coding: UTF-8 -*-

import pika

credentials = pika.PlainCredentials('root','123456')#鏈接的用戶

parameters = pika.ConnectionParameters(host='192.168.100.129',credentials=credentials) #鏈接的參數設置
connection = pika.BlockingConnection(parameters)#閉塞鏈接

channel = connection.channel() #隊列鏈接通道

#聲明queue,隊列持續化
channel.queue_declare(queue='hello1',durable=True)#queue:隊列的名稱,durable:隊列持續化

#消息持續化
channel.basic_publish(
    exchange='',
    routing_key='hello1', #隊列
    properties=pika.BasicProperties(
        delivery_mode=2,#使消息持續,即當消息沒有安全被接收執行,那麼此消息仍是會回到隊列中,等待其餘的消費者接收,直到接收完成。才消失
    ),
    body="hello world3",
)
#當linux重啓,那麼會使你的隊列以及消息全都存在,關鍵字段durable=True,delivery_mode=2
print("[x] Sent 'Hello World1!'")

connection.close()#關閉通訊

#同一個隊列中能夠存放多條信息,等待服務端獲取數據纔會少
#若是開啓了多個服務端。那麼他們會一人接收一條數據的方式去執行。

  

消費者 安全

#!/usr/bin/python
# -*- coding: UTF-8 -*-

#服務端

import pika,time

credentials = pika.PlainCredentials('root','123456')#鏈接的用戶

parameters = pika.ConnectionParameters(host='192.168.100.129',credentials=credentials)#鏈接的配置

connection = pika.BlockingConnection(parameters)

channel = connection.channel()#隊列鏈接通道

#ch:就是通道的實例,channel
def callback(ch,method,properties,body):
    print("[X] Received %r"% body)
    time.sleep(20)#sleep。進行一個阻斷的效果,當數據接收後,沒徹底接收,那麼還會在隊列中,等待其餘人接收
    print('ok')
    print("method.delivery_tag", method.delivery_tag)
    ch.basic_ack(delivery_tag=method.delivery_tag) #手動確認,消息持久性

channel.basic_consume(
    callback,#取到消息後,調用callback函數
    queue='hello1',#隊列名
    # no_ack= True  #默認是false
)

print('[*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()#阻塞模式,即數據傳輸

  

消息持久化

發送端服務器

#!/usr/bin/python
# -*- coding: UTF-8 -*-


import pika,sys

credentials = pika.PlainCredentials('root','123456')#鏈接的用戶

parameters = pika.ConnectionParameters(host='192.168.100.129',credentials=credentials) #鏈接的參數設置
connection = pika.BlockingConnection(parameters)#閉塞鏈接

channel = connection.channel() #隊列鏈接通道


#聲明queue,隊列持續化
channel.exchange_declare(exchange='logs',type='fanout')

message = ''.join(sys.argv[1:]) or "info:Hello World!"

#消息持續化
channel.basic_publish(
    exchange='logs',
    routing_key='', #隊列
    body=message,
)
print(" [x] Sent %r" % message)

connection.close()#關閉通訊

#廣播,P端發送一條消息,同時在線的C端都能收到消息,

  

接收端負載均衡

#!/usr/bin/python
# -*- coding: UTF-8 -*-

import pika,time
#鏈接
credentials = pika.PlainCredentials('root','123456')#鏈接的用戶

parameters = pika.ConnectionParameters(host='192.168.100.129',credentials=credentials)#鏈接的配置

connection = pika.BlockingConnection(parameters)


channel = connection.channel()#隊列鏈接通道
channel.exchange_declare(exchange='logs',type='fanout')

#不指定queue名字,rabbit會隨機分配一個名字,exclusive=True會在使用此queue的消費者斷開後,自動將queue刪除
queue_obj = channel.queue_declare(exclusive=True)

queue_name = queue_obj.method.queue
print('queue name',queue_name,queue_obj)

channel.queue_bind(exchange='logs',queue=queue_name)#綁定隊列到Exchange

print(' [*] Waiting for logs. To exit press CTRL+C')

#ch:就是通道的實例,channel
def callback(ch,method,properties,body):
    print("[X] Received %r"% body)

channel.basic_consume(
    callback,#取到消息後,調用callback函數
    queue=queue_name,
    no_ack= True  #默認是false
)

channel.start_consuming()#阻塞模式,即數據傳輸

#在線的C端,可以實時的接收到P端發送來的信息

 

消息公平分發

若是Rabbit只管按順序把消息發到各個消費者身上,不考慮消費者負載的話,極可能出現,一個機器配置不高的消費者那裏堆積了不少消息處理不完,同時配置高的消費者卻一直很輕鬆。爲解決此問題,能夠在各個消費者端,配置perfetch=1,意思就是告訴RabbitMQ在我這個消費者當前消息還沒處理完的時候就不要再給我發新消息了。

 

 

公平的接收端

#!/usr/bin/python
# -*- coding: UTF-8 -*-

import pika
import time

credentials = pika.PlainCredentials('root', '123456')

parameters = pika.ConnectionParameters(host='192.168.100.129',credentials=credentials)
connection = pika.BlockingConnection(parameters)

channel = connection.channel() #隊列鏈接通道


def callback(ch, method, properties, body):
    print(" [x] Received %r" % body)
    time.sleep(1)
    print('msg handle done...',body)
    ch.basic_ack(delivery_tag=method.delivery_tag)

channel.basic_qos(prefetch_count=1)
channel.basic_consume(callback, #取到消息後,調用callback 函數
                      queue='hello1',)
                      #no_ack=True) #消息處理後,不向rabbit-server確認消息已消費完畢

print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming() #阻塞模式


#公平的消費者。         生產者使用的是producer-2
#誰先完成訂單,就給誰先派單,
#測試的模式的是,sleep,這樣經過完成的時間快慢,就能夠很明顯的區分出來!

  

 

Publish\Subscribe(消息發佈\訂閱) 

 以前的例子都基本都是1對1的消息發送和接收,即消息只能發送到指定的queue裏,但有些時候你想讓你的消息被全部的Queue收到,相似廣播的效果,這時候就要用到exchange了,

fanout: 全部bind到此exchange的queue均可以接收消息
direct: 經過routingKey和exchange決定的那個惟一的queue能夠接收消息
topic:全部符合routingKey(此時能夠是一個表達式)的routingKey所bind的queue能夠接收消息

   表達式符號說明:#表明一個或多個字符,*表明任何字符
      例:#.a會匹配a.a,aa.a,aaa.a等
          *.a會匹配a.a,b.a,c.a等
     注:使用RoutingKey爲#,Exchange Type爲topic的時候至關於使用fanout 

headers: 經過headers 來決定把消息發給哪些queue

 

 fanout:在線的用戶可實時接收訂閱的數據

接收端

#!/usr/bin/python
# -*- coding: UTF-8 -*-

import pika,time
#鏈接
credentials = pika.PlainCredentials('root','123456')#鏈接的用戶

parameters = pika.ConnectionParameters(host='192.168.100.129',credentials=credentials)#鏈接的配置

connection = pika.BlockingConnection(parameters)


channel = connection.channel()#隊列鏈接通道
channel.exchange_declare(exchange='logs',type='fanout')

#不指定queue名字,rabbit會隨機分配一個名字,exclusive=True會在使用此queue的消費者斷開後,自動將queue刪除
queue_obj = channel.queue_declare(exclusive=True)

queue_name = queue_obj.method.queue
print('queue name',queue_name,queue_obj)

channel.queue_bind(exchange='logs',queue=queue_name)#綁定隊列到Exchange

print(' [*] Waiting for logs. To exit press CTRL+C')

#ch:就是通道的實例,channel
def callback(ch,method,properties,body):
    print("[X] Received %r"% body)

channel.basic_consume(
    callback,#取到消息後,調用callback函數
    queue=queue_name,
    no_ack= True  #默認是false
)

channel.start_consuming()#阻塞模式,即數據傳輸

#在線的C端,可以實時的接收到P端發送來的信息

  

 發送端

#!/usr/bin/python
# -*- coding: UTF-8 -*-


import pika,sys

credentials = pika.PlainCredentials('root','123456')#鏈接的用戶

parameters = pika.ConnectionParameters(host='192.168.100.129',credentials=credentials) #鏈接的參數設置
connection = pika.BlockingConnection(parameters)#閉塞鏈接

channel = connection.channel() #隊列鏈接通道


#聲明queue,隊列持續化
channel.exchange_declare(exchange='logs',type='fanout')

message = ''.join(sys.argv[1:]) or "info:Hello World!"

#消息持續化
channel.basic_publish(
    exchange='logs',
    routing_key='', #隊列
    body=message,
)
print(" [x] Sent %r" % message)

connection.close()#關閉通訊

#廣播,P端發送一條消息,同時在線的C端都能收到消息,

  

direct:選擇性的接收消息(info warning error)

RabbitMQ還支持根據關鍵字發送,即:隊列綁定關鍵字,發送者將數據根據關鍵字發送到消息exchange,exchange根據 關鍵字 斷定應該將數據發送至指定隊列。

 

發送端

 

#!/usr/bin/python
# -*- coding: UTF-8 -*-

import pika,sys

credentials = pika.PlainCredentials('root','123456')#鏈接的用戶

parameters = pika.ConnectionParameters(host='192.168.100.129',credentials=credentials) #鏈接的參數設置
connection = pika.BlockingConnection(parameters)#閉塞鏈接

channel = connection.channel() #隊列鏈接通道

# type='direct':組播類型
channel.exchange_declare(exchange='direct_log',type='direct')

log_level = sys.argv[1] if len(sys.argv) > 1 else 'info'  #嚴重程度


message = ''.join(sys.argv[1:]) or "info:Hello World!"

#消息持續化
channel.basic_publish(
    exchange='direct_log',
    routing_key=log_level, #隊列
    body=message,
)
print(" [x] Sent %r" % message)

connection.close()#關閉通訊


"""
啓動三臺客戶端,運行不一樣的狀態,info  warning error
服務端運行其中的任意一個命令執行。

"""

  

 

接收端

#!/usr/bin/python
# -*- coding: UTF-8 -*-

import pika,time,sys
#鏈接
credentials = pika.PlainCredentials('root','123456')#鏈接的用戶

parameters = pika.ConnectionParameters(host='192.168.100.129',credentials=credentials)#鏈接的配置

connection = pika.BlockingConnection(parameters)


channel = connection.channel()#隊列鏈接通道

#不指定queue名字,rabbit會隨機分配一個名字,exclusive=True會在使用此queue的消費者斷開後,自動將queue刪除
queue_obj = channel.queue_declare(exclusive=True)
queue_name = queue_obj.method.queue
print('queue name',queue_name,queue_obj)

logs_levels = sys.argv[1:]#組播的狀態,info warning error

if not logs_levels:
    sys.stderr.write("Usage: %s [info] [warning] [error]\n" % sys.argv[0])
    sys.exit(1)

for level in logs_levels:
    channel.queue_bind(
        exchange='direct_log',
        queue=queue_name,
        routing_key=level, #綁定隊列到Exchange
                       )

print(' [*] Waiting for logs. To exit press CTRL+C')


#ch:就是通道的實例,channel
def callback(ch,method,properties,body):
    print("[X] Received %r"% body)


#發送消息
channel.basic_consume(
    callback,#取到消息後,調用callback函數
    queue=queue_name,
    no_ack= True  #默認是false
)



channel.start_consuming()#阻塞模式,即數據傳輸

"""
啓動三臺客戶端,運行不一樣的狀態,info  warning error
服務端運行其中的任意一個命令執行。

"""

topic:更加詳細的過濾

 

#:表明全部
*.:表明.前面匹配任意,且只能表明前面一個.的

 

發送端

#!/usr/bin/python
# -*- coding: UTF-8 -*-

import pika,sys

credentials = pika.PlainCredentials('root','123456')#鏈接的用戶

parameters = pika.ConnectionParameters(host='192.168.100.129',credentials=credentials) #鏈接的參數設置
connection = pika.BlockingConnection(parameters)#閉塞鏈接

channel = connection.channel() #隊列鏈接通道

# type='topic':組播中的更詳細的細分過濾
channel.exchange_declare(exchange='topic_log',type='topic')

log_level = sys.argv[1] if len(sys.argv) > 1 else 'all.info'  #嚴重程度


message = ''.join(sys.argv[1:]) or "info:Hello World!"

#消息持續化
channel.basic_publish(
    exchange='topic_log',
    routing_key=log_level, #隊列
    body=message,
)
print(" [x] Sent %r" % message)

connection.close()#關閉通訊

"""
能夠細分的過濾組件。
客戶端:經過 *.django.*:任意.django.任意  的字段去匹配
服務端:經過客戶端的任意匹配規則去查詢。

"""

  

接收端

#!/usr/bin/python
# -*- coding: UTF-8 -*-

import pika,time,sys
#鏈接
credentials = pika.PlainCredentials('root','123456')#鏈接的用戶

parameters = pika.ConnectionParameters(host='192.168.100.129',credentials=credentials)#鏈接的配置

connection = pika.BlockingConnection(parameters)


channel = connection.channel()#隊列鏈接通道

#不指定queue名字,rabbit會隨機分配一個名字,exclusive=True會在使用此queue的消費者斷開後,自動將queue刪除
queue_obj = channel.queue_declare(exclusive=True)
queue_name = queue_obj.method.queue

logs_levels = sys.argv[1:]#狀態,info warning error

if not logs_levels:
    sys.stderr.write("Usage: %s [info] [warning] [error]\n" % sys.argv[0])
    sys.exit(1)

for level in logs_levels:
    channel.queue_bind(
        exchange='topic_log',
        queue=queue_name,
        routing_key=level, #綁定隊列到Exchange
                       )

print(' [*] Waiting for logs. To exit press CTRL+C')


#ch:就是通道的實例,channel
def callback(ch,method,properties,body):
    print("[X] Received %r"% body)


#發送消息
channel.basic_consume(
    callback,#取到消息後,調用callback函數
    queue=queue_name,
    no_ack= True  #默認是false
)



channel.start_consuming()#阻塞模式,即數據傳輸

"""
能夠細分的過濾組件。
客戶端:經過 *.django.*:任意.django.任意  的字段去匹配
服務端:經過客戶端的任意匹配規則去查詢。

"""

  

Remote procedure call (RPC)

fibonacci_rpc = FibonacciRpcClient()
result = fibonacci_rpc.call(4)
print("fib(4) is %r" % result)

ps.使用惟一標識來進行約定。返回的數據

 rpc_client

#!/usr/bin/python
# -*- coding: UTF-8 -*-

"""
1.聲明一隊列,做爲reply_to返回纖細結構的隊列
2.發消息到隊列,消息裏帶一個惟一標識符uid,reply_to
3.監聽reply_to 的隊列,直到有結果
"""

import queue

import pika

import uuid

class CMDRpcClient(object):
    def __init__(self):
        credentials = pika.PlainCredentials('root', '123456')
        parameters = pika.ConnectionParameters(host='192.168.100.129',credentials=credentials)
        self.connection = pika.BlockingConnection(parameters)
        self.channel = self.connection.channel()

        result = self.channel.queue_declare(exclusive=True)
        self.callback_queue = result.method.queue #命令的執行結果的queue

        #聲明要監聽callback_queue
        self.channel.basic_consume(self.on_response, no_ack=True,
                                   queue=self.callback_queue)

    def on_response(self, ch, method, props, body):
        """
        收到服務器端命令結果後執行這個函數
        :param ch:
        :param method:
        :param props:
        :param body:
        :return:
        """
        if self.corr_id == props.correlation_id:
            self.response = body.decode("gbk") #把執行結果賦值給Response

    def call(self, n):
        self.response = None
        self.corr_id = str(uuid.uuid4()) #惟一標識符號
        self.channel.basic_publish(exchange='',
                                   routing_key='rpc_queue2',
                                   properties=pika.BasicProperties(
                                         reply_to = self.callback_queue,
                                         correlation_id = self.corr_id,
                                         ),
                                   body=str(n))


        while self.response is None:
            self.connection.process_data_events()  #檢測監聽的隊列裏有沒有新消息,若是有,收,若是沒有,返回None
            #檢測有沒有要發送的新指令
        return self.response

cmd_rpc = CMDRpcClient()

print(" [x] Requesting fib(30)")
response = cmd_rpc.call('ipconfig')

print(response)

  

 rpc_server

#!/usr/bin/python
# -*- coding: UTF-8 -*-

"""
1.定義fib函數
2.聲明接收指令的對列名rpc_queue
3.開始監聽隊列,收到消息後,調用fib函數
4.把fib執行結果,發送回客戶端指定的reply_to隊列
"""
import subprocess
import pika
import time

credentials = pika.PlainCredentials('root', '123456')

parameters = pika.ConnectionParameters(host='192.168.100.129',credentials=credentials)
connection = pika.BlockingConnection(parameters)

channel = connection.channel() #隊列鏈接通道

channel.queue_declare(queue='rpc_queue2')

#執行客戶端發送的值
def fib(n):
    if n == 0:
        return 0
    elif n == 1:
        return 1
    else:
        return fib(n-1)+fib(n-2)

def run_cmd(cmd):
    cmd_obj = subprocess.Popen(cmd,shell=True,stdout=subprocess.PIPE,stderr=subprocess.PIPE)
    result = cmd_obj.stdout.read() + cmd_obj.stderr.read()

    return result

def on_request(ch,method,props,body):
    cmd = body.decode("utf-8")

    print(" [.] run (%s)" % cmd)
    response = run_cmd(cmd)

    ch.basic_publish(exchange='',
                     routing_key=props.reply_to, #隊列
                     properties=pika.BasicProperties(correlation_id = \
                                                         props.correlation_id),
                     body=response)

    ch.basic_ack(delivery_tag = method.delivery_tag)

channel.basic_consume(on_request, queue='rpc_queue2')

print(" [x] Awaiting RPC requests")
channel.start_consuming()
相關文章
相關標籤/搜索