send端fetch
import pika credentials = pika.PlainCredentials('zhou', '123456') connection = pika.BlockingConnection(pika.ConnectionParameters( host='10.0.1.14',credentials=credentials)) channel = connection.channel() #聲明queue channel.queue_declare(queue='hello') #n RabbitMQ a message can never be sent directly to the queue, it always needs to go through an exchange. channel.basic_publish(exchange='', routing_key='hello', body='Hello World!') print(" [x] Sent 'Hello World!'") connection.close()
receive端this
import pika connection = pika.BlockingConnection(pika.ConnectionParameters( 'localhost')) channel = connection.channel() #You may ask why we declare the queue again ‒ we have already declared it in our previous code. # We could avoid that if we were sure that the queue already exists. For example if send.py program #was run before. But we're not yet sure which program to run first. In such cases it's a good # practice to repeat declaring the queue in both programs. channel.queue_declare(queue='hello') def callback(ch, method, properties, body): print(" [x] Received %r" % body) channel.basic_consume(callback, queue='hello', no_ack=True) print(' [*] Waiting for messages. To exit press CTRL+C') channel.start_consuming()
消費一個消息可能會花好幾秒。你可能會問,若是若是一個消費者啓動了一個長的任務,消息發了一半而後死了。按咱們如今的代碼,一旦rabbitmq傳遞了一個消息到給消費者,而後rabbitmq就迅速的將這個消息從內存裏刪除。在這種狀況下,若是你kill了一個消費者,咱們就會失去了這個正在傳遞的消息。url
可是咱們不想失去這個消息。若是一個消費者死了,我想把這個任務傳給另外一個消費者。spa
爲了確保一個消息從不丟失,rabbitmq支持消息確認。消費者處理完信息後會向rabbitmq發送一個消息確認,這樣rabbitmq就能夠從內存裏刪除這個消息了。code
若是一個消費者死掉了,沒有發ack,rabbitmq就會知道這個消息沒有傳遞成功,就會把這個消息從新存到queue裏。blog
若是此時有其它的消費者在線,rabbitmq就會迅速的將這個消息傳遞給其它的的消費者。這樣就確保了消息沒有丟失,即便這個消費者是偶爾掛了。rabbitmq
根本就沒有超時時間一說,當那個消費者掛了,rabbitmq會從新轉發那條消息。即便這個消息處理了很長時間都不要緊。隊列
rabbitmq消息確認默認是開啓的(no_ack=Fault)。在以前的例子裏咱們顯然用的no_ack=True。內存
生產者ssl
import pika credentials = pika.PlainCredentials('zhou', '123456') # connection = pika.BlockingConnection(pika.ConnectionParameters(host=url_1, # credentials=credentials, ssl=ssl, port=port)) connection = pika.BlockingConnection(pika.ConnectionParameters( host='10.0.1.14',credentials=credentials)) channel = connection.channel() # 聲明queue,durable=True(隊列持久化) channel.queue_declare(queue='alex3',durable=True) # n RabbitMQ a message can never be sent directly to the queue, it always needs to go through an exchange. channel.basic_publish(exchange='', routing_key='alex3', #send msg to this queue body='Hello World!23', properties=pika.BasicProperties( delivery_mode=2, # 消息持久化 ) ) print(" [x] Sent 'Hello World!2'") connection.close()
消費者
import pika import time credentials = pika.PlainCredentials('zhou', '123456') connection = pika.BlockingConnection(pika.ConnectionParameters( host='10.0.1.14',credentials=credentials)) channel = connection.channel() # You may ask why we declare the queue again ‒ we have already declared it in our previous code. # We could avoid that if we were sure that the queue already exists. For example if send.py program # was run before. But we're not yet sure which program to run first. In such cases it's a good # practice to repeat declaring the queue in both programs.
channel.queue_declare(queue='alex3',durable=True)#隊列持久化 def callback(ch, method, properties, body): print(ch, method, properties) print(" [x] Received %r" % body) time.sleep(1) channel.basic_consume(callback, queue='alex3', no_ack=True ) channel.basic_qos(prefetch_count=1) print(' [*] Waiting for messages. To exit press CTRL+C') channel.start_consuming()
若是Rabbit只管按順序把消息發到各個消費者身上,不考慮消費者負載的話,極可能出現,一個機器配置不高的消費者那裏堆積了不少消息處理不完,同時配置高的消費者卻一直很輕鬆。爲解決此問題,能夠在各個消費者端,配置perfetch=1,意思就是告訴RabbitMQ在我這個消費者當前消息還沒處理完的時候就不要再給我發新消息了。
channel.basic_qos(prefetch_count=1)
生產者
import pika,sys credentials = pika.PlainCredentials('zhou', '123456') # connection = pika.BlockingConnection(pika.ConnectionParameters(host=url_1, # credentials=credentials, ssl=ssl, port=port)) connection = pika.BlockingConnection(pika.ConnectionParameters( host='10.0.1.14',credentials=credentials)) channel = connection.channel() channel.queue_declare(queue='task_queue', durable=True) message = ' '.join(sys.argv[1:]) or "Hello World!" channel.basic_publish(exchange='', routing_key='task_queue', body=message, properties=pika.BasicProperties( delivery_mode = 2, # make message persistent )) print(" [x] Sent %r" % message) connection.close()
消費者
import pika,time credentials = pika.PlainCredentials('zhou', '123456') # connection = pika.BlockingConnection(pika.ConnectionParameters(host=url_1, # credentials=credentials, ssl=ssl, port=port)) connection = pika.BlockingConnection(pika.ConnectionParameters( host='10.0.1.14',credentials=credentials)) channel = connection.channel() channel.queue_declare(queue='task_queue', durable=True) print(' [*] Waiting for messages. To exit press CTRL+C') def callback(ch, method, properties, body): print(" [x] Received %r" % body) time.sleep(body.count(b'.')) print(" [x] Done") ch.basic_ack(delivery_tag = method.delivery_tag) channel.basic_qos(prefetch_count=1) channel.basic_consume(callback, queue='task_queue') channel.start_consuming()
當要採用廣播發消息的時候就要用到exchange了。
exchange是一個很簡單的東西,一端從生產者裏接收消息,另外一端把消息推送到隊列裏。exchange必須清除的知道它從生產者裏接收的消息要發送給誰。接收到的消息是應該被追加到指定的queue裏?仍是應該追加到不少個queue裏?或者是扔掉?這個規則都是根據exchange的類型來定義的。
fanout: 全部bind到此exchange的queue均可以接收消息
direct: 經過routingKey和exchange決定的那個惟一的queue能夠接收消息
topic:全部符合routingKey(此時能夠是一個表達式)的routingKey所bind的queue能夠接收消息
表達式符號說明:#表明一個或多個字符,*表明任何字符
例:#.a會匹配a.a,aa.a,aaa.a等
*.a會匹配a.a,b.a,c.a等
注:使用RoutingKey爲#,Exchange Type爲topic的時候至關於使用fanout
headers: 經過headers 來決定把消息發給哪些queue
send端
import pika import sys credentials = pika.PlainCredentials('zhou', '123456') connection = pika.BlockingConnection(pika.ConnectionParameters( host='10.0.1.14',credentials=credentials)) channel = connection.channel() channel.exchange_declare(exchange='logs', type='fanout') message = ' '.join(sys.argv[1:]) or "info: Hello World!" channel.basic_publish(exchange='logs', routing_key='', body=message) print(" [x] Sent %r" % message) connection.close()
recv端
import pika credentials = pika.PlainCredentials('zhou', '123456') connection = pika.BlockingConnection(pika.ConnectionParameters( host='10.0.1.14',credentials=credentials)) channel = connection.channel() channel.exchange_declare(exchange='logs',type='fanout') result = channel.queue_declare(exclusive=True) # 不指定queue名字,rabbit會隨機分配一個名字,exclusive=True會在使用此queue的消費者斷開後,自動將queue刪除 queue_name = result.method.queue channel.queue_bind(exchange='logs',queue=queue_name) print(' [*] Waiting for logs. To exit press CTRL+C') def callback(ch, method, properties, body): print(" [x] %r" % body) channel.basic_consume(callback, queue=queue_name, ) channel.start_consuming()
send端
import pika import sys credentials = pika.PlainCredentials('zhou', '123456') connection = pika.BlockingConnection(pika.ConnectionParameters( host='10.0.1.14',credentials=credentials)) channel = connection.channel() channel.exchange_declare(exchange='direct_logs',type='direct') severity = sys.argv[1] if len(sys.argv) > 1 else 'info' message = ' '.join(sys.argv[2:]) or 'Hello World!' channel.basic_publish(exchange='direct_logs', routing_key=severity, #error body=message) print(" [x] Sent %r:%r" % (severity, message)) connection.close()
recv端
import pika import sys credentials = pika.PlainCredentials('zhou', '123456') connection = pika.BlockingConnection(pika.ConnectionParameters( host='10.0.1.14',credentials=credentials)) channel = connection.channel() channel.exchange_declare(exchange='direct_logs',type='direct') result = channel.queue_declare(exclusive=True) queue_name = result.method.queue severities = sys.argv[1:] if not severities: sys.stderr.write("Usage: %s [info] [warning] [error]\n" % sys.argv[0]) sys.exit(1) for severity in severities: #[error info warning] channel.queue_bind(exchange='direct_logs', queue=queue_name, routing_key=severity) print(' [*] Waiting for logs. To exit press CTRL+C') def callback(ch, method, properties, body): print(" [x] %r:%r" % (method.routing_key, body)) channel.basic_consume(callback,queue=queue_name,) channel.start_consuming()
send端
import pika import sys credentials = pika.PlainCredentials('zhou', '123456') connection = pika.BlockingConnection(pika.ConnectionParameters( host='10.0.1.14',credentials=credentials)) channel = connection.channel() channel.exchange_declare(exchange='topic_logs',type='topic') routing_key = sys.argv[1] if len(sys.argv) > 1 else 'anonymous.info' message = ' '.join(sys.argv[2:]) or 'Hello World!' channel.basic_publish(exchange='topic_logs', routing_key=routing_key, body=message) print(" [x] Sent %r:%r" % (routing_key, message)) connection.close()
recv端
import pika import sys credentials = pika.PlainCredentials('zhou', '123456') connection = pika.BlockingConnection(pika.ConnectionParameters( host='10.0.1.14',credentials=credentials)) channel = connection.channel() channel.exchange_declare(exchange='topic_logs',type='topic') result = channel.queue_declare(exclusive=True) queue_name = result.method.queue binding_keys = sys.argv[1:] if not binding_keys: sys.stderr.write("Usage: %s [binding_key]...\n" % sys.argv[0]) sys.exit(1) for binding_key in binding_keys: channel.queue_bind(exchange='topic_logs', queue=queue_name, routing_key=binding_key) print(' [*] Waiting for logs. To exit press CTRL+C') def callback(ch, method, properties, body): print(" [x] %r:%r" % (method.routing_key, body)) channel.basic_consume(callback,queue=queue_name) channel.start_consuming()