在網上找了不少文章,一大堆,可直接運行的很少,特別是對於配置同時支持Queue和Topic兩種方式的,也沒有一個能夠直接運行。最後不得以,慢慢琢磨,成功以後,就在這裏分享一下經驗。
在項目實際使用過程當中,持久化隊列用處很少,有持久化需求的,也基本是持久化到數據庫的。最後也分享一個目前項目中關於記帳服務使用activemq來異步記帳的案例。java
//MQ configuration class @Configuration public class MqConfig { @Bean public Queue queue(){ return new ActiveMQQueue("mvp.queue"); } @Bean public Topic topic(){ return new ActiveMQTopic("mvp.topic"); } @Bean public ActiveMQConnectionFactory connectionFactory() { return new ActiveMQConnectionFactory("admin", "admin", "tcp://localhost:61616"); } @Bean public JmsListenerContainerFactory<?> jmsListenerContainerTopic(ActiveMQConnectionFactory connectionFactory) { DefaultJmsListenerContainerFactory bean = new DefaultJmsListenerContainerFactory(); bean.setPubSubDomain(true); bean.setConnectionFactory(connectionFactory); return bean; } @Bean public JmsListenerContainerFactory<?> jmsListenerContainerQueue(ActiveMQConnectionFactory connectionFactory) { DefaultJmsListenerContainerFactory bean = new DefaultJmsListenerContainerFactory(); bean.setConnectionFactory(connectionFactory); return bean; } @Bean public JmsMessagingTemplate jmsMessagingTemplate(ActiveMQConnectionFactory connectionFactory){ return new JmsMessagingTemplate(connectionFactory); } }
@Component @EnableScheduling public class producer { @Autowired private JmsMessagingTemplate jmsMessagingTemplate; @Autowired private Queue queue; @Autowired private Topic topic; private static int count= 0; @Scheduled(fixedDelay=3000) public void send(){ this.jmsMessagingTemplate.convertAndSend(this.queue,"hi.activeMQ,index="+count); this.jmsMessagingTemplate.convertAndSend(this.topic,"hi,activeMQ( topic ),index="+count++); } }
@Component public class consumerqueue { @JmsListener(destination = "mvp.topic",containerFactory="jmsListenerContainerTopic") public void receiveTopic(String text){ System.out.println("Topic Consumer1:"+text); } @JmsListener(destination = "mvp.topic",containerFactory="jmsListenerContainerTopic") public void receiveTopic2(String text){ System.out.println("Topic Consumer2:"+text); } @JmsListener(destination = "mvp.queue",containerFactory="jmsListenerContainerQueue") public void reviceQueue(String text){ System.out.println("Queue Consumer:"+text); } }
應用啓動後的輸出
Queue Consumer:hi.activeMQ,index=0
Topic Consumer1:hi,activeMQ( topic ),index=0
Topic Consumer2:hi,activeMQ( topic ),index=0
Queue Consumer:hi.activeMQ,index=1
Topic Consumer1:hi,activeMQ( topic ),index=1
Topic Consumer2:hi,activeMQ( topic ),index=1數據庫
在跨境系統的記帳模塊中,記帳分爲兩部分,這兩部份內容基本一致,分爲同步記帳和異步記帳。在同步記帳完成後,會發送MQ消息去異步記帳,而記帳模塊也會起一個任務去查詢哪些由於記帳服務中斷沒有執行記帳的記錄。
數據表T_BALANCE_DEAL的CHARGE_STATUS有4種狀態:0-未記帳,未發MQ,3-未記帳,已發MQ ,1-記帳成功,2-記帳失敗。定時任務會掃描狀態爲0-未記帳未發MQ 和 三小時前的3-未記帳已發MQ的記錄,掃到後,從新發送MQ消息到隊列。
記帳模塊有一個監聽器來監聽MQ記帳隊列,若是有消息,就執行記帳。這樣設計就比較清晰,記帳入口有2個,一個是其餘服務發送的記帳請求,一個是定時任務發送的記帳請求。
系統比較老舊,仍是老式的xml配置。關鍵配置以下:異步
<!--記帳定時任務掃描--> <bean id="mdbtask-chargeupTaskSendMsg" class="com.ttf.ma.task.impl.ChargeupTaskSendMsgImpl" parent="mdbtask-abstractChargeupTask"> <property name="jmsSender" ref="jmsSender" /> </bean>
//查詢沒有發mq交易信息 List<BalanceDealDto> balanceDealDtos = this.queryBalanceChargeupWithEnum(ChargeUpStatusEnum.CHARGEUP_NO_SEND_MQ); //查詢3個小時沒有記帳交易信息進行記帳 List<BalanceDealDto> lists =this.queryBalanceChargeupWithEnum(ChargeUpStatusEnum.CHARGEUP_SEND_MQ);
<!--MQ監聽--> <bean id="mdbtask-chargeUpMessageListener" class="com.ttf.ma.mdb.impl.CommonChargeupListenerImpl" parent="mdbtask-abstractChargeupListener"> <property name="balanceDealService" ref="mdbtask-balanceDealService" /> <property name="peServiceFacade" ref="mdbtask-peServiceFacade" /> </bean>
跨境系統向外部發送消息提供了三種方式HTTP,SMS,EMAIL.第一種一般用於接口回調,後面兩種用於商戶通知,其中會涉及消息模板的使用,這個能夠另起一篇文章來介紹。tcp