http://www.rabbitmq.com/getstarted.html官網html
最近業務須要使用Rabbitmq工做隊列實現任務的負載分發緩存
RabbitMQ是實現AMQP(高級消息隊列協議)的消息中間件的一種,服務器端用Erlang語言編寫,支持多種客戶端,如:Python、Ruby、.NET、Java、JMS、C、PHP、ActionScript、XMPP、STOMP等,支持AJAX。用於在分佈式系統中存儲轉發消息。服務器
AMQP,即Advanced Message Queuing Protocol,高級消息隊列協議,是應用層協議的一個開放標準,爲面向消息的中間件設計。它從生產者接收消息並遞送給消費者,在這個過程當中,根據規則進行路由,緩存與持久化。併發
而在AMQP中主要有兩個組件:Exchange 和 Queue (在 AMQP 1.0 裏還會有變更),以下圖所示,綠色的 X 就是 Exchange ,紅色的是 Queue ,這二者都在 Server 端,又稱做 Broker ,這部分是 RabbitMQ 實現的,而藍色的則是客戶端,一般有 Producer 和 Consumer 兩種類型:分佈式
安裝配置過程,這裏不贅述。函數
RabbitMQ有六種應用場景,這裏作簡單介紹ui
應用場景1-「Hello Word」.net
一個P向queue發送一個message,一個C從該queue接收message並打印。設計
send.py
producer,鏈接至RabbitMQ Server,聲明隊列,發送message,關閉鏈接,退出。日誌
應用場景2-work queues
將耗時的消息處理經過隊列分配給多個consumer來處理,咱們稱此處的consumer爲worker,咱們將此處的queue稱爲Task Queue,其目的是爲了不資源密集型的task的同步處理,也即當即處理task並等待完成。相反,調度task使其稍後被處理。也即把task封裝進message併發送到task queue,worker進程在後臺運行,從task queue取出task並執行job,若運行了多個worker,則task可在多個worker間分配。
new_task.py
創建鏈接,聲明隊列,發送能夠模擬耗時任務的message,斷開鏈接、退出。
worker.py
創建鏈接,聲明隊列,不斷的接收message,處理任務,進行確認。
應用場景3-Publish/Subscribe
在應用場景2中一個message(task)僅被傳遞給了一個comsumer(worker)。如今咱們設法將一個message傳遞給多個consumer。這種模式被稱爲publish/subscribe。此處以一個簡單的日誌系統爲例進行說明。該系統包含一個log發送程序和一個log接收並打印的程序。由log發送者發送到queue的消息能夠被全部運行的log接收者接收。所以,咱們能夠運行一個log接收者直接在屏幕上顯示log,同時運行另外一個log接收者將log寫入磁盤文件。
receive_logs.py
日誌消息接收者:創建鏈接,聲明exchange,將exchange與queue進行綁定,開始不停的接收log並打印。
emit_log.py
日誌消息發送者:創建鏈接,聲明fanout類型的exchange,經過exchage向queue發送日誌消息,消息被廣播給全部接收者,關閉鏈接,退出。
應用場景4-Routing
應用場景3中構建了簡單的log系統,能夠將log message廣播至多個receiver。如今咱們將考慮只把指定的message類型發送給其subscriber,好比,只把error message寫到log file而將全部log message顯示在控制檯。
應用場景5-topic
應用場景4中改進的log系統中用direct類型的exchange替換應用場景3中的fanout類型exchange實現將不一樣的log message發送給不一樣的subscriber(也即分別經過不一樣的routing_key將queue綁定到exchange,這樣exchange即可將不一樣的message根據message內容路由至不一樣的queue)。但仍然存在限制,不能根據多個規則路由消息,好比接收者要麼只能收error類型的log message要麼只能收info類型的message。若是咱們不只想根據log的重要級別如info、warning、error等來進行log message路由還想同時根據log message的來源如auth、cron、kern來進行路由。爲了達到此目的,須要topic類型的exchange。topic類型的exchange中routing_key中能夠包含兩個特殊字符:「*」用於替代一個詞,「#」用於0個或多個詞。
receive_logs_topic.py
log message接收者:創建鏈接,聲明topic類型的exchange,聲明queue,根據程序參數構造routing_key,根據routing_key將queue綁定到exchange,循環接收並處理message。
emit_log_topic.py
log message發送者:創建鏈接、聲明topic類型的exchange、根據程序參數構建routing_key和要發送的message,以構建的routing_key將message發送給topic類型的exchange,關閉鏈接,退出。
應用場景6-PRC
在應用場景2中描述瞭如何使用work queue將耗時的task分配到不一樣的worker中。可是,若是咱們task是想在遠程的計算機上運行一個函數並等待返回結果呢。這根場景2中的描述是一個徹底不一樣的故事。這一模式被稱爲遠程過程調用。如今,咱們將構建一個RPC系統,包含一個client和可擴展的RPC server,經過返回斐波那契數來模擬RPC service。
rpc_server.py
RPC server:創建鏈接,聲明queue,定義了一個返回指定數字的斐波那契數的函數,定義了一個回調函數在接收到包含參數的調用請求後調用本身的返回斐波那契數的函數並將結果發送到與接收到message的queue相關聯的queue,並進行確認。開始接收調用請求並用回調函數進行請求處理。
rpc_client.py
RPC client:遠程過程調用發起者:定義了一個類,類中初始化到RabbitMQ Server的鏈接、聲明回調queue、開始在回調queue上等待接收響應、定義了在回調queue上接收到響應後的處理函數on_response根據響應關聯的correlation_id屬性做出響應、定義了調用函數並在其中向調用queue發送包含correlation_id等屬性的調用請求、初始化一個client實例,以30爲參數發起遠程過程調用。
你能夠根據本身想須要進行應用,具體代碼能夠去文章頂部的官網查看
本文參考:http://blog.csdn.net/zyz511919766/article/details/41946521