環境 CentOS7 + Python3.5 yum -y install epel-release erlang socat cd /usr/local/src wget http://www.rabbitmq.com/releases/rabbitmq-server/v3.6.15/rabbitmq-server-3.6.15-1.el7.noarch.rpm yum install rabbitmq-server-3.6.15-1.el7.noarch.rpm 插件列表查詢: rabbitmq-plugins list 開啓管理功能: rabbitmq-plugins enable rabbitmq_management 增長訪問用戶,默認用戶guest只能本地訪問。 rabbitmqctl add_user admin 123456 設置用戶角色: rabbitmqctl set_user_tags admin administrator 權限設置:rabbitmqctl set_permissions [-p vhostpath] {user} {conf} {write} {read} conf: 一個正則表達式match哪些配置資源可以被該用戶訪問。 write: 一個正則表達式match哪些配置資源可以被該用戶讀。 read: 一個正則表達式match哪些配置資源可以被該用戶訪問。 權限設置示例: rabbitmqctl set_permissions -p / admin ".*" ".*" ".*" 列出全部用戶: rabbitmqctl list_users 刪除用戶: rabbitmqctl delete_user {username} 修改用戶密碼: rabbitmqctl change_password {username} {newpassword} 啓動: rabbitmq-server -detached 關閉: rabbitmqctl stop 服務器狀態: rabbitmqctl status 集羣狀態: rabbitmqctl cluster_status 添加vhost: rabbitmqctl add_vhost {name} 刪除vhost: rabbitmqctl delete_vhost {name}
生產者文件 producer.py正則表達式
import pika, sys credentials = pika.PlainCredentials("admin", "123456") conn_params = pika.ConnectionParameters("192.168.205.128", virtual_host="/", credentials=credentials) conn_broker = pika.BlockingConnection(conn_params) channel = conn_broker.channel() channel.confirm_delivery() # 將信道設置爲confirm模式 msg = sys.argv[1] # 將第一個參數做爲消息內容 msg_props = pika.BasicProperties() msg_props.content_type = "text/plain" ack = channel.basic_publish(body=msg, exchange="hello-exchange", properties=msg_props, routing_key="hola") # 發佈信息 if ack is True: print ("put message to rabbitmq successed!") else: print ("put message to rabbitmq failed") channel.close()
消費者文件 consumer.py服務器
import pika credentials = pika.PlainCredentials("admin", "123456") conn_params = pika.ConnectionParameters("192.168.205.128", virtual_host="/", credentials=credentials) conn_broker = pika.BlockingConnection(conn_params) channel = conn_broker.channel() # 獲取信道 channel.exchange_declare("hello-exchange", "direct", passive=False, durable=True, auto_delete=False) # 聲明交換器 channel.queue_declare(queue="hello-queue") #聲明隊列 channel.queue_bind(queue="hello-queue", exchange="hello-exchange", routing_key="hola") # 使用路由鍵綁定隊列和交換器 def msg_consumer(channel, method, header, body): # 處理消息 channel.basic_ack(delivery_tag=method.delivery_tag) # 消息確認 print ("%d : '%s' " %(method.delivery_tag, body) ) if body == b'quit': # 這裏的 body 是 bytes 類型 channel.basic_cancel(consumer_tag="hello-consumer") # 中止消費並退出 channel.stop_consuming() return # 訂閱消費者 channel.basic_consume( msg_consumer, queue="hello-queue", consumer_tag="hello-consumer") channel.start_consuming() # 開始消費