前言
消息隊列在現今數據量超大,併發量超高的系統中是十分經常使用的。本文將會對現時最經常使用到的幾款消息隊列框架ActiveMQ, RabbitMQ、卡夫卡進行分析對比。
詳細介紹RabbitMQ在春天框架下的結構及實現原理,從生產端的事務,回調函數(ConfirmCallback / ReturnCallback)到消費者端的MessageListenerContainer信息接收容器進行詳細的分析。經過對RabbitTemplate, SimpleMessageListenerContainer, DirectMessageListenerContainer等經常使用類型介紹,深刻剖析在消息處理各個傳輸環節中的原理及注意事項。
一,RabbitMQ與AMQP的關係
1.1 AMQP簡介
AMQP(高級消息隊列協議高級消息隊列協議)是一個消息隊列協議,它支持符合條件的客戶端和消息代理中間件(消息代理中間件)進行通信.RabbitMQ則是AMQP協議的實現者,主要用於在分佈式系統中信息的存儲發送與接收,RabbitMQ的服務器端用Erlang語言編寫,客戶端支持多種開發語言:Python,。 淨、Java、Ruby C, PHP, ActionScript, XMPP,踩等。
1.2 ActiveMQ, RabbitMQ,卡夫卡對比
如今在市場上有ActiveMQ, RabbitMQ,卡夫卡等多個經常使用的消息隊列框架,與其餘框架對比起來,RabbitMQ在易用性,擴展性,高可用性,多協議,支持多語言客戶端等方面都有不俗表現。
1.2.1 AcitveMQ特色
ActiveMQ是Apache以Java語言開發的消息模型,它完美地支持JMS (Java消息服務)消息服務,客戶端支持Java, C, c++, c#, Ruby、Perl、Python、PHP等多種開主發語言,支持OpenWire,跺腳,休息,XMPP, AMQP等多種協議.ActiveMQ採用異步消息傳遞方式,在設計上保證了多主機集羣,客戶端——服務器、點對點等模式的有效通訊。從開始它就是按照JMS 1.1和J2EE 1.4規範進行開發,實現了消息持久化,XA,事務支撐等功能。經歷多年的升級完善,現今已成爲Java應用開發中主流的消息解決方案。但相比起RabbitMQ,卡夫卡它的主要缺點表現爲資源消耗比較大,吞吐量較低,在高併發的狀況下系統支撐能力較弱。若是系統全程使用Java開發,其併發量在可控範圍內,或系統須要支持多種不一樣的協議,使用ActiveMQ可更輕便地搭建起消息隊列服務。
1.2.2卡夫卡特色
卡夫卡天生是面向分佈式系統開發的消息隊列,它具備高性能,容災性,可動態擴容等特色.Kafka與生俱來的特色在於它會把每一個分區的數據都備份到不一樣的服務器當中,並與飼養員配合,當某個經紀人故障失效時,飼養員服務就會將通知生產者和消費者,從備份服務器進行數據恢復。在性能上卡夫卡也大大超越了傳統的ActiveMQ, RabbitMQ,因爲卡夫卡集羣可支持動態擴容,在負載量到達峯值時可動態增長新的服務器進集羣而無需重啓服務。但因爲卡夫卡屬於分佈式系統,因此它只能在同一分區內實現消息有序,沒法實現全局消息有序。並且它內部的監控機制不夠完善,須要安裝插件,依賴飼養員進行元數據管理。若是系統屬於分佈式管理機制,數據量較大且併發量難以預估的狀況下,建議使用卡夫卡隊列。
1.2.3 RabbitMQ對比
因爲ActiveMQ過於依賴JMS的規範而限制了它的發展,因此RabbitMQ在性能和吞吐量上明顯會優於ActiveMQ。 因爲上市時間較長,在可用性、穩定性、可靠性上RabbitMq會比卡夫卡技術成熟,並且RabbitMq使用Erlang開發,因此天生具有高併發高可用的特色。而卡夫卡屬於分佈式系統,它的性能,吞吐量,TPS都會比RabbitMq要強。
二,RabbitMQ的實現原理
2.1生產者(生產者),消費者(消費者),服務中心(代理)之間的關係
首先簡單介紹RabbitMQ的運行原理,在RabbitMQ使用時,系統會先安裝並啓動代理服務器,也就是RabbitMQ的服務中心。不管是生產者(生產者),消費者(消費者)都會經過鏈接池(鏈接)使用TCP / IP協議(默認)來與BrokerServer進行鏈接,而後生產者會把交換/隊列的綁定信息發送到代理服務器,代理服務器根據交換的類型邏輯選擇對應隊列,最後把信息發送到與隊列關聯的對應的消費者。
2.2交換器(交換),隊列(隊列),信道(頻道),綁定(綁定)的概念
2.2.1交換器交換
製片人創建鏈接後,並不是直接將消息投遞到隊列隊列中,而是把消息發送到交換器交易所,由交易所根據不一樣邏輯把消息發送到一個或多個對應的隊列當中。目前交換提供了四種不一樣的經常使用類型:扇出,直接,話題,頭。
此類型是最爲常見的交換器,它會將消息轉發給全部與之綁定的隊列上,好比,有N個隊列與扇出交換器綁定,當產生一條消息時,交易所會將該消息的N個副本分別發給每一個隊列,相似於廣播機制。
此類型的交換會把消息發送到Routing_Key徹底相等的隊列當中。多個Cousumer可使用相同的關鍵字進行綁定,相似於數據庫的一對多關係。好比,生產者以直接類型的交換推送Routing_Key爲直接。 key1的隊列,系統再指定多個Cousumer綁定直接。 key1。如此,消息就會被分發至多個不一樣的Cousumer當中。
此類型是最靈活的一種方式配置方式,它可使用模糊匹配,根據Routing_Key綁定到包含該關鍵字的不一樣隊列中,好比,生產商使用主題類型的交換分別推送Routing_Key設置爲topic.guangdong。 廣州,topic.guangdong。 深圳的不一樣隊列,Cousumer只須要把Routing_Key設置爲topic.guangdong。 #,就能夠把全部消息接收處理。
該類型的交換器與前面介紹的稍有不一樣,它再也不是基於關鍵字Routing_Key進行路由,而是基於多個屬性進行路由的,這些屬性比路由關鍵字更容易表示爲消息的頭。也就是說,用於路由的屬性是取自於消息頭屬性,當消息頭的值與隊列綁定時指定的值相同時,消息就會路由至相應的隊列中。
2.2.2隊列隊列
隊列隊列是消息的載體,每一個消息都會被投入到隊列當中,它包含名稱、耐用、參數等多個屬性,名字用於定義它的名稱,當持久(持久化)爲真時,隊列將會持久化保存到硬盤上。反之爲假時,一旦代理服務器被重啓,對應的隊列就會消失,後面還會有例子做詳細介紹。
2.2.3通道通道
當代理服務器使用鏈接鏈接生產者/ Cousumer時會使用到信道(頻道),一個鏈接上能夠創建多個通道,每一個頻道都有一個會話任務,能夠理解爲邏輯上的鏈接。主要用做管理相關的參數定義,發送消息,獲取消息,事務處理等。
2.2.4綁定綁定
綁定的主要用於綁定交換器交換與隊列隊列之間的對應關係,並記錄路由的Routing-Key.Binding信息會保存到系統當中,用於代理服務器信息的分發依據。
三,RabbitMQ應用實例
3.1兔經常使用類說明
3.1.1 RabbitTemplate類
春天框架已經封裝了RabbitTemplate對RabbitMQ的綁定,隊列發送、接收進行簡化管理
3.2初探RabbitMQ
3.2.1生產者端開發
先在pom中添加RabbitMQ的依賴,並在應用程序。 yml中加入RabbitMQ賬號密碼等信息。此例子,咱們嘗試使用直接交換器把隊列發送到不一樣的消費者。
**********************pom *************************
<project>
.............
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
<version>2.0.5.RELEASE</version>
</dependency>
</project>
**************** application.yml ****************
spring:
application:
name: rabbitMqProducer
rabbitmq:
host: localhost
port: 5672
username: admin
password: 12345678
virtual-host: /LeslieHost複製代碼
首先使用CachingConnectionFactory創建連接,經過BindingBuilder綁定交換,隊列,RoutingKey之間的關係。
而後經過空虛convertAndSend(字符串,字符串routingKey、對象對象CorrelationData數據)方法把信息發送到破碎的服務器
@Configuration
public class ConnectionConfig {
@Value("${spring.rabbitmq.host}")
public String host;
@Value("${spring.rabbitmq.port}")
public int port;
@Value("${spring.rabbitmq.username}")
public String username;
@Value("${spring.rabbitmq.password}")
public String password;
@Value("${spring.rabbitmq.virtual-host}")
public String virtualHost;
@Bean
public ConnectionFactory getConnectionFactory(){
CachingConnectionFactory factory=new CachingConnectionFactory();
factory.setHost(host);
factory.setPort(port);
factory.setUsername(username);
factory.setPassword(password);
factory.setVirtualHost(virtualHost);
return factory;
}
}
@Configuration
public class BindingConfig {
public final static String first="direct.first";
public final static String second="direct.second";
public final static String Exchange_NAME="directExchange";
public final static String RoutingKey1="directKey1";
public final static String RoutingKey2="directKey2";
@Bean
public Queue queueFirst(){
return new Queue(first);
}
@Bean
public Queue queueSecond(){
return new Queue(second);
}
@Bean
public DirectExchange directExchange(){
return new DirectExchange(Exchange_NAME,true,true);
}
//利用BindingBuilder綁定Direct與queueFirst
@Bean
public Binding bindingExchangeFirst(Queue queueFirst, DirectExchange directExchange){
return BindingBuilder.bind(queueFirst).to(directExchange).with(RoutingKey1);
}
//利用BindingBuilder綁定Direct與queueSecond
@Bean
public Binding bindingExchangeSecond(Queue queueSecond, DirectExchange directExchange){
return BindingBuilder.bind(queueSecond).to(directExchange).with(RoutingKey2);
}
}
@Controller
@RequestMapping("/producer")
public class ProducerController {
@Autowired
private RabbitTemplate template;
@RequestMapping("/send")
public void send() {
for(int n=0;n<100;n++){
template.convertAndSend(BindingConfig.Exchange_NAME,BindingConfig.RoutingKey1,"I'm the first queue! "+String.valueOf(n),getCorrelationData());
template.convertAndSend(BindingConfig.Exchange_NAME,BindingConfig.RoutingKey2,"I'm the second queue! "+String.valueOf(n),getCorrelationData());
}
}
private CorrelationData getCorrelationData(){
return new CorrelationData(UUID.randomUUID().toString());
}
}複製代碼
此時,打開RabbitMQ管理界面,可看到生產商已經向破碎的服務器的直接。 第一/直接。 第二個兩個隊列分別發送100個消息
3.2.2消費者端開發
分別創建兩個不一樣的消費者,一個綁定直接。 先別一個綁定直接。 第二,而後經過註解@RabbitListener監聽不一樣的隊列,當接到到製片人推送隊列時,顯示隊列信息。
@Configuration
public class ConnectionConfig {
@Value("${spring.rabbitmq.host}")
public String host;
@Value("${spring.rabbitmq.port}")
public int port;
@Value("${spring.rabbitmq.username}")
public String username;
@Value("${spring.rabbitmq.password}")
public String password;
@Value("${spring.rabbitmq.virtual-host}")
public String virtualHost;
@Bean
public ConnectionFactory getConnectionFactory(){
CachingConnectionFactory factory=new CachingConnectionFactory();
factory.setHost(host);
factory.setPort(port);
factory.setUsername(username);
factory.setPassword(password);
factory.setVirtualHost(virtualHost);
return factory;
}
}
@Configuration
public class BindingConfig {
public final static String first="direct.first";
public final static String Exchange_NAME="directExchange";
public final static String RoutingKey1="directKey1";
@Bean
public Queue queueFirst(){
return new Queue(first);
}
@Bean
public DirectExchange directExchange(){
return new DirectExchange(Exchange_NAME);
}
//利用BindingBuilder綁定Direct與queueFirst
@Bean
public Binding bindingExchangeFirst(Queue queueFirst, DirectExchange directExchange){
return BindingBuilder.bind(queueFirst).to(directExchange).with(RoutingKey1);
}
}
@Configuration
@RabbitListener(queues="direct.first")
public class RabbitMqListener {
@RabbitHandler
public void handler(String message){
System.out.println(message);
}
}
@SpringBootApplication
public class App {
public static void main(String[] args){
SpringApplication.run(App.class, args);
}
}複製代碼
運行後能夠觀察到不一樣的消費者會收到不一樣隊列的消息
若是以爲使用綁定代碼綁定過於繁瑣,還能夠直接在監聽類RabbitMqListener中使用@QueueBinding註解綁定
@Configuration
public class ConnectionConfig {
@Value("${spring.rabbitmq.host}")
public String host;
@Value("${spring.rabbitmq.port}")
public int port;
@Value("${spring.rabbitmq.username}")
public String username;
@Value("${spring.rabbitmq.password}")
public String password;
@Value("${spring.rabbitmq.virtual-host}")
public String virtualHost;
@Bean
public ConnectionFactory getConnectionFactory(){
CachingConnectionFactory factory=new CachingConnectionFactory();
factory.setHost(host);
factory.setPort(port);
factory.setUsername(username);
factory.setPassword(password);
factory.setVirtualHost(virtualHost);
return factory;
}
}
@Configuration
@RabbitListener(bindings=@QueueBinding(
exchange=@Exchange(value="directExchange"),
value=@Queue(value="direct.second"),
key="directKey2"))
public class RabbitMqListener {
@RabbitHandler
public void handler(String message){
System.out.println(message);
}
}
@SpringBootApplication
public class App {
public static void main(String[] args){
SpringApplication.run(App.class, args);
}
}複製代碼
4、製片人端的消息發送與監
前面一節已經介紹了RabbitMQ的基本使用方法,這一節將從更深刻的層面講述生產商的應用。 試想一下這種的情形,若是因RabbitTemplate發送時交換名稱綁定錯誤,或打破服務器因網絡問題或服務負荷過大引起異常,製片人發送的隊列丟失,系統沒法正常工做。此時,開發人員應該進行一系列應對措施進行監測,確保每一個數據都能正常推送到破碎的服務器。有見及此,RabbitMQ專門爲你們提供了兩種解決方案,一是使用傳統的事務模式,二是使用回調函數、下面爲你們做詳介紹。
4.1生產端的事務管理
在須要使用事務時,能夠經過兩種方法 第一能夠調用通道類的方法以傳統模式進行管理、事務開始時調用通道。 txSelect()、信息發送後進行確認通道。 txCommit(),一旦捕捉到異常進行回滾channel.txRollback(),最後關閉事務。
@Controller
@RequestMapping("/producer")
public class ProducerController {
@Autowired
private RabbitTemplate template;
@RequestMapping("/send")
public void send1(HttpServletResponse response)
throws InterruptedException, IOException, TimeoutException{
Channel channel=template.getConnectionFactory().createConnection().createChannel(true);
.......
try{
channel.txSelect();
channel.basicPublish("ErrorExchange", BindingConfig.Routing_Key_First, new AMQP.BasicProperties(),"Nothing".getBytes());
channel.txCommit();
}catch(Exception e){
channel.txRollback();
}finally{
channel.close();
}
......
......
......
}
}複製代碼
第二還能夠直接經過RabbitTemplate的配置方法無效setChannelTransacted (bool isTransacted)直接開啓事務
public class ProducerController {
@Autowired
private ConnectionConfig connection;
@Autowired
@Bean
private RabbitTemplate template(){
RabbitTemplate template=new RabbitTemplate(connection.getConnectionFactory());
template.setChannelTransacted(true);
return template;
}
@RequestMapping("/send")
@Transactional(rollbackFor=Exception.class)
public void send(HttpServletResponse response) throws InterruptedException, IOException,TimeoutException{
..........
..........
..........
}
}複製代碼
4.2利用ConfirmCallback回調確認消息是否成功發送到交易所
使用事務模式消耗的系統資源比較大,系統每每會處理長期等待的狀態,在併發量較高的時候也有可能形成死鎖的隱患。有見及此,系統提供了輕量級的回調函數方式進行異步處理。 當須要確認消息是否成功發送到交換的時候,可使用ConfirmCallback回調函數。使用該函數,系統推送消息後,該線程便會獲得釋放,等交換接收到消息後系統便會異步調用ConfirmCallback綁定的方法進行處理.ConfirmCallback只包含一個方法無效確認(CorrelationData CorrelationData,布爾ack,字符串緣由),此方法會把每條數據發送到交換時候的ack狀態(成功/失敗),致使成敗緣由,及對應的CorrelationData (CorrelationData只包含一個屬性id,是綁定發送對象的惟一標識符)返還到生產商,讓生產者進行相應處理。
注意:在綁定ConfirmCallback回調函數前,請先把publisher-confirms屬性設置爲真實的
spring:
application:
name: rabbitmqproducer
rabbitmq:
host: 127.0.0.1
port: 5672
username: admin
password: 12345678
virtual-host: /LeslieHost複製代碼
例如:下面的例子,特地將RabbitTemplate發送時所綁定的交換名稱填寫爲錯誤名稱「ErrorExchange」,形成發送失敗,而後在回調函數中檢查失敗的緣由。
@Configuration
public class ConnectionConfig {
@Value("${spring.rabbitmq.host}")
public String host;
@Value("${spring.rabbitmq.port}")
public int port;
@Value("${spring.rabbitmq.username}")
public String username;
@Value("${spring.rabbitmq.password}")
public String password;
@Value("${spring.rabbitmq.virtual-host}")
public String virtualHost;
@Bean
public ConnectionFactory getConnectionFactory(){
CachingConnectionFactory factory=new CachingConnectionFactory();
System.out.println(host);
factory.setHost(host);
factory.setPort(port);
factory.setUsername(username);
factory.setPassword(password);
factory.setVirtualHost(virtualHost);
factory.setPublisherConfirms(true);
factory.setPublisherReturns(true);
return factory;
}
}
@Configuration
public class BindingConfig {
public final static String first="direct.first";
public final static String Exchange_NAME="directExchange";
public final static String RoutingKey1="directKey1";
@Bean
public Queue queueFirst(){
return new Queue(first);
}
@Bean
public DirectExchange directExchange(){
return new DirectExchange(Exchange_NAME);
}
@Bean
public Binding bindingExchangeFirst(Queue queueFirst, DirectExchange directExchange){
return BindingBuilder.bind(queueFirst).to(directExchange).with(RoutingKey1);
}
}
@Component
public class MyConfirmCallback implements ConfirmCallback {
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
// TODO 自動生成的方法存根
// TODO 自動生成的方法存根
if(ack){
System.out.println(correlationData.getId()+" ack is: true! \ncause:"+cause);
}else
System.out.println(correlationData.getId()+" ack is: false! \ncause:"+cause);
}
}
@Controller
@RequestMapping("/producer")
public class ProducerController {
@Autowired
private RabbitTemplate template;
@Autowired
private MyConfirmCallback confirmCallback;
@RequestMapping("/send")
public void send() {
template.setConfirmCallback(confirmCallback);
for(int n=0;n<2;n++){
template.convertAndSend("ErrorExchange",
BindingConfig.RoutingKey1,"I'm the first queue! "
+String.valueOf(n),getCorrelationData());
}
}
private CorrelationData getCorrelationData(){
return new CorrelationData(UUID.randomUUID().toString());
}
}複製代碼
@Configuration
public class ConnectionConfig {
@Value("${spring.rabbitmq.host}")
public String host;
@Value("${spring.rabbitmq.port}")
public int port;
@Value("${spring.rabbitmq.username}")
public String username;
@Value("${spring.rabbitmq.password}")
public String password;
@Value("${spring.rabbitmq.virtual-host}")
public String virtualHost;
@Bean
public ConnectionFactory getConnectionFactory(){
CachingConnectionFactory factory=new CachingConnectionFactory();
factory.setHost(host);
factory.setPort(port);
factory.setUsername(username);
factory.setPassword(password);
factory.setVirtualHost(virtualHost);
return factory;
}
}
@Configuration
@RabbitListener(bindings=@QueueBinding(
exchange=@Exchange(value="directExchange"),
value=@Queue(value="direct.first"),
key="directKey1"))
public class RabbitMqListener {
@RabbitHandler
public void handler(String message){
System.out.println(message);
}
}
@SpringBootApplication
public class App {
public static void main(String[] args){
SpringApplication.run(App.class, args);
}
}複製代碼
4.3綁定CorrelationData與發送對象的關係
上面的例子當中,CorrelationData只是用一個隨機的UUID做爲CorrelationID,而在現實的應用場景中,因爲ConfirmCallback只反回標識值CorrelationData,而沒有把隊列裏的對象值也一同返回,因此,在推送隊列時能夠先用鍵-值保存CorrelationID與所發送信息的關係,這樣當ConfirmCallback回調時,就可根據CorrelationID找回對象,做進一步處理。
下面例子,咱們把要發送的對象放在虛擬數據數據源類中,用DataRelation記錄CorrelationID與發送對象OrderID的關係,而後在回調函數ConfirmCallback中根據CorrelationID查找對應的OrderEntity,若是發送成功,則刪除綁定。若是發送失敗,能夠從新發送或根據狀況再做處理。
@Configuration
public class ConnectionConfig {
@Value("${spring.rabbitmq.host}")
public String host;
@Value("${spring.rabbitmq.port}")
public int port;
@Value("${spring.rabbitmq.username}")
public String username;
@Value("${spring.rabbitmq.password}")
public String password;
@Value("${spring.rabbitmq.virtual-host}")
public String virtualHost;
@Bean
public ConnectionFactory getConnectionFactory(){
CachingConnectionFactory factory=new CachingConnectionFactory();
System.out.println(host);
factory.setHost(host);
factory.setPort(port);
factory.setUsername(username);
factory.setPassword(password);
factory.setVirtualHost(virtualHost);
factory.setPublisherConfirms(true);
factory.setPublisherReturns(true);
return factory;
}
}
@Configuration
public class BindingConfig {
public final static String first="direct.first";
//Exchange 使用 direct 模式
public final static String Exchange_NAME="directExchange";
public final static String RoutingKey1="directKey1";
@Bean
public Queue queueFirst(){
return new Queue(first);
}
@Bean
public DirectExchange directExchange(){
return new DirectExchange(Exchange_NAME);
}
@Bean
public Binding bindingExchangeFirst(Queue queueFirst, DirectExchange directExchange){
return BindingBuilder.bind(queueFirst).to(directExchange).with(RoutingKey1);
}
}
@Data
public class OrderEntity implements Serializable{
private String id;
private String goods;
private Double price;
private Integer count;
public OrderEntity(String id,String goods,Double price,Integer count){
this.id=id;
this.goods=goods;
this.price=price;
this.count=count;
}
public OrderEntity(){}
public String getId() {
return id;
}
public void setId(String id) {
this.id = id;
}
public String getGoods() {
return goods;
}
public void setGoodsId(String goods) {
this.goods = goods;
}
public Integer getCount() {
return count;
}
public void setCount(Integer count) {
this.count = count;
}
public Double getPrice() {
return price;
}
public void setPrice(Double price) {
this.price = price;
}
}
@Component
public class DataSource {
//加入虛擬數據
private static List<OrderEntity> list=new ArrayList<OrderEntity>(
Arrays.asList(new OrderEntity("001","Nikon D750",13990.00,1),
new OrderEntity("002","Huwei P30 Plus",5400.00,1),
..........));
public DataSource(){
}
public List<OrderEntity> getOrderList(){
return list;
}
//根據Id獲取對應order
public OrderEntity getOrder(String id){
for(OrderEntity order:list){
if(order.getId()==id)
return order;
}
return null;
}
}
public class DataRelation {
public static Map map=new HashMap();
//綁定關係
public static void add(String key,String value){
if(!map.containsKey(key))
map.put(key,value);
}
//返回orderId
public static Object get(String key){
if(map.containsKey(key))
return map.get(key);
else
return null;
}
//根據 orderId 刪除綁定關係
public static void del(String key){
if(map.containsKey(key))
map.remove(key);
}
}
@Component
public class MyConfirmCallback implements ConfirmCallback {
@Autowired
private DataSource datasource;
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
String correlationId=correlationData.getId();
//根據 correclationId取回對應的orderId
String orderId=DataRelation.get(correlationId).toString();
//在datasource中找回對應的order
OrderEntity order=datasource.getOrder(orderId);
if(ack){
System.out.println("--------------------ConfirmCallback-------------------\n"
+" order's ack is true!\nId:"+order.getId()+" Goods:"+order.getGoods()
+" Count:"+order.getCount().toString()+" Price:"+order.getPrice());
DataRelation.del(correlationId); //操做完成刪除對應綁定
}else {
System.out.println(order.getId()+" order's ack is: false! \ncause:"+cause);
//可在記錄日誌後把Order推送到隊列進行從新發送
.......
}
}
}
@Controller
@RequestMapping("/producer")
public class ProducerController {
@Autowired
private RabbitTemplate template;
@Autowired
private MyConfirmCallback confirmCallback;
@Autowired
private DataSource dataSource;
@RequestMapping("/send")
public void send() throws InterruptedException, IOException{
//綁定 ConfirmCallback 回調函數
template.setConfirmCallback(confirmCallback);
for(OrderEntity order:dataSource.getOrderList()){
CorrelationData correlationData=getCorrelationData();
//保存 CorrelationId 與 orderId關係
DataRelation.add(correlationData.getId(), order.getId());
//把 order 插入隊列
template.convertAndSend("directExchange",BindingConfig.RoutingKey1,order,correlationData);
}
}
private CorrelationData getCorrelationData(){
return new CorrelationData(UUID.randomUUID().toString());
}
}複製代碼
@Configuration
public class ConnectionConfig {
@Value("${spring.rabbitmq.host}")
public String host;
@Value("${spring.rabbitmq.port}")
public int port;
@Value("${spring.rabbitmq.username}")
public String username;
@Value("${spring.rabbitmq.password}")
public String password;
@Value("${spring.rabbitmq.virtual-host}")
public String virtualHost;
@Bean
public ConnectionFactory getConnectionFactory(){
CachingConnectionFactory factory=new CachingConnectionFactory();
factory.setHost(host);
factory.setPort(port);
factory.setUsername(username);
factory.setPassword(password);
factory.setVirtualHost(virtualHost);
return factory;
}
}
@Configuration
@RabbitListener(bindings=@QueueBinding(
exchange=@Exchange(value="directExchange"),
value=@Queue(value="direct.first"),
key="directKey1"))
public class RabbitMqListener {
@RabbitHandler
public void handler(String message){
System.out.println(message);
}
}
@SpringBootApplication
public class App {
public static void main(String[] args){
SpringApplication.run(App.class, args);
}
}複製代碼
4.4利用ReturnCallback處理隊列隊列錯誤
使用ConfirmCallback函數只能判斷消息是否成功發送到交易所,但並不能保證消息已經成功進行隊列隊列。因此,系統預備了另外一個回調函數ReturnCallback來監聽隊列隊列處理的成敗。若是隊列錯誤綁定不存在的隊列,或者打破服務器瞬間出現問題末能找到對應的隊列,系統就會激發生產者端ReturnCallback的回調函數來進行錯誤處理。ReturnCallback回調接口只包含一個方法無效returnedMessage (replyText消息消息、int replyCode字符串,字符串,字符串routingKey),它會把出錯的replyCode, replyText,交換,routingKey等值都一塊兒返還。與ConfirmCallback不一樣的是,returnedMessage會把隊列中的對象保存到消息的正文屬性中並返還到回調函數。
注意:在綁定ReturnCallback回調函數前,請先把publisher-returns及強制性屬性設置爲真的。強制參數默認爲假,用於判斷了服務器是否把錯誤的對象返還到製片人。如末進行設置,系統將把錯誤的消息丟棄。
下面例子咱們在調用convertAndSend方法時特地把routingKey設置爲ErrorKey,觸發ReturnCallback回調,而後在ReturenCallback的回調方法顯示replyCode, replyText,交換,routingKey等值,並把隊列中對象屬性一併顯示。
@Configuration
public class ConnectionConfig {
@Value("${spring.rabbitmq.host}")
public String host;
@Value("${spring.rabbitmq.port}")
public int port;
@Value("${spring.rabbitmq.username}")
public String username;
@Value("${spring.rabbitmq.password}")
public String password;
@Value("${spring.rabbitmq.virtual-host}")
public String virtualHost;
@Bean
public ConnectionFactory getConnectionFactory(){
CachingConnectionFactory factory=new CachingConnectionFactory();
System.out.println(host);
factory.setHost(host);
factory.setPort(port);
factory.setUsername(username);
factory.setPassword(password);
factory.setVirtualHost(virtualHost);
factory.setPublisherConfirms(true);
factory.setPublisherReturns(true);
return factory;
}
}
@Configuration
public class BindingConfig {
public final static String first="direct.first";
public final static String Exchange_NAME="directExchange";
public final static String RoutingKey1="directKey1";
@Bean
public Queue queueFirst(){
return new Queue(first);
}
@Bean
public DirectExchange directExchange(){
return new DirectExchange(Exchange_NAME);
}
@Bean
public Binding bindingExchangeFirst(Queue queueFirst, DirectExchange directExchange){
return BindingBuilder.bind(queueFirst).to(directExchange).with(RoutingKey1);
}
}
@Data
public class OrderEntity implements Serializable{
private String id;
private String goods;
private Double price;
private Integer count;
public OrderEntity(String id,String goods,Double price,Integer count){
this.id=id;
this.goods=goods;
this.price=price;
this.count=count;
}
public OrderEntity(){}
public String getId() {
return id;
}
public void setId(String id) {
this.id = id;
}
public String getGoods() {
return goods;
}
public void setGoodsId(String goods) {
this.goods = goods;
}
public Integer getCount() {
return count;
}
public void setCount(Integer count) {
this.count = count;
}
public Double getPrice() {
return price;
}
public void setPrice(Double price) {
this.price = price;
}
}
@Component
public class DataSource {
//虛擬數據
private static List<OrderEntity> list=new ArrayList<OrderEntity>(
Arrays.asList(new OrderEntity("001","Nikon D750",13990.00,1),
new OrderEntity("002","Huwei P30 Plus",5400.00,1),
......));
public DataSource(){
}
public List<OrderEntity> getOrderList(){
return list;
}
//根據Id獲取對應order
public OrderEntity getOrder(String id){
for(OrderEntity order:list){
if(order.getId()==id)
return order;
}
return null;
}
}
@Component
public class MyReturnCallback implements ReturnCallback {
@Override
public void returnedMessage(Message message, int replyCode,
String replyText, String exchange, String routingKey){
//把messageBody反序列化爲 OrderEntity對象
OrderEntity order=convertToOrder(message.getBody());
//顯示錯誤緣由
System.out.println("-------------ReturnCallback!------------\n"
+" exchange:"+exchange+" replyCode:"+String.valueOf(replyCode)
+" replyText:"+replyText+" key:"+routingKey+"\n OrderId:"+order.getId()
+" Goods:"+order.getGoods()+" Count:"+order.getCount().toString()
+" Price:"+order.getPrice()+" ");
}
//把byte[]反序列化爲 OrderEntity對象
private OrderEntity convertToOrder(byte[] bytes){
OrderEntity order=null;
ByteArrayInputStream bis = new ByteArrayInputStream (bytes);
ObjectInputStream ois;
try {
ois = new ObjectInputStream (bis);
Object obj = ois.readObject();
order=(OrderEntity)obj;
ois.close();
bis.close();
} catch (IOException | ClassNotFoundException e) {
// TODO 自動生成的 catch 塊
e.printStackTrace();
}
return order;
}
}
@Controller
@RequestMapping("/producer")
public class ProducerController {
@Autowired
private RabbitTemplate template;
@Autowired
private MyReturnCallback returnCallback;
@Autowired
private DataSource dataSource;
@RequestMapping("/send")
public void send() throws InterruptedException, IOException{
//把 mandatory 屬性設定爲true
template.setMandatory(true);
//綁定 ReturnCallback 回調函數
template.setReturnCallback(returnCallback);
for(OrderEntity order:dataSource.getOrderList()){
CorrelationData correlationData=getCorrelationData();
template.convertAndSend("directExchange","ErrorKey",order,correlationData);
}
}
private CorrelationData getCorrelationData(){
return new CorrelationData(UUID.randomUUID().toString());
}
}複製代碼
@Configuration
public class ConnectionConfig {
@Value("${spring.rabbitmq.host}")
public String host;
@Value("${spring.rabbitmq.port}")
public int port;
@Value("${spring.rabbitmq.username}")
public String username;
@Value("${spring.rabbitmq.password}")
public String password;
@Value("${spring.rabbitmq.virtual-host}")
public String virtualHost;
@Bean
public ConnectionFactory getConnectionFactory(){
CachingConnectionFactory factory=new CachingConnectionFactory();
factory.setHost(host);
factory.setPort(port);
factory.setUsername(username);
factory.setPassword(password);
factory.setVirtualHost(virtualHost);
return factory;
}
}
@Configuration
@RabbitListener(bindings=@QueueBinding(
exchange=@Exchange(value="directExchange"),
value=@Queue(value="direct.first"),
key="directKey1"))
public class RabbitMqListener {
@RabbitHandler
public void handler(String message){
System.out.println(message);
}
}
@SpringBootApplication
public class App {
public static void main(String[] args){
SpringApplication.run(App.class, args);
}
}複製代碼
5、消費者消息接收管控
在第四節主要介紹了生產者端的隊列發送與監控,它只能管理生產商與代理服務器之間的通訊,但並不能確認消費者是否能成功接收到隊列,在這節內容將介紹消費者端的隊列接收與監聽。前面幾節裏,消費者端都是簡單地直接使用RabbitListener對隊列進行監聽,其實RabbitMQ已經爲用戶準備了功能更強大的MessageListenerContainer容器用於管理消息,下面將爲你們介紹。
5.1 AbstractMessageListenerContainer介紹
AbstractMeessageListenerContainer虛擬類是RabbitMQ封裝好的一個容器,自己並無對消息進行處理,而是把消息的處理方式交給了MessageListener。而它的主要功能是實現MessageListener的綁定,ApplicationContext上下文的綁定,ErrorHandler的錯誤處理方法的綁定,對消息消費的開始,結束等等默認參數進行配置,讓開發人員能夠在容器中對消費者實現統一管理.SimpleMessageListenerContainer, DirectMessageLinstenerCoontainer都是它的子類,分別應用於不一樣的場景,在下面會再做詳細介紹。
MessageListener是監聽消息最經常使用的傾聽者,它只包含了一個方法無效onMessage(消息消息),這是消息接收最經常使用的一個方法,開發者只須要實現此方法便可對接收到的消息進行處理。
ChannelAwareMessageListener至關因而MessageListener的一個擴展,包含了方法無效onMessage(消息消息,頻道頻道),除了對消息進行處理外,還能夠對接收此消息的通道進行檢測。
5.2 SimpleMessageListenerContainer經常使用方法
SimpleMessageListenerContainer是最經常使用的MessageListener容器,它能夠經過下面的方法設置默認消費者數量與最大的消費者數量。下面例子中嘗試把consurrentConsumers設置爲3,把maxConcurrentConsumers設置爲4,並同時監控直接模式交換器的direct.first,直接。 第二隊列。
經過截圖能夠看的到,系統默認會爲每一個隊列都建立3個消費者,不一樣的隊列中的消費者是共享相同的3個頻道。
當生產者端發送消息時,消費者的實際數量可根據maxConcurrentConsumers的配置限制進行擴展。
@Configuration
public class BindingConfig {
public final static String first="direct.first";
public final static String second="direct.second";
public final static String Exchange_NAME="directExchange";
public final static String RoutingKey1="directKey1";
public final static String RoutingKey2="directKey2";
@Bean
public Queue queueFirst(){
return new Queue(first);
}
@Bean
public Queue queueSecond(){
return new Queue(second);
}
@Bean
public DirectExchange directExchange(){
return new DirectExchange(Exchange_NAME);
}
//利用BindingBuilder綁定Direct與queueFirst
@Bean
public Binding bindingExchangeFirst(Queue queueFirst, DirectExchange directExchange){
return BindingBuilder.bind(queueFirst).to(directExchange).with(RoutingKey1);
}
//利用BindingBuilder綁定Direct與queueSecond
@Bean
public Binding bindingExchangeSecond(Queue queueSecond, DirectExchange directExchange){
return BindingBuilder.bind(queueSecond).to(directExchange).with(RoutingKey2);
}
}
@Configuration
public class ConnectionConfig {
@Value("${spring.rabbitmq.host}")
public String host;
@Value("${spring.rabbitmq.port}")
public int port;
@Value("${spring.rabbitmq.username}")
public String username;
@Value("${spring.rabbitmq.password}")
public String password;
@Value("${spring.rabbitmq.virtual-host}")
public String virtualHost;
@Bean
public ConnectionFactory getConnectionFactory(){
CachingConnectionFactory factory=new CachingConnectionFactory();
factory.setHost(host);
factory.setPort(port);
factory.setUsername(username);
factory.setPassword(password);
factory.setVirtualHost(virtualHost);
return factory;
}
}
@Controller
@RequestMapping("/producer")
public class ProducerController {
@Autowired
private RabbitTemplate template;
@RequestMapping("/send")
public void send(HttpServletResponse response) throws InterruptedException, IOException{
for(Integer n=0;n<100;n++){
CorrelationData correlationData=getCorrelationData();
template.convertAndSend("directExchange","directKey1",
"queue1"+" "+n.toString(),correlationData);
template.convertAndSend("directExchange","directKey2"," queue2"+" "+n.toString(),correlationData);
Thread.currentThread().sleep(30);
}
}
private CorrelationData getCorrelationData(){
return new CorrelationData(UUID.randomUUID().toString());
}
}複製代碼
@Configuration
public class ConnectionConfig {
@Value("${spring.rabbitmq.host}")
public String host;
@Value("${spring.rabbitmq.port}")
public int port;
@Value("${spring.rabbitmq.username}")
public String username;
@Value("${spring.rabbitmq.password}")
public String password;
@Value("${spring.rabbitmq.virtual-host}")
public String virtualHost;
@Bean
public ConnectionFactory getConnectionFactory(){
CachingConnectionFactory factory=new CachingConnectionFactory();
factory.setHost(host);
factory.setPort(port);
factory.setUsername(username);
factory.setPassword(password);
factory.setVirtualHost(virtualHost);
return factory;
}
}
@Configuration
public class BindingConfig {
public final static String first="direct.first";
public final static String second="direct.second";
public final static String Exchange_NAME="directExchange";
public final static String RoutingKey1="directKey1";
public final static String RoutingKey2="directKey2";
@Bean
public Queue queueFirst(){
return new Queue(first);
}
@Bean
public Queue queueSecond(){
return new Queue(second);
}
@Bean
public DirectExchange directExchange(){
return new DirectExchange(Exchange_NAME);
}
//利用BindingBuilder綁定Direct與queueFirst
@Bean
public Binding bindingExchangeFirst(Queue queueFirst, DirectExchange directExchange){
return BindingBuilder.bind(queueFirst).to(directExchange).with(RoutingKey1);
}
//利用BindingBuilder綁定Direct與queueSecond
@Bean
public Binding bindingExchangeSecond(Queue queueSecond, DirectExchange directExchange){
return BindingBuilder.bind(queueSecond).to(directExchange).with(RoutingKey2);
}
}
@Configuration
public class SimpleMessListener {
@Autowired
private RabbitTemplate template;
private int index=0;
@Bean
public SimpleMessageListenerContainer messageContainer(){
SimpleMessageListenerContainer container=new SimpleMessageListenerContainer();
container.setConnectionFactory(connectionConfig.getConnectionFactory());
// 綁定Queue1/Queue2
container.setQueueNames("direct.first");
container.addQueueNames("direct.second");
//設置默認 consumer 數爲3
container.setConcurrentConsumers(3);
//設置最大 consumer 數爲4
container.setMaxConcurrentConsumers(4);
//標記 consumerTag
container.setConsumerTagStrategy(queue -> "consumer"+(++index));
//綁定MessageListener顯示接收信息
container.setMessageListener(new MessageListener(){
@Override
public void onMessage(Message message) {
// TODO 自動生成的方法存根
Thread thread=Thread.currentThread();
MessageProperties messProp=message.getMessageProperties();
try {
System.out.println(" ConsumerTag:"+messProp.getConsumerTag()
+" ThreadId is:"+thread.getId()+" Queue:"+messProp.getConsumerQueue()
+" "+new String(message.getBody(),"UTF-8"));
} catch (UnsupportedEncodingException e) {
// TODO 自動生成的 catch 塊
e.printStackTrace();
}
}
});
return container;
}
}複製代碼
5.3 SimpleMessageListenerContainer的運做原理
在SimpleMessageListenerContainer模式中,不管系統監聽多少個隊列隊列,通道都是共享的,相似上面的例子,4個頻道會把接收到不一樣的隊列請求並分發到對應的消費者進行處理。這樣作的好處是系統能夠經過concurrentConsumers, maxConcurrentConsumers靈活設定當前隊列中消費者的數量,系統能夠跟據實際需求靈活處理。但因爲每一個通道都是在固定線程中運行的,一個頻道要遊走於多個消費者當中,這無疑增長了系統在上下文切換中的開銷。下面用系統提供的ChannelAwareMessageListener接口,以更直觀的例子說明一下SimpleMessageListenerContainer當中,隊列,消費者之間的關係。
@Configuration
public class BindingConfig {
public final static String first="direct.first";
public final static String second="direct.second";
public final static String Exchange_NAME="directExchange";
public final static String RoutingKey1="directKey1";
public final static String RoutingKey2="directKey2";
@Bean
public Queue queueFirst(){
return new Queue(first);
}
@Bean
public Queue queueSecond(){
return new Queue(second);
}
@Bean
public DirectExchange directExchange(){
return new DirectExchange(Exchange_NAME);
}
//利用BindingBuilder綁定Direct與queueFirst
@Bean
public Binding bindingExchangeFirst(Queue queueFirst, DirectExchange directExchange){
return BindingBuilder.bind(queueFirst).to(directExchange).with(RoutingKey1);
}
//利用BindingBuilder綁定Direct與queueSecond
@Bean
public Binding bindingExchangeSecond(Queue queueSecond, DirectExchange directExchange){
return BindingBuilder.bind(queueSecond).to(directExchange).with(RoutingKey2);
}
}
@Configuration
public class ConnectionConfig {
@Value("${spring.rabbitmq.host}")
public String host;
@Value("${spring.rabbitmq.port}")
public int port;
@Value("${spring.rabbitmq.username}")
public String username;
@Value("${spring.rabbitmq.password}")
public String password;
@Value("${spring.rabbitmq.virtual-host}")
public String virtualHost;
@Bean
public ConnectionFactory getConnectionFactory(){
CachingConnectionFactory factory=new CachingConnectionFactory();
factory.setHost(host);
factory.setPort(port);
factory.setUsername(username);
factory.setPassword(password);
factory.setVirtualHost(virtualHost);
return factory;
}
}
@Controller
@RequestMapping("/producer")
public class ProducerController {
@Autowired
private RabbitTemplate template;
@RequestMapping("/send")
public void send(HttpServletResponse response) throws InterruptedException, IOException{
for(Integer n=0;n<100;n++){
CorrelationData correlationData=getCorrelationData();
template.convertAndSend("directExchange","directKey1",
" queue1"+" "+n.toString(),correlationData);
template.convertAndSend("directExchange","directKey2",
"queue2"+" "+n.toString(),correlationData);
Thread.currentThread().sleep(30);
}
}
private CorrelationData getCorrelationData(){
return new CorrelationData(UUID.randomUUID().toString());
}
}複製代碼
@Configuration
public class ConnectionConfig {
@Value("${spring.rabbitmq.host}")
public String host;
@Value("${spring.rabbitmq.port}")
public int port;
@Value("${spring.rabbitmq.username}")
public String username;
@Value("${spring.rabbitmq.password}")
public String password;
@Value("${spring.rabbitmq.virtual-host}")
public String virtualHost;
@Bean
public ConnectionFactory getConnectionFactory(){
CachingConnectionFactory factory=new CachingConnectionFactory();
factory.setHost(host);
factory.setPort(port);
factory.setUsername(username);
factory.setPassword(password);
factory.setVirtualHost(virtualHost);
return factory;
}
}
@Configuration
public class BindingConfig {
public final static String first="direct.first";
public final static String second="direct.second";
public final static String Exchange_NAME="directExchange";
public final static String RoutingKey1="directKey1";
public final static String RoutingKey2="directKey2";
@Bean
public Queue queueFirst(){
return new Queue(first);
}
@Bean
public Queue queueSecond(){
return new Queue(second);
}
@Bean
public DirectExchange directExchange(){
return new DirectExchange(Exchange_NAME);
}
//利用BindingBuilder綁定Direct與queueFirst
@Bean
public Binding bindingExchangeFirst(Queue queueFirst, DirectExchange directExchange){
return BindingBuilder.bind(queueFirst).to(directExchange).with(RoutingKey1);
}
//利用BindingBuilder綁定Direct與queueSecond
@Bean
public Binding bindingExchangeSecond(Queue queueSecond, DirectExchange directExchange){
return BindingBuilder.bind(queueSecond).to(directExchange).with(RoutingKey2);
}
}
@Configuration
public class SimpleMessListener {
@Autowired
private RabbitTemplate template;
@Autowired
private ConnectionConfig connectionConfig;
private int index=0;
@Bean
public SimpleMessageListenerContainer messageContainer(){
SimpleMessageListenerContainer container=new SimpleMessageListenerContainer();
container.setConnectionFactory(connectionConfig.getConnectionFactory());
// 綁定Queue1/Queue2
container.setQueueNames("direct.first");
container.addQueueNames("direct.second");
//設置默認 consumer 數爲3
container.setConcurrentConsumers(3);
//設置最大 consumer 數爲4
container.setMaxConcurrentConsumers(4);
//標記 consumerTag
container.setConsumerTagStrategy(queue -> "consumer"+(++index));
//綁定ChannelAwareMessageListener顯示接收信息
container.setChannelAwareMessageListener(new ChannelAwareMessageListener(){
@Override
public void onMessage(Message message, com.rabbitmq.client.Channel channel)
throws Exception {
// TODO 自動生成的方法存根
// TODO 自動生成的方法存根
Thread thread=Thread.currentThread();
System.out.println("Channel:"+channel.getChannelNumber()
+" ThreadId is:"+thread.getId()
+" ConsumerTag:"+message.getMessageProperties().getConsumerTag()
+" Queue:"+message.getMessageProperties().getConsumerQueue());
}
});
return container;
}
}複製代碼
觀察運行結果能夠看到:每一個頻道都在固定的線程中運行,一個頻道會向不一樣的消費者發送隊列信息了。解頻道,線程,隊列,消費者之間的關係,會對SimpleMessageListenerContainer有更深刻認識。
5.4 DirectMessageListenerContainer
SimpleMessageListenerContainer是經典的容器,使用通道共享,一旦某個頻道關閉或重啓,意味着每一個隊列隊列中使用當通道前的消費者都會受到影響。有見及此,在RabbitMQ 2.0後,系統引入了DirectMessageListenerContainer,它容許每一個消費者都有各自的對應的通道的通道只管理負責管理當前消費者的通道。這樣令消費者運用更靈活,同時線程並無跟頻道綁定,而是由獨立的線程池進行管理,這是更好地解決了SimpleMessageListenerContainer中上下文切換所帶來的資源消耗問題。
下面的例子,咱們嘗試使用把consumersPerQueue設置爲4,並同時監控直接模式交換的direct.first,直接。 第二隊列。
從管理界面能夠看的到,系統會爲每一個消費者都生成一個獨立的通道進行管理。
@Configuration
public class BindingConfig {
public final static String first="direct.first";
public final static String second="direct.second";
public final static String Exchange_NAME="directExchange";
public final static String RoutingKey1="directKey1";
public final static String RoutingKey2="directKey2";
@Bean
public Queue queueFirst(){
return new Queue(first);
}
@Bean
public Queue queueSecond(){
return new Queue(second);
}
@Bean
public DirectExchange directExchange(){
return new DirectExchange(Exchange_NAME);
}
//利用BindingBuilder綁定Direct與queueFirst
@Bean
public Binding bindingExchangeFirst(Queue queueFirst, DirectExchange directExchange){
return BindingBuilder.bind(queueFirst).to(directExchange).with(RoutingKey1);
}
//利用BindingBuilder綁定Direct與queueSecond
@Bean
public Binding bindingExchangeSecond(Queue queueSecond, DirectExchange directExchange){
return BindingBuilder.bind(queueSecond).to(directExchange).with(RoutingKey2);
}
}
@Configuration
public class ConnectionConfig {
@Value("${spring.rabbitmq.host}")
public String host;
@Value("${spring.rabbitmq.port}")
public int port;
@Value("${spring.rabbitmq.username}")
public String username;
@Value("${spring.rabbitmq.password}")
public String password;
@Value("${spring.rabbitmq.virtual-host}")
public String virtualHost;
@Bean
public ConnectionFactory getConnectionFactory(){
CachingConnectionFactory factory=new CachingConnectionFactory();
factory.setHost(host);
factory.setPort(port);
factory.setUsername(username);
factory.setPassword(password);
factory.setVirtualHost(virtualHost);
return factory;
}
}
@Controller
@RequestMapping("/producer")
public class ProducerController {
@Autowired
private RabbitTemplate template;
@RequestMapping("/send")
public void send(HttpServletResponse response) throws InterruptedException, IOException{
for(Integer n=0;n<100;n++){
CorrelationData correlationData=getCorrelationData();
template.convertAndSend("directExchange","directKey1",
" queue1"+" "+n.toString(),correlationData);
template.convertAndSend("directExchange","directKey2",
"queue2"+" "+n.toString(),correlationData);
Thread.currentThread().sleep(30);
}
}
private CorrelationData getCorrelationData(){
return new CorrelationData(UUID.randomUUID().toString());
}
}複製代碼
@Configuration
public class ConnectionConfig {
@Value("${spring.rabbitmq.host}")
public String host;
@Value("${spring.rabbitmq.port}")
public int port;
@Value("${spring.rabbitmq.username}")
public String username;
@Value("${spring.rabbitmq.password}")
public String password;
@Value("${spring.rabbitmq.virtual-host}")
public String virtualHost;
@Bean
public ConnectionFactory getConnectionFactory(){
CachingConnectionFactory factory=new CachingConnectionFactory();
factory.setHost(host);
factory.setPort(port);
factory.setUsername(username);
factory.setPassword(password);
factory.setVirtualHost(virtualHost);
return factory;
}
}
@Configuration
public class BindingConfig {
public final static String first="direct.first";
public final static String second="direct.second";
public final static String Exchange_NAME="directExchange";
public final static String RoutingKey1="directKey1";
public final static String RoutingKey2="directKey2";
@Bean
public Queue queueFirst(){
return new Queue(first);
}
@Bean
public Queue queueSecond(){
return new Queue(second);
}
@Bean
public DirectExchange directExchange(){
return new DirectExchange(Exchange_NAME);
}
//利用BindingBuilder綁定Direct與queueFirst
@Bean
public Binding bindingExchangeFirst(Queue queueFirst, DirectExchange directExchange){
return BindingBuilder.bind(queueFirst).to(directExchange).with(RoutingKey1);
}
//利用BindingBuilder綁定Direct與queueSecond
@Bean
public Binding bindingExchangeSecond(Queue queueSecond, DirectExchange directExchange){
return BindingBuilder.bind(queueSecond).to(directExchange).with(RoutingKey2);
}
}
@Configuration
public class DirectMessListener {
@Autowired
private ConnectionConfig connectionConfig;
@Autowired
private RabbitTemplate template;
private int index=0;
@Bean
public DirectMessageListenerContainer messageContainer(){
DirectMessageListenerContainer container=new DirectMessageListenerContainer();
container.setConnectionFactory(connectionConfig.getConnectionFactory());
// 設置每一個隊列的 consumer 數量
container.setConsumersPerQueue(4);
container.addQueueNames("direct.first");
container.addQueueNames("direct.second");
container.setConsumerTagStrategy(queue -> "consumer"+(++index));
container.setMessageListener(new ChannelAwareMessageListener(){
@Override
public void onMessage(Message message, com.rabbitmq.client.Channel channel)
throws Exception {
// TODO 自動生成的方法存根
// TODO 自動生成的方法存根
Thread thread=Thread.currentThread();
System.out.println("Channel:"+channel.getChannelNumber()
+" ThreadId is:"+thread.getId()
+" ConsumerTag:"+message.getMessageProperties().getConsumerTag()
+" Queue:"+message.getMessageProperties().getConsumerQueue());
}
});
return container;
}
}複製代碼
經過運行結果進一步能夠證明,消費者信息接收是由獨立的線程池進行管理的,並無與通道綁定,每一個消費者都有本身單獨的通道,即便頻道發生問題時,也不會對其餘的消費者發生影響,這正是DirectMessageListenerContainer的優勝之處。
5.5消費者的信息接收確認方式
在第四節曾經介紹過在生產者端利用ConfirmCallback / ReturnCallback監控信息的發送,在這節將爲你們在消費者端監控信息的接收。
消費者的信息接收確認模式能夠經過AcknowledgeMode設定,一共有三種模式:沒有,手動,自動,默認是汽車模式。其中沒有爲系統確認,手動是手動確認。
而汽車爲自動模式,系統能夠根據執行狀況自動發送ack /納。若是方法未拋出異常,則發送ack。若是拋出異常AmqpRejectAndDontRequeueException顧名思義消息被拒絕且不會從新加入隊列。若是方法拋出非AmqpRejectAndDontRequeueException異常,則系統發送納消息重歸隊列。
AcknowledgeMode配置爲手動後,用戶可經過通道類的空白basicAck(長deliveryTag,布爾多個)方法手動確認消息接收是否成功。
若檢測到有異,常可經過空虛basicReject(長deliveryTag,布爾requeue)或無效basicNack(長deliveryTag,布爾多,布爾requeue)確認是否從新把消息推送。
經過配置prefetchCount可設置消費者每次接收到的信息數量,系統默認值爲250,這表示當消費者隊列接收到250請求其狀態皆爲unacked時,代理服務器將暫停向消費者發送消息,待消息處理後再繼續。
下面例子中咱們嘗試把prefetchCount設置爲10,即每一個消費者單次最多接收到的消息爲10條,並把consumersPerQueue設置爲4,而後把AcknowledgeMode設置爲手動,經過手動確認消息接收,一旦發生錯誤,消息從新加入隊列。
@Configuration
public class BindingConfig {
public final static String first="direct.first";
public final static String second="direct.second";
public final static String Exchange_NAME="directExchange";
public final static String RoutingKey1="directKey1";
public final static String RoutingKey2="directKey2";
@Bean
public Queue queueFirst(){
return new Queue(first);
}
@Bean
public Queue queueSecond(){
return new Queue(second);
}
@Bean
public DirectExchange directExchange(){
return new DirectExchange(Exchange_NAME);
}
//利用BindingBuilder綁定Direct與queueFirst
@Bean
public Binding bindingExchangeFirst(Queue queueFirst, DirectExchange directExchange){
return BindingBuilder.bind(queueFirst).to(directExchange).with(RoutingKey1);
}
//利用BindingBuilder綁定Direct與queueSecond
@Bean
public Binding bindingExchangeSecond(Queue queueSecond, DirectExchange directExchange){
return BindingBuilder.bind(queueSecond).to(directExchange).with(RoutingKey2);
}
}
@Configuration
public class ConnectionConfig {
@Value("${spring.rabbitmq.host}")
public String host;
@Value("${spring.rabbitmq.port}")
public int port;
@Value("${spring.rabbitmq.username}")
public String username;
@Value("${spring.rabbitmq.password}")
public String password;
@Value("${spring.rabbitmq.virtual-host}")
public String virtualHost;
@Bean
public ConnectionFactory getConnectionFactory(){
CachingConnectionFactory factory=new CachingConnectionFactory();
factory.setHost(host);
factory.setPort(port);
factory.setUsername(username);
factory.setPassword(password);
factory.setVirtualHost(virtualHost);
return factory;
}
}
@Controller
@RequestMapping("/producer")
public class ProducerController {
@Autowired
private RabbitTemplate template;
@RequestMapping("/send")
public void send(HttpServletResponse response) throws InterruptedException, IOException{
for(Integer n=0;n<100;n++){
CorrelationData correlationData=getCorrelationData();
template.convertAndSend("directExchange","directKey1",
" queue1"+" "+n.toString(),correlationData);
template.convertAndSend("directExchange","directKey2",
"queue2"+" "+n.toString(),correlationData);
}
}
private CorrelationData getCorrelationData(){
return new CorrelationData(UUID.randomUUID().toString());
}
}複製代碼
運行後可看到代理服務器每條隊列會有100條數據處於待處理狀態
@Configuration
public class ConnectionConfig {
@Value("${spring.rabbitmq.host}")
public String host;
@Value("${spring.rabbitmq.port}")
public int port;
@Value("${spring.rabbitmq.username}")
public String username;
@Value("${spring.rabbitmq.password}")
public String password;
@Value("${spring.rabbitmq.virtual-host}")
public String virtualHost;
@Bean
public ConnectionFactory getConnectionFactory(){
CachingConnectionFactory factory=new CachingConnectionFactory();
factory.setHost(host);
factory.setPort(port);
factory.setUsername(username);
factory.setPassword(password);
factory.setVirtualHost(virtualHost);
return factory;
}
}
@Configuration
public class BindingConfig {
public final static String first="direct.first";
public final static String second="direct.second";
public final static String Exchange_NAME="directExchange";
public final static String RoutingKey1="directKey1";
public final static String RoutingKey2="directKey2";
@Bean
public Queue queueFirst(){
return new Queue(first);
}
@Bean
public Queue queueSecond(){
return new Queue(second);
}
@Bean
public DirectExchange directExchange(){
return new DirectExchange(Exchange_NAME);
}
//利用BindingBuilder綁定Direct與queueFirst
@Bean
public Binding bindingExchangeFirst(Queue queueFirst, DirectExchange directExchange){
return BindingBuilder.bind(queueFirst).to(directExchange).with(RoutingKey1);
}
//利用BindingBuilder綁定Direct與queueSecond
@Bean
public Binding bindingExchangeSecond(Queue queueSecond, DirectExchange directExchange){
return BindingBuilder.bind(queueSecond).to(directExchange).with(RoutingKey2);
}
}
@Configuration
public class DirectMessListener {
@Autowired
private ConnectionConfig connectionConfig;
@Autowired
private RabbitTemplate template;
private int index=0;
@Bean
public DirectMessageListenerContainer messageContainer(){
DirectMessageListenerContainer container=new DirectMessageListenerContainer();
container.setConnectionFactory(connectionConfig.getConnectionFactory());
// 設置每一個隊列的 consumer 數量
container.setConsumersPerQueue(4);
// 設置每一個 consumer 每次的接收的消息數量爲10個
container.setPrefetchCount(10);
// 使用MANUAL進行手動確認
container.setAcknowledgeMode(AcknowledgeMode.MANUAL);
container.addQueueNames("direct.first");
container.addQueueNames("direct.second");
container.setConsumerTagStrategy(queue -> "consumer"+(++index));
container.setMessageListener(new ChannelAwareMessageListener(){
@Override
public void onMessage(Message message, com.rabbitmq.client.Channel channel)
throws Exception {
Thread thread=Thread.currentThread();
MessageProperties prop=message.getMessageProperties();
try{
System.out.println("Channel:"+channel.getChannelNumber()
+" ThreadId is:"+thread.getId()
+" ConsumerTag:"+prop.getConsumerTag()
+" Queue:"+prop.getConsumerQueue());
//經過Tag單個確認
channel.basicAck(prop.getDeliveryTag(), false);
}catch(Exception ex){
//斷定單個接收失敗,從新加入consumer隊列
channel.basicReject(prop.getDeliveryTag(), true);
}
thread.sleep(1000);
}
});
return container;
}
}複製代碼
觀察信息接收狀況,每一個消費者一次可處理10條信息,對隊列進行分批處理。
6、死信隊列
死信隊列(Dead-Letter-Exchange)可被看做是死信交換器。當消息在一個隊列中變成死信後,它能被從新被髮送到特定的交換器中,這個交換器就是DLX,綁定DLX的隊列就稱之爲死信隊列。消息變成死信通常是因爲如下幾種狀況:
-
消息被拒絕,requeue被設置爲假,可經過上一介紹的空白basicReject (deliveryTag requeue)或無效basicNack (deliveryTag,多個requeue)完成設置;
-
消息過時;
-
隊列超出最大長度。
其實死信隊列DLX也是一個正常的交換器,和通常的交換器沒有什麼區別,咱們能夠用通常創建隊列的方法,創建一個死信隊列。而後創建一個正常的隊列,在正常隊列中加入參數x-dead-letter-exchange, x-dead-letter-routing-key與死信隊列進行綁定,完成綁定後在管理界面特徵選項中direct.queue。 第一次會顯示DLX DLK。這時當被綁定的隊列出現超時,超長,或被拒絕時(注意requeue被設置爲假時,對會激發死信),信息就會流入死信隊列被處理。
@Configuration
public class BindingConfig {
public final static String Queue_First="direct.queue.first";
public final static String Exchange_Name="directExchange";
public final static String Routing_Key_First="directKey1";
@Bean
public Queue queueFirst(){
return new Queue(this.Queue_First);
}
@Bean
public DirectExchange directExchange(){
return new DirectExchange(this.Exchange_Name);
}
@Bean
public Binding bindingExchangeFirst(Queue queueFirst, DirectExchange directExchange){
return BindingBuilder.bind(queueFirst).to(directExchange).with(Routing_Key_First);
}
}
@Configuration
public class ConnectionConfig {
@Value("${spring.rabbitmq.host}")
public String host;
@Value("${spring.rabbitmq.port}")
public int port;
@Value("${spring.rabbitmq.username}")
public String username;
@Value("${spring.rabbitmq.password}")
public String password;
@Value("${spring.rabbitmq.virtual-host}")
public String virtualHost;
@Bean
public ConnectionFactory getConnectionFactory(){
CachingConnectionFactory factory=new CachingConnectionFactory();
System.out.println(host);
factory.setHost(host);
factory.setPort(port);
factory.setUsername(username);
factory.setPassword(password);
factory.setVirtualHost(virtualHost);
return factory;
}
}
@Controller
@RequestMapping("/producer")
public class ProducerController {
@Autowired
private RabbitTemplate template;
@RequestMapping("/send")
public void send() {
for(int n=0;n<10;n++){
template.convertAndSend(BindingConfig.Exchange_NAME,BindingConfig.RoutingKey1,"Hello World! "
+String.valueOf(n),getCorrelationData());
}
}
private CorrelationData getCorrelationData(){
return new CorrelationData(UUID.randomUUID().toString());
}
}複製代碼
@Configuration
public class BindingConfig {
//普通隊列參數
public final static String Queue_First="direct.queue.first";
public final static String Exchange_Name="directExchange";
public final static String Routing_Key_First="directKey1";
//死信隊列參數
public final static String Queue_Dead="direct.queue.dead";
public final static String Exchange_Dead="directDead";
public final static String Routing_Key_Dead="directDeadKey";
@Bean
public Queue queueFirst(){
Map<String, Object> args=new HashMap<String,Object>();
//聲明當前死信的 Exchange
args.put("x-dead-letter-exchange", this.Exchange_Dead);
//聲明當前隊列的死信路由key
args.put("x-dead-letter-routing-key", this.Routing_Key_Dead);
//把死信隊列的參數綁定到當前隊列中
return QueueBuilder.durable(Queue_First).withArguments(args).build();
}
@Bean
public DirectExchange directExchange(){
return new DirectExchange(this.Exchange_Name);
}
@Bean
public Binding bindingExchangeFirst(Queue queueFirst, DirectExchange directExchange){
return BindingBuilder.bind(queueFirst).to(directExchange).with(Routing_Key_First);
}
@Bean
public Queue queueDead(){
return new Queue(this.Queue_Dead);
}
@Bean
public DirectExchange directExchangeDead(){
return new DirectExchange(this.Exchange_Dead);
}
@Bean
public Binding bindingExchangeDead(Queue queueDead,DirectExchange directExchangeDead){
return BindingBuilder.bind(queueDead).to(directExchangeDead).with(this.Routing_Key_Dead);
}
}
@Configuration
public class ConnectionConfig {
@Value("${spring.rabbitmq.host}")
public String host;
@Value("${spring.rabbitmq.port}")
public int port;
@Value("${spring.rabbitmq.username}")
public String username;
@Value("${spring.rabbitmq.password}")
public String password;
@Value("${spring.rabbitmq.virtual-host}")
public String virtualHost;
@Bean
public ConnectionFactory getConnectionFactory(){
CachingConnectionFactory factory=new CachingConnectionFactory();
factory.setHost(host);
factory.setPort(port);
factory.setUsername(username);
factory.setPassword(password);
factory.setVirtualHost(virtualHost);
return factory;
}
}
@Configuration
public class DirectMessListener {
@Autowired
private ConnectionConfig connectionConfig;
@Autowired
private RabbitTemplate template;
private int index=0,normalIndex=0,deadIndex=0;
@Bean
public DirectMessageListenerContainer messageContainer(){
DirectMessageListenerContainer container=new DirectMessageListenerContainer();
container.setConnectionFactory(connectionConfig.getConnectionFactory());
// 設置每一個隊列的 consumer 數量
container.setConsumersPerQueue(4);
// 設置每一個 consumer 每次的接收的消息數量
container.setPrefetchCount(10);
// 使用MANUAL手動確認
container.setAcknowledgeMode(AcknowledgeMode.MANUAL);
// 監聽隊列
container.addQueueNames(BindingConfig.Queue_First);
container.addQueueNames(BindingConfig.Queue_Dead);
container.setConsumerTagStrategy(queue -> "consumer"+(++index));
container.setMessageListener(new ChannelAwareMessageListener(){
@Override
public void onMessage(Message message, com.rabbitmq.client.Channel channel)
throws Exception {
MessageProperties prop=message.getMessageProperties();
if(prop.getReceivedRoutingKey().equals(BindingConfig.Routing_Key_First)){
System.out.println("This is a normal queue! "+(++normalIndex));
//把當前的隊列轉送到死信隊列中
channel.basicReject(prop.getDeliveryTag(), false);
}
if(prop.getReceivedRoutingKey().equals(BindingConfig.Routing_Key_Dead)){
System.out.println("This is a dead queue! "+(++deadIndex));
//模擬對死信隊列處理
Thread.currentThread().sleep(5000);
.......
//處理完畢
channel.basicAck(prop.getDeliveryTag(), false);
}
}
});
return container;
}
}複製代碼
經過管理界面能夠看,信息會先發送到direct.queue.first,而後被放進死信隊列做處理。
死信隊列最經常使用的場景能夠在訂單支付,流程審批等環節,例如在京,淘等平臺,當下單成功後,客戶要在必定的時間內完成支付操做,不然訂單被視做無效,這些業務流程就可使用死信隊列來處理。
7、持久化操做
RabbitMq的持久化操做包含有隊列持久化,消息持久化和交換持久化三類。
7.1隊列的持久化
隊列持久化只須要在隊列的構造函數公共隊列(字符串名稱,布爾耐用)把耐用參數置爲正確就可實現。若是隊列不設置持久化((耐用默認爲false),那麼在RabbitMQ服務重啓以後,相關隊列的元數據會丟失,此時數據也會丟失。
7.2消息持久化
設置了隊列持久化之後,當RabbitMQ服務重啓以後,隊列依然存在,但消息已經消失,可見單單設置隊列的持久化而不設置消息持久化顯得毫無心義,因此一般列隊持久化會與消息持久化共同使用。
在RabbitMQ原生態的框架下,須要把信息屬性設置爲MessageProperties。 持續的純文本纔會實現消息的持久化。
而在春天框架下,因爲在使用回調函數時須要把消息從新返回隊列再進行處理,因此消息默認已是持久化的。
7.3交流的持久化
交換器持久化可經過構造函數公共DirectExchange(字符串名稱,布爾耐用,布爾autoDelete)把耐用參數置爲正確就可實現,而autoDelete則是指在所在消費者都解除訂閱的狀況下自動刪除。若是交換器不設置持久化,那麼在RabbitMQ服務重啓以後,相關的交換器元數據會丟失,不過消息不會丟失,只是消息再也不發送到該交易所。對一個長期使用的交換器來講,持久化仍是有其必要性的。
本章總結
RabbitMQ發展至今,被愈來愈多的人承認,這和它在易用性,擴展性、可靠性和高可用性等方面的卓著表現是密不可分的。
相比於傳統的ActiveMQ和分佈式卡夫卡,它具備本身獨有的特色。
但願文章有幫於你們對RabbitMQ消息隊列方面有更深刻的瞭解,在不一樣的開發環境中靈活運用。
因爲時間倉促,文章當中有不明確的地方或有錯漏敬請點明。
對JAVA開發有興趣的朋友歡迎加入QQ羣:
833145934 裏面資深架構師會分享一些整理好的錄製視頻錄像和BATJ面試題:有春天,MyBatis,網狀的源碼分析,高併發,高性能,分佈式、微服務架構的原理,JVM性能優化,分佈式架構等這些成爲架構師必備的知識體系。還能領取免費的學習資源,目前受益良多。
共同探討!