rabbitmq

1、安裝

linux(centos7)
    yum install erlang
    yum install rabbitmq-server* 

    可參考:https://www.cnblogs.com/web424/p/6761153.html
linux
問題1:
Failed to start LSB: Enable AMQP service provided by RabbitMQ broker.

# vi /etc/rabbitmq/rabbitmq-env.conf    
NODENAME=rabbit@localhost  
搞定!
(參考:http://blog.csdn.net/testcs_dn/article/details/52514199)
linux安裝問題
    windows
    安裝erlang,官網http://www.erlang.org/downloads
    安裝RabbitMQ,官網http://www.rabbitmq.com/download.html


    可參考:https://www.cnblogs.com/ericli-ericli/p/5902270.html
windows
問題1:
C:\Program Files\RabbitMQ Server\rabbitmq_server-3.7.4\sbin>rabbitmqctl.bat status
Status of node rabbit@fat39 ...
Error: unable to perform an operation on node 'rabbit@fat39'. Please see diagnostics information and suggestions below.

Most common reasons for this are:

 * Target node is unreachable (e.g. due to hostname resolution, TCP connection or firewall issues)
 * CLI tool fails to authenticate with the server (e.g. due to CLI tool's Erlang cookie not matching that of the server)
 * Target node is not running

In addition to the diagnostics info below:

 * See the CLI, clustering and networking guides on http://rabbitmq.com/documentation.html to learn more
 * Consult server logs on node rabbit@fat39

DIAGNOSTICS
===========

attempted to contact: [rabbit@fat39]

rabbit@fat39:
  * connected to epmd (port 4369) on fat39
  * epmd reports node 'rabbit' uses port 25672 for inter-node and CLI tool traffic
  * TCP connection succeeded but Erlang distribution failed

  * Authentication failed (rejected by the remote node), please check the Erlang cookie


Current node details:
 * node name: rabbitmqcli84@fat39
 * effective user's home directory: C:\Users\fat39
 * Erlang cookie hash: E6ohUpM/NQ9szEKtdnLQnQ==





用搜索工具對電腦文件系統進行檢索,找到C:\Windows\System32\config\systemprofile下有個.erlang.cookie文件,內容與C:\User\lujie\.erlang.cookie不同,後來修改其中一個文件的內容,使兩個文件內容同樣。 再次執行命令rabbitmqctl status,成功
windows安裝問題

 補充:linux測試有效參考:https://blog.csdn.net/lilin0800/article/details/80690752html

 2、使用

0、使用前準備

linux
# 新增用戶
sudo rabbitmqctl add_user admin admin123
# 受權
sudo rabbitmqctl set_permissions -p / admin ".*" ".*" ".*"

windows
# 新增用戶
C:\Program Files\RabbitMQ Server\rabbitmq_server-3.7.4\sbin\rabbitmqctl.bat add_user admin admin123
# 受權
C:\Program Files\RabbitMQ Server\rabbitmq_server-3.7.4\sbin\rabbitmqctl.bat set_permissions -p / admin ".*" ".*" ".*"
建立用戶及受權
rabbitmqctl.bat add_user admin admin123  # 建立帳號密碼
rabbitmqctl.bat list_users  # 查看帳號
rabbitmqctl.bat set_permissions admin .* .* .*  # 分發權限
rabbitmqctl.bat list_permissions  # 查看權限
rabbitmqctl.bat list_queues  # 查看隊列、數量

./rabbitmqctl add_user admin admin     
./rabbitmqctl set_user_tags admin administrator  # 管理員

 

 

參數說明:node

1、隊列持久化durable
    說明:
        該queue保存在rabbitmq數據庫中,若配置了True,則在rabbitmq重啓後該queue還在;若False,則丟失。
        此配置與消息持久化delivery_mode配合使用
    配置:
        在producer端:
        channel.queue_declare(queue='myqueue',durable=True)



2、消息持久化delivery_mode
    說明:
        隊列持久化的前提下配置消息持久化delivery_mode=2,則消息不會丟失;不然丟失。
        rabbitmq重啓後的集中狀況:
            隊列不持久,消息不持久:均丟失;
            隊列持久、消息持久:均不丟失;
            隊列持久、消息不持久:隊列不丟失、消息丟失;
            
    配置:
        在producer端:
        channel.basic_publish(exchange='',  # exchange與訂閱有關
                              routing_key='myqueue',  # 發往的隊列
                              body=msg,  # 消息內容
                              properties=pika.BasicProperties(
                                  delivery_mode=2,  # 消息持久化
                              ),
                              )

3、公平分發
    說明:
        默認消息隊列裏的數據是按照順序被消費者拿走,例如:消費者1 去隊列中獲取 奇數 序列的任務,消費者2去隊列中獲取 偶數 序列的任務。
        channel.basic_qos(prefetch_count=1) 表示誰來誰取,再也不按照奇偶數排列
    配置:
        在consumer端:
        channel.basic_qos(prefetch_count=1)  # 公平分發

4、消息確認basic_ack、no_ack
    說明:
        未確認:
            若是consumer從queue中提取消息的途中出錯,致使消息沒被處理,但消息已從producer取走,消息丟失。
        配置確認:
            consumer在處理callback函數的最後發出ack,告訴producer已處理完畢。若是處理消息途中出錯,producer未收到ack,則producer從新把消息放回queue。
            
        no-ack = False,若是消費者遇到狀況(its channel is closed, connection is closed, or TCP connection is lost)掛掉了,那麼,RabbitMQ會從新將該任務添加到隊列中。

    配置:
        在consumer端:
        def mycallback(ch, method, properties, body):
            print(" [x] Received %r" % body)
            time.sleep(6)
            print('msg handle done...', body)
            ch.basic_ack(delivery_tag=method.delivery_tag)  # 消息接收後,給rabbitmq返回確認消息

        channel.basic_consume(
            consumer_callback=mycallback, #取到消息後,調用callback 函數
            queue="myqueue",  # 目標隊列
            # no_ack=True,  # 不返回確認消息
        )

 

一、簡單使用

配置流程python

import pika

connection = pika.BlockingConnection(pika.ConnectionParameters(
               'localhost'))
channel = connection.channel()

channel.queue_declare(queue='hello1',durable=True)  # durable隊列持久化

channel.basic_publish(exchange='',
                      routing_key='hello1',
                      body='Hello World!',
                      properties=pika.BasicProperties(
                          delivery_mode=2,  # 消息持久化
                      ),
                      )
print(" [x] Sent 'Hello World!'")
connection.close()
producer
__author__ = 'Administrator'


import pika


# credentials = pika.PlainCredentials('admin', 'admin123')
#
# parameters = pika.ConnectionParameters(host='localhost',credentials=credentials)
# connection = pika.BlockingConnection(parameters)
#
# channel = connection.channel() #隊列鏈接通道

connection = pika.BlockingConnection(pika.ConnectionParameters(
               'localhost'))
channel = connection.channel()

channel.basic_qos(prefetch_count=1)  # 公平分發

def callback(ch, method, properties, body):
    print(" [x] Received %r" % body)
    #time.sleep(15)
    print('msg handle done...',body)
    ch.basic_ack(delivery_tag=method.delivery_tag)

channel.basic_consume(callback, #取到消息後,調用callback 函數
                      queue='hello1',)
                      #no_ack=True) #消息處理後,不向rabbit-server確認消息已消費完畢

print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming() #阻塞模式
consumer

 

二、訂閱

以前的例子都基本都是1對1的消息發送和接收,即消息只能發送到指定的queue裏,但有些時候你想讓你的消息被全部的Queue收到,相似廣播的效果,這時候就要用到exchange了,linux

Exchange在定義的時候是有類型的,以決定究竟是哪些Queue符合條件,能夠接收消息web

fanout: 全部bind到此exchange的queue均可以接收消息
direct: 經過routingKey和exchange決定的那個惟一的queue能夠接收消息
topic:全部符合routingKey(此時能夠是一個表達式)的routingKey所bind的queue能夠接收消息

   表達式符號說明:#表明一個或多個字符,*表明任何字符
      例:#.a會匹配a.a,aa.a,aaa.a等
          *.a會匹配a.a,b.a,c.a等
     注:使用RoutingKey爲#,Exchange Type爲topic的時候至關於使用fanout shell

headers: 經過headers 來決定把消息發給哪些queue數據庫

 

(1)fanout模式

配置流程:windows

# -*- coding:utf-8 -*-
import sys
import pika
# 認證消息
credentials = pika.PlainCredentials('admin', 'admin123')
# 鏈接的參數,ip,端口,認證
parameters = pika.ConnectionParameters(host='localhost',credentials=credentials)
# 鏈接rabbitmq
connection = pika.BlockingConnection(parameters)
# 向rabbitmq聲明通道,即建立通道
channel = connection.channel()
# 綁定通道和隊列
channel.exchange_declare(exchange="myexchange",exchange_type="fanout")

msg = "...."
msg = ' '.join(sys.argv[1:]) or "info: Hello World!"
channel.basic_publish(
    exchange="myexchange",
    routing_key='',
    body=msg,
)

connection.close()
producer
# -*- coding:utf-8 -*-

import pika
# 認證消息
credentials = pika.PlainCredentials('admin', 'admin123')
# 鏈接的參數,ip,端口,認證
parameters = pika.ConnectionParameters(host='localhost',credentials=credentials)
# 鏈接rabbitmq
connection = pika.BlockingConnection(parameters)
# 向rabbitmq聲明通道,即建立通道
channel = connection.channel()
# 綁定通道和頻道
channel.exchange_declare(exchange="myexchange",exchange_type="fanout")  # 訂閱頻道爲myexchange的消息,模式是廣播fanout

# 自動生成一個惟一的隊列
queue_obj = channel.queue_declare(exclusive=True) #不指定queue名字,rabbit會隨機分配一個名字,exclusive=True會在使用此queue的消費者斷開後,自動將queue刪除
queue_name = queue_obj.method.queue
channel.queue_bind(queue=queue_name,exchange="myexchange")  # 隊列和頻道綁定在一塊


print(' [*] Waiting for myexchange. To exit press CTRL+C')

def mycallback(ch, method, properties, body):
    print(" [x] %r" % body)

channel.basic_consume(
    consumer_callback=mycallback,
    queue=queue_name,
    no_ack=True,
)

channel.start_consuming()
consumer

 

(2)direct模式

RabbitMQ還支持根據關鍵字發送,即:隊列綁定關鍵字,發送者將數據根據關鍵字發送到消息exchange,exchange根據 關鍵字 斷定應該將數據發送至指定隊列。centos

配置流程:服務器

__author__ = 'Administrator'
import pika
import sys

credentials = pika.PlainCredentials('admin', 'admin123')

parameters = pika.ConnectionParameters(host='localhost',credentials=credentials)
connection = pika.BlockingConnection(parameters)

channel = connection.channel() #隊列鏈接通道

channel.exchange_declare(exchange='direct_log',exchange_type='direct')

log_level =  sys.argv[1] if len(sys.argv) > 1 else 'info'

message = ' '.join(sys.argv[1:]) or "info: Hello World!"

channel.basic_publish(exchange='direct_log',
                      routing_key=log_level,
                      body=message)
print(" [x] Sent %r" % message)
connection.close()
producer
__author__ = 'Administrator'
import pika,sys
credentials = pika.PlainCredentials('admin', 'admin123')

parameters = pika.ConnectionParameters(host='localhost',credentials=credentials)
connection = pika.BlockingConnection(parameters)

channel = connection.channel() #隊列鏈接通道

queue_obj = channel.queue_declare(exclusive=True) #不指定queue名字,rabbit會隨機分配一個名字,exclusive=True會在使用此queue的消費者斷開後,自動將queue刪除
queue_name = queue_obj.method.queue
print('queue name',queue_name,queue_obj)

log_levels = sys.argv[1:] # info warning errr

if not log_levels:
    sys.stderr.write("Usage: %s [info] [warning] [error]\n" % sys.argv[0])
    sys.exit(1)


for level in log_levels:
    channel.queue_bind(exchange='direct_log',
                       queue=queue_name,
                       routing_key=level) #綁定隊列到Exchange

print(' [*] Waiting for logs. To exit press CTRL+C')

def callback(ch, method, properties, body):
    print(" [x] %r" % body)

channel.basic_consume(callback,queue=queue_name, no_ack=True)

channel.start_consuming()
consumer

consumer1:python consumer-4-direct.py info error

consuumer2: python consumer-4-direct.py info alert

producer:

  一、python  producer-4-direct.py info information

  二、python  producer-4-direct.py error errorinformation

  三、python  producer-4-direct.py alert alertinformat

c1:

c2:

p:

 

(3)topic模式

 

配置流程:

__author__ = 'Administrator'
import pika
import sys

credentials = pika.PlainCredentials('admin', 'admin123')

parameters = pika.ConnectionParameters(host='localhost',credentials=credentials)
connection = pika.BlockingConnection(parameters)

channel = connection.channel() #隊列鏈接通道

channel.exchange_declare(exchange='topic_log',exchange_type='topic')

#log_level =  sys.argv[1] if len(sys.argv) > 1 else 'info'
log_level =  sys.argv[1] if len(sys.argv) > 1 else 'all.info'

message = ' '.join(sys.argv[1:]) or "all.info: Hello World!"

channel.basic_publish(exchange='topic_log',
                      routing_key=log_level,
                      body=message)
print(" [x] Sent %r" % message)
connection.close()
producer
__author__ = 'Administrator'
import pika,sys
credentials = pika.PlainCredentials('admin', 'admin123')

parameters = pika.ConnectionParameters(host='localhost',credentials=credentials)
connection = pika.BlockingConnection(parameters)

channel = connection.channel() #隊列鏈接通道

queue_obj = channel.queue_declare(exclusive=True) #不指定queue名字,rabbit會隨機分配一個名字,exclusive=True會在使用此queue的消費者斷開後,自動將queue刪除
queue_name = queue_obj.method.queue


log_levels = sys.argv[1:] # info warning errr

if not log_levels:
    sys.stderr.write("Usage: %s [info] [warning] [error]\n" % sys.argv[0])
    sys.exit(1)


for level in log_levels:
    channel.queue_bind(exchange='topic_log',
                       queue=queue_name,
                       routing_key=level) #綁定隊列到Exchange

print(' [*] Waiting for logs. To exit press CTRL+C')

def callback(ch, method, properties, body):
    print(" [x] %r" % body)

channel.basic_consume(callback,queue=queue_name, no_ack=True)

channel.start_consuming()
consumer
接收全部logs
"#"
python receive_logs_topic.py "#"

接收以xx開頭、xx中間、xx結尾
以"*"爲佔位符
python receive_logs_topic.py "kern.*"
python receive_logs_topic.py "kern.*" "abc.*.critical"
python receive_logs_topic.py "kern.*" "*.critical"
參數說明

 

c1:偵聽 *.abc.*  info.*  err.*

c2:偵聽 abc.*

p:

 

 

三、RPC模式

clinet和server須要對話時,使用rpc模式。

 

__author__ = 'Administrator'

# 1.聲明一個隊列,做爲reply_to返回消息結果的隊列
# 2.  發消息到隊列,消息裏帶一個惟一標識符uid,reply_to
# 3.  監聽reply_to 的隊列,直到有結果
import queue

import pika
import uuid

class CMDRpcClient(object):
    def __init__(self):
        credentials = pika.PlainCredentials('admin', 'admin123')
        parameters = pika.ConnectionParameters(host='localhost', credentials=credentials)
        self.connection = pika.BlockingConnection(parameters)
        self.channel = self.connection.channel()

        queue_obj = self.channel.queue_declare(exclusive=True)
        self.callback_queue = queue_obj.method.queue #命令的執行結果的queue

        #聲明要監聽callback_queue
        self.channel.basic_consume(self.on_response, no_ack=True,
                                   queue=self.callback_queue)

    def on_response(self, ch, method, props, body):
        """
        收到服務器端命令結果後執行這個函數
        :param ch:
        :param method:
        :param props:
        :param body:
        :return:
        """
        if self.corr_id == props.correlation_id:
            self.response = body.decode("gbk") #把執行結果賦值給Response

    def call(self, n):
        self.response = None
        self.corr_id = str(uuid.uuid4()) #惟一標識符號
        self.channel.basic_publish(exchange='',
                                   routing_key='rpc_queue2',
                                   properties=pika.BasicProperties(
                                         reply_to = self.callback_queue,
                                         correlation_id = self.corr_id,
                                         ),
                                   body=str(n))


        while self.response is None:
            self.connection.process_data_events()  #檢測監聽的隊列裏有沒有新消息,若是有,收,若是沒有,返回None
            #檢測有沒有要發送的新指令
        return self.response

cmd_rpc = CMDRpcClient()

print(" [x] Requesting fib(30)")
response = cmd_rpc.call('ipconfig')

print(response)
client
__author__ = 'Administrator'

#1 。 定義fib函數
#2. 聲明接收指令的隊列名rpc_queue
#3. 開始監聽隊列,收到消息後 調用fib函數
#4 把fib執行結果,發送回客戶端指定的reply_to 隊列
import subprocess
import pika
import time
credentials = pika.PlainCredentials('admin', 'admin123')

parameters = pika.ConnectionParameters(host='localhost',credentials=credentials)
connection = pika.BlockingConnection(parameters)

channel = connection.channel() #隊列鏈接通道

channel.queue_declare(queue='rpc_queue2')

def fib(n):
    if n == 0:
        return 0
    elif n == 1:
        return 1
    else:
        return fib(n-1) + fib(n-2)


def run_cmd(cmd):
    cmd_obj = subprocess.Popen(cmd,shell=True,stdout=subprocess.PIPE,stderr=subprocess.PIPE)
    result = cmd_obj.stdout.read() + cmd_obj.stderr.read()

    return result


def on_request(ch, method, props, body):
    cmd = body.decode("utf-8")

    print(" [.] run (%s)" % cmd)
    response = run_cmd(cmd)

    ch.basic_publish(exchange='',
                     routing_key=props.reply_to, #隊列
                     properties=pika.BasicProperties(correlation_id = \
                                                         props.correlation_id),
                     body=response)

    ch.basic_ack(delivery_tag = method.delivery_tag)

channel.basic_consume(on_request, queue='rpc_queue2')

print(" [x] Awaiting RPC requests")
channel.start_consuming()
server
相關文章
相關標籤/搜索