若是Rabbit只管按順序把消息發到各個消費者身上,不考慮消費者負載的話,極可能出現,一個機器配置不高的消費者那裏堆積了不少消息處理不完,同時配置高的消費者卻一直很輕鬆。爲解決此問題,能夠在各個消費者端,配置perfetch=1,意思就是告訴RabbitMQ在我這個消費者當前消息還沒處理完的時候就不要再給我發新消息了。python
消費者端添加服務器
channel.basic_qos(prefetch_count=1)
帶消息持久化+公平分發的完整代碼socket
生產者函數
import pika #至關於聲明一個socket conn = pika.BlockingConnection(pika.ConnectionParameters('localhost')) #聲明一個管道 channel = conn.channel() #聲明queue channel.queue_declare(queue='hello',durable=True) channel.basic_publish(exchange='', routing_key='hello', body='Hello World!', properties=pika.BasicProperties( delivery_mode=2, ) ) # routing_key 消息的key body 消息的內容 print('Sent "hello world"') conn.close()
消費者fetch
import pika #至關於聲明一個socket conn = pika.BlockingConnection(pika.ConnectionParameters('localhost')) #聲明一個管道 channel = conn.channel() #聲明queue 這裏能夠不用聲明,可是若是消費者先運行,又不但願出錯,就要消費者先運行 channel.queue_declare(queue='hello',durable=True) def callback(ch,method,properties,body): print('[x] Received %r' % body ) ch.basic_ack(delivery_tag=method.delivery_tag) channel.basic_qos(prefetch_count=1) channel.basic_consume(callback, queue='hello' ) #消費消息 若是收到消息就調用CALLBACK函數處理 print('[*] Waiting for message.To exit press CTRL+C') channel.start_consuming() #開始收消息
消費者2blog
import pika,time #至關於聲明一個socket conn = pika.BlockingConnection(pika.ConnectionParameters('localhost')) #聲明一個管道 channel = conn.channel() #聲明queue 這裏能夠不用聲明,可是若是消費者先運行,又不但願出錯,就要消費者先運行 channel.queue_declare(queue='hello1',durable=True) def callback(ch,method,properties,body): time.sleep(30) print('[x] Received %r' % body ) ch.basic_ack(delivery_tag=method.delivery_tag) channel.basic_qos(prefetch_count=1) channel.basic_consume(callback, queue='hello1' ) #消費消息 若是收到消息就調用CALLBACK函數處理 print('[*] Waiting for message.To exit press CTRL+C') channel.start_consuming() #開始收消息
消費者2 CALLBACK中sleep30s 在這個過程當中 將不會接收到消息,那麼消息會發送到消費者1隊列
以前的例子都基本都是1對1的消息發送和接收,即消息只能發送到指定的queue裏,但有些時候你想讓你的消息被全部的Queue收到,相似廣播的效果,這時候就要用到exchange了,it
exchange是一個簡單的東西,一端接收消息,另外一端將消息推送到隊列。exchange類型將決定這條消息是放到一個隊列,仍是不少隊列,仍是被刪除。exchange 就像轉發器。io
exchange的類型:class
fanout: 全部bind到此exchange的queue均可以就收消息。 純廣播 ,只要綁定exchange就能夠接收到。
direct: 經過routingKey和exchange決定的那個惟一的queue能夠接收消息。
topic 全部符合routingkey(此時能夠是一個表達式)的routingkey所bind的queue能夠接收消息。
headers 經過headers 來決定把消息發給哪些queue。
import pika import sys connection = pika.BlockingConnection(pika.ConnectionParameters( host='localhost')) channel = connection.channel() #發佈方不須要聲明queue 只須要有個exchange就能夠了 , exchange_type 類型是 fanout channel.exchange_declare(exchange='logs',exchange_type='fanout') #message = ' '.join(sys.argv[1:]) or "info: Hello World!" message = "info: Hello World!" channel.basic_publish(exchange='logs', # 發佈廣播的時候 exchange定義要一致 routing_key='', #必需要寫成空 body=message) #發送的消息主體 print(" [x] Sent %r" % message) connection.close()
import pika connection = pika.BlockingConnection(pika.ConnectionParameters( host='localhost')) channel = connection.channel() channel.exchange_declare(exchange='logs', exchange_type='fanout') #exchange 轉發器,exchange_type=fanout 綁定到這個轉發器上的消費者都能接收到消息 result = channel.queue_declare(exclusive=True) # 隨機queue 不指定queue名字,rabbit會隨機分配一個名字,exclusive=True會在使用此queue的消費者斷開後,自動將queue刪除 queue_name = result.method.queue #獲取queueu的名字 channel.queue_bind(exchange='logs', #綁定到這個轉發器上,只能從這個轉發器上接收 queue=queue_name) #指定queue的名字 轉發器把消息發送到這個queue上,消費者從這個queue上接收消息 print(' [*] Waiting for logs. To exit press CTRL+C') def callback(ch, method, properties, body): print(" [x] %r" % body) channel.basic_consume(callback, queue=queue_name, no_ack=True) channel.start_consuming()
發送接收過程:
消費者要先在線,生產者發送消息,消費者才能接收到。
生產者
消費者1在線
消費者2在線
消費者3中斷從新鏈接
廣播並不會存下來,不在線就接收不到了。
RabbitMQ還支持根據關鍵字發送,即:隊列綁定關鍵字,發送者將數據根據關鍵字發送到消息exchange,exchange根據 關鍵字 斷定應該將數據發送至指定隊列。
import pika import sys connection = pika.BlockingConnection(pika.ConnectionParameters( host='localhost')) channel = connection.channel() channel.exchange_declare(exchange='direct_logs', type='direct') severity = sys.argv[1] if len(sys.argv) > 1 else 'info' message = ' '.join(sys.argv[2:]) or 'Hello World!' channel.basic_publish(exchange='direct_logs', routing_key=severity, body=message) print(" [x] Sent %r:%r" % (severity, message)) connection.close()
import pika import sys connection = pika.BlockingConnection(pika.ConnectionParameters( host='localhost')) channel = connection.channel() channel.exchange_declare(exchange='direct_logs', type='direct') result = channel.queue_declare(exclusive=True) queue_name = result.method.queue severities = sys.argv[1:] if not severities: sys.stderr.write("Usage: %s [info] [warning] [error]\n" % sys.argv[0]) sys.exit(1) for severity in severities: channel.queue_bind(exchange='direct_logs', queue=queue_name, routing_key=severity) print(' [*] Waiting for logs. To exit press CTRL+C') def callback(ch, method, properties, body): print(" [x] %r:%r" % (method.routing_key, body)) channel.basic_consume(callback, queue=queue_name, no_ack=True) channel.start_consuming()
服務器端
消費者端
綁定哪一個就能接收的哪一個的消息。
上面的過濾條件是寫死的,更細緻的過濾條件就是在上面的基礎上,對過濾參數的匹配。相似與正則匹配。
publisher
import pika import sys connection = pika.BlockingConnection(pika.ConnectionParameters( host='localhost')) channel = connection.channel() channel.exchange_declare(exchange='topic_logs', type='topic') routing_key = sys.argv[1] if len(sys.argv) > 1 else 'anonymous.info' message = ' '.join(sys.argv[2:]) or 'Hello World!' channel.basic_publish(exchange='topic_logs', routing_key=routing_key, body=message) print(" [x] Sent %r:%r" % (routing_key, message)) connection.close()
subscriber
import pika import sys connection = pika.BlockingConnection(pika.ConnectionParameters( host='localhost')) channel = connection.channel() channel.exchange_declare(exchange='topic_logs', type='topic') result = channel.queue_declare(exclusive=True) queue_name = result.method.queue binding_keys = sys.argv[1:] if not binding_keys: sys.stderr.write("Usage: %s [binding_key]...\n" % sys.argv[0]) sys.exit(1) for binding_key in binding_keys: channel.queue_bind(exchange='topic_logs', queue=queue_name, routing_key=binding_key) print(' [*] Waiting for logs. To exit press CTRL+C') def callback(ch, method, properties, body): print(" [x] %r:%r" % (method.routing_key, body)) channel.basic_consume(callback, queue=queue_name, no_ack=True) channel.start_consuming()