RabbitMQ隊列
首先咱們在講rabbitMQ以前咱們要說一下python裏的queue:兩者乾的事情是同樣的,都是隊列,用於傳遞消息python
在python的queue中有兩個一個是線程queue,一個是進程queue(multiprocessing中的queue)。線程queue不可以跨進程,用於多個線程之間進行數據同步交互;進程queue只是用於父進程與子進程,或者同屬於贊成父進程下的多個子進程 進行交互。也就是說若是是兩個徹底獨立的程序,即便是python程序,也依然不可以用這個進程queue來通訊。那若是咱們有兩個獨立的python程序,分屬於兩個進程,或者是python和其餘語言windows
安裝:windows下服務器
安裝pika:負載均衡
以前安裝過了pip,直接打開cmd,運行pip install pika運維
安裝完畢以後,實現一個最簡單的隊列通訊:socket
producer:函數
1 import pika 2 3 connection = pika.BlockingConnection(pika.ConnectionParameters('localhost')) 4 #聲明一個管道 5 channel = connection.channel() 6 7 #聲明queue 8 channel.queue_declare(queue = 'hello') 9 #routing_key是queue的名字 10 channel.basic_publish(exchange='', 11 routing_key='hello',#queue的名字 12 body='Hello World!', 13 ) 14 print("[x] Send 'Hello World!'") 15 connection.close()
先創建一個基本的socket,而後創建一個管道,在管道中發消息,而後聲明一個queue,起個隊列的名字,以後真正的發消息(basic_publish)fetch
consumer:ui
1 import pika 2 connection = pika.BlockingConnection(pika.ConnectionParameters('localhost')) 3 channel = connection.channel() 4 5 channel.queue_declare(queue='hello') 6 7 8 def callback(ch, method, properties, body):#回調函數 9 print("---->",ch,method,properties) 10 print(" [x] Received %r" % body) 11 12 channel.basic_consume(callback,#若是收到消息,就調用callback來處理消息 13 queue='hello', 14 no_ack=True 15 ) 16 17 print(' [*] Waiting for messages. To exit press CTRL+C') 18 channel.start_consuming()
start_consuming()只要一啓動就一直運行下去,他不止收一條,永遠在這裏卡住。url
在上面不論是produce仍是consume,裏面都聲明瞭一個queue,這個是爲何呢?由於咱們不知道是消費者先開始運行仍是生產者先運行,這樣若是沒有聲明的話就會報錯。
下面咱們來看一下一對多,即一個生產者對應多個消費者:
首先咱們運行3個消費者,而後不斷的用produce去發送數據,咱們能夠看到消費者是經過一種輪詢的方式進行不斷的接受數據,每一個消費者消費一個。
那麼假如咱們消費者收到了消息,而後處理這個消息須要30秒鐘,在處理的過程當中,消費者斷電了宕機了,那消費者尚未處理完,咱們設這個任務咱們必須處理完,那咱們應該有一個確認的信息,說這個任務完成了或者是沒有完成,因此個人生產者要確認消費者是否把這個任務處理完了,消費者處理完以後要給這個生產者服務器端發送一個確認信息,生產者纔會把這個任務從消息隊列中刪除。若是沒有處理完,消費者宕機了,沒有給生產者發送確認信息,那就表示沒有處理完,那咱們看看rabbitMQ是怎麼處理的
咱們能夠在消費者的callback中添加一個time.sleep()進行模擬宕機。callback是一個回調函數,只要事件一觸發就會調用這個函數。函數執行完了就表明消息處理完了,若是函數沒有處理完,那就說明。。。。
咱們能夠看到在消費者代碼中的basic_consume()中有一個參數叫no_ack=True,這個意思是這條消息是否被處理完都不會發送確認信息,通常咱們不加這個參數,rabbitMQ默認就會給你設置成消息處理完了就自動發送確認,咱們如今把這個參數去掉,而且在callback中添加一句話運行:ch.basic_ack(delivery_tag=method.delivery_tag)(手動處理)
def callback(ch, method, properties, body):#回調函數 print("---->",ch,method,properties) #time.sleep(30) print(" [x] Received %r" % body) ch.basic_ack(delivery_tag=method.delivery_tag)
運行的結果就是,我先運行一次生產者,數據被消費者1接收到了,可是我把消費者1宕機,中止運行,那麼消費者2就接到了消息,即只要消費者沒有發送確認信息,生產者就不會把信息刪除。
RabbitMQ消息持久化:
咱們能夠生成好多的消息隊列,那咱們怎麼查看消息隊列的狀況呢:rabbitmqctl.bat list_queues
如今的狀況是,消息隊列中還有消息,可是服務器宕機了,那這個消息就丟了,那我就須要這個消息強制的持久化:
channel.queue_declare(queue='hello2',durable=True)
在每次聲明隊列的時候加上一個durable參數(客戶端和服務器端都要加上這個參數),
在這個狀況下,咱們把rabbitMQ服務器重啓,發現只有隊列名留下了,可是隊列中的消息沒有了,這樣咱們還須要在生產者basic_publish中添加一個參數:properties
producer:
import pika connection = pika.BlockingConnection(pika.ConnectionParameters('localhost')) #聲明一個管道 channel = connection.channel() #聲明queue channel.queue_declare(queue = 'hello2',durable=True) #routing_key是queue的名字 channel.basic_publish(exchange='', routing_key='hello2', body='Hello World!', properties=pika.BasicProperties( delivery_mode=2,#make message persistent ) ) print("[x] Send 'Hello World!'") connection.close()
這樣就可使得消息持久化
如今是一個生產者對應三個消費者,很公平的收發收發,可是實際的狀況是,咱們機器的配置是不同的,有的配置是單核的有的配置是多核的,可能i7處理器處理4條消息的時候和其餘的處理器處理1條消息的時間差很少,那差的處理器那裏就會堆積消息,而好的處理器那裏就會造成閒置,在現實中作運維的,咱們會在負載均衡中設置權重,誰的配置高權重高,任務就多一點,可是在rabbitMQ中,咱們只作了一個簡單的處理就能夠實現公平的消息分發,你有多大的能力就處理多少消息
即:server端給客戶端發送消息的時候,先檢查如今還有多少消息,若是當前消息沒有處理完畢,就不會發送給這個消費者消息。若是當前的消費者沒有消息就發送
這個只須要在消費者端進行修改加代碼:
import pika import time connection = pika.BlockingConnection(pika.ConnectionParameters('localhost')) channel = connection.channel() channel.queue_declare(queue='hello2',durable=True) def callback(ch, method, properties, body):#回調函數 print("---->",ch,method,properties) #time.sleep(30) print(" [x] Received %r" % body) ch.basic_ack(delivery_tag=method.delivery_tag) channel.basic_qos(prefetch_count=1) channel.basic_consume(callback,#若是收到消息,就調用callback來處理消息 queue='hello2', #no_ack=False ) print(' [*] Waiting for messages. To exit press CTRL+C') channel.start_consuming()
咱們在生成一個consume2,在callback中sleep20秒來模擬
我先啓動兩個produce,被consume接受,而後在啓動一個,就被consumer2接受,可是由於consumer2中sleep20秒,處理慢,因此這時候在啓動produce,就又給了consume進行處理
Publish\Subscrible(消息發佈\訂閱)
前面都是1對1的發送接收數據,那我想1對多,想廣播同樣,生產者發送一個消息,全部消費者都收到消息。那咱們怎麼作呢?這個時候咱們就要用到exchange了
exchange在一端收消息,在另外一端就把消息放進queue,exchange必須精確的知道收到的消息要幹什麼,是否應該發到一個特定的queue仍是發給許多queue,或者說把他丟棄,這些都被exchange的類型所定義
exchange在定義的時候是有類型的,以決定究竟是那些queue符合條件,能夠接受消息:
fanout:全部bind到此exchange的queue均可以接受消息
direct:經過rounroutingKey和exchange決定的那個惟一的queue能夠接收消息
topic:全部符合routingKey的routingKey所bind的queue能夠接受消息
headers:經過headers來決定把消息發給哪些queue
消息publisher:
1 import pika 2 import sys 3 4 connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost')) 5 6 channel = connection.channel() 7 8 channel.exchange_declare(exchange='log',type = 'fanout') 9 10 message = ' '.join(sys.argv[1:]) or 'info:Hello World!' 11 channel.basic_publish(exchange='logs',routing_key='',body=message) 12 print("[x] Send %r " % message) 13 connection.close()
這裏的exchange以前是空的,如今賦值log;在這裏也沒有聲明queue,廣播不須要寫queue
消息subscriber:
1 import pika 2 connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost')) 3 channel = connection.channel() 4 channel.exchange_declare(exchange='logs',exchange_type='fanout') 5 6 #exclusive惟一的,不指定queue名字,rabbit會隨機分配一個名字 7 #exclusive=True會在使用此queue的消費者斷開後,自動將queue刪除 8 result = channel.queue_declare(exclusive=True) 9 queue_name = result.method.queue 10 11 channel.queue_bind(exchange='logs',queue=queue_name) 12 13 print('[*] Waiting for logs,To exit press CTRL+C') 14 15 def callback(ch,method,properties,body): 16 print("[X] %r" % body) 17 channel.basic_consume(callback,queue = queue_name,no_ack=True) 18 channel.start_consuming()
在消費者這裏咱們有定義了一個queue,注意一下注釋中的內容。可是咱們在發送端沒有聲明queue,爲何發送端不須要接收端須要呢?在consume裏有一個channel.queue_bind()函數,裏面綁定了exchange轉換器上,固然裏面還須要一個queue_name
運行結果:
就至關於收音機同樣,實時廣播,打開三個消費者,生產者發送一條數據,而後3個消費者同時接收到
有選擇的接收消息(exchange_type = direct)
RabbitMQ還支持根據關鍵字發送,即:隊列綁定關鍵字,發送者將數據根據關鍵字發送到消息exchange,exchange根據關鍵字斷定應該將數據發送到指定的隊列
publisher:
1 import pika 2 import sys 3 connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost')) 4 channel = connection.channel() 5 6 channel.exchange_declare(exchange='direct_logs',exchange_type='direct') 7 8 severity = sys.argv[1] if len(sys.argv)>1 else 'info' 9 message = ' '.join(sys.argv[2:]) or 'Hello World!' 10 channel.basic_publish(exchange='direct_logs',routing_key=severity,body=message) 11 12 print("[X] Send %r:%r" %(severity,message)) 13 connection.close()
subscriber:
import pika import sys connect = pika.BlockingConnection(pika.ConnectionParameters(host='localhost')) channel = connect.channel() channel.exchange_declare(exchange='direct_logs',exchange_type='direct') result = channel.queue_declare(exclusive=True) queue_name = result.method.queue severities = sys.argv[1:]# if not severities: sys.stderr.write("Usage:%s [info] [warning] [error]\n" %sys.argv[0]) sys.exit(1) for severity in severities: channel.queue_bind(exchange='direct_logs',queue=queue_name,routing_key=severity) print('[*]Waiting for logs.To exit press CTRL+c') def callback(ch,method,properties,body): print("[x] %r:%r"%(method.routing_key,body)) channel.basic_consume(callback,queue = queue_name,no_ack=True) channel.start_consuming()
更加細緻的過濾(exchange_type=topic)
publish:
import pika import sys connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost')) channel = connection.channel() channel.exchange_declare(exchange='topic_logs', exchange_type='topic') routing_key = sys.argv[1] if len(sys.argv) > 1 else 'anonymous.info' message = ' '.join(sys.argv[2:]) or 'Hello World!' channel.basic_publish(exchange='topic_logs', routing_key=routing_key, body=message) print(" [x] Sent %r:%r" % (routing_key, message)) connection.close()
subscriber:
1 import pika 2 import sys 3 4 connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost')) 5 channel = connection.channel() 6 7 channel.exchange_declare(exchange='topic_logs', 8 exchange_type='topic') 9 10 result = channel.queue_declare(exclusive=True) 11 queue_name = result.method.queue 12 13 binding_keys = sys.argv[1:] 14 if not binding_keys: 15 sys.stderr.write("Usage: %s [binding_key]...\n" % sys.argv[0]) 16 sys.exit(1) 17 18 for binding_key in binding_keys: 19 channel.queue_bind(exchange='topic_logs', 20 queue=queue_name, 21 routing_key=binding_key) 22 23 print(' [*] Waiting for logs. To exit press CTRL+C') 24 25 26 def callback(ch, method, properties, body): 27 print(" [x] %r:%r" % (method.routing_key, body)) 28 29 30 channel.basic_consume(callback, 31 queue=queue_name, 32 no_ack=True) 33 34 channel.start_consuming()
以上都是服務器端發消息,客戶端收消息,消息流是單向的,那若是咱們想要發一條命令給遠程的客戶端去執行,而後想讓客戶端執行的結果返回,則這種模式叫作rpc
RabbitMQ RPC
rpc server:
1 import pika 2 import time 3 connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost')) 4 channel = connection.channel() 5 6 channel.queue_declare(queue='rpc_queue') 7 def fib(n): 8 if n==0: 9 return 0 10 elif n==1: 11 return 1 12 else: 13 return fib(n-1)+fib(n-2) 14 15 def on_request(ch,method,props,body): 16 n = int(body) 17 print("[.] fib(%s)" %n) 18 response = fib(n) 19 20 ch.basic_publish(exchange='',routing_key=props.reply_to, 21 properties=pika.BasicProperties(correlation_id=props.correlation_id), 22 body = str(response)) 23 ch.basic_ack(delivery_tag=method.delivery_tag)25 channel.basic_consume(on_request,queue='rpc_queue') 26 27 print("[x] Awaiting rpc requests") 28 channel.start_consuming()
rpc client:
1 import pika 2 import uuid,time 3 class FibonacciRpcClient(object): 4 def __init__(self): 5 self.connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost')) 6 7 self.channel = self.connection.channel() 8 9 result = self.channel.queue_declare(exclusive=True) 10 self.callback_queue = result.method.queue 11 12 self.channel.basic_consume(self.on_response,#回調函數,只要一收到消息就調用 13 no_ack=True,queue=self.callback_queue) 14 15 def on_response(self,ch,method,props,body): 16 if self.corr_id == props.correlation_id: 17 self.response = body 18 19 def call(self,n): 20 self.response = None 21 self.corr_id = str(uuid.uuid4()) 22 self.channel.basic_publish(exchange='',routing_key='rpc_queue', 23 properties=pika.BasicProperties( 24 reply_to=self.callback_queue, 25 correlation_id=self.corr_id 26 ), 27 body=str(n),#傳的消息,必須是字符串 28 ) 29 while self.response is None: 30 self.connection.process_data_events()#非阻塞版的start_consuming 31 print("no message....") 32 time.sleep(0.5) 33 return int(self.response) 34 fibonacci_rpc = FibonacciRpcClient() 35 print("[x] Requesting fib(30)") 36 response = fibonacci_rpc.call(30) 37 print("[.] Got %r"%response)
以前的start_consuming是進入一個阻塞模式,沒有消息就等待消息,有消息就收過來
self.connection.process_data_events()是一個非阻塞版的start_consuming,就是說發了一個東西給客戶端,每過一點時間去檢查有沒有消息,若是沒有消息,能夠去幹別的事情
reply_to = self.callback_queue是用來接收反應隊列的名字
corr_id = str(uuid.uuid4()),correlation_id第一在客戶端會經過uuid4生成,第二在服務器端返回執行結果的時候也會傳過來一個,因此說若是服務器端發過來的correlation_id與本身的id相同 ,那麼服務器端發出來的結果就確定是我剛剛客戶端發過去的指令的執行結果。如今就一個服務器端一個客戶端,無所謂缺人不確認。如今客戶端是非阻塞版的,咱們能夠不讓它打印沒有消息,而是執行新的指令,這樣就兩條消息,不必定按順序完成,那咱們就須要去確認每一個返回的結果是哪一個命令的執行結果。
整體的模式是這樣的:生產者發了一個命令給消費者,不知道客戶端何時返回,仍是要去收結果的,可是它又不想進入阻塞模式,想每過一段時間看這個消息收回來沒有,若是消息收回來了,就表明收完了。
運行結果:
服務器端開啓,而後在啓動客戶端,客戶端先是等待消息的發送,而後作出反應,直到算出斐波那契