ActiveMQ JMS延遲隊列

JMS的4個特性:html

1.P2P,一個消息只有一我的消費java

2. Pub/Sub ,一個消息能夠有多我的消費apache

3.分組訂閱,一個消息在一個同名組中只有一我的消費網絡

4.延遲隊列,能夠設置消息延遲發送session

 

1、mq配置支持: schedulerSupport="true"tcp

 

2、java代碼支持ide

1.主要就是在message上添加屬性,有4種類型的屬性 a.delay,ScheduledMessage.AMQ_SCHEDULED_DELAY延遲投遞的時間,單位毫秒 b.repeat,ScheduledMessage.AMQ_SCHEDULED_REPEAT重複投遞次數 c.peroid,ScheduledMessage.AMQ_SCHEDULED_PERIOD重複投遞的時間間隔 d.corn,ScheduledMessage.AMQ_SCHEDULED_CRON Cron表達式 延遲隊列消息post

2.須要藉助MessagePostProcessor在convertAndSend方法中對message進行延遲屬性設置ui

jmsTemplate.convertAndSend(destArr[i], message, delayMessagePostProcessor);url

    public void convertAndSend(
            String destinationName, final Object message, final MessagePostProcessor postProcessor)
        throws JmsException {

        send(destinationName, new MessageCreator() {
            @Override
            public Message createMessage(Session session) throws JMSException {
                Message msg = getRequiredMessageConverter().toMessage(message, session);
                return postProcessor.postProcessMessage(msg);
            }
        });
    }

3、程序支持

最好有消息發送和消息處理記錄,以便覈對

4、應用場景

1)2個小時後給用戶發送短信。
2)15分鐘後關閉網絡鏈接。
3)2分鐘後再次嘗試回調。

 

ActiveMQ對消息延時和定時投遞作了很好的支持,其內部啓動Scheduled來對該功能支持,也提供了一個封裝的消息類型:org.apache.activemq.ScheduledMessage,只須要把幾個描述消息定時調度方式的參數做爲屬性添加到消息,broker端的調度器就會按照咱們想要的行爲去處理消息。

 

Property name type description
AMQ_SCHEDULED_DELAY long 延遲投遞的時間
AMQ_SCHEDULED_PERIOD long 重複投遞的時間間隔
AMQ_SCHEDULED_REPEAT int 重複投遞次數
AMQ_SCHEDULED_CRON String Cron表達式

下面咱們演示一下間隔性重複投遞;

生產者:

 

package cn.slimsmart.study.activemq;
 
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
 
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.ScheduledMessage;
 
public class Producer {
 
    public static final String broker_url = "failover:(tcp://10.1.199.169:61616)";
    private static String queue_name = "test.queue";
 
    public static void main(String[] args) throws Exception {
        ConnectionFactory factory = new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER, ActiveMQConnection.DEFAULT_PASSWORD, broker_url);
        // 經過工廠建立一個鏈接
        Connection connection = factory.createConnection();
        // 啓動鏈接
        connection.start();
        // 建立一個session會話 事務 自動ack
        Session session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
        // 建立一個消息隊列
        Destination destination = session.createQueue(queue_name);
        // 建立生產者
        MessageProducer producer = session.createProducer(destination);
        // 消息持久化
        producer.setDeliveryMode(DeliveryMode.PERSISTENT);
        TextMessage message = session.createTextMessage("test delay message:" + System.currentTimeMillis());
        long time = 60 * 1000;// 延時1min
        long period = 10 * 1000;// 每一個10s
        int repeat = 6;// 6次
        message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, time);
        message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_PERIOD, period);
        message.setIntProperty(ScheduledMessage.AMQ_SCHEDULED_REPEAT, repeat);
        // 發送消息
        producer.send(message);
        session.commit();
        producer.close();
        session.close();
        connection.close();
    }
}

消費者代碼:

package cn.slimsmart.study.activemq;
 
import java.util.concurrent.CountDownLatch;
 
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.Session;
import javax.jms.TextMessage;
 
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
 
public class Consumer {
 
    public static final String broker_url = "failover:(tcp://10.1.199.169:61616)";
    private static String queue_name = "test.queue";
 
    public static void main(String[] args) throws Exception {
        ConnectionFactory factory = new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER, ActiveMQConnection.DEFAULT_PASSWORD, broker_url);
        // 經過工廠建立一個鏈接
        Connection connection = factory.createConnection();
        // 啓動鏈接
        connection.start();
        // 建立一個session會話 事務 自動ack
        Session session = connection.createSession(Boolean.TRUE, Session.CLIENT_ACKNOWLEDGE);
        // 建立一個消息隊列
        Destination destination = session.createQueue(queue_name);
        // 建立消費者
        MessageConsumer consumer = session.createConsumer(destination);
        consumer.setMessageListener(new MessageListener() {
            @Override
            public void onMessage(Message message) {
                try {
                    System.out.println("receive message :" + ((TextMessage) message).getText());
                    message.acknowledge();
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }
        });
        new CountDownLatch(1).await();
    }
}

參考:

1.延時和定時消息投遞

2.消息屬性定義

相關文章
相關標籤/搜索