RabbitMQ

注意一下實現的都是在本地進行的測試,若是是在兩臺機器上,是須要用戶認證,還須要傳遞參數的,具體連接https://www.cnblogs.com/alex3714/articles/5248247.htmlhtml

1、RabbitMQ實現傳輸python

pika模塊服務器

發送方send實現步驟網絡

1.創建一個鏈接分佈式

2.創建一個channel函數

3.聲明一個隊列測試

4.開始發送數據fetch

5.關閉鏈接spa

import pika


connection =pika.BlockingConnection(
pika.ConnectionParameters('localhost')
)#創建一個連接
channel = connection.channel() #創建一個管道
channel.queue_declare(queue='hello') #聲明一個隊列
channel.basic_publish(exchange='', routing_key='hello',body='you are so ugly') #發送消息
#routing_key 指定隊列 ,body 指定message
print('producer send message')
connection.close() #關閉鏈接

recieve方實現步驟code

1.創建一個鏈接

2.創建一個管道

3.聲明一個隊列(爲何還要聲明一個隊列,下面有詳述)

4.接受數據

5.建立函數處理接受的數據

6.啓動

import pika

connection = pika.BlockingConnection(
pika.ConnectionParameters('localhost')
)
channel = connection.channel()
channel.queue_declare(queue='hello')#數據傳輸是雙向的,不知道是那邊先開始運行,因此在聲明一個,防止接收端先運行會
                                     #報錯

def callback(ch ,method , properties , body):
    '''

    :param ch: 管道
    :param method:
    :param properties:
    :param body: 消息
    :return:
    '''
    print(body)

channel.basic_consume(callback, queue='hello' , no_ack=True)#no_ack參數用來當callback處理完髮送一個確認信息給producer,no_ack就是不須要確認的意思,通常此參數不用
'''
callback:用來處理接受的消息'''
channel.start_consuming()# 開始傳輸,注意,此方法會一直運行,若是沒有消息就阻塞。

  2、RabbitMQ輪詢機制

當存在多個消費者時,這些消費者會平均接收消息。同時,若是當其中一個消費者connection斷開,消息不會丟失,消息會自動進入到其餘消費者中。只有當callback運行完畢後,給producer一個確認信息後,此信息纔會在隊列中刪除。

實例以下

 
 
import pika , time

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
channel.queue_declare(queue='hello')#數據傳輸是雙向的,不知道是那邊先開始運行,因此在聲明一個,防止接收端先運行會
#報錯

def callback(ch ,method , properties , body):
'''
:param ch: 管道
:param method:
:param properties:
:param body: 消息
:return:
'''
time.sleep(20)
print(body)
ch.basic_ack(delivery_tag=method.delivery_tag) #向producer發送一個確認消息
channel.basic_consume(callback, queue='hello' , )#n
'''
callback:用來處理接受的消息'''
channel.start_consuming()# 開始傳輸,注意,此方法會一直運行,若是沒有消息就阻塞。
 

3、queue、message 持久化。

當server端出錯時,queue和message會消失,只有經過將queue和message持久化,就算server出錯了,queue和message都會保留,記住是queue和message都要持久化

import pika


connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))#創建一個連接
channel = connection.channel() #創建一個管道
channel.queue_declare(queue='hello3',durable=True) #聲明一個隊列 ,後面的durable是持久化隊列
channel.basic_publish(exchange='', routing_key='hello3',body='you are so ugly',
                      properties=pika.BasicProperties(delivery_mode=2,)) #發送消息 properties是將消息進行持久化
#routing_key 指定隊列 ,body 指定message
print('producer send message')
connection.close() #關閉鏈接

c:\Program Files\RabbitMQ Server\rabbitmq_server-3.7.8\sbin>rabbitmqctl.bat list_queues  是在sbin目錄下執行此命令能夠查看存在的隊列和隊列中的消息

 

4、不平均分發消息

# -*- coding: utf-8 -*-
#@Author: Eric
import pika , time

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
channel.queue_declare(queue='hello3' , durable=True)#數據傳輸是雙向的,不知道是那邊先開始運行,因此在聲明一個,防止接收端先運行會
                                     #報錯

def callback(ch ,method , properties , body):
    '''
    :param ch: 管道
    :param method:
    :param properties:
    :param body: 消息
    :return:
    '''
    # time.sleep(20)
    print(body)
    ch.basic_ack(delivery_tag=method.delivery_tag) #向客戶端確認一個消息

channel.basic_qos(prefetch_count=1) #這段做用是告訴producer,當處理完一個消息在發送另一個消息
                                    #這個時候會形成,處理信息快的接受的消息多,處理的多
channel.basic_consume(callback, queue='hello3' , )#n
'''
callback:用來處理接受的消息'''
channel.start_consuming()# 開始傳輸,注意,此方法會一直運行,若是沒有消息就阻塞。

5、RabbitMQ 訂閱發佈、廣播模式

1.fanout類型的exchange

此模式是說producer發出一條消息,全部綁定的exchange的queue對應的consume都會收到一條消息。同時

這個消息是實時的,也就說當producer發送時,consumer沒有啓用接收,那麼producer以前發送的消息,consumer就不會再接收到,

只能接收到啓用後,producer發出的消息。就像廣播同樣。另外exchange(交換器存在不一樣的類型)

producer實現過程

1.建立鏈接

2.建立channel

3.聲明一個exchange(交換器)

4.發送message

5.鏈接關閉

 

import pika

connection =pika.BlockingConnection(pika.ConnectionParameters('localhost') )

channel = connection.channel()

channel.exchange_declare(exchange='long2' , exchange_type='fanout') #聲明一個交換器,類型是fanout

channel.basic_publish(exchange='long2', routing_key='',body='you are so ugly',)

print('producer has send message')
connection.close()

consumer實現過程

1.創建鏈接

2.建立管道

3.聲明exchange

4.聲明一個queue

5.將queue綁定在exchange

6.處理消息

7.啓動

import pika
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()

channel.exchange_declare(exchange='long2' , exchange_type='fanout')

result = channel.queue_declare(exclusive=True)#隨機生成一個queue ,並在消息處理完後,刪除隊列

queue_name = result.method.queue #獲取隊列的名字

channel.queue_bind(exchange='long2', queue=queue_name)

def callback(ch ,method , properties , body):
    '''
    :param ch: 管道
    :param method:
    :param properties:
    :param body: 消息
    :return:
    '''

    print(body)

channel.basic_consume(callback , queue=queue_name, no_ack=True)

channel.start_consuming()

2. direct類型

這種類型是實現,經過exchange實現消息分級,queue綁定不一樣的消息級別,只顯示對應級別的消息

producer實現過程

1.創建鏈接

2.建立管道

3.聲明一個exchange

4.設置消息分級

5.推送消息

6.鏈接關閉

import pika ,sys

def producer( severity , message):

    connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))

    channel = connection.channel()

    channel.exchange_declare(exchange='666' , exchange_type='direct')

    # severity = sys.argv[1] if len(sys.argv[1])>1 else 'info'

    # message = 'you are ugly'
    channel.basic_publish(exchange='666', routing_key=severity, body=message)
    print(message)

    connection.close()

if  __name__=='__main__':
    level = sys.argv[1]
    message = sys.argv[2]
    producer(level , message)

consumer實現過程

1.創建鏈接

2.建立管道

3.聲明exchange

4.建立queue

5.消息分級

6.循環消息分級並將queue和exchange綁定

7.處理消息

8.啓動

 1 import sys , pika
 2 
 3 connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
 4 
 5 channel = connection.channel()
 6 
 7 channel.exchange_declare(exchange='666' , exchange_type='direct')
 8 
 9 result = channel.queue_declare(exclusive=True)
10 queue_name = result.method.queue
11 
12 severities = sys.argv[1:]
13 if not severities:
14     print('must input error or info or warning')
15     sys.exit()
16 for severity in severities:
17     channel.queue_bind(exchange='666',
18                           queue=queue_name,
19                           routing_key=severity)
20 
21 
22 def callback(ch, method, properties, body):
23     print(body)
24 
25 
26 channel.basic_consume(callback, queue=queue_name)
27 
28 channel.start_consuming()

3.topic 類型 是能夠更加細緻的劃分,能夠經過過濾條件來讓consumer收對應的想要的消息

producer

import sys , pika

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))

channel = connection.channel()

channel.exchange_declare(exchange='888', exchange_type='topic')

routing_key = sys.argv[1] if len(sys.argv)>1 else 'anonymous.info'
message = message = ' '.join(sys.argv[1:])


channel.basic_publish(exchange='888' ,routing_key=routing_key ,body=message)

connection.close()

consumer

 1 import sys , pika
 2 
 3 connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
 4 
 5 channel = connection.channel()
 6 
 7 channel.exchange_declare(exchange='888', exchange_type='topic')
 8 
 9 result = channel.queue_declare(exclusive=True)
10 queue_name = result.method.queue
11 
12 binding_keys = sys.argv[1:]
13 
14 if not binding_keys:
15     print('must input bing_key')
16     exit()
17 
18 for routing_key in binding_keys:
19     channel.queue_bind(exchange='888', routing_key=routing_key , queue=queue_name)
20 
21 
22 def callback(ch, method, properties, body):
23     print(body)
24 
25 
26 channel.basic_consume(callback, queue=queue_name)
27 
28 channel.start_consuming()

其實topic就是dirct的細分化,他們惟一的區別就是routing_key ,topic

是自定義的條件,二direct是固定的severity。

 

To receive all the logs run:

python receive_logs_topic.py "#"

To receive all logs from the facility "kern":

python receive_logs_topic.py "kern.*"

Or if you want to hear only about "critical" logs:

python receive_logs_topic.py "*.critical"

You can create multiple bindings:

python receive_logs_topic.py "kern.*" "*.critical" 

And to emit a log with a routing key "kern.critical" type:

python emit_log_topic.py "kern.critical" "A critical kernel error"

6、RPC模式

RPC

RPC(Remote Procedure Call)— 遠程過程調用,它是一種經過 網絡從遠程計算機程序上請求服務,而不須要了解底層網絡技術的協議。 RPC協議假定某些 傳輸協議的存在,如TCP或UDP,爲通訊程序之間攜帶信息數據。在OSI 網絡通訊模型中,RPC跨越了 傳輸層應用層。RPC使得開發包括網絡 分佈式多程序在內的應用程序更加容易。
RPC採用客戶機/服務器模式。請求程序就是一個客戶機,而服務提供程序就是一個服務器。首先,客戶機調用進程發送一個有進程參數的調用信息到服務進程,而後等待應答信息。在服務器端,進程保持睡眠狀態直到調用信息到達爲止。當一個調用信息到達,服務器得到進程參數,計算結果,發送答覆 信息,而後等待下一個調用信息,最後,客戶端調用進程接收答覆信息,得到進程結果,而後調用執行繼續進行。
相關文章
相關標籤/搜索