RabbitMQ-從基礎到實戰(1)— Hello RabbitMQhtml
RabbitMQ-從基礎到實戰(3)— 消息的交換(上)java
RabbitMQ-從基礎到實戰(4)— 消息的交換(中)apache
RabbitMQ-從基礎到實戰(5)— 消息的交換(下)app
RabbitMQ-從基礎到實戰(6)— 與Spring集成maven
RabbitMQ中,消息丟失能夠簡單的分爲兩種:客戶端丟失和服務端丟失。針對這兩種消息丟失,RabbitMQ都給出了相應的解決方案。post
如圖,生產者P向隊列中生產消息,C1和C2消費隊列中的消息,默認狀況下,RabbitMQ會平均的分發消費給C1C2(Round-robin dispatching),假設一個任務的執行時間很是長,在執行過程當中,客戶端掛了(鏈接斷開),那麼,該客戶端正在處理且未完成的消息,以及分配給它還沒來得及執行的消息,都將丟失。由於默認狀況下,RabbitMQ分發完消息後,就會從內存中把消息刪除掉。ui
爲了解決上述問題,RabbitMQ引入了消息確認機制,當消息處理完成後,給Server端發送一個確認消息,來告訴服務端能夠刪除該消息了,若是鏈接斷開的時候,Server端沒有收到消費者發出的確認信息,則會把消息轉發給其餘保持在線的消費者。spa
首先,咱們驗證上述問題(客戶端丟失消息)是否真的存在,對Consumer進行以下改造。3d
先生產兩條消息code
啓動消費者,在消費者接收到消息,還沒處理完成的時候,強制關掉
這時,觀察控制檯,發現兩條消息都沒有了,1條是在執行中丟失的,還有1條,已經分配給這個Consumer,還沒來得及處理,也丟失了
這證實了上述問題是真的存在的,若是發生在生產環境,將產生難以預料的後果
爲了方便觀察,咱們用CMD來運行Consumer,要經過maven打成可執行的JAR包,須要在pom.xml中增長以下配置
<build> <finalName>Consumer</finalName> <plugins> <plugin> <artifactId>maven-assembly-plugin</artifactId> <configuration> <appendAssemblyId>false</appendAssemblyId> <descriptorRefs> <descriptorRef>jar-with-dependencies</descriptorRef> </descriptorRefs> <archive> <manifest> <mainClass>com.liyang.ticktock.rabbitmq.App</mainClass> </manifest> </archive> </configuration> <executions> <execution> <id>make-assembly</id> <phase>package</phase> <goals> <goal>assembly</goal> </goals> </execution> </executions> </plugin> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <configuration> <source>1.8</source> <target>1.8</target> </configuration> </plugin> </plugins> </build>
上述配置描述了最終打包名字、入口類路徑、帶上依賴包、使用1.8版本的JDK進行打包,配置完後,就能夠經過maven的install方法,在target目錄生成可執行的jar包,若是包大小很小,應檢查配置,是否是沒有帶上依賴包
再次改造Consummer類
install成可執行jar包,經過cmd開啓兩個consumer
經過Sender發送一條消息,而後用Ctrl+C結束先收到消息的Consumer,發現另一個Consumer接收到了未處理完的消息
問題獲得瞭解決,如今消費者在執行過程當中死掉也不會丟失消息了
看一下發送確認的方法
1 /** 2 * Acknowledge one or several received 3 * messages. Supply the deliveryTag from the {@link com.rabbitmq.client.AMQP.Basic.GetOk} 4 * or {@link com.rabbitmq.client.AMQP.Basic.Deliver} method 5 * containing the received message being acknowledged. 6 * @see com.rabbitmq.client.AMQP.Basic.Ack 7 * @param deliveryTag the tag from the received 這個是RabbitMQ用來區分消息的,文檔在這 8 * @param multiple true to acknowledge all messages up to and 爲true的話,確認全部消息,爲false只確認當前消息 9 * including the supplied delivery tag; false to acknowledge just 10 * the supplied delivery tag. 11 * @throws java.io.IOException if an error is encountered 12 */ 13 void basicAck(long deliveryTag, boolean multiple) throws IOException;
在官方文檔中,這樣描述deliveryTag
簡單來講,就是RabbitMQ內部用來區分消息的一個標籤,從envelope中獲取就好了
RabbitMQ只有在收到消費者確認後,纔會從內存中刪除消息,若是消費者忘了確認(更多狀況是由於代碼問題沒有執行到確認的代碼),將會致使內存泄漏
驗證一下
註釋掉Consumer中的確認代碼
運行Sender和Consumer,不停的生產消費消息,發現消費者在正常的消費消息
查看控制檯,發現已經被吃掉了43KB的內存,因此,在試用過程當中,必定要保證消息確認在任何狀況下均可以發出,不然即便消費者處理完成,RabbitMQ也不會把消息在內存中清除,在該消費者斷開鏈接以後,還會把消息轉發給其餘消費者從新處理,將引起難以預計的問題
如今,消費者宕機已經沒法影響到咱們的消息了,但若是RabbitMQ重啓了,消息依然會丟失。所幸的是,RabbitMQ提供了持久化的機制,將內存中的消息持久化到硬盤上,即便重啓RabbitMQ,消息也不會丟失。可是,仍然有一個很是短暫的時間窗口(RabbitMQ收到消息還沒來得及存到硬盤上)會致使消息丟失,若是須要嚴格的控制,能夠參考官方文檔
要使用RabbitMQ的消息持久化,在聲明隊列時設置一個參數便可
注意,RabbitMQ不容許對一個已經存在的隊列用不一樣的參數從新聲明,對於試圖這麼作的程序,會報錯,因此,改動以前代碼以前,要在控制檯中把原來的隊列刪除
從新聲明隊列後,發現Durable爲true
重啓RabbitMQ
隊列的消息沒有丟失
這一章介紹了RabbitMQ消息的確認和持久化,後面將會繼續深刻介紹RabbitMQ的其餘特性