消息隊列&Celery&RabbitMQ&zeromq

1、消息隊列java

什麼是消息隊列?python

「消息隊列」是在消息的傳輸過程當中保存消息的容器。
「消息」是在兩臺計算機間傳送的數據單位。消息能夠很是簡單,例如只包含文本字符串;也能夠更復雜,可能包含嵌入對象。
消息被髮送到隊列中。「消息隊列」是在消息的傳輸過程當中保存消息的容器。消息隊列管理器是消息從它的源傳輸到它的目標時充當中間人。隊列的主要目的是提供路由並保證消息的傳遞;若是發送消息時接收者不可用,消息隊列會保留消息,直到能夠成功地傳遞它。redis

 

爲何使用消息隊列?數據庫

主要緣由是因爲在高併發環境下,因爲來不及同步處理,請求每每會發生堵塞,好比說,大量的insert,update之類的請求同時到達MySQL,直接致使無數的行鎖表鎖,甚至最後請求會堆積過多,從而觸發too many connections錯誤。經過使用消息隊列,咱們能夠異步處理請求,從而緩解系統的壓力。後端

總結:消息隊列中間件是分佈式系統中重要的組件,主要解決應用耦合,異步消息,流量削鋒等問題。實現高性能,高可用,可伸縮和最終一致性架構。是大型分佈式系統不可缺乏的中間件。目前在生產環境,使用較多的消息隊列有ActiveMQ,RabbitMQ,ZeroMQ,Kafka,MetaMQ,RocketMQ等緩存

  

消息隊列特色:服務器

  • 採用異步處理模式:消息發送者能夠發送一個消息而無須等待響應。消息發送者將消息發送到一條虛擬的通道隊列)上,消息接收者訂閱監聽該通道。一條信息可能最終轉發給一個或多個消息接收者,這些接收者都無需對消息發送者作出同步迴應。整個過程都是異步的網絡

  • 應用系統之間解耦合:數據結構

    • 發送者和接受者沒必要了解對方、只須要 確認消息
    • 發送者和接受者 沒必要同時在線

  好比在線交易系統爲了保證數據的最終一致,在支付系統處理完成後會把支付結果放到消息中間件裏,通知訂單系統修改訂單支付狀態。兩個系統是經過消息中間件解耦的。架構

 

應用場景: 

  • 異步處理,舉個栗子:現有用戶註冊模塊,須要同時完成寫入註冊數據至數據庫、發送激活郵件、發送短信驗證碼。實現包括:串行方式、並行方式
    • 串行方式:先將註冊信息寫入數據庫成功後,再發送激活郵件,最後發送短信驗證碼。以上三個任務依次所有完成後,返回給客戶。
    • 並行方式:先將註冊信息寫入數據庫成功後,發送註冊郵件的同時,一塊兒發送註冊短信。以上三個任務完成後,返回給客戶端。與串行的差異是,並行的方式能夠提升處理的時間。
  • 應用解耦,舉個栗子:現有用戶下單模塊,當用戶下單後,訂單系統須要通知庫存系統。傳統的作法是,在訂單系統調用庫存系統的接口。假如庫存系統沒法訪問,則訂單系統減庫存將失敗,從而致使訂單失敗,訂單系統與庫存系統耦合度太高。
    • 訂單系統:用戶下單後,在訂單系統中將調用庫存系統接口的操做放入到消息隊列,訂單系統中再也不阻塞等待庫存系統的返回結果。並將訂單下單成功返回給用戶。

    • 庫存系統:在消息隊列中訂閱下單的消息,採用拉/推的方式,獲取下單信息,庫存系統根據下單信息,進行庫存操做。

    • 假如:在下單時庫存系統不能正常使用。也不影響正常下單,由於下單後,訂單系統寫入消息隊列就再也不關心其餘的後續操做了。實現訂單系統與庫存系統的應用解耦。

  • 流量削鋒,通常在秒殺或團搶活動中使用普遍。舉個栗子:現有一個秒殺活動,通常會由於流量過大、暴增而致使應用掛掉。爲解決這個問題,通常須要將用戶請求加入消息隊列,達到控制活動的人數,能夠緩解短期內高流量壓垮應用。消息通信
    • 服務器接收用戶請求後,首先寫入消息隊列。假如消息隊列長度超過最大數量,則直接拋棄用戶請求或跳轉到錯誤頁面。
    • 秒殺業務根據消息隊列中的請求信息,再作後續處理。

  • 消息通信
    • 點對點通信:客戶端A和客戶端B使用同一隊列,進行消息通信;

 

 

  • 聊天室通信:客戶端A,客戶端B,客戶端N訂閱同一主題,進行消息發佈和接收。實現相似聊天室效果。

 

 

消息隊列的缺點:

系統可用性下降:系統引入的外部依賴越多,越容易掛掉,假如BCD系統如今都要調用系統A,爲了使應用之間解耦合,使用了消息隊列MQ,可是有一個問題是若是MQ掛掉,整個系統就都不能使用了。

系統複雜性提升:硬生生加個MQ進來,沒法保證消息沒有被重複消費,也沒法自動解決消息丟失的狀況,消息傳遞的順序性也沒辦法保證。

一致性問題:因爲後臺任務是按期對消息隊列中的消息進行處理,於是觸發的時機是不可預測的。生產者將任務發佈於消息隊列中,而後再由消費者訂閱任務進行處理,但生產者將任務存放後再也不關係其執行結果是否成功,而是直接返回成功,若是執消費者處理任務失敗了,就形成了數據的不一致。

 

消息隊列的傳輸模型

  • 點對點模型:用於消息生產者消息消費者之間點到點的通訊。消息生產者將消息發送到由某個名字標識的特定消費者。這個名字實際上對於消費服務中的一個隊列Queue),在消息傳遞給消費者以前它被存儲在這個隊列中。隊列消息能夠放在內存中也能夠持久化,以保證在消息服務出現故障時仍然可以傳遞消息。

    傳統的點對點消息中間件一般由 消息隊列服務消息傳遞服務消息隊列消息應用程序接口 API 組成,其典型的結構以下圖所示。



    • 特色:每一個消息只用一個消費者;發送者和接受者沒有時間依賴;接受者確認消息接受和處理成功。

 

  • 發佈/訂閱模型:支持向一個特定的消息主題生產消息。0多個訂閱者 可能對接收來自 特定消息主題 的消息感興趣。在這種模型下,發佈者和訂閱者彼此不知道對方,就比如是匿名公告板。這種模式被概況爲:多個消費者能夠得到消息,發佈者訂閱者 之間存在 時間依賴性。發佈者須要創建一個 訂閱subscription),以便可以消費者訂閱。訂閱者 必須保持 持續的活動狀態接收消息

  在這種狀況下,在訂閱者 未鏈接時,發佈的消息將在訂閱者 從新鏈接從新發布,以下圖所示:


    • 特色:每一個消息能夠有多個訂閱者;客戶端只有訂閱後才能接收到消息;持久訂閱和非持久訂閱。

注意:

  1. 發佈者和訂閱者有時間依賴:接受者和發佈者只有創建訂閱關係才能收到消息;
  2. 持久訂閱:訂閱關係創建後,消息就不會消失,無論訂閱者是否都在線;
  3. 非持久訂閱:訂閱者爲了接受消息,必須一直在線。 當只有一個訂閱者時約等於點對點模式

 

 

消息丟失的解決:

生產者弄丟了數據(producer)生產者將數據發送到 RabbitMQ 的時候,可能數據在半路就丟失了,由於大併發寫入隊列致使消息丟失,網絡問題啥的等等緣由。 

  • RabbitMQ:1. 開啓RabbitMQ提供的事務功能,在生產者發送數據以前開啓 RabbitMQ 事務channel.txSelect,而後再發送消息,若是消息沒有成功被 RabbitMQ 接收到,那麼生產者會收到異常報錯,此時就能夠回滾事務channel.txRollback,再重試發送消息;若是收到了消息,就能夠提交事務channel.txCommit但開啓事務太耗費系統性能不推薦。
    # 開啓事務
    channel.txSelect()
    try# 發送消息到消息隊列中
    except Exception as e:
        # 事務回滾
        channel.txRollback()
    else:
        # 提交事務
        channel.txCommit()

     

  • RabbitMQ:2. 開啓 confirm 模式,生產者設置開啓 confirm 模式以後,當每次寫入消息時都會分配一個惟一的 id,若是消息成功寫入了 RabbitMQ 中,RabbitMQ 會回傳一個 ack 消息,提示這個消息寫入成功。若是 RabbitMQ 沒能處理這個消息,會回調生產者的一個 nack 接口,提示這個消息寫入失敗,能夠再次重試。並且能夠結合這個機制本身在內存裏維護每一個消息 id 的狀態,若是超過必定時間還沒接收到這個消息的回調,那麼你能夠重發。

  :事務機制和 cnofirm 機制的區別:事務機制是同步的,提交一個事務以後會一直阻塞,可是 confirm 機制是異步的,發送完這個消息以後能夠繼續發送下一個消息,第一個寫入的那個消息 RabbitMQ 接收以後,會異步回調你的一個接口通知你這個消息接收到了。因此通常在生產者這塊避免數據丟失,都是用 confirm 機制的。

 

  • Redis:RedisPUSH/POP機制,利用redis的列表數據結構。比較好的使用模式是,生產者lpush消息,消費者brpop消息,並設定超時時間,能夠減小redis的壓力。這種方案相對於第一種方案數據可靠性是提升了,只有在redis宕機且數據沒有持久化的狀況下丟失數據,能夠根據業務經過AOF和縮短持久化時間間隔來保證很高的可靠性,並且也能夠經過多個client來提升消費速度。但相對於專業的消息隊列來講,這種方案消息的狀態過於簡單(沒有狀態),沒有ack機制,消息取出後消費失敗依賴於client記錄日誌或者從新push到隊列裏面。

    注:Redis相較於rabbitMQ沒有ack機制,也不能保證消息的順序性,不適應用做於消息隊列來使用。能夠考慮消息中間件:Redis做者開源的Disque、阿里開源RocketMQ,以及基於Golang的nsq等,Redis更適用於存儲數據。

    • 每一個消費者(單線程)都有本身的單獨的隊列,這個隊列是用rpoplpush命令從公共隊列生成。而後:若是消費成功了,用rpop命令把本身的隊列銷燬,進行下一次循環;若是消費失敗,由於是消費者私有的隊列,能夠自由選擇如何處理,不用擔憂衝突;若是消費者崩潰了,在消費者啓動時會檢查本身的隊列,把本身隊列裏的元素放回公共隊列。在不考慮redis掛掉的狀況下,這種機制保證了任務至少被消費一次。


消息隊列(Queue)弄丟了數據:  消息隊列本身弄丟了數據,這此時必須開啓持久化功能,就是消息寫入以後會持久化到磁盤,哪怕是消息隊列本身宕機了,恢復以後會自動讀取以前存儲的數據,通常數據不會丟。

RabbitMQ 設置持久化有兩個步驟

  • 建立 queue 時設置爲持久化:這樣就能夠保證 RabbitMQ 持久化 queue 的元數據,可是它是不會持久化 queue 裏的數據的。
channel.queue_declare(queue='shuaigaogao', durable=True) # durable=True 持久化 
  • 發送消息時設置持久化,將消息的 deliveryMode 設置爲 2,就是將消息設置爲持久化的,此時 RabbitMQ 就會將消息持久化到磁盤上去。
channel.basic_publish(exchange="",
                      routing_key="shuaigaogao",   # queue的名字
                      body="hello world",          # body是要發送的內容
                      properties=pika.BasicProperties(delivery_mode=2,) # make message persistent=>使消息持久化的特性
                      )

  注意:必需要同時設置這兩個持久化,這樣 RabbitMQ 哪怕是掛了,再次重啓,也會從磁盤上重啓恢復 queue,恢復這個 queue 裏的數據。另外一種狀況,已經給 RabbitMQ 開啓了持久化機制,生產者發送了消息到消息隊列中,而且也將其寫入了 RabbitMQ 中,可是還沒來得及持久化到磁盤上,結果不巧,此時 RabbitMQ 掛了,就會致使內存裏的一點點數據丟失。因此,持久化能夠跟生產者那邊的 confirm 機制配合起來,只有當消息被持久化到磁盤以後,纔會通知生產者 ack 了,因此哪怕是在持久化到磁盤以前,RabbitMQ 掛了,數據丟了,生產者收不到 ack你也是能夠本身重發的。


Redis 設置持久化有兩種方式:RDBAOF
 

消費端弄丟了數據:消費者從消息隊列中訂閱消息時,可能剛消費到還沒處理完,進程就掛了,好比宕機、死鎖、網絡抖動等等,可是消息隊列認爲你都消費了那麼這數據就丟了。

  • RabbitMQ啓用手動確認模式便可
    • ①自動確認模式,消費者掛掉,待ack的消息迴歸到隊列中。消費者拋出異常,消息會不斷的被重發,直處處理成功。不會丟失消息,即使服務掛掉,沒有處理完成的消息會重回隊列,可是異常會讓消息不斷重試。
    • ②手動確認模式,若是消費者來不及處理就死掉時,沒有響應ack時會重複發送一條信息給其餘消費者;若是監聽程序處理異常了,且未對異常進行捕獲,會一直重複接收消息,而後一直拋異常;若是對異常進行了捕獲,可是沒有在finallyack,也會一直重複發送消息(重試機制)。
    • ③不確認模式,acknowledge="none" 不使用確認機制,只要消息發送完成會當即在隊列移除,不管客戶端異常仍是斷開,只要發送完就移除,不會重發。
    • 消費者業務邏輯異常,可是未手動執行noack致使:經過noack方式來從新放入隊列

     注 :消息隊列經過判斷consumer鏈接狀況來判斷消息是否被從新放入隊列

 

2、celery

a、概念 celery是基於python實現的一個分佈式任務隊列框架,主要用於管理分佈式任務隊列、處理耗時的任務,支持使用任務隊列的方式在分佈的 機器/進程/線程上 執行任務調度。可讓任務的執行徹底脫離主程序,甚至能夠被分配到其餘主機上運行,一般使用它實現異步任務&定時任務

 

b、組件 

  • 任務(tasks)-- 用戶定義的函數,用於實現應用功能,好比執行一個發送短信的耗時任務。
  • 消息中間件(Broker)-- 用於存放tasks的地方,代指任務隊列自己,這個中間人須要解決的一個問題,就是可能須要存放很是很是多的tasks,並且要保證Worker可以從這裏拿取,常見的有broker有Redis
  • 任務執行單元(Worker)-- 用於執行tasks,從broker中取出tasks,調用執行任務函數。

 

c、具體功能 :

 任務模塊 Task:包含異步任務和定時任務。其中,異步任務一般在業務邏輯中被觸發併發往任務隊列,而定時任務由 Celery Beat 進程週期性地將任務發往任務隊列。

 消息中間件 Broker:Broker,即爲任務調度隊列,接收任務生產者發來的消息(即任務),將任務存入隊列。Celery 自己不提供隊列服務,官方推薦使用 RabbitMQ 和 Redis 等。

 任務執行單元 Worker:Worker 是執行任務的處理單元,它實時監控消息隊列,獲取隊列中調度的任務,並執行它。

 任務結果存儲 Backend:Backend 用於存儲任務的執行結果,以供查詢。同消息中間件同樣,存儲也可以使用 RabbitMQ, Redis 和 MongoDB 等。

 

d、底層原理

celery架構由三個模塊組成:消息中間件(message broker),任務執行單元(worker)和任務執行結果存儲(task result store)組成。

消息中間件(Broker): 消息中間人,是任務調度隊列,是一個獨立的服務,是一個生產者消費之模式,生產者把任務放入隊列中,消費者(worker)從任務隊列中取出任務執行,任務的執行能夠按照順序依次執行也能夠按照計劃時間進行。可是Broker自己不提供隊列服務,因此要集成第三方隊列,推薦使用RatbbitMQ或Redis.

任務執行單元(worker):即執行任務的程序,能夠有多個併發。它實時監控消息隊列,獲取隊列中調度的任務,並執行它。

任務執行結果存儲(task result store):因爲任務的執行同主程序分開,若是主程序想獲取任務執行的結果,就必須經過中間件存儲。同消息中間人同樣,存儲也可使用RabbitMQ、Redis;另外,假如不須要保存執行的結果也能夠不配置這個模塊。

 

e、實現步驟 :

  • 建立一個 Celery 實例 
  • 啓動 Celery Worker 
  • 應用程序中調用異步任務

 

 

3、RabbitMQ 

a、概念

RabbitMQ 是一個由 Erlang 語言開發的,而且基於 AMQPAdvanced Message Queuing Protocol)高級消息隊列協議的消息隊列服務。用於在分佈式系統中存儲轉發消息,在易用性、擴展性、高可用性等方面表現不俗。默認端口5672。

AMQP :Advanced Message Queue,高級消息隊列協議。它是應用層協議的一個開放標準,爲面向消息的中間件設計,基於此協議的客戶端與消息中間件可傳遞消息,並不受產品、開發語言等條件的限制。

 

b、特色: 

  • 可靠性(Reliability):RabbitMQ 使用一些機制來保證可靠性,如持久化、傳輸確認、發佈確認。

  • 靈活的路由(Flexible Routing):在消息進入隊列以前,經過 Exchange 來路由消息的。對於典型的路由功能,RabbitMQ 已經提供了一些內置的 Exchange 來實現。針對更復雜的路由功能,能夠將多個 Exchange 綁定在一塊兒,也經過插件機制實現本身的 Exchange

  • 消息集羣(Clustering):多個 RabbitMQ 服務器能夠組成一個集羣,造成一個邏輯 Broker

  • 高可用(Highly Available Queues):隊列能夠在集羣中的機器上進行鏡像,使得在部分節點出問題的狀況下隊列仍然可用。

  • 多種協議(Multi-protocol):RabbitMQ 支持多種消息隊列協議,好比 STOMP、MQTT 等等。

  • 多語言客戶端(Many Clients):RabbitMQ 幾乎支持全部經常使用語言,好比 Java、.NET、Ruby 等等。

  • 管理界面(Management UI):RabbitMQ 提供了一個易用的用戶界面,使得用戶能夠監控和管理消息 Broker 的許多方面。

  • 跟蹤機制(Tracing):若是消息異常,RabbitMQ 提供了消息跟蹤機制,使用者能夠找出發生了什麼。

  • 插件機制(Plugin System):RabbitMQ 提供了許多插件,來從多方面進行擴展,也能夠編寫本身的插件。

 
c、組件:

RabbitMQ是一個消息代理:它接受和轉發消息。能夠將其視爲郵局:服務器將(生產者)將要發佈的郵件放在郵箱中,RabbitMQ最終會將郵件發送給對應API(消費者)。 在這個類比中,RabbitMQ是一個郵箱,郵局和郵遞員。RabbitMQ和郵局之間的主要區別在於它不處理紙張,而是接受,存儲和轉發二進制blob數據 - 消息。在RabbitMQ中郵箱就是一個隊列,消息存儲在隊列當中,隊列只受主機的內存和磁盤限制的約束,它本質上是一個大的消息緩衝區。許多生產者能夠發送到同一個消息隊列,反之也容許不少消費者從同一個隊列中接收數據。

生產者是發送消息的程序,消費者是等待接收消息的程序。生產者,消費者和代理(消息隊列)沒必要駐留在同一主機上;

 

  • Broker:也稱Broker/RabbitMQ Server,一種傳輸服務,維護一條從ProducerConsumer的路線,保證數據可以按照指定的方式進行傳輸。

  • Producer:消息生產者,數據的發送方。一個Message有兩個部分:payload(有效載荷)和label(標籤)。payload:傳輸的數據。labelexchange的名字或者說是一個tag,它描述了payload,並且RabbitMQ也是經過這個label來決定把這個Message發給哪一個ConsumerAMQP僅僅描述了label,而RabbitMQ決定了如何使用這個label的規則。

  • Consumer:消息消費者,數據的接收方。把queue比做是一個有名字的郵箱。當有Message到達某個郵箱後,RabbitMQ把它發送給它的某個訂閱者即Consumer, 也可能會把同一個Message發送給不少的Consumer。在這個Message中,只有payload,label已經被刪掉了。對於Consumer來講,它是不知道誰發送的這個信息的,就是協議自己不支持。若是Producer發送的payload包含了Producer的信息就另當別論。
  • Exchange:消息交換器,用於接受、分配消息;它指定消息按什麼規則、路由到哪一個隊列,用來接收生產者發送的消息並將這些消息路由給服務器中的隊列。

  • Queue:隊列,用於存儲生產者的消息;保存消息直到發送給消費者。它是消息的容器,也是消息的終點。一個消息可投入一個或多個隊列。消息一直在隊列裏面,等待消費者鏈接到這個隊列將其取走。

  • Binding:綁定,決定交換器的消息應該發送到那個隊列。它的做用就是把交換器(exchange)和消息隊列(queue)按照路由規則綁定起來,是基於路由鍵將交換器和消息隊列鏈接起來的路由規則成爲一個綁定,因此能夠將交換器理解成一個由綁定構成的路由表。

  • Routing Key:路由鍵,用於把生成者的數據分配到交換器上;exchange根據這個關鍵字進行消息投遞

  • BindingKey(綁定鍵):用於把交換器的消息綁定到隊列上;
  • VHost:虛擬主機,一個broker裏能夠開設多個vhost,用做不一樣用戶的權限分離。表示一批交換器、消息隊列和相關對象。虛擬主機是共享相同的身份認證和加密環境的獨立服務器域。每一個 vhost 本質上就是一個 mini 版的 RabbitMQ 服務器,擁有本身的隊列、交換器、綁定和權限機制。vhostAMQP 概念的基礎,必須在鏈接時指定,RabbitMQ 默認的 vhost 是 / 。

  • Connection: 網絡鏈接,好比一個TCP鏈接。ProducerConsumer都是經過TCP鏈接到RabbitMQ Server的,程序的起始處就是創建這個TCP鏈接。
  • Channel(信道):消息通道,消息推送使用的通道;在客戶端的每一個鏈接裏,可創建多個channel,每一個channel表明一個會話任務;多路複用鏈接中的一條獨立的雙向數據流通道。信道是創建在真實的TCP鏈接內地虛擬鏈接,AMQP 命令都是經過信道發出去的,無論是發佈消息、訂閱隊列仍是接收消息,這些動做都是經過信道完成。由於對於操做系統來講創建和銷燬 TCP 都是很是昂貴的開銷,因此引入了信道的概念,以複用一條 TCP 鏈接。

  • ConnectionFactory(鏈接管理器)應用程序與Rabbit之間創建鏈接的管理器,程序代碼中使用;
  • Message:  消息,消息是不具名的,它由消息頭和消息體組成。消息體是不透明的,而消息頭則由一系列的可選屬性組成,這些屬性包括routing-key(路由鍵)、priority(相對於其餘消息的優先權)、delivery-mode(指出該消息可能須要持久性存儲)等。

Exchange、Queue、RoutingKey三個才能決定一個從ExchangeQueue的惟一的線路。

 

爲何使用Channel,而不是直接使用TCP鏈接

對於OS來講,創建和關閉TCP鏈接是有代價的,頻繁的創建關閉TCP鏈接對於系統的性能有很大的影響,並且TCP的鏈接數也有限制,這也限制了系統處理高併發的能力。可是,在TCP鏈接中創建Channel是沒有上述代價的。對於Producer或者Consumer來講,能夠併發的使用多個Channel進行Publish或者Receive。有實驗代表,1s的數據能夠Publish10K的數據包。固然對於不一樣的硬件環境,不一樣的數據包大小這個數據確定不同,可是我只想說明,對於普通的Consumer或者Producer來講,這已經足夠了。若是不夠用,你考慮的應該是如何細化SPLIT你的設計。

 

Exchange的分發策略:

Exchange分發消息時根據類型的不一樣分發策略有區別,目前共四種類型:direct、fanout、topic、headers。headers 匹配 AMQP 消息的 header 而不是路由鍵,此外 headers 交換器和 direct 交換器徹底一致,但性能差不少,目前幾乎用不到。

  • 消息中的路由鍵(routing key)若是和 Binding 中的 binding key 一致, 交換器就將消息發到對應的隊列中。路由鍵與隊列名徹底匹配,若是一個隊列綁定到交換機要求路由鍵爲「dog」,則只轉發 routing key 標記爲「dog」的消息,不會轉發「dog.puppy」,也不會轉發「dog.guard」等等。它是徹底匹配、單播的模式。
  • fanout每一個發到 fanout 類型交換器的消息都會分到全部綁定的隊列上去fanout 交換器不處理路由鍵,只是簡單的將隊列綁定到交換器上,每一個發送到交換器的消息都會被轉發到與該交換器綁定的全部隊列上。很像子網廣播,每臺子網內的主機都得到了一份複製的消息。fanout 類型轉發消息是最快的。
  • topic:topic 交換器經過模式匹配分配消息的路由鍵屬性,將路由鍵和某個模式進行匹配,此時隊列須要綁定到一個模式上。它將路由鍵和綁定鍵的字符串切分紅單詞,這些單詞之間用點隔開。它一樣也會識別兩個通配符:符號「#」和符號「」。匹配0個或多個單詞匹配很少很多一個單詞。

 

4、zeromq

a、概念

zeromq 是一個爲可伸縮的分佈式或併發應用程序設計的高性能異步消息庫,提供一個消息隊列, 可是與面向消息的中間件不一樣,ZeroMQ的運行不須要專門的消息代理(message broker)該庫設計成常見的套接字風格的API。zeromq 並非相似rabbitmq消息列隊,它實際上只一個消息列隊組件,一個庫。

請求響應模式(Request-Reply用於未來自ZMQ_REQ客戶端的請求發送到一個或多個ZMQ_REP服務,並接收對發送的每一個請求的後續回覆,是一種遠程過程調用和任務分發模式。客戶端在請求後,服務端必須迴響應

發佈/訂閱模式Publish-Subscribe): 從單個發佈者到多個訂閱者的一對多數據分發。廣播全部client,沒有隊列緩存,斷開鏈接數據將永遠丟失。client能夠進行數據過濾。

管道模式Parallel Pipeline)以 push/pull 模式鏈接節點,能夠有多個步驟、循環。這是一種並行的任務分發和收集模式。由三部分組成,push進行數據推送,work進行數據緩存,pull進行數據競爭獲取處理。區別於Publish-Subscribe存在一個數據緩存和處理負載。當鏈接被斷開,數據不會丟失,重連後數據繼續發送到對端。

 

b、代碼實現

  • 請求響應模式
    # server端:
    import zmq
    
    context = zmq.Context()
    socket = context.socket(zmq.REP)
    socket.bind("tcp://*:5555")
    
    while True:
        message = socket.recv()
        print("Received: %s" % message)
        socket.send("I am OK!")
    
    
    # client端:
    import zmq
    import sys
    
    context = zmq.Context()
    socket = context.socket(zmq.REQ)
    socket.connect("tcp://localhost:5555")
    
    socket.send('Are you OK?')
    response = socket.recv();
    print("response: %s" % response)
    
    
    # 輸出:
    $ python app/server.py 
    Received: Are you OK?
    
    $ python app/client1.py 
    response: I am OK!

     

 

  • 發佈訂閱模式
    # server端:
    import zmq
    import time
    
    context = zmq.Context()
    socket = context.socket(zmq.PUB)
    socket.bind("tcp://*:5555")
    
    while True:
        print('發送消息')
        socket.send("消息羣發")
        time.sleep(1)    
    
    
    # client端1:
    import zmq
    import sys
    
    context = zmq.Context()
    socket = context.socket(zmq.SUB)
    socket.connect("tcp://localhost:5555")
    socket.setsockopt(zmq.SUBSCRIBE,'')  # 消息過濾
    while True:
        response = socket.recv();
        print("response: %s" % response)
    
    
    #client端2:
    import zmq
    import sys
    
    context = zmq.Context()
    socket = context.socket(zmq.SUB)
    socket.connect("tcp://localhost:5555")
    socket.setsockopt(zmq.SUBSCRIBE,'') 
    while True:
        response = socket.recv();
        print("response: %s" % response)
    
    
    # 輸出:
    $ python app/server.py 
    發送消息
    發送消息
    發送消息
    
    $ python app/client2.py 
    response: 消息羣發
    response: 消息羣發
    response: 消息羣發
    
    $ python app/client1.py 
    response: 消息羣發
    response: 消息羣發
    response: 消息羣發

     

  • 管道模式
    # server端:
    import zmq
    import time
    
    context = zmq.Context()
    socket = context.socket(zmq.PUSH)
    socket.bind("tcp://*:5557")
    
    while True:
        socket.send("測試消息")
        print "已發送"    
        time.sleep(1)    
    
    
    # work端:
    import zmq
    
    context = zmq.Context()
    
    recive = context.socket(zmq.PULL)
    recive.connect('tcp://127.0.0.1:5557')
    
    sender = context.socket(zmq.PUSH)
    sender.connect('tcp://127.0.0.1:5558')
    
    while True:
        data = recive.recv()
        print "正在轉發..."
        sender.send(data)
    
    
    # client端:
    import zmq
    import sys
    
    context = zmq.Context()
    socket = context.socket(zmq.PULL)
    socket.bind("tcp://*:5558")
    
    while True:
        response = socket.recv();
        print("response: %s" % response)
    
    
    # 輸出結果:
    $ python app/server.py 
    已發送
    已發送
    已發送
    
    $ python app/work.py 
    正在轉發...
    正在轉發...
    正在轉發...
    
    $ python app/client1.py
    response: 測試消息
    response: 測試消息
    response: 測試消息

     

 

 

特性

ActiveMQ

RabbitMQ

RocketMQ

Kafka

單機吞吐量

萬級,吞吐量比RocketMQ和Kafka要低了一個數量級

萬級,吞吐量比RocketMQ和Kafka要低了一個數量級

10萬級,RocketMQ也是能夠支撐高吞吐的一種MQ

10萬級別,這是kafka最大的優勢,就是吞吐量高。

 

通常配合大數據類的系統來進行實時數據計算、日誌採集等場景

topic數量對吞吐量的影響

 

 

topic能夠達到幾百,幾千個的級別,吞吐量會有較小幅度的降低

 

這是RocketMQ的一大優點,在同等機器下,能夠支撐大量的topic

topic從幾十個到幾百個的時候,吞吐量會大幅度降低

 

因此在同等機器下,kafka儘可能保證topic數量不要過多。若是要支撐大規模topic,須要增長更多的機器資源

時效性

ms級

微秒級,這是rabbitmq的一大特色,延遲是最低的

ms級

延遲在ms級之內

可用性

高,基於主從架構實現高可用性

高,基於主從架構實現高可用性

很是高,分佈式架構

很是高,kafka是分佈式的,一個數據多個副本,少數機器宕機,不會丟失數據,不會致使不可用

消息可靠性

有較低的機率丟失數據

 

通過參數優化配置,能夠作到0丟失

通過參數優化配置,消息能夠作到0丟失

功能支持

MQ領域的功能極其完備

基於erlang開發,因此併發能力很強,性能極其好,延時很低

MQ功能較爲完善,仍是分佈式的,擴展性好

功能較爲簡單,主要支持簡單的MQ功能,在大數據領域的實時計算以及日誌採集被大規模使用,是事實上的標準

優劣勢總結

很是成熟,功能強大,在業內大量的公司以及項目中都有應用

 

偶爾會有較低機率丟失消息

 

並且如今社區以及國內應用都愈來愈少,官方社區如今對ActiveMQ 5.x維護愈來愈少,幾個月才發佈一個版本

 

並且確實主要是基於解耦和異步來用的,較少在大規模吞吐的場景中使用

 

erlang語言開發,性能極其好,延時很低;

 

吞吐量到萬級,MQ功能比較完備

 

並且開源提供的管理界面很是棒,用起來很好用

 

社區相對比較活躍,幾乎每月都發布幾個版本分

 

在國內一些互聯網公司近幾年用rabbitmq也比較多一些

 

可是問題也是顯而易見的,RabbitMQ確實吞吐量會低一些,這是由於他作的實現機制比較重。

 

並且erlang開發,國內有幾個公司有實力作erlang源碼級別的研究和定製?若是說你沒這個實力的話,確實偶爾會有一些問題,你很難去看懂源碼,你公司對這個東西的掌控很弱,基本職能依賴於開源社區的快速維護和修復bug。

 

並且rabbitmq集羣動態擴展會很麻煩,不過這個我以爲還好。其實主要是erlang語言自己帶來的問題。很難讀源碼,很難定製和掌控。

接口簡單易用,並且畢竟在阿里大規模應用過,有阿里品牌保障

 

日處理消息上百億之多,能夠作到大規模吞吐,性能也很是好,分佈式擴展也很方便,社區維護還能夠,可靠性和可用性都是ok的,還能夠支撐大規模的topic數量,支持複雜MQ業務場景

 

並且一個很大的優點在於,阿里出品都是java系的,咱們能夠本身閱讀源碼,定製本身公司的MQ,能夠掌控

 

社區活躍度相對較爲通常,不過也還能夠,文檔相對來講簡單一些,而後接口這塊不是按照標準JMS規範走的有些系統要遷移須要修改大量代碼

 

還有就是阿里出臺的技術,你得作好這個技術萬一被拋棄,社區黃掉的風險,那若是大家公司有技術實力我以爲用RocketMQ挺好的

kafka的特色其實很明顯,就是僅僅提供較少的核心功能,可是提供超高的吞吐量,ms級的延遲,極高的可用性以及可靠性,並且分佈式能夠任意擴展

 

同時kafka最好是支撐較少的topic數量便可,保證其超高吞吐量

 

並且kafka惟一的一點劣勢是有可能消息重複消費,那麼對數據準確性會形成極其輕微的影響,在大數據領域中以及日誌採集中,這點輕微影響能夠忽略

 

這個特性自然適合大數據實時計算以及日誌收集

 

RocketMQ,適用於可靠性要求高的場景,尤爲是電商裏面的訂單扣款,以及業務削峯,在大量交易涌入時,後端可能沒法及時處理的狀況。

RabbitMQ,erlang語言自己的併發優點,性能較好,社區活躍度也比較高,可是不利於作二次開發和維護,適用於數據量沒有那麼大的場景。

Kafka,是基於Pull的模式來處理消息消費,追求高吞吐量,一開始的目的就是用於日誌收集和傳輸,適合產生大量數據的互聯網服務的數據收集業務,有日誌採集功能。

 

 

----------------------------------------------------

舉個栗子:「P」是咱們的生產者,「C」是咱們的消費者。中間的框是一個隊列 -  RabbitMQ表明消費者保留的消息緩衝區。

 

 

生產者將消息發送到「hello」隊列。使用者從該隊列接收消息。

-------------------------------------------------------------------------------

相關文章
相關標籤/搜索