RabbitMQ-從基礎到實戰(1)— Hello RabbitMQhtml
RabbitMQ-從基礎到實戰(2)— 防止消息丟失java
RabbitMQ-從基礎到實戰(3)— 消息的交換(上)編程
RabbitMQ-從基礎到實戰(5)— 消息的交換(下)post
RabbitMQ-從基礎到實戰(6)— 與Spring集成學習
本章節和官方教程類似度較高,英文好的能夠移步官方教程ui
在上一章的例子中,咱們建立了一個消費者,生產日誌消息,廣播給兩個消費者,對消息進行不一樣的處理。這一節,咱們將對它進行擴展,實現一些更加高級的功能,例如:使消費者A只接受error級別的日誌保存到硬盤,消費者B接收全部級別的消息進行打印。編碼
本文中涉及到的全部概念(包括前面幾章),都將摒棄我的經驗,以官方文檔爲基礎進行講解,在書寫本文的同時,也是我對RabbitMQ的從新學習。spa
回顧一下上一章的隊列綁定代碼線程
// 把剛剛獲取的隊列綁定到logs這個交換中心上,
channel.queueBind(queueName, "logs", "");3d
這段代碼在消費者中,爲何生產者沒有?由於在RabbitMQ中消息是發送到交換中心(exchange)的,這在上一張已經重點強調過。
上述代碼能夠理解成,queueName這個隊列對logs這個exchange中的消息感興趣,routingKey是""
在發送消息的basicPublish方法中,也有一個參數叫作routingKey,沒錯,他們是有關聯的,下面會介紹
在不一樣的exchange類型中,routingKey扮演的角色也相應的不一樣,好比上一章咱們使用的fanout(扇出,多貼切的名字,想象一下WOW中盜賊的刀扇)將忽略routingKey,全部綁定在fanout類型的exchange上的隊列,都將接收到該exchange上的全部消息。
fanout類型的exchange沒有給咱們太多的靈活性,direct類型的echange很是簡單,會匹配消息發佈時的routingKey和queue的routingKey,徹底相等則把消息放入該隊列。
如上圖,Q1綁定了orange,Q2綁定了black和green,就能夠實現不一樣級別的日誌用不一樣的消費者進行處理
咱們看到Q2綁定了兩個routingKey,難道第二次綁定不會把第一次綁定覆蓋掉嗎?
實踐出真正,咱們來試一下
改造一下發送方法,輪流發送info和error信息
給Consummer隊列綁定兩個routingKey
激動人心的時刻到來了,跑一把
Duang,報錯了
報錯信息:
inequivalent arg 'type' for exchange 'logs' in vhost '/': received 'direct' but current is 'fanout', class-id=40, method-id=10)
大意就是logs exchange已經被聲明稱fanout了,不能再聲明成direct類型,RabbitMQ的隊列聲明方法和exchange聲明方法都是冪等的,若是沒有,就建立,若是有,參數相同,就無論,若是有了還用不一樣的參數從新聲明,就報錯
進入RabbitMQ控制檯把logs刪除,從新執行
逆襲成功,消費一下看看
成功了,一個隊列能夠綁定多個routingKey,這裏注意先啓動消費者,由於前面的代碼裏咱們用的是臨時隊列,斷開鏈接後,隊列就刪除了,若是先啓動生產者,exchange接到消息後發現沒有隊列對它感興趣,就職性的把消息給丟掉了。
一個隊列能夠綁定多個routingKey,反之,一個routingKey也能夠綁定多個隊列,以下圖,感興趣的朋友能夠本身試一下
若是綁定在一個direct類型的exchange上的隊列都使用同一個routingKey,那它就是一個fanout
要實現本章的需求,即Q1只接收error級別的日誌寫到硬盤上,Q2接收error和info級別的日誌打印出來
用direct類型的exchange來實現這個需求很是簡單,Q1綁定error,Q2綁定error和info便可,缺點是Q2須要綁定N個routingKey,N=日誌級別數量,咱們能夠用一些編程的技巧來規避它
Sender的代碼上面已經改好了,把exchange換爲direct,注意刪除原exchange,再也不贅述
Q2綁定全部日誌級別,咱們用一個Enum來規避手動綁定
定義一個Enum
1 public enum LogType{ 2 error,info; 3 }
用foreach語法糖進行循環綁定
1 //綁定全部類型 2 for(LogType logType: LogType.values()){ 3 channel.queueBind(queueName, "logs", logType.name()); 4 }
foreach是單線程的,這裏也能夠裝個逼用一下JAVA8的lambda,因爲lambda是並行處理,因此外圍的try catch無效,須要在內層從新抓取異常,並且不能拋出,反而顯得代碼很不美好,裝逼失敗
1 IntStream.range(0, LogType.values().length).forEach(n->{ 2 try { 3 channel.queueBind(queueName, "logs", LogType.values()[n].name()); 4 } catch (IOException e) { 5 e.printStackTrace(); 6 } 7 });
把另一個Consumer改爲只綁定error隊列
1 channel.queueBind(queueName, "logs", LogType.error.name());
而後,改造一下發送消息的地方,一開始咱們用了一個while還有一組if else,看起來比較挫,別人看你代碼的時候,就不會以爲你很厲害,這樣和你不寫博客同樣,對你的工做是沒有好處的,咱們把它改的高端一點
1 while(true){ 2 boolean info = ++i%2==0; 3 String type = info?LogType.info.name():LogType.error.name(); 4 sender.sendMessage(type +" message: "+i, type); 5 Thread.sleep(1000); 6 }
對比一下
是否是以爲本身厲害了不少?這就是編碼的藝術(得意臉)
好了,這一章沒有太多內容,跑一下看看結果
左邊的Consumer1,消費了info和error級別的日誌,右邊的Consumer2,只消費了error級別的日誌
這一章主要是介紹了RabbitMQ中direct類型的exchange,下一章將跟着官方教程的進度繼續介紹topic類型的exchange,以及下下章介紹用RabbitMQ實現RPC調用。以後則會介紹RabbitMQ與Spring的集成等與真實開發環境更相關的技術。