rabbitmq基本管理命令:node
一步啓動Erlang node和Rabbit應用:sudo rabbitmq-serverpython
在後臺啓動Rabbit node:sudo rabbitmq-server -detached安全
關閉整個節點(包括應用):sudo rabbitmqctl stop函數
add_user <UserName> <Password>
delete_user <UserName>
change_password <UserName> <NewPassword>
list_users
add_vhost <VHostPath>
delete_vhost <VHostPath>
list_vhosts
set_permissions [-p <VHostPath>] <UserName> <Regexp> <Regexp> <Regexp>
clear_permissions [-p <VHostPath>] <UserName>
list_permissions [-p <VHostPath>]
list_user_permissions <UserName>
list_queues [-p <VHostPath>] [<QueueInfoItem> ...]
list_exchanges [-p <VHostPath>] [<ExchangeInfoItem> ...]
list_bindings [-p <VHostPath>]
list_connections [<ConnectionInfoItem> ...]測試
Demo:fetch
producer.pyui
1 #!/usr/bin/env python 2 # -*- coding: utf_8 -*- 3 # Date: 2015年11月30日 4 # Author:蔚藍行 5 # 博客 http://www.cnblogs.com/duanv/ 6 7 import pika 8 import sys 9 10 #建立鏈接connection到localhost 11 con = pika.BlockingConnection(pika.ConnectionParameters('localhost')) 12 #建立虛擬鏈接channel 13 cha = con.channel() 14 #建立隊列anheng,durable參數爲真時,隊列將持久化;exclusive爲真時,創建臨時隊列 15 result=cha.queue_declare(queue='anheng',durable=True,exclusive=False) 16 #建立名爲yanfa,類型爲fanout的exchange,其餘類型還有direct和topic,若是指定durable爲真,exchange將持久化 17 cha.exchange_declare(durable=False, 18 exchange='yanfa', 19 type='direct',) 20 #綁定exchange和queue,result.method.queue獲取的是隊列名稱 21 cha.queue_bind(exchange='yanfa', 22 queue=result.method.queue, 23 routing_key='',) 24 #公平分發,使每一個consumer在同一時間最多處理一個message,收到ack前,不會分配新的message 25 cha.basic_qos(prefetch_count=1) 26 #發送信息到隊列‘anheng’ 27 message = ' '.join(sys.argv[1:]) 28 #消息持久化指定delivery_mode=2; 29 cha.basic_publish(exchange='', 30 routing_key='anheng', 31 body=message, 32 properties=pika.BasicProperties( 33 delivery_mode = 2, 34 )) 35 print '[x] Sent %r' % (message,) 36 #關閉鏈接 37 con.close()
consumer.pyspa
1 #!/usr/bin/env python 2 # -*- coding: utf_8 -*- 3 # Date: 2015年11月30日 4 # Author:蔚藍行 5 # 博客 http://www.cnblogs.com/duanv/ 6 import pika 7 8 #創建鏈接connection到localhost 9 con = pika.BlockingConnection(pika.ConnectionParameters('localhost')) 10 #建立虛擬鏈接channel 11 cha = con.channel() 12 #建立隊列anheng 13 result=cha.queue_declare(queue='anheng',durable=True) 14 #建立名爲yanfa,類型爲fanout的交換機,其餘類型還有direct和topic 15 cha.exchange_declare(durable=False, 16 exchange='yanfa', 17 type='direct',) 18 #綁定exchange和queue,result.method.queue獲取的是隊列名稱 19 cha.queue_bind(exchange='yanfa', 20 queue=result.method.queue, 21 routing_key='',) 22 #公平分發,使每一個consumer在同一時間最多處理一個message,收到ack前,不會分配新的message 23 cha.basic_qos(prefetch_count=1) 24 print ' [*] Waiting for messages. To exit press CTRL+C' 25 #定義回調函數 26 def callback(ch, method, properties, body): 27 print " [x] Received %r" % (body,) 28 ch.basic_ack(delivery_tag = method.delivery_tag) 29 30 cha.basic_consume(callback, 31 queue='anheng', 32 no_ack=False,) 33 34 cha.start_consuming()
1、概念:調試
Connection: 一個TCP的鏈接。Producer和Consumer都是經過TCP鏈接到RabbitMQ Server的。程序的起始處就是創建這個TCP鏈接。code
Channels: 虛擬鏈接。創建在上述的TCP鏈接中。數據流動都是在Channel中進行的。通常狀況是程序起始創建TCP鏈接,第二步就是創建這個Channel。
2、隊列:
首先創建一個Connection,而後創建Channels,在channel上創建隊列
創建時指定durable參數爲真,隊列將持久化;指定exclusive爲真,隊列爲臨時隊列,關閉consumer後該隊列將再也不存在,通常狀況下創建臨時隊列並不指定隊列名稱,rabbitmq將隨機起名,經過result.method.queue來獲取隊列名:
result = channel.queue_declare(exclusive=True)
result.method.queue
區別:durable是隊列持久化與否,若是爲真,隊列將在rabbitmq服務重啓後仍存在,若是爲假,rabbitmq服務重啓前不會消失,與consumer關閉與否無關;
而exclusive是創建臨時隊列,當consumer關閉後,該隊列就會被刪除
3、exchange和bind
Exchange中durable參數指定exchange是否持久化,exchange參數指定exchange名稱,type指定exchange類型。Exchange類型有direct,fanout和topic。
Bind是將exchange與queue進行關聯,exchange參數和queue參數分別指定要進行bind的exchange和queue,routing_key爲可選參數。
Exchange的三種模式:
Direct:
任何發送到Direct Exchange的消息都會被轉發到routing_key中指定的Queue
1.通常狀況可使用rabbitMQ自帶的Exchange:」」(該Exchange的名字爲空字符串);
2.這種模式下不須要將Exchange進行任何綁定(bind)操做;
3.消息傳遞時須要一個「routing_key」,能夠簡單的理解爲要發送到的隊列名字;
4.若是vhost中不存在routing_key中指定的隊列名,則該消息會被拋棄。
Demo中雖然聲明瞭一個exchange=’yanfa’和queue=’anheng’的bind,可是在後面發送消息時並無使用該exchange和bind,而是採用了direct的模式,沒有指定exchange,而是指定了routing_key的名稱爲隊列名,消息將發送到指定隊列。
若是一個exchange 聲明爲direct,而且bind中指定了routing_key,那麼發送消息時須要同時指明該exchange和routing_key.
Fanout:
任何發送到Fanout Exchange的消息都會被轉發到與該Exchange綁定(Binding)的全部Queue上
1.能夠理解爲路由表的模式
2.這種模式不須要routing_key
3.這種模式須要提早將Exchange與Queue進行綁定,一個Exchange能夠綁定多個Queue,一個Queue能夠同多個Exchange進行綁定。
4.若是接受到消息的Exchange沒有與任何Queue綁定,則消息會被拋棄。
Demo中建立了一個將一個exchange和一個queue進行fanout類型的bind.可是發送信息時沒有用到它,若是要用到它,只要在發送消息時指定該exchange的名稱便可,該exchange就會將消息發送到全部和它bind的隊列中。在fanout模式下,指定的routing_key是無效的 。
Topic:
任何發送到Topic Exchange的消息都會被轉發到全部關心routing_key中指定話題的Queue上
1.這種模式較爲複雜,簡單來講,就是每一個隊列都有其關心的主題,全部的消息都帶有一個「標題」(routing_key),Exchange會將消息轉發到全部關注主題能與routing_key模糊匹配的隊列。
2.這種模式須要routing_key,也許要提早綁定Exchange與Queue。
3.在進行綁定時,要提供一個該隊列關心的主題,如「#.log.#」表示該隊列關心全部涉及log的消息(一個routing_key爲」MQ.log.error」的消息會被轉發到該隊列)。
4.「#」表示0個或若干個關鍵字,「*」表示一個關鍵字。如「log.*」能與「log.warn」匹配,沒法與「log.warn.timeout」匹配;可是「log.#」能與上述二者匹配。
5.一樣,若是Exchange沒有發現可以與routing_key匹配的Queue,則會拋棄此消息。
4、任務分發
1.Rabbitmq的任務是循環分發的,若是開啓兩個consumer,producer發送的信息是輪流發送到兩個consume的。
2.在producer端使用cha.basic_publish()來發送消息,其中body參數就是要發送的消息,properties=pika.BasicProperties(delivery_mode = 2,)啓用消息持久化,能夠防止RabbitMQ Server 重啓或者crash引發的數據丟失。
3.在接收端使用cha.basic_consume()無限循環監聽,若是設置no-ack參數爲真,每次Consumer接到數據後,而不論是否處理完成,RabbitMQ Server會當即把這個Message標記爲完成,而後從queue中刪除了。爲了保證數據不被丟失,RabbitMQ支持消息確認機制,即acknowledgments。爲了保證數據能被正確處理而不只僅是被Consumer收到,那麼咱們不能採用no-ack。而應該是在處理完數據後發送ack。
在處理數據後發送的ack,就是告訴RabbitMQ數據已經被接收,處理完成,RabbitMQ能夠去安全的刪除它了。若是Consumer退出了可是沒有發送ack,那麼RabbitMQ就會把這個Message發送到下一個Consumer。這樣就保證了在Consumer異常退出的狀況下數據也不會丟失。
這裏並無用到超時機制。RabbitMQ僅僅經過Consumer的鏈接中斷來確認該Message並無被正確處理。也就是說,RabbitMQ給了Consumer足夠長的時間來作數據處理。
Demo的callback方法中ch.basic_ack(delivery_tag = method.delivery_tag)告訴rabbitmq消息已經正確處理。若是沒有這條代碼,Consumer退出時,Message會從新分發。而後RabbitMQ會佔用愈來愈多的內存,因爲RabbitMQ會長時間運行,所以這個「內存泄漏」是致命的。去調試這種錯誤,能夠經過一下命令打印un-acked Messages:
sudo rabbitmqctl list_queues name messages_ready messages_unacknowledged
4.公平分發:設置cha.basic_qos(prefetch_count=1),這樣RabbitMQ就會使得每一個Consumer在同一個時間點最多處理一個Message。換句話說,在接收到該Consumer的ack前,他它不會將新的Message分發給它。
5、注意:
生產者和消費者都應該聲明創建隊列,網上教程上說第二次建立若是參數和第一次不同,那麼該操做雖然成功,可是queue的屬性並不會被修改。
可能由於版本問題,在個人測試中若是第二次聲明創建的隊列屬性和第一次不徹底相同,將報相似這種錯406, "PRECONDITION_FAILED - parameters for queue 'anheng' in vhost '/' not equivalent"
若是是exchange第二次建立屬性不一樣,將報這種錯406, "PRECONDITION_FAILED - cannot redeclare exchange 'yanfa' in vhost '/' with different type, durable, internal or autodelete value"
若是第一次聲明創建隊列也出現這個錯誤,說明以前存在名字相同的隊列且本次聲明的某些屬性和以前聲明不一樣,可經過命令sudo rabbitmqctl list_queues查看當前有哪些隊列。解決方法是聲明創建另外一名稱的隊列或刪除原有隊列,若是原有隊列是非持久化的,可經過重啓rabbitmq服務刪除原有隊列,若是原有隊列是持久化的,只能刪除它所在的vhost,而後再重建vhost,再設置vhost的權限(先確認該vhost中沒有其餘有用隊列)。
sudo rabbitmqctl delete_vhost /sudo rabbitmqctl add_vhost /sudo rabbitmqctl set_permissions -p / username '.*' '.*' '.*'