小弟 前段時間使用mq是由於要在Jfianl架構中使用,但Jfinal並不擅長,因此使用的是工具類建立的連接和通道。又寫了消費者和生產者的公共方法。java
如今有一個業務。對接銀行的時候,因異步回調。致使客戶在對一張A表操做 和銀行回調對A表的操做產生併發。導致A表出現一個seq_no重複。餘額也計算錯誤。領導要求集成MQ,小弟終於在3天后集成了一個基礎的demo。如今記錄一下:spring
首先 maven項目確定要引入jar包的apache
<dependency> <groupId>org.springframework.amqp</groupId> <artifactId>spring-rabbit</artifactId> <version>1.4.5.RELEASE</version> </dependency>
而後請看spring的配置:json
<beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:p="http://www.springframework.org/schema/p" xmlns:context="http://www.springframework.org/schema/context" xmlns:rabbit="http://www.springframework.org/schema/rabbit" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit-1.0.xsd" > <description>rabbitmq 鏈接服務配置</description> <rabbit:connection-factory id="connectionFactory" username="${mq.name}" password="${mq.pwd}" host="${mq.url}" port="${mq.port}"/> <rabbit:admin connection-factory="connectionFactory"/> <!-- spring template聲明--> <rabbit:template exchange="${mq.user.bill.exchange.name}" id="amqpTemplate" connection-factory="connectionFactory" message-converter="jsonMessageConverter" /> <!-- 消息對象json轉換類 --> <bean id="jsonMessageConverter" class="org.springframework.amqp.support.converter.Jackson2JsonMessageConverter" /> <!-- 業務隊列 --> <rabbit:queue id="user_bill_queue" name="user_bill_queue" durable="true" auto-delete="false" exclusive="false"> <!-- <rabbit:queue-arguments> <!– 設置死信交換機 –> <entry key="x-dead-letter-exchange"> <value type="java.lang.String">dead_letter_userbill_exchange</value> </entry> <!– 設置死信交換機的路由鍵 –> <entry key="x-dead-letter-routing-key"> <value type="java.lang.String">userbill_queue_fail</value> </entry> </rabbit:queue-arguments>--> </rabbit:queue> <!-- 死信隊列 --> <!--<rabbit:queue id="user_bill_dead_queue" name="user_bill_dead_queue" durable="true" auto-delete="false" exclusive="false" />--> <!-- 死信交換機配置 --> <!--<rabbit:direct-exchange name="dead_letter_userbill_exchange" durable="true" auto-delete="false" id="dead_letter_exchange"> <rabbit:bindings> <rabbit:binding queue="user_bill_dead_queue" key="userbill_queue_fail"/> </rabbit:bindings> </rabbit:direct-exchange>--> <!-- 正常交換機配置 --> <rabbit:direct-exchange name="${mq.user.bill.exchange.name}" durable="true" auto-delete="false" id="${mq.user.bill.exchange.name}"> <rabbit:bindings> <rabbit:binding queue="user_bill_queue" key="${mq.user.bill.routing.key}"/> </rabbit:bindings> </rabbit:direct-exchange> <!-- 配置監聽 手動ack prefetch="1" 表示消費一條--> <rabbit:listener-container connection-factory="connectionFactory" acknowledge="manual" prefetch="1" > <rabbit:listener queues="user_bill_queue" ref="queueListenter"/> <!--<rabbit:listener queues="user_bill_dead_queue" ref="deadUserBillQueueListenter"/>--> </rabbit:listener-container> </beans>
一、強調一下命名空間很少。夠用就行。這裏只單單配置了mq 與spring其餘文件集成可以使用import不要重複引用便可緩存
二、這裏粘出來的 有異常msg的處理。也就是死信隊列。後面會提到服務器
以上基本都是固定配置。獲取連接,建立admin(在消息代理中如何利用協議來配置隊列,交換和綁定。實現將自動聲明在一個應用上下文的Queues,Exchanges,Bindings。具體功能我也不清楚。一直沒搞懂) 建立生產者模板,建立隊列。建立指定路由key的交換器 並綁定隊列,消息對象轉json的bean等等。架構
三、若是想要引入消息失效時間,須要在定義隊列的地方添加屬性<rabbit:queue-arguments>,並指定併發
<entry key="x-message-ttl"> <value type="java.lang.Integer">60000</value> </entry>
表示該隊列中的信息失效時間爲1min。dom
要引入隊列的等級 須要的key=x-max-priority。異步
下面來講下 死信隊列。當有消息再消費端處理失敗時。若是要ackNack的話(true),會致使不斷消費這個消息,一直產生錯誤,一個死循環。
這時,使用死信隊列就能夠處理。
一、定義業務隊列的時候綁定一個死信交換機。並綁定一個路由key,注意x-dead-letter-exchange和x-dead-letter-routing-key是固定參數
<rabbit:queue-arguments> <!-- 設置死信交換機 --> <entry key="x-dead-letter-exchange"> <value type="java.lang.String">dead_letter_userbill_exchange</value> </entry> <!-- 設置死信交換機的路由鍵 --> <entry key="x-dead-letter-routing-key"> <value type="java.lang.String">userbill_queue_fail</value> </entry> </rabbit:queue-arguments>
二、設置一個死信隊列,用來接收死信交換機轉發來的異常信息(想要隊列的其餘屬性能夠自定義配置)
<rabbit:queue id="user_bill_dead_queue" name="user_bill_dead_queue" durable="true" auto-delete="false" exclusive="false" />
三、定義一個死信交換機,名稱與業務隊列中定義的一致,綁定死信隊列和路由key(與業務隊列中定義的死信交換機的路由key一致)
<rabbit:direct-exchange name="dead_letter_userbill_exchange" durable="true" auto-delete="false" id="dead_letter_exchange"> <rabbit:bindings> <rabbit:binding queue="user_bill_dead_queue" key="userbill_queue_fail"/> </rabbit:bindings> </rabbit:direct-exchange>
四、在監聽器中將死信隊列歸入監聽 監聽器中的ref bean 都是經過@Component註解注入的。
<rabbit:listener-container connection-factory="connectionFactory" acknowledge="manual" prefetch="1" > <rabbit:listener queues="user_bill_queue" ref="queueListenter"/> <rabbit:listener queues="user_bill_dead_queue" ref="deadUserBillQueueListenter"/> </rabbit:listener-container>
這樣就完成了失敗消息轉發到死信隊列中。在設計另外一個消費者deadUserBillQueueListenter 進行消息處理便可,可設計,在處理一次失敗就將期ackreject
這裏要提醒一下,當設計有自定義交換機時,生產者傳入的就不是隊列名稱 ,而是交換機名稱和路由key,只有在使用默認交換機時才使用隊列名稱
生產者代碼:
package com.qiantu.core.rabbitmq; /** * @Description: 給隊列發送消息接口類 * @Date: create in 2018-07-30 16:36 * @Author:Reynold-白 */ public interface MQProducer { /** * 發送消息到指定隊列 * @param queueKey * @param object */ void sendDataToQueue(String exchangeName, String routingKey, Object object); }
package com.qiantu.core.rabbitmq; import com.alibaba.fastjson.JSON; import org.apache.log4j.Logger; import org.springframework.amqp.core.AmqpTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.support.GenericXmlApplicationContext; import org.springframework.stereotype.Service; /** * @Description: 發送消息實現 * @Date: create in 2018-07-30 16:37 * @Author:Reynold-白 */ @Service("mqProducer") public class MQProducerImpl implements MQProducer{ private final static Logger log = Logger.getLogger(MQProducerImpl.class); @Autowired private AmqpTemplate amqpTemplate; @Override public void sendDataToQueue(String exchangeName, String routingKey, Object object) { try { log.info("========向MQ發送消息【開始】========消息:" + object.toString()); amqpTemplate.convertAndSend(exchangeName, routingKey,object); log.info("========向MQ發送消息【完成】========消息:"); } catch (Exception e) { log.error("=======發送消息失敗======", e); e.printStackTrace(); } } }
消費者代碼:
package com.qiantu.core.rabbitmq; import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONObject; import com.alibaba.fastjson.TypeReference; import com.qiantu.core.constants.UserBillConstants; import com.qiantu.core.model.RabbitMQConsumerFailData; import com.qiantu.core.service.UserBillSerivce; import com.qiantu.core.utils.IdGenerator; import com.rabbitmq.client.Channel; import org.apache.log4j.Logger; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.core.ChannelAwareMessageListener; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import java.io.IOException; import java.io.UnsupportedEncodingException; import java.util.Date; import java.util.HashMap; import java.util.Map; /** * @Description: userBill消息監聽消費 * @Date: create in 2018-07-30 17:08 * @Author:Reynold-白 */ @Component public class QueueListenter implements ChannelAwareMessageListener { protected static Logger log = Logger.getLogger(QueueListenter.class); @Autowired private UserBillSerivce userBillSerivce; @Override public void onMessage(Message message, Channel channel) { String msgStr = ""; try{ msgStr = new String(message.getBody(), "UTF-8"); log.info("=====獲取消息" + msgStr); Map<String, String> userBillParams = JSONObject.parseObject(msgStr, new TypeReference<Map<String, String>>() {}); boolean result = userBillSerivce.queueMsgCreateUserBill(userBillParams); if(result){ //處理成功,響應隊列,刪除該條信息 this.basicACK(message, channel); log.info("=======消息:" + msgStr + ",處理成功!"); }else{ RabbitMQConsumerFailData rmcfd = new RabbitMQConsumerFailData(); rmcfd.setId(IdGenerator.randomUUID()); rmcfd.setData(msgStr); rmcfd.setType("0"); rmcfd.setCreateBy("admin"); rmcfd.setCreateTime(new Date()); userBillSerivce.insertRabbitMQFailData(rmcfd); //處理失敗,拒絕數據 this.basicReject(message, channel); log.info("=======消息:" + msgStr + ",處理失敗。回退!"); } }catch(Exception e){ log.error("=======消息業務處理異常=====", e); this.basicReject(message, channel); e.printStackTrace(); } } //正常消費通知 private void basicACK(Message message,Channel channel){ try{ channel.basicAck(message.getMessageProperties().getDeliveryTag(),false); }catch(IOException e){ log.error("通知服務器移除mq時異常,異常信息:"+e); } } //處理異常,消息回到異常處理隊列總再處理 private void basicReject(Message message,Channel channel){ try { /** * 第一個參數:該消息的index * 第二個參數:是否批量.true:將一次性拒絕全部小於deliveryTag的消息。 * 第三個參數:被拒絕的是否從新入隊列 */ // channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,true); channel.basicReject(message.getMessageProperties().getDeliveryTag(), false); } catch (IOException e) { try { log.error(new String(message.getBody(), "utf-8") + "從新進入服務器時出現異常,異常信息:", e); } catch (UnsupportedEncodingException e1) { e1.printStackTrace(); } e.printStackTrace(); } } }
死信隊列消費者:
package com.qiantu.core.rabbitmq; import com.alibaba.fastjson.JSONObject; import com.alibaba.fastjson.TypeReference; import com.qiantu.core.service.UserBillSerivce; import com.rabbitmq.client.Channel; import org.apache.log4j.Logger; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.core.ChannelAwareMessageListener; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import java.io.IOException; import java.io.UnsupportedEncodingException; import java.util.Map; /** * @Description: 失敗信息再處理 * @Date: create in 2018-08-02 15:00 * @Author:Reynold-白 */ @Component public class DeadUserBillQueueListenter implements ChannelAwareMessageListener { protected static Logger log = Logger.getLogger(QueueListenter.class); @Autowired private UserBillSerivce userBillSerivce; @Override public void onMessage(Message message, Channel channel) throws Exception { String msgStr = ""; try{ msgStr = new String(message.getBody(), "UTF-8"); log.info("=====獲取消息" + msgStr); Map<String, String> userBillParams = JSONObject.parseObject(msgStr, new TypeReference<Map<String, String>>() {}); boolean result = userBillSerivce.queueMsgCreateUserBill(userBillParams); if(result){ //處理成功,響應隊列,刪除該條信息 this.basicACK(message, channel); log.info("=======deadUserBillQueue消息:" + msgStr + ",處理成功!"); }else{ //處理失敗,拋棄數據 this.basicNack(message, channel); log.info("=======deadUserBillQueue消息:" + msgStr + ",處理失敗。回退!"); } }catch(Exception e){ log.error("=======deadUserBillQueue消息業務處理異常=====", e); this.basicNack(message, channel); e.printStackTrace(); } } //正常消費通知 private void basicACK(Message message,Channel channel){ try{ channel.basicAck(message.getMessageProperties().getDeliveryTag(),false); }catch(IOException e){ log.error("deadUserBillQueue通知服務器移除mq時異常,異常信息:"+e); } } //處理異常,刪除信息 private void basicNack(Message message,Channel channel){ try { /** * 第一個參數:該消息的index * 第二個參數:是否批量.true:將一次性拒絕全部小於deliveryTag的消息。 * 第三個參數:被拒絕的是否從新入隊列 */ channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,false); // channel.basicReject(message.getMessageProperties().getDeliveryTag(), false); } catch (IOException e) { log.error("deadUserBillQueue通知服務器移除mq時異常,異常信息:"+e); try { log.error(new String(message.getBody(), "utf-8") + "從新進入服務器時出現異常,異常信息:", e); } catch (UnsupportedEncodingException e1) { e1.printStackTrace(); } e.printStackTrace(); } } }
親測可實現錯誤消息轉發,至於隊列和消息的優先級能夠根據隊列的數據進行配置。與消息失效方式一致。
但要注意,隊列和消息優先級須要 spring的版本較高至少要4.1以上(低版本主要是命名空間中的屬性標籤不支持),RabbitMQ3.5以上才能支持。
2018-08-09日補充:
以上demo在處理消息時還不夠全面。首先若是消費端業務過於複雜致使消息 消費失敗,這個時候可使用死信隊列保存(我的以爲),或者入庫都可,但卻沒法保證 排除消息重發的這種現象。一旦消息重發,唄消費端消費,有涉及客戶的小金庫,那就玩完。。。通宵補數據都是輕的。
經過查閱資料得知,能夠向異步接口那樣,引用冪等概念進行控制。有兩種方案。
一、經過MQ自身的msg-id來進行控制(這個id一直都沒有找到在哪裏獲取);
二、能夠在上游(生產端)生成一個惟一標識(相似流水號不重複的這種),在消費端進行驗證。入庫也好。緩存驗證也行。目前採用這中方法。
以上 是我的的一點淺談。。繼續找那個msg-id去