linux(centos7) yum install erlang yum install rabbitmq-server* 可參考:https://www.cnblogs.com/web424/p/6761153.html
問題1: Failed to start LSB: Enable AMQP service provided by RabbitMQ broker. # vi /etc/rabbitmq/rabbitmq-env.conf NODENAME=rabbit@localhost 搞定! (參考:http://blog.csdn.net/testcs_dn/article/details/52514199)
windows 安裝erlang,官網http://www.erlang.org/downloads 安裝RabbitMQ,官網http://www.rabbitmq.com/download.html 可參考:https://www.cnblogs.com/ericli-ericli/p/5902270.html
問題1: C:\Program Files\RabbitMQ Server\rabbitmq_server-3.7.4\sbin>rabbitmqctl.bat status Status of node rabbit@fat39 ... Error: unable to perform an operation on node 'rabbit@fat39'. Please see diagnostics information and suggestions below. Most common reasons for this are: * Target node is unreachable (e.g. due to hostname resolution, TCP connection or firewall issues) * CLI tool fails to authenticate with the server (e.g. due to CLI tool's Erlang cookie not matching that of the server) * Target node is not running In addition to the diagnostics info below: * See the CLI, clustering and networking guides on http://rabbitmq.com/documentation.html to learn more * Consult server logs on node rabbit@fat39 DIAGNOSTICS =========== attempted to contact: [rabbit@fat39] rabbit@fat39: * connected to epmd (port 4369) on fat39 * epmd reports node 'rabbit' uses port 25672 for inter-node and CLI tool traffic * TCP connection succeeded but Erlang distribution failed * Authentication failed (rejected by the remote node), please check the Erlang cookie Current node details: * node name: rabbitmqcli84@fat39 * effective user's home directory: C:\Users\fat39 * Erlang cookie hash: E6ohUpM/NQ9szEKtdnLQnQ== 用搜索工具對電腦文件系統進行檢索,找到C:\Windows\System32\config\systemprofile下有個.erlang.cookie文件,內容與C:\User\lujie\.erlang.cookie不同,後來修改其中一個文件的內容,使兩個文件內容同樣。 再次執行命令rabbitmqctl status,成功
補充:linux測試有效參考:https://blog.csdn.net/lilin0800/article/details/80690752html
linux # 新增用戶 sudo rabbitmqctl add_user admin admin123 # 受權 sudo rabbitmqctl set_permissions -p / admin ".*" ".*" ".*" windows # 新增用戶 C:\Program Files\RabbitMQ Server\rabbitmq_server-3.7.4\sbin\rabbitmqctl.bat add_user admin admin123 # 受權 C:\Program Files\RabbitMQ Server\rabbitmq_server-3.7.4\sbin\rabbitmqctl.bat set_permissions -p / admin ".*" ".*" ".*"
rabbitmqctl.bat add_user admin admin123 # 建立帳號密碼 rabbitmqctl.bat list_users # 查看帳號 rabbitmqctl.bat set_permissions admin .* .* .* # 分發權限 rabbitmqctl.bat list_permissions # 查看權限 rabbitmqctl.bat list_queues # 查看隊列、數量 ./rabbitmqctl add_user admin admin ./rabbitmqctl set_user_tags admin administrator # 管理員
參數說明:node
1、隊列持久化durable 說明: 該queue保存在rabbitmq數據庫中,若配置了True,則在rabbitmq重啓後該queue還在;若False,則丟失。 此配置與消息持久化delivery_mode配合使用 配置: 在producer端: channel.queue_declare(queue='myqueue',durable=True) 2、消息持久化delivery_mode 說明: 隊列持久化的前提下配置消息持久化delivery_mode=2,則消息不會丟失;不然丟失。 rabbitmq重啓後的集中狀況: 隊列不持久,消息不持久:均丟失; 隊列持久、消息持久:均不丟失; 隊列持久、消息不持久:隊列不丟失、消息丟失; 配置: 在producer端: channel.basic_publish(exchange='', # exchange與訂閱有關 routing_key='myqueue', # 發往的隊列 body=msg, # 消息內容 properties=pika.BasicProperties( delivery_mode=2, # 消息持久化 ), ) 3、公平分發 說明: 默認消息隊列裏的數據是按照順序被消費者拿走,例如:消費者1 去隊列中獲取 奇數 序列的任務,消費者2去隊列中獲取 偶數 序列的任務。 channel.basic_qos(prefetch_count=1) 表示誰來誰取,再也不按照奇偶數排列 配置: 在consumer端: channel.basic_qos(prefetch_count=1) # 公平分發 4、消息確認basic_ack、no_ack 說明: 未確認: 若是consumer從queue中提取消息的途中出錯,致使消息沒被處理,但消息已從producer取走,消息丟失。 配置確認: consumer在處理callback函數的最後發出ack,告訴producer已處理完畢。若是處理消息途中出錯,producer未收到ack,則producer從新把消息放回queue。 no-ack = False,若是消費者遇到狀況(its channel is closed, connection is closed, or TCP connection is lost)掛掉了,那麼,RabbitMQ會從新將該任務添加到隊列中。 配置: 在consumer端: def mycallback(ch, method, properties, body): print(" [x] Received %r" % body) time.sleep(6) print('msg handle done...', body) ch.basic_ack(delivery_tag=method.delivery_tag) # 消息接收後,給rabbitmq返回確認消息 channel.basic_consume( consumer_callback=mycallback, #取到消息後,調用callback 函數 queue="myqueue", # 目標隊列 # no_ack=True, # 不返回確認消息 )
配置流程python
import pika connection = pika.BlockingConnection(pika.ConnectionParameters( 'localhost')) channel = connection.channel() channel.queue_declare(queue='hello1',durable=True) # durable隊列持久化 channel.basic_publish(exchange='', routing_key='hello1', body='Hello World!', properties=pika.BasicProperties( delivery_mode=2, # 消息持久化 ), ) print(" [x] Sent 'Hello World!'") connection.close()
__author__ = 'Administrator' import pika # credentials = pika.PlainCredentials('admin', 'admin123') # # parameters = pika.ConnectionParameters(host='localhost',credentials=credentials) # connection = pika.BlockingConnection(parameters) # # channel = connection.channel() #隊列鏈接通道 connection = pika.BlockingConnection(pika.ConnectionParameters( 'localhost')) channel = connection.channel() channel.basic_qos(prefetch_count=1) # 公平分發 def callback(ch, method, properties, body): print(" [x] Received %r" % body) #time.sleep(15) print('msg handle done...',body) ch.basic_ack(delivery_tag=method.delivery_tag) channel.basic_consume(callback, #取到消息後,調用callback 函數 queue='hello1',) #no_ack=True) #消息處理後,不向rabbit-server確認消息已消費完畢 print(' [*] Waiting for messages. To exit press CTRL+C') channel.start_consuming() #阻塞模式
以前的例子都基本都是1對1的消息發送和接收,即消息只能發送到指定的queue裏,但有些時候你想讓你的消息被全部的Queue收到,相似廣播的效果,這時候就要用到exchange了,linux
Exchange在定義的時候是有類型的,以決定究竟是哪些Queue符合條件,能夠接收消息web
fanout: 全部bind到此exchange的queue均可以接收消息
direct: 經過routingKey和exchange決定的那個惟一的queue能夠接收消息
topic:全部符合routingKey(此時能夠是一個表達式)的routingKey所bind的queue能夠接收消息
表達式符號說明:#表明一個或多個字符,*表明任何字符
例:#.a會匹配a.a,aa.a,aaa.a等
*.a會匹配a.a,b.a,c.a等
注:使用RoutingKey爲#,Exchange Type爲topic的時候至關於使用fanout shell
headers: 經過headers 來決定把消息發給哪些queue數據庫
配置流程:windows
# -*- coding:utf-8 -*- import sys import pika # 認證消息 credentials = pika.PlainCredentials('admin', 'admin123') # 鏈接的參數,ip,端口,認證 parameters = pika.ConnectionParameters(host='localhost',credentials=credentials) # 鏈接rabbitmq connection = pika.BlockingConnection(parameters) # 向rabbitmq聲明通道,即建立通道 channel = connection.channel() # 綁定通道和隊列 channel.exchange_declare(exchange="myexchange",exchange_type="fanout") msg = "...." msg = ' '.join(sys.argv[1:]) or "info: Hello World!" channel.basic_publish( exchange="myexchange", routing_key='', body=msg, ) connection.close()
# -*- coding:utf-8 -*- import pika # 認證消息 credentials = pika.PlainCredentials('admin', 'admin123') # 鏈接的參數,ip,端口,認證 parameters = pika.ConnectionParameters(host='localhost',credentials=credentials) # 鏈接rabbitmq connection = pika.BlockingConnection(parameters) # 向rabbitmq聲明通道,即建立通道 channel = connection.channel() # 綁定通道和頻道 channel.exchange_declare(exchange="myexchange",exchange_type="fanout") # 訂閱頻道爲myexchange的消息,模式是廣播fanout # 自動生成一個惟一的隊列 queue_obj = channel.queue_declare(exclusive=True) #不指定queue名字,rabbit會隨機分配一個名字,exclusive=True會在使用此queue的消費者斷開後,自動將queue刪除 queue_name = queue_obj.method.queue channel.queue_bind(queue=queue_name,exchange="myexchange") # 隊列和頻道綁定在一塊 print(' [*] Waiting for myexchange. To exit press CTRL+C') def mycallback(ch, method, properties, body): print(" [x] %r" % body) channel.basic_consume( consumer_callback=mycallback, queue=queue_name, no_ack=True, ) channel.start_consuming()
RabbitMQ還支持根據關鍵字發送,即:隊列綁定關鍵字,發送者將數據根據關鍵字發送到消息exchange,exchange根據 關鍵字 斷定應該將數據發送至指定隊列。centos
配置流程:服務器
__author__ = 'Administrator' import pika import sys credentials = pika.PlainCredentials('admin', 'admin123') parameters = pika.ConnectionParameters(host='localhost',credentials=credentials) connection = pika.BlockingConnection(parameters) channel = connection.channel() #隊列鏈接通道 channel.exchange_declare(exchange='direct_log',exchange_type='direct') log_level = sys.argv[1] if len(sys.argv) > 1 else 'info' message = ' '.join(sys.argv[1:]) or "info: Hello World!" channel.basic_publish(exchange='direct_log', routing_key=log_level, body=message) print(" [x] Sent %r" % message) connection.close()
__author__ = 'Administrator' import pika,sys credentials = pika.PlainCredentials('admin', 'admin123') parameters = pika.ConnectionParameters(host='localhost',credentials=credentials) connection = pika.BlockingConnection(parameters) channel = connection.channel() #隊列鏈接通道 queue_obj = channel.queue_declare(exclusive=True) #不指定queue名字,rabbit會隨機分配一個名字,exclusive=True會在使用此queue的消費者斷開後,自動將queue刪除 queue_name = queue_obj.method.queue print('queue name',queue_name,queue_obj) log_levels = sys.argv[1:] # info warning errr if not log_levels: sys.stderr.write("Usage: %s [info] [warning] [error]\n" % sys.argv[0]) sys.exit(1) for level in log_levels: channel.queue_bind(exchange='direct_log', queue=queue_name, routing_key=level) #綁定隊列到Exchange print(' [*] Waiting for logs. To exit press CTRL+C') def callback(ch, method, properties, body): print(" [x] %r" % body) channel.basic_consume(callback,queue=queue_name, no_ack=True) channel.start_consuming()
consumer1:python consumer-4-direct.py info error
consuumer2: python consumer-4-direct.py info alert
producer:
一、python producer-4-direct.py info information
二、python producer-4-direct.py error errorinformation
三、python producer-4-direct.py alert alertinformat
c1:
c2:
p:
配置流程:
__author__ = 'Administrator' import pika import sys credentials = pika.PlainCredentials('admin', 'admin123') parameters = pika.ConnectionParameters(host='localhost',credentials=credentials) connection = pika.BlockingConnection(parameters) channel = connection.channel() #隊列鏈接通道 channel.exchange_declare(exchange='topic_log',exchange_type='topic') #log_level = sys.argv[1] if len(sys.argv) > 1 else 'info' log_level = sys.argv[1] if len(sys.argv) > 1 else 'all.info' message = ' '.join(sys.argv[1:]) or "all.info: Hello World!" channel.basic_publish(exchange='topic_log', routing_key=log_level, body=message) print(" [x] Sent %r" % message) connection.close()
__author__ = 'Administrator' import pika,sys credentials = pika.PlainCredentials('admin', 'admin123') parameters = pika.ConnectionParameters(host='localhost',credentials=credentials) connection = pika.BlockingConnection(parameters) channel = connection.channel() #隊列鏈接通道 queue_obj = channel.queue_declare(exclusive=True) #不指定queue名字,rabbit會隨機分配一個名字,exclusive=True會在使用此queue的消費者斷開後,自動將queue刪除 queue_name = queue_obj.method.queue log_levels = sys.argv[1:] # info warning errr if not log_levels: sys.stderr.write("Usage: %s [info] [warning] [error]\n" % sys.argv[0]) sys.exit(1) for level in log_levels: channel.queue_bind(exchange='topic_log', queue=queue_name, routing_key=level) #綁定隊列到Exchange print(' [*] Waiting for logs. To exit press CTRL+C') def callback(ch, method, properties, body): print(" [x] %r" % body) channel.basic_consume(callback,queue=queue_name, no_ack=True) channel.start_consuming()
接收全部logs "#" python receive_logs_topic.py "#" 接收以xx開頭、xx中間、xx結尾 以"*"爲佔位符 python receive_logs_topic.py "kern.*" python receive_logs_topic.py "kern.*" "abc.*.critical" python receive_logs_topic.py "kern.*" "*.critical"
c1:偵聽 *.abc.* info.* err.*
c2:偵聽 abc.*
p:
clinet和server須要對話時,使用rpc模式。
__author__ = 'Administrator' # 1.聲明一個隊列,做爲reply_to返回消息結果的隊列 # 2. 發消息到隊列,消息裏帶一個惟一標識符uid,reply_to # 3. 監聽reply_to 的隊列,直到有結果 import queue import pika import uuid class CMDRpcClient(object): def __init__(self): credentials = pika.PlainCredentials('admin', 'admin123') parameters = pika.ConnectionParameters(host='localhost', credentials=credentials) self.connection = pika.BlockingConnection(parameters) self.channel = self.connection.channel() queue_obj = self.channel.queue_declare(exclusive=True) self.callback_queue = queue_obj.method.queue #命令的執行結果的queue #聲明要監聽callback_queue self.channel.basic_consume(self.on_response, no_ack=True, queue=self.callback_queue) def on_response(self, ch, method, props, body): """ 收到服務器端命令結果後執行這個函數 :param ch: :param method: :param props: :param body: :return: """ if self.corr_id == props.correlation_id: self.response = body.decode("gbk") #把執行結果賦值給Response def call(self, n): self.response = None self.corr_id = str(uuid.uuid4()) #惟一標識符號 self.channel.basic_publish(exchange='', routing_key='rpc_queue2', properties=pika.BasicProperties( reply_to = self.callback_queue, correlation_id = self.corr_id, ), body=str(n)) while self.response is None: self.connection.process_data_events() #檢測監聽的隊列裏有沒有新消息,若是有,收,若是沒有,返回None #檢測有沒有要發送的新指令 return self.response cmd_rpc = CMDRpcClient() print(" [x] Requesting fib(30)") response = cmd_rpc.call('ipconfig') print(response)
__author__ = 'Administrator' #1 。 定義fib函數 #2. 聲明接收指令的隊列名rpc_queue #3. 開始監聽隊列,收到消息後 調用fib函數 #4 把fib執行結果,發送回客戶端指定的reply_to 隊列 import subprocess import pika import time credentials = pika.PlainCredentials('admin', 'admin123') parameters = pika.ConnectionParameters(host='localhost',credentials=credentials) connection = pika.BlockingConnection(parameters) channel = connection.channel() #隊列鏈接通道 channel.queue_declare(queue='rpc_queue2') def fib(n): if n == 0: return 0 elif n == 1: return 1 else: return fib(n-1) + fib(n-2) def run_cmd(cmd): cmd_obj = subprocess.Popen(cmd,shell=True,stdout=subprocess.PIPE,stderr=subprocess.PIPE) result = cmd_obj.stdout.read() + cmd_obj.stderr.read() return result def on_request(ch, method, props, body): cmd = body.decode("utf-8") print(" [.] run (%s)" % cmd) response = run_cmd(cmd) ch.basic_publish(exchange='', routing_key=props.reply_to, #隊列 properties=pika.BasicProperties(correlation_id = \ props.correlation_id), body=response) ch.basic_ack(delivery_tag = method.delivery_tag) channel.basic_consume(on_request, queue='rpc_queue2') print(" [x] Awaiting RPC requests") channel.start_consuming()