activemq筆記

activemq筆記

activeMq的安裝.啓動和中止
下載ActiveMq的tar安裝包, 解壓到響應目錄下, 使用bin目錄下的./activemq start啓動, ./activemq stop中止
activemq和spring-boot整合
配置類
@EnableJms
@Configuration
public class ActiveMQ4Config {

    @Bean
    public Queue queue(){
        return new ActiveMQQueue("queue1");
    }

    @Bean
    public RedeliveryPolicy redeliveryPolicy(){
        RedeliveryPolicy redeliveryPolicy= new RedeliveryPolicy();
        //是否在每次嘗試從新發送失敗後,增加這個等待時間
        redeliveryPolicy.setUseExponentialBackOff(true);
        //重發次數,默認爲6次 這裏設置爲10次
        redeliveryPolicy.setMaximumRedeliveries(10);
        //重發時間間隔,默認爲1秒
        redeliveryPolicy.setInitialRedeliveryDelay(1);
        //第一次失敗後從新發送以前等待500毫秒,第二次失敗再等待500 \* 2毫秒,這裏的2就是 value
        redeliveryPolicy.setBackOffMultiplier(2);
        //是否避免消息碰撞
        redeliveryPolicy.setUseCollisionAvoidance(false);
        //設置重發最大拖延時間-1 表示沒有拖延只有UseExponentialBackOff(true)爲true時生效
        redeliveryPolicy.setMaximumRedeliveryDelay(-1);
        return redeliveryPolicy;
    }

    @Bean
    public ActiveMQConnectionFactory activeMQConnectionFactory (@Value("${activemq.url}")String url,RedeliveryPolicy redeliveryPolicy){
        ActiveMQConnectionFactory activeMQConnectionFactory =new ActiveMQConnectionFactory(
            "admin",
            "admin",
            url);
        activeMQConnectionFactory.setRedeliveryPolicy(redeliveryPolicy);
        return activeMQConnectionFactory;
    }

    @Bean(name="jmsQueueTemplate")
    public JmsTemplate jmsQueueTemplate(ActiveMQConnectionFactory activeMQConnectionFactory) {
        //設置建立鏈接的工廠
        //JmsTemplate jmsTemplate = new JmsTemplate(activeMQConnectionFactory);
        //優化鏈接工廠,這裏應用緩存池 鏈接工廠就便可
        JmsTemplate jmsTemplate = new JmsTemplate(cachingConnectionFactory);
        //設置默認消費topic
       //jmsTemplate.setDefaultDestination(topic());
        //設置P2P隊列消息類型
        jmsTemplate.setPubSubDomain(isPubSubDomain);
        DestinationResolver destinationResolver = (DestinationResolver) this.destinationResolver.getIfUnique();
        if (destinationResolver != null) {
            jmsTemplate.setDestinationResolver(destinationResolver);
        }
        MessageConverter messageConverter = (MessageConverter) this.messageConverter.getIfUnique();
        if (messageConverter != null) {
            jmsTemplate.setMessageConverter(messageConverter);
        }
        //deliveryMode, priority, timeToLive 的開關,要生效,必須配置爲true,默認false
        jmsTemplate.setExplicitQosEnabled(true);
        //DeliveryMode.NON_PERSISTENT=1:非持久 ; DeliveryMode.PERSISTENT=2:持久
        //定義持久化後節點掛掉之後,重啓能夠繼續消費.
        jmsTemplate.setDeliveryMode(DeliveryMode.PERSISTENT);
        //默認不開啓事務
        System.out.println("默認是否開啓事務:"+jmsTemplate.isSessionTransacted());
        //若是不啓用事務,則會致使XA事務失效;
        //做爲生產者若是須要支持事務,則須要配置SessionTransacted爲true
        //jmsTemplate.setSessionTransacted(true);
        //消息的應答方式,須要手動確認,此時SessionTransacted必須被設置爲false,且爲Session.CLIENT_ACKNOWLEDGE模式
        //Session.AUTO_ACKNOWLEDGE  消息自動簽收
        //Session.CLIENT_ACKNOWLEDGE  客戶端調用acknowledge方法手動簽收
        //Session.DUPS_OK_ACKNOWLEDGE 沒必要必須簽收,消息可能會重複發送
        jmsTemplate.setSessionAcknowledgeMode(Session.CLIENT_ACKNOWLEDGE);
        return jmsTemplate;
    }

    @Bean(name="jmsTopicTemplate")
    public JmsTemplate jmsTopicTemplate(ActiveMQConnectionFactory activeMQConnectionFactory) {
        //設置建立鏈接的工廠
        //JmsTemplate jmsTemplate = new JmsTemplate(connectionFactory);
        //優化鏈接工廠,這裏應用緩存池 鏈接工廠就便可
        JmsTemplate jmsTemplate = new JmsTemplate(cachingConnectionFactory);
        //設置默認消費topic
        //jmsTemplate.setDefaultDestination(topic());
        //設置發佈訂閱消息類型
        jmsTemplate.setPubSubDomain(isPubSubDomain);
        //deliveryMode, priority, timeToLive 的開關,要生效,必須配置爲true,默認false
        jmsTemplate.setExplicitQosEnabled(true);
        //DeliveryMode.NON_PERSISTENT=1:非持久 ; DeliveryMode.PERSISTENT=2:持久
        jmsTemplate.setDeliveryMode(DeliveryMode.PERSISTENT);
        //默認不開啓事務
        System.out.println("是否開啓事務"+jmsTemplate.isSessionTransacted());
        //若是session帶有事務,而且事務成功提交,則消息被自動簽收。若是事務回滾,則消息會被再次傳送。
        //jmsTemplate.setSessionTransacted(true);
        //不帶事務的session的簽收方式,取決於session的配置。
        //默認消息確認方式爲1,即AUTO_ACKNOWLEDGE
        System.out.println("是否消息確認方式"+jmsTemplate.getSessionAcknowledgeMode());
        //消息的應答方式,須要手動確認,此時SessionTransacted必須被設置爲false,且爲Session.CLIENT_ACKNOWLEDGE模式
        //Session.AUTO_ACKNOWLEDGE  消息自動簽收
        //Session.CLIENT_ACKNOWLEDGE  客戶端調用acknowledge方法手動簽收
        //Session.DUPS_OK_ACKNOWLEDGE 沒必要必須簽收,消息可能會重複發送
        jmsTemplate.setSessionAcknowledgeMode(Session.CLIENT_ACKNOWLEDGE);
        return jmsTemplate;
    }

    //定義一個消息監聽器鏈接工廠,這裏定義的是點對點模式的監聽器鏈接工廠
    @Bean(name = "jmsQueueListener")
    public DefaultJmsListenerContainerFactory jmsQueueListenerContainerFactory(ActiveMQConnectionFactory activeMQConnectionFactory) {
        DefaultJmsListenerContainerFactory factory =new DefaultJmsListenerContainerFactory();
        factory.setConnectionFactory(activeMQConnectionFactory);
        //設置鏈接數
        factory.setConcurrency("1-10");
        //重連間隔時間
        factory.setRecoveryInterval(1000L);
        factory.setSessionAcknowledgeMode(4);
        return factory;
    }
}
消費者
@Component
public class Consumer {
 
    private final static Logger logger = LoggerFactory.getLogger(Consumer.class);
    
    @JmsListener(destination = "queue1", containerFactory = "jmsQueueListener")
    public void receiveQueue(final TextMessage text, Session session)throws JMSException {
        try {
            logger.debug("Consumer收到的報文爲:" + text.getText());
            text.acknowledge();// 使用手動簽收模式,須要手動的調用,若是不在catch中調用session.recover()消息只會在重啓服務後重發
        } catch (Exception e) {    
            session.recover();// 此不可省略 重發信息使用
        }
    }
}
生產者(不一樣的設置, 生產者和消費者要進行簽收或者提交操做)
@Component
public class Producter {

    @Autowired("..")//這裏根據消息發佈類型不一樣注入
    private JmsTemplate jmsTemplate;
    @Autowired
    private Queue queue;
    @Autowired
    private Topic topic;

    //發送queue類型消息
    public void sendQueueMsg(String msg){
        jmsTemplate.convertAndSend(queue, msg);
    }

    //發送topic類型消息
    public void sendTopicMsg(String msg){
        jmsTemplate.convertAndSend(topic, msg);
    }
}
延時投遞的實現(其他高級特性實現方式相似)
broker配置文件schedulerSupport修改成true
<broker xmlns="http://activemq.apache.org/schema/core" brokerName="localhost" dataDirectory="${activemq.data}" schedulerSupport="true" >
@Service
public class Producer {

    public static final Destination DEFAULT_QUEUE = new ActiveMQQueue("delay.queue");

    @Autowired
    private JmsMessagingTemplate template;

    /**
     * 延時發送
     *
     * @param destination 發送的隊列
     * @param data        發送的消息
     * @param time        延遲時間
     */
    public <T extends Serializable> void delaySend(Destination destination, T data, Long time) {
        Connection connection = null;
        Session session = null;
        MessageProducer producer = null;
        // 獲取鏈接工廠
        ConnectionFactory connectionFactory = template.getConnectionFactory();
        try {
            // 獲取鏈接
            connection = connectionFactory.createConnection();
            connection.start();
            // 獲取session,true開啓事務,false關閉事務
            session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
            // 建立一個消息隊列
            producer = session.createProducer(destination);
            producer.setDeliveryMode(JmsProperties.DeliveryMode.PERSISTENT.getValue());
            ObjectMessage message = session.createObjectMessage(data);
            //設置延遲時間
            message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, time);
            // 發送消息
            producer.send(message);
            log.info("發送消息:{}", data);
            session.commit();
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            try {
                if (producer != null) {
                    producer.close();
                }
                if (session != null) {
                    session.close();
                }
                if (connection != null) {
                    connection.close();
                }
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }
}
ActiveMQ消息異步投遞
默認是異步發送消息, 這種消息效率更高, 可是會出現消息丟失, 可是有如下狀況會發送同步消息
    1.指定使用同步發送消息
    2.在沒有事務的前提下發送持久化消息
如何確保消息發送成功
須要接收回調
// 建立一個消息隊列
ActiveMqMessageProducer producer = (ActiveMqMessageProducer)session.createProducer(destination);
ObjectMessage message = session.createObjectMessage(data);
// 發送消息
producer.send(message, new AsyncCallback() {
    ...
});
消息的重試機制
1. 什麼狀況下會致使消息的重試
    . 客戶端在使用事務的前提下, rollBack()或者沒有commit()消息;
    . 未使用事務的前提下, 使用ACKNOWLEDGE模式, 進行了session.recover()
2. 重試多少次, 每次間隔
    默認是6次, 間隔爲1s
3. 超太重發的次數, 消息會被放入死信隊列中
死信隊列的處理

能夠經過individualDeadLetterStrategy來設置各自的死信隊列, 也能夠設置過時redis

保證消息的冪等性
能夠根據messageId來作校驗, 能夠使用redis來作
相關文章
相關標籤/搜索