歡迎關注我的公衆號:石杉的架構筆記(ID:shishan100)
面試
週一至週五早8點半!精品技術文章準時送上!算法
上一篇文章億級流量系統架構之如何在上萬併發場景下設計可擴展架構(中)?分析了一下如何利用消息中間件對系統進行解耦處理。數據庫
同時,咱們也提到了使用消息中間件還有利於一份數據被多個系統同時訂閱,供多個系統來使用於不一樣的目的。緩存
目前的一個架構以下圖所示。性能優化
在這個圖裏,咱們能夠清晰的看到,實時計算平臺發佈的一份數據到消息中間件裏,接着,會進行以下步驟:bash
所以上述場景中,使用消息中間件一來能夠解耦,二來還能夠實現消息「Pub/Sub」模型,實現消息的發佈與訂閱。架構
這篇文章,我們就來看看,假如說基於RabbitMQ做爲消息中間件,如何實現一份數據被多個系統同時訂閱的「Pub/Sub」模型。併發
上面那個圖,其實就是採用的RabbitMQ最基本的隊列消費模型的支持。
分佈式
也就是說,你能夠理解爲RabbitMQ內部有一個隊列,生產者不斷的發送數據到隊列裏,消息按照前後順序進入隊列中排隊。微服務
接着,假設隊列裏有4條數據,而後咱們有2個消費者一塊兒消費這個隊列的數據。
此時每一個消費者會均勻的被分配到2條數據,也就是說4條數據會均勻的分配給各個消費者,每一個消費者只不過是處理一部分數據罷了,這個就是典型的隊列消費模型。
若是有同窗對這塊基於RabbitMQ如何實現有點不太清楚的話,能夠參考以前的一些文章:
以前這幾篇文章,基本給出了上述那個最基本的隊列消費模型的RabbitMQ代碼實現,以及如何保證消費者宕機時數據不丟失,如何讓RabbitMQ集羣對queue和message都進行持久化。基本上總體代碼實現都比較完整,你們能夠去參考一下。
可是消息中間件還能夠實現一種「Pub/Sub」模型,也就是「發佈/訂閱」模型,Pub就是Publish,Sub就是Subscribe。
這種模型是能夠支持多個系統同時消費一份數據的。也就是說,你發佈出去的每條數據,都會廣播給每一個系統。
給你們來一張圖,一塊兒來感覺一下。
如上圖所示。也就是說,咱們想要實現的上圖的效果,實時計算平臺發佈一系列的數據到消息中間件裏。
而後數據查詢平臺、數據質量監控系統、數據鏈路追蹤系統,都會訂閱數據,都會消費到同一份完整的數據,每一個系統均可以根據本身的須要使用數據。
這,就是所謂的「Pub/Sub」模型,一個系統發佈一份數據出去,多個系統訂閱和消費到如出一轍的一份數據。
那若是要實現上述的效果,基於RabbitMQ應該怎麼來處理呢?
實際上來講,在RabbitMQ裏面是不容許生產者直接投遞消息到某個queue(隊列)裏的,而是隻能讓生產者投遞消息給RabbitMQ內部的一個特殊組件,叫作「exchange」。
關於這個exchange,大概你能夠把這個組件理解爲一種消息路由的組件。
也就是說,實時計算平臺發送出去的message到RabbitMQ中都是由一個exchange來接收的。
而後這個exchange會根據必定的規則決定要將這個message路由轉發到哪一個queue裏去,這個實際上就是RabbitMQ中的一個核心的消息模型。
你們看下面的圖,一塊兒來理解一下。
在以前的文章裏,咱們投遞消息到RabbitMQ的時候,也沒有用什麼exchange,可是爲何就仍是把消息投遞到了queue裏去呢?
那是由於咱們用了默認的exchange,他會直接把消息路由到你指定的那個queue裏去,因此若是簡單用隊列消費模型,不就省去了exchange的概念了嗎。
上面這段就是以前咱們給你們展現的,讓消息持久化的一種投遞消息的方式。
你們注意裏面的第一個參數,是一個空的字符串,這個空字符串的意思,就是說投遞消息到默認的exchange裏去,而後他就會路由消息到咱們指定的queue裏去。
在RabbitMQ裏,exchange這種組件有不少種類型,好比說:direct、topic、headers以及fanout。這裏我們就來看看最後一種,fanout這種類型的exchange組件。
這種exchange組件其實很是的簡單,你能夠建立一個fanout類型的exchange,而後給這個exchange綁定多個queue。
接着只要你投遞一條消息到這個exchange,他就會把消息路由給他綁定的全部queue。
使用下面的代碼就能夠建立一個exchange,好比說在實時計算平臺(生產者)的代碼裏,能夠加入下面的一段,建立一個fanout類型的exchange。第一個參數咱們叫作「rt_compute_data」,這個就是exchange的名字,rt就是「RealTime」的縮寫,意思就是實時計算系統的計算結果數據。
第二個參數就是定義了這個exchange的類型是「fanout」。
channel.exchangeDeclare(
"rt_compute_data",
"fanout");
複製代碼
接着咱們就採用下面的代碼來投遞消息到咱們建立好的exchange組件裏去:
你們會注意到,此時消息就是投遞到指定的exchange裏去了,可是路由到哪一個queue裏去呢?此時咱們暫時還沒肯定,要讓消費者本身來把本身的queue綁定到這個exchange上去才能夠。
咱們對消費者的代碼也進行修改,以前咱們在這裏關閉了autoAck機制,而後每次都是本身手動ack。
上面的代碼裏,每一個消費者系統,都會有一些不同,就是每一個消費者都須要定義本身的隊列,而後綁定到exchange上去。好比:
這樣,每一個訂閱這份數據的系統其實都有一個屬於本身的隊列,而後隊列裏被會被exchange路由進去實時計算平臺生產的全部數據。
並且由於是多個隊列的模式,每一個系統均可以部署消費者集羣來進行數據的消費和處理,很是的方便。
最後,給你們來一張大圖,咱們再跟着圖,來捋一捋整個流程。
如上圖所示,首先,實時計算平臺會投遞消息到「rt_compute_data」這個「exchange」裏去,可是他沒指定這個exchange要路由消息到哪一個隊列,由於這個他自己是不知道的。
接着數據查詢平臺、數據質量監控系統、數據鏈路追蹤系統,就能夠聲明本身的隊列,都綁定到exchange上去。
由於queue和exchange的綁定,在這裏是要由訂閱數據的平臺本身指定的。並且由於這個exchange是fanout類型的,他只要接收到了數據,就會路由數據到全部綁定到他的隊列裏去,這樣每一個隊列裏都有一樣的一份數據,供對應的平臺來消費。
並且針對每一個平臺本身的隊列,本身還能夠部署消費服務集羣來消費本身的一個隊列,本身的隊列裏的數據仍是會均勻分發給各個消費服務實例來處理,每一個消費服務實例會獲取到一部分的數據。
你們思考一下,這樣是否是就實現了不一樣的系統訂閱一份數據的「Pub/Sub」的模型?
固然,其實RabbitMQ還支持各類不一樣類型的exchange,能夠實現各類複雜的功能。
後續咱們將會給你們經過實際的線上系統架構案例,來闡述消息中間件技術的各類用法。
END
若有收穫,請幫忙轉發,您的鼓勵是做者最大的動力,謝謝!
一大波微服務、分佈式、高併發、高可用的原創系列文章正在路上
歡迎掃描下方二維碼,持續關注:
石杉的架構筆記(id:shishan100)
十餘年BAT架構經驗傾囊相授
推薦閱讀:二、【雙11狂歡的背後】微服務註冊中心如何承載大型系統的千萬級訪問?
三、【性能優化之道】每秒上萬併發下的Spring Cloud參數優化實戰
六、大規模集羣下Hadoop NameNode如何承載每秒上千次的高併發訪問
七、【性能優化的祕密】Hadoop如何將TB級大文件的上傳性能優化上百倍
八、拜託,面試請不要再問我TCC分佈式事務的實現原理坑爹呀!
九、【坑爹呀!】最終一致性分佈式事務如何保障實際生產中99.99%高可用?
十一、【眼前一亮!】看Hadoop底層算法如何優雅的將大規模集羣性能提高10倍以上?
1六、億級流量系統架構之如何設計全鏈路99.99%高可用架構
1八、大白話聊聊Java併發面試問題之volatile究竟是什麼?
1九、大白話聊聊Java併發面試問題之Java 8如何優化CAS性能?
20、大白話聊聊Java併發面試問題之談談你對AQS的理解?
2一、大白話聊聊Java併發面試問題之公平鎖與非公平鎖是啥?
2二、大白話聊聊Java併發面試問題之微服務註冊中心的讀寫鎖優化
2三、互聯網公司的面試官是如何360°無死角考察候選人的?(上篇)
2四、互聯網公司面試官是如何360°無死角考察候選人的?(下篇)
2五、Java進階面試系列之一:哥們,大家的系統架構中爲何要引入消息中間件?
2六、【Java進階面試系列之二】:哥們,那你說說系統架構引入消息中間件有什麼缺點?
2七、【行走的Offer收割機】記一位朋友斬獲BAT技術專家Offer的面試經歷
2八、【Java進階面試系列之三】哥們,消息中間件在大家項目裏是如何落地的?
2九、【Java進階面試系列之四】扎心!線上服務宕機時,如何保證數據100%不丟失?
30、一次JVM FullGC的背後,竟隱藏着驚心動魄的線上生產事故!
3一、【高併發優化實踐】10倍請求壓力來襲,你的系統會被擊垮嗎?
3二、【Java進階面試系列之五】消息中間件集羣崩潰,如何保證百萬生產數據不丟失?
做者:石杉的架構筆記 連接:https://juejin.im/post/5c23901a51882565986a1909 來源:掘金 著做權歸做者全部。商業轉載請聯繫做者得到受權,非商業轉載請註明出處。