JMS的4個特性:html
1.P2P,一個消息只有一我的消費java
2. Pub/Sub ,一個消息能夠有多我的消費apache
3.分組訂閱,一個消息在一個同名組中只有一我的消費網絡
4.延遲隊列,能夠設置消息延遲發送session
1、mq配置支持: schedulerSupport="true"tcp
2、java代碼支持ide
1.主要就是在message上添加屬性,有4種類型的屬性 a.delay,ScheduledMessage.AMQ_SCHEDULED_DELAY延遲投遞的時間,單位毫秒 b.repeat,ScheduledMessage.AMQ_SCHEDULED_REPEAT重複投遞次數 c.peroid,ScheduledMessage.AMQ_SCHEDULED_PERIOD重複投遞的時間間隔 d.corn,ScheduledMessage.AMQ_SCHEDULED_CRON Cron表達式 延遲隊列消息post
2.須要藉助MessagePostProcessor在convertAndSend方法中對message進行延遲屬性設置ui
jmsTemplate.convertAndSend(destArr[i], message, delayMessagePostProcessor);url
public void convertAndSend(
String destinationName, final Object message, final MessagePostProcessor postProcessor)
throws JmsException {
send(destinationName, new MessageCreator() {
@Override
public Message createMessage(Session session) throws JMSException {
Message msg = getRequiredMessageConverter().toMessage(message, session);
return postProcessor.postProcessMessage(msg);
}
});
}
3、程序支持
最好有消息發送和消息處理記錄,以便覈對
4、應用場景
1)2個小時後給用戶發送短信。
2)15分鐘後關閉網絡鏈接。
3)2分鐘後再次嘗試回調。
ActiveMQ對消息延時和定時投遞作了很好的支持,其內部啓動Scheduled來對該功能支持,也提供了一個封裝的消息類型:org.apache.activemq.ScheduledMessage,只須要把幾個描述消息定時調度方式的參數做爲屬性添加到消息,broker端的調度器就會按照咱們想要的行爲去處理消息。
Property name | type | description |
---|---|---|
AMQ_SCHEDULED_DELAY | long | 延遲投遞的時間 |
AMQ_SCHEDULED_PERIOD | long | 重複投遞的時間間隔 |
AMQ_SCHEDULED_REPEAT | int | 重複投遞次數 |
AMQ_SCHEDULED_CRON | String | Cron表達式 |
下面咱們演示一下間隔性重複投遞;
生產者:
package cn.slimsmart.study.activemq; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.DeliveryMode; import javax.jms.Destination; import javax.jms.MessageProducer; import javax.jms.Session; import javax.jms.TextMessage; import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.ScheduledMessage; public class Producer { public static final String broker_url = "failover:(tcp://10.1.199.169:61616)"; private static String queue_name = "test.queue"; public static void main(String[] args) throws Exception { ConnectionFactory factory = new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER, ActiveMQConnection.DEFAULT_PASSWORD, broker_url); // 經過工廠建立一個鏈接 Connection connection = factory.createConnection(); // 啓動鏈接 connection.start(); // 建立一個session會話 事務 自動ack Session session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE); // 建立一個消息隊列 Destination destination = session.createQueue(queue_name); // 建立生產者 MessageProducer producer = session.createProducer(destination); // 消息持久化 producer.setDeliveryMode(DeliveryMode.PERSISTENT); TextMessage message = session.createTextMessage("test delay message:" + System.currentTimeMillis()); long time = 60 * 1000;// 延時1min long period = 10 * 1000;// 每一個10s int repeat = 6;// 6次 message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, time); message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_PERIOD, period); message.setIntProperty(ScheduledMessage.AMQ_SCHEDULED_REPEAT, repeat); // 發送消息 producer.send(message); session.commit(); producer.close(); session.close(); connection.close(); } }
消費者代碼:
package cn.slimsmart.study.activemq; import java.util.concurrent.CountDownLatch; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.MessageListener; import javax.jms.Session; import javax.jms.TextMessage; import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnectionFactory; public class Consumer { public static final String broker_url = "failover:(tcp://10.1.199.169:61616)"; private static String queue_name = "test.queue"; public static void main(String[] args) throws Exception { ConnectionFactory factory = new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER, ActiveMQConnection.DEFAULT_PASSWORD, broker_url); // 經過工廠建立一個鏈接 Connection connection = factory.createConnection(); // 啓動鏈接 connection.start(); // 建立一個session會話 事務 自動ack Session session = connection.createSession(Boolean.TRUE, Session.CLIENT_ACKNOWLEDGE); // 建立一個消息隊列 Destination destination = session.createQueue(queue_name); // 建立消費者 MessageConsumer consumer = session.createConsumer(destination); consumer.setMessageListener(new MessageListener() { @Override public void onMessage(Message message) { try { System.out.println("receive message :" + ((TextMessage) message).getText()); message.acknowledge(); } catch (JMSException e) { e.printStackTrace(); } } }); new CountDownLatch(1).await(); } }
參考:
2.消息屬性定義