下載ActiveMq的tar安裝包, 解壓到響應目錄下, 使用bin目錄下的./activemq start啓動, ./activemq stop中止
配置類
@EnableJms @Configuration public class ActiveMQ4Config { @Bean public Queue queue(){ return new ActiveMQQueue("queue1"); } @Bean public RedeliveryPolicy redeliveryPolicy(){ RedeliveryPolicy redeliveryPolicy= new RedeliveryPolicy(); //是否在每次嘗試從新發送失敗後,增加這個等待時間 redeliveryPolicy.setUseExponentialBackOff(true); //重發次數,默認爲6次 這裏設置爲10次 redeliveryPolicy.setMaximumRedeliveries(10); //重發時間間隔,默認爲1秒 redeliveryPolicy.setInitialRedeliveryDelay(1); //第一次失敗後從新發送以前等待500毫秒,第二次失敗再等待500 \* 2毫秒,這裏的2就是 value redeliveryPolicy.setBackOffMultiplier(2); //是否避免消息碰撞 redeliveryPolicy.setUseCollisionAvoidance(false); //設置重發最大拖延時間-1 表示沒有拖延只有UseExponentialBackOff(true)爲true時生效 redeliveryPolicy.setMaximumRedeliveryDelay(-1); return redeliveryPolicy; } @Bean public ActiveMQConnectionFactory activeMQConnectionFactory (@Value("${activemq.url}")String url,RedeliveryPolicy redeliveryPolicy){ ActiveMQConnectionFactory activeMQConnectionFactory =new ActiveMQConnectionFactory( "admin", "admin", url); activeMQConnectionFactory.setRedeliveryPolicy(redeliveryPolicy); return activeMQConnectionFactory; } @Bean(name="jmsQueueTemplate") public JmsTemplate jmsQueueTemplate(ActiveMQConnectionFactory activeMQConnectionFactory) { //設置建立鏈接的工廠 //JmsTemplate jmsTemplate = new JmsTemplate(activeMQConnectionFactory); //優化鏈接工廠,這裏應用緩存池 鏈接工廠就便可 JmsTemplate jmsTemplate = new JmsTemplate(cachingConnectionFactory); //設置默認消費topic //jmsTemplate.setDefaultDestination(topic()); //設置P2P隊列消息類型 jmsTemplate.setPubSubDomain(isPubSubDomain); DestinationResolver destinationResolver = (DestinationResolver) this.destinationResolver.getIfUnique(); if (destinationResolver != null) { jmsTemplate.setDestinationResolver(destinationResolver); } MessageConverter messageConverter = (MessageConverter) this.messageConverter.getIfUnique(); if (messageConverter != null) { jmsTemplate.setMessageConverter(messageConverter); } //deliveryMode, priority, timeToLive 的開關,要生效,必須配置爲true,默認false jmsTemplate.setExplicitQosEnabled(true); //DeliveryMode.NON_PERSISTENT=1:非持久 ; DeliveryMode.PERSISTENT=2:持久 //定義持久化後節點掛掉之後,重啓能夠繼續消費. jmsTemplate.setDeliveryMode(DeliveryMode.PERSISTENT); //默認不開啓事務 System.out.println("默認是否開啓事務:"+jmsTemplate.isSessionTransacted()); //若是不啓用事務,則會致使XA事務失效; //做爲生產者若是須要支持事務,則須要配置SessionTransacted爲true //jmsTemplate.setSessionTransacted(true); //消息的應答方式,須要手動確認,此時SessionTransacted必須被設置爲false,且爲Session.CLIENT_ACKNOWLEDGE模式 //Session.AUTO_ACKNOWLEDGE 消息自動簽收 //Session.CLIENT_ACKNOWLEDGE 客戶端調用acknowledge方法手動簽收 //Session.DUPS_OK_ACKNOWLEDGE 沒必要必須簽收,消息可能會重複發送 jmsTemplate.setSessionAcknowledgeMode(Session.CLIENT_ACKNOWLEDGE); return jmsTemplate; } @Bean(name="jmsTopicTemplate") public JmsTemplate jmsTopicTemplate(ActiveMQConnectionFactory activeMQConnectionFactory) { //設置建立鏈接的工廠 //JmsTemplate jmsTemplate = new JmsTemplate(connectionFactory); //優化鏈接工廠,這裏應用緩存池 鏈接工廠就便可 JmsTemplate jmsTemplate = new JmsTemplate(cachingConnectionFactory); //設置默認消費topic //jmsTemplate.setDefaultDestination(topic()); //設置發佈訂閱消息類型 jmsTemplate.setPubSubDomain(isPubSubDomain); //deliveryMode, priority, timeToLive 的開關,要生效,必須配置爲true,默認false jmsTemplate.setExplicitQosEnabled(true); //DeliveryMode.NON_PERSISTENT=1:非持久 ; DeliveryMode.PERSISTENT=2:持久 jmsTemplate.setDeliveryMode(DeliveryMode.PERSISTENT); //默認不開啓事務 System.out.println("是否開啓事務"+jmsTemplate.isSessionTransacted()); //若是session帶有事務,而且事務成功提交,則消息被自動簽收。若是事務回滾,則消息會被再次傳送。 //jmsTemplate.setSessionTransacted(true); //不帶事務的session的簽收方式,取決於session的配置。 //默認消息確認方式爲1,即AUTO_ACKNOWLEDGE System.out.println("是否消息確認方式"+jmsTemplate.getSessionAcknowledgeMode()); //消息的應答方式,須要手動確認,此時SessionTransacted必須被設置爲false,且爲Session.CLIENT_ACKNOWLEDGE模式 //Session.AUTO_ACKNOWLEDGE 消息自動簽收 //Session.CLIENT_ACKNOWLEDGE 客戶端調用acknowledge方法手動簽收 //Session.DUPS_OK_ACKNOWLEDGE 沒必要必須簽收,消息可能會重複發送 jmsTemplate.setSessionAcknowledgeMode(Session.CLIENT_ACKNOWLEDGE); return jmsTemplate; } //定義一個消息監聽器鏈接工廠,這裏定義的是點對點模式的監聽器鏈接工廠 @Bean(name = "jmsQueueListener") public DefaultJmsListenerContainerFactory jmsQueueListenerContainerFactory(ActiveMQConnectionFactory activeMQConnectionFactory) { DefaultJmsListenerContainerFactory factory =new DefaultJmsListenerContainerFactory(); factory.setConnectionFactory(activeMQConnectionFactory); //設置鏈接數 factory.setConcurrency("1-10"); //重連間隔時間 factory.setRecoveryInterval(1000L); factory.setSessionAcknowledgeMode(4); return factory; } }
消費者
@Component public class Consumer { private final static Logger logger = LoggerFactory.getLogger(Consumer.class); @JmsListener(destination = "queue1", containerFactory = "jmsQueueListener") public void receiveQueue(final TextMessage text, Session session)throws JMSException { try { logger.debug("Consumer收到的報文爲:" + text.getText()); text.acknowledge();// 使用手動簽收模式,須要手動的調用,若是不在catch中調用session.recover()消息只會在重啓服務後重發 } catch (Exception e) { session.recover();// 此不可省略 重發信息使用 } } }
生產者(不一樣的設置, 生產者和消費者要進行簽收或者提交操做)
@Component public class Producter { @Autowired("..")//這裏根據消息發佈類型不一樣注入 private JmsTemplate jmsTemplate; @Autowired private Queue queue; @Autowired private Topic topic; //發送queue類型消息 public void sendQueueMsg(String msg){ jmsTemplate.convertAndSend(queue, msg); } //發送topic類型消息 public void sendTopicMsg(String msg){ jmsTemplate.convertAndSend(topic, msg); } }
延時投遞的實現(其他高級特性實現方式相似)
broker配置文件schedulerSupport修改成true <broker xmlns="http://activemq.apache.org/schema/core" brokerName="localhost" dataDirectory="${activemq.data}" schedulerSupport="true" >
@Service public class Producer { public static final Destination DEFAULT_QUEUE = new ActiveMQQueue("delay.queue"); @Autowired private JmsMessagingTemplate template; /** * 延時發送 * * @param destination 發送的隊列 * @param data 發送的消息 * @param time 延遲時間 */ public <T extends Serializable> void delaySend(Destination destination, T data, Long time) { Connection connection = null; Session session = null; MessageProducer producer = null; // 獲取鏈接工廠 ConnectionFactory connectionFactory = template.getConnectionFactory(); try { // 獲取鏈接 connection = connectionFactory.createConnection(); connection.start(); // 獲取session,true開啓事務,false關閉事務 session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE); // 建立一個消息隊列 producer = session.createProducer(destination); producer.setDeliveryMode(JmsProperties.DeliveryMode.PERSISTENT.getValue()); ObjectMessage message = session.createObjectMessage(data); //設置延遲時間 message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, time); // 發送消息 producer.send(message); log.info("發送消息:{}", data); session.commit(); } catch (Exception e) { e.printStackTrace(); } finally { try { if (producer != null) { producer.close(); } if (session != null) { session.close(); } if (connection != null) { connection.close(); } } catch (Exception e) { e.printStackTrace(); } } } }
默認是異步發送消息, 這種消息效率更高, 可是會出現消息丟失, 可是有如下狀況會發送同步消息 1.指定使用同步發送消息 2.在沒有事務的前提下發送持久化消息
須要接收回調
// 建立一個消息隊列 ActiveMqMessageProducer producer = (ActiveMqMessageProducer)session.createProducer(destination); ObjectMessage message = session.createObjectMessage(data); // 發送消息 producer.send(message, new AsyncCallback() { ... });
1. 什麼狀況下會致使消息的重試 . 客戶端在使用事務的前提下, rollBack()或者沒有commit()消息; . 未使用事務的前提下, 使用ACKNOWLEDGE模式, 進行了session.recover() 2. 重試多少次, 每次間隔 默認是6次, 間隔爲1s 3. 超太重發的次數, 消息會被放入死信隊列中
能夠經過individualDeadLetterStrategy來設置各自的死信隊列, 也能夠設置過時redis
能夠根據messageId來作校驗, 能夠使用redis來作