注意一下實現的都是在本地進行的測試,若是是在兩臺機器上,是須要用戶認證,還須要傳遞參數的,具體連接https://www.cnblogs.com/alex3714/articles/5248247.htmlhtml
1、RabbitMQ實現傳輸python
pika模塊服務器
發送方send實現步驟網絡
1.創建一個鏈接分佈式
2.創建一個channel函數
3.聲明一個隊列測試
4.開始發送數據fetch
5.關閉鏈接spa
import pika connection =pika.BlockingConnection( pika.ConnectionParameters('localhost') )#創建一個連接 channel = connection.channel() #創建一個管道 channel.queue_declare(queue='hello') #聲明一個隊列 channel.basic_publish(exchange='', routing_key='hello',body='you are so ugly') #發送消息 #routing_key 指定隊列 ,body 指定message print('producer send message') connection.close() #關閉鏈接
recieve方實現步驟code
1.創建一個鏈接
2.創建一個管道
3.聲明一個隊列(爲何還要聲明一個隊列,下面有詳述)
4.接受數據
5.建立函數處理接受的數據
6.啓動
import pika connection = pika.BlockingConnection( pika.ConnectionParameters('localhost') ) channel = connection.channel() channel.queue_declare(queue='hello')#數據傳輸是雙向的,不知道是那邊先開始運行,因此在聲明一個,防止接收端先運行會 #報錯 def callback(ch ,method , properties , body): ''' :param ch: 管道 :param method: :param properties: :param body: 消息 :return: ''' print(body) channel.basic_consume(callback, queue='hello' , no_ack=True)#no_ack參數用來當callback處理完髮送一個確認信息給producer,no_ack就是不須要確認的意思,通常此參數不用 ''' callback:用來處理接受的消息''' channel.start_consuming()# 開始傳輸,注意,此方法會一直運行,若是沒有消息就阻塞。
2、RabbitMQ輪詢機制
當存在多個消費者時,這些消費者會平均接收消息。同時,若是當其中一個消費者connection斷開,消息不會丟失,消息會自動進入到其餘消費者中。只有當callback運行完畢後,給producer一個確認信息後,此信息纔會在隊列中刪除。
實例以下
import pika , time
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
channel.queue_declare(queue='hello')#數據傳輸是雙向的,不知道是那邊先開始運行,因此在聲明一個,防止接收端先運行會
#報錯
def callback(ch ,method , properties , body):
'''
:param ch: 管道
:param method:
:param properties:
:param body: 消息
:return:
'''
time.sleep(20)
print(body)
ch.basic_ack(delivery_tag=method.delivery_tag) #向producer發送一個確認消息
channel.basic_consume(callback, queue='hello' , )#n
'''
callback:用來處理接受的消息'''
channel.start_consuming()# 開始傳輸,注意,此方法會一直運行,若是沒有消息就阻塞。
3、queue、message 持久化。
當server端出錯時,queue和message會消失,只有經過將queue和message持久化,就算server出錯了,queue和message都會保留,記住是queue和message都要持久化
import pika connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))#創建一個連接 channel = connection.channel() #創建一個管道 channel.queue_declare(queue='hello3',durable=True) #聲明一個隊列 ,後面的durable是持久化隊列 channel.basic_publish(exchange='', routing_key='hello3',body='you are so ugly', properties=pika.BasicProperties(delivery_mode=2,)) #發送消息 properties是將消息進行持久化 #routing_key 指定隊列 ,body 指定message print('producer send message') connection.close() #關閉鏈接
c:\Program Files\RabbitMQ Server\rabbitmq_server-3.7.8\sbin>rabbitmqctl.bat list_queues 是在sbin目錄下執行此命令能夠查看存在的隊列和隊列中的消息
4、不平均分發消息
# -*- coding: utf-8 -*- #@Author: Eric import pika , time connection = pika.BlockingConnection(pika.ConnectionParameters('localhost')) channel = connection.channel() channel.queue_declare(queue='hello3' , durable=True)#數據傳輸是雙向的,不知道是那邊先開始運行,因此在聲明一個,防止接收端先運行會 #報錯 def callback(ch ,method , properties , body): ''' :param ch: 管道 :param method: :param properties: :param body: 消息 :return: ''' # time.sleep(20) print(body) ch.basic_ack(delivery_tag=method.delivery_tag) #向客戶端確認一個消息 channel.basic_qos(prefetch_count=1) #這段做用是告訴producer,當處理完一個消息在發送另一個消息 #這個時候會形成,處理信息快的接受的消息多,處理的多 channel.basic_consume(callback, queue='hello3' , )#n ''' callback:用來處理接受的消息''' channel.start_consuming()# 開始傳輸,注意,此方法會一直運行,若是沒有消息就阻塞。
5、RabbitMQ 訂閱發佈、廣播模式
1.fanout類型的exchange
此模式是說producer發出一條消息,全部綁定的exchange的queue對應的consume都會收到一條消息。同時
這個消息是實時的,也就說當producer發送時,consumer沒有啓用接收,那麼producer以前發送的消息,consumer就不會再接收到,
只能接收到啓用後,producer發出的消息。就像廣播同樣。另外exchange(交換器存在不一樣的類型)
producer實現過程
1.建立鏈接
2.建立channel
3.聲明一個exchange(交換器)
4.發送message
5.鏈接關閉
import pika connection =pika.BlockingConnection(pika.ConnectionParameters('localhost') ) channel = connection.channel() channel.exchange_declare(exchange='long2' , exchange_type='fanout') #聲明一個交換器,類型是fanout channel.basic_publish(exchange='long2', routing_key='',body='you are so ugly',) print('producer has send message') connection.close()
consumer實現過程
1.創建鏈接
2.建立管道
3.聲明exchange
4.聲明一個queue
5.將queue綁定在exchange
6.處理消息
7.啓動
import pika connection = pika.BlockingConnection(pika.ConnectionParameters('localhost')) channel = connection.channel() channel.exchange_declare(exchange='long2' , exchange_type='fanout') result = channel.queue_declare(exclusive=True)#隨機生成一個queue ,並在消息處理完後,刪除隊列 queue_name = result.method.queue #獲取隊列的名字 channel.queue_bind(exchange='long2', queue=queue_name) def callback(ch ,method , properties , body): ''' :param ch: 管道 :param method: :param properties: :param body: 消息 :return: ''' print(body) channel.basic_consume(callback , queue=queue_name, no_ack=True) channel.start_consuming()
2. direct類型
這種類型是實現,經過exchange實現消息分級,queue綁定不一樣的消息級別,只顯示對應級別的消息
producer實現過程
1.創建鏈接
2.建立管道
3.聲明一個exchange
4.設置消息分級
5.推送消息
6.鏈接關閉
import pika ,sys def producer( severity , message): connection = pika.BlockingConnection(pika.ConnectionParameters('localhost')) channel = connection.channel() channel.exchange_declare(exchange='666' , exchange_type='direct') # severity = sys.argv[1] if len(sys.argv[1])>1 else 'info' # message = 'you are ugly' channel.basic_publish(exchange='666', routing_key=severity, body=message) print(message) connection.close() if __name__=='__main__': level = sys.argv[1] message = sys.argv[2] producer(level , message)
consumer實現過程
1.創建鏈接
2.建立管道
3.聲明exchange
4.建立queue
5.消息分級
6.循環消息分級並將queue和exchange綁定
7.處理消息
8.啓動
1 import sys , pika 2 3 connection = pika.BlockingConnection(pika.ConnectionParameters('localhost')) 4 5 channel = connection.channel() 6 7 channel.exchange_declare(exchange='666' , exchange_type='direct') 8 9 result = channel.queue_declare(exclusive=True) 10 queue_name = result.method.queue 11 12 severities = sys.argv[1:] 13 if not severities: 14 print('must input error or info or warning') 15 sys.exit() 16 for severity in severities: 17 channel.queue_bind(exchange='666', 18 queue=queue_name, 19 routing_key=severity) 20 21 22 def callback(ch, method, properties, body): 23 print(body) 24 25 26 channel.basic_consume(callback, queue=queue_name) 27 28 channel.start_consuming()
3.topic 類型 是能夠更加細緻的劃分,能夠經過過濾條件來讓consumer收對應的想要的消息
producer
import sys , pika connection = pika.BlockingConnection(pika.ConnectionParameters('localhost')) channel = connection.channel() channel.exchange_declare(exchange='888', exchange_type='topic') routing_key = sys.argv[1] if len(sys.argv)>1 else 'anonymous.info' message = message = ' '.join(sys.argv[1:]) channel.basic_publish(exchange='888' ,routing_key=routing_key ,body=message) connection.close()
consumer
1 import sys , pika 2 3 connection = pika.BlockingConnection(pika.ConnectionParameters('localhost')) 4 5 channel = connection.channel() 6 7 channel.exchange_declare(exchange='888', exchange_type='topic') 8 9 result = channel.queue_declare(exclusive=True) 10 queue_name = result.method.queue 11 12 binding_keys = sys.argv[1:] 13 14 if not binding_keys: 15 print('must input bing_key') 16 exit() 17 18 for routing_key in binding_keys: 19 channel.queue_bind(exchange='888', routing_key=routing_key , queue=queue_name) 20 21 22 def callback(ch, method, properties, body): 23 print(body) 24 25 26 channel.basic_consume(callback, queue=queue_name) 27 28 channel.start_consuming()
其實topic就是dirct的細分化,他們惟一的區別就是routing_key ,topic
是自定義的條件,二direct是固定的severity。
To receive all the logs run:
python receive_logs_topic.py "#"
To receive all logs from the facility "kern":
python receive_logs_topic.py "kern.*"
Or if you want to hear only about "critical" logs:
python receive_logs_topic.py "*.critical"
You can create multiple bindings:
python receive_logs_topic.py "kern.*" "*.critical"
And to emit a log with a routing key "kern.critical" type:
python emit_log_topic.py "kern.critical" "A critical kernel error"
6、RPC模式
RPC