不少時候咱們會有延時處理一個任務的需求,好比說:html
下面咱們來分別探討一下幾種實現方案:java
Java中的DelayQueue位於java.util.concurrent包下,本質是由PriorityQueue和BlockingQueue實現的阻塞優先級隊列。redis
放入隊列的元素須要實現Delayed接口:數據庫
public interface Delayed extends Comparable<Delayed> { /** * Returns the remaining delay associated with this object, in the * given time unit. * * @param unit the time unit * @return the remaining delay; zero or negative values indicate * that the delay has already elapsed */ long getDelay(TimeUnit unit); }
經過實現這個接口,來完成對隊列中元素,按照時間延遲前後排序的目的。apache
從隊列中取元素:網絡
/** * Retrieves and removes the head of this queue, waiting if necessary * until an element with an expired delay is available on this queue. * * @return the head of this queue * @throws InterruptedException {@inheritDoc} */ public E take() throws InterruptedException { final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { for (;;) { E first = q.peek(); if (first == null) available.await(); else { long delay = first.getDelay(TimeUnit.NANOSECONDS); if (delay <= 0) return q.poll(); else if (leader != null) available.await(); else { Thread thisThread = Thread.currentThread(); leader = thisThread; try { available.awaitNanos(delay); } finally { if (leader == thisThread) leader = null; } } } } } finally { if (leader == null && q.peek() != null) available.signal(); lock.unlock(); } }
能夠看到,在這段代碼裏,在第一個元素的延遲時間還沒到的狀況下:session
向隊列中放入元素:數據結構
/** * Inserts the specified element into this delay queue. * * @param e the element to add * @return <tt>true</tt> * @throws NullPointerException if the specified element is null */ public boolean offer(E e) { final ReentrantLock lock = this.lock; lock.lock(); try { q.offer(e); if (q.peek() == e) { leader = null; available.signal(); } return true; } finally { lock.unlock(); } }
在放入元素的時候,會喚醒等待中的讀線程。分佈式
若是咱們不考慮分佈式運行和任務持久化的話,Java中的DelayQueue是一個很理想的方案,精巧好用。性能
可是若是咱們須要分佈式運行和任務持久化,就須要引入一些外部組件。
前文咱們看到,能夠經過優先級隊列來實現延遲隊列的功能。
Redis提供了不少數據結構,其中的zset是一種有序的數據結構;咱們能夠經過Redis中的zset來實現一個延遲隊列。
基本的方法就是使用時間戳做爲元素的score存入zset。
redis> ZADD delayqueue <future_timestamp> "messsage"
獲取全部已經「就緒」的message,而且刪除message。
redis> MULTI redis> ZRANGEBYSCORE delayqueue 0 <current_timestamp> redis> ZREMRANGEBYSCORE delayqueue 0 <current_timestamp> redis> EXEC
可是這個方案也有一些問題:
Redis事務雖然保證了一致性和隔離性,可是並無提供回滾功能。消息處理失敗是不能被恢復的,若是處理某條消息的線程崩潰或機器宕機,這條未被處理不能被自動的再次處理。
也有考慮過將分爲TODO和Doing兩條隊列:
先從TODO隊列中取出任務,放入Doing中,再開始處理;若是停留在Doing隊列總太久,則從新放入TODO隊列。
可是因爲Redis的事務特性,並不能作到徹底可靠;而且檢查Doing超時的邏輯也略複雜。
那麼有沒有一個成熟的消息隊列能夠支持延遲投遞消息的功能呢?
答案固然是有的,本文的標題就是使用RabbitMQ實現DelayQueue。
這是RabbitMQ衆多隱藏的強大特性中的一個,能夠輕鬆的下降代碼的複雜度,實現DelayQueue的功能。
咱們須要兩個隊列,一個用來作主隊列,真正的投遞消息;另外一個用來延遲處理消息。
ConnectionFactory factory = new ConnectionFactory(); factory.setHost(host); factory.setPort(port); connection = factory.newConnection(); channel = connection.createChannel(); channel.queueDeclare("MAIN_QUEUE", true, false, false, null); channel.queueBind("MAIN_QUEUE", "amq.direct", "MAIN_QUEUE"); HashMap<String, Object> arguments = new HashMap<String, Object>(); arguments.put("x-dead-letter-exchange", "amq.direct"); arguments.put("x-dead-letter-routing-key", "MAIN_QUEUE"); channel.queueDeclare("DELAY_QUEUE", true, false, false, arguments);
放入延遲消息:
AMQP.BasicProperties.Builder builder = new AMQP.BasicProperties.Builder(); AMQP.BasicProperties properties = builder.expiration(String.valueOf(task.getDelayMillis())).deliveryMode(2).build(); channel.basicPublish("", "DELAY_QUEUE", properties, SerializationUtils.serialize(task));
而關鍵點,就在於 x-dead-letter-exchange 和 x-dead-letter-routing-key 兩個參數上。這兩個參數說明了:消息過時後的處理方式 --> 投遞到咱們指定的MAIN_QUEUE;而後咱們只須要在MAIN_QUEUE中等待消息投遞便可。
RabbitMQ自己提供了消息持久化和沒有收到ACK的重投遞功能,這樣咱們就能夠實現一個高可靠的分佈式延遲消息隊列了。
上面講述的RabbitMQ定時任務方案有問題,RabbitMQ TTL文檔 中寫道:
Caveats
While consumers never see expired messages, only when expired messages reach the head of a queue will they actually be discarded (or dead-lettered). When setting a per-queue TTL this is not a problem, since expired messages are always at the head of the queue. When setting per-message TTL however, expired messages can queue up behind non-expired ones until the latter are consumed or expired. Hence resources used by such expired messages will not be freed, and they will be counted in queue statistics (e.g. the number of messages in the queue).
per-queue TTL不會有問題,由於快要過時的消息老是在隊列的前邊;可是若是使用per-message TTL的話,過時的消息有可能會在未過時的消息後邊,直到前邊的消息過時或者被消費。由於RabbitMQ保證過時的消息必定不會被消費者消費,可是不能保證消息過時就會從隊列中移除。
ActiveMQ from version 5.4 has an optional persistent scheduler built into the ActiveMQ message broker.
能夠支持定時、延遲投遞、重複投遞和Cron調度。
在配置文件中,啓用<broker ... schedulerSupport="true">
選項後便可使用。
MessageProducer producer = session.createProducer(destination); TextMessage message = session.createTextMessage("test msg"); long delay = 30 * 1000; long period = 10 * 1000; int repeat = 9; message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, delay); message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_PERIOD, period); message.setIntProperty(ScheduledMessage.AMQ_SCHEDULED_REPEAT, repeat); producer.send(message);
MessageProducer producer = session.createProducer(destination); TextMessage message = session.createTextMessage("test msg"); message.setStringProperty(ScheduledMessage.AMQ_SCHEDULED_CRON, "0 * * * *"); producer.send(message);
因爲ActiveMQ採用的是相似於Java中DelayQueue的方式,經過先將消息排序再定時觸發的方式來實現延遲消息。在往隊列中投遞大量(10w+)定時消息以後,ActiveMQ的性能將會變得接近不可用,大量的消息擠壓得不到投遞。
RocketMQ 支持定時消息,可是不支持任意時間精度,支持特定的level,例如定時5s,10s,1m等。
經過MySQL等數據庫記錄消息應該被投遞的時間,而後循環進行查找,並把當前時間應該投遞的消息放入普通的消息隊列。