本示例程序所有來自rabbitmq官方示例程序,rabbitmq-demo;
官方共有6個demo,針對不一樣的語言(如 C#,Java,Spring-AMQP等),都有不一樣的示例程序;
本示例程序主要是Spring-AMQP的參考示例,若是須要其餘語言的參考示例,能夠參考官網;html
4種不一樣的Exchange,對routingKey的解釋都不相同;
對routingKey的不一樣解釋,決定了Exchange路由Message到Queue的不一樣方案;java
aaa.bbb.xxx
、*.ccc.dd.#
,相似正則表達式匹配;<dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId> <version>4.0.2</version> </dependency>
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>
spring.profiles.active=hello-world, sender, receiver
Work queues官方示例
application.properties配置github
spring.profiles.active=work-queues, sender, receiver #spring.profiles.active=work-queues, sender #spring.profiles.active=work-queues, receiver
詳細描述參見:單生產者-多消費者詳細正則表達式
Publish/Subscribe官方示例
spring
application.properties配置緩存
spring.profiles.active=pub-sub, receiver , sender
詳細描述參見:發佈/訂閱詳細架構
Routing官方示例app
a message goes to the queues whose binding key
exactly matches the routing key
of the message;(相等時才路由)異步
兩個Queue使用相同的BingingKey(black) ==> 效果相似於:發佈/訂閱模式(demo3);
application.properties配置
pring.profiles.active=routing, receiver , sender
詳細描述參見:發佈/訂閱詳細
*.aaa.bbb.#
;*
: 能夠匹配一個word;#
: 能夠匹配0個或多個words;application.properties配置
pring.profiles.active=topics, receiver , sender
詳細描述參見:Topics
application.properties配置
spring.profiles.active=rpc,server #spring.profiles.active=rpc,client
詳細描述參見:RPC
消費者註冊後,rabbitmq將消息交付給消費者時,都會帶有一個「Delivery Tags」,這個是惟一的ID標識,id以整數的遞增的方式實現。
缺點:
批量確認
;delete
,不支持批量確認
;假設:在Channel(ch)上有5,6,7,8這4個delivery tags未確認;
delivery_tag=8 & multiple=true
: 則5,6,7,8這4個tags都將被確認;delivery_tag=8 & multiple=false
:則只有8被確認,而5,6,7將不會被被確認;異步
完成的,手動確認也是異步
的;但願控制未被確認消息的size,防止無界的緩存
;prefetch count
:使用basic.qos
方法設置該值能夠控制未被確認消息的max size;basic.qos
方法有效,對basic.get
方法無效;假設:在Channel(Ch)上有5,6,7,8共4個未被確認
的消息,且ch的prefetch count=4
;
結果:rabbitmq將不會再交付任何消息到該Channel上,除非有消息被確認;
prefetch
:提升向消費者傳遞消息的速度;應避免:
自動確認模式
;手動確認模式
+ 無限制的prefetch
;結論:
狀況1
和狀況2
均可能致使交付但將來得及處理
的Message增長,增大RAM的消耗;推薦值:
prefetch
: 100~300,能夠有效提升吞吐量,並避免RAM消耗過多的風險;當消息發送給消費端後,若是出現以下狀況,則消息會從新reQueue
,會被再次發送;
消費端沒法對同一個消息確認超過一次,當超過一次以後,將拋出Channel error: PRECONDITION_FAILED - unknown delivery tag XXXX
delivery tag
;每一個消息單獨確認
和批量消息確認
;prefetchCount
:能夠控制消息端的吞吐量,避免消費端消費過慢,產生RAM大量消耗;TCP鏈接中斷
或消費端掛掉
,都會引發消息從新入隊列,從新消費(手動消息確認時);確認
,不然會拋出異常;在 AMQP 0-9-1中,保證消息不丟失的惟一方法,就是使用事務;
confirm.select
: 應用於Channel時,表示使用確認模式
;事務
和確認模式
沒法共存:兩者只能選擇其一;confirm.select
;broker
發送basic.ack
來確認Message已被處理;delivery-tag
: 消息序列,具備惟一性;multiple=true
: 用於設置批量消息確認
;confirmed(OK)
,要麼被nack(fail)
,且only once;Java示例:(發送端發送大量messages,使用確認模式)
程序-確認模式
異常狀況時,服務端沒法處理消息,則broker
發送basic.nack
來進行否認確認
;
basic.ack
應答;異步處理應答
、批量發送消息
;當使用異步發送和持久化消息時,broker對消息的確認順序
可能和發送者的消息發送順序
不一致;
Delivery tag is a 64 bit long value, and thus its maximum value is 9223372036854775807.Since delivery tags are scoped per channel, it is very unlikely that a publisher or consumer will run over this value in practice.