rabbitmq實現分佈式定時任務

在java web開發中,你們常常會用到定時任務。比較經常使用的類庫有:Timer、ScheduledExecutor 、Quartz、Spring Scheduler。
Timer和ScheduleExecutor都不支持持久化和分佈式,宕機或重啓則待執行的任務丟失。部署多個實例時會同時觸發任務。
Quartz若支持持久化和分佈式須要較多的配置,貌似是7張表。
Spring Scheduler支持持久化和分佈式也是須要相應的插件。
本文將介紹下另外一種思路,使用rabbitmq實現定時任務。java

死信隊列

DLX, Dead-Letter-Exchange。當消息在一個隊列中變成死信以後,它能被從新publish到另外一個Exchange,這個Exchange就是DLX。消息變成死信一貫有一下幾種狀況:web

  • 消息被拒絕(basic.reject/ basic.nack)而且requeue=false
  • 消息TTL過時
  • 隊列達到最大長度
    變爲死信隊列也很簡單,爲隊列設置一個屬性Dead letter exchange指定死信要從新publish到的Exchange就好了。

思路


rabbitMq配置兩個exchange:exchange-product負責接收生產者的消息、DLX負責接收死信。兩個queue:queue-dead綁定在exchange-product上,queue-consume綁定在DLX上。
消息傳遞流程:spring

  1. 生產者發送消息到exchange-product後,因爲這個exchange Binding了queue-dead,所以消息會傳到queue-dead。
  2. 因爲這個隊列沒有消費者,所以queue-dead的消息會超時。因爲queue-dead設置了Dead Letter exchange屬性,指定了死信要publish到DLX,所以消息會到DLX。
  3. DLX這個exchange Binding了queue-consume。消息會到queue-consume中。
  4. 因爲消費者鏈接了queue-consume,最終消息傳到消費者。

代碼

像這種定時任務通常都是須要常常執行的。所以,隊列,exchange都須要持久化。因此能夠在控制檯去建立隊列和exchange。所以不須要在代碼裏建立隊列以及exchange,只須要發送和接收消息就好了。json

//消費者
@Component
public class MessageConsumer {
    @RabbitListener(queues = "queue-consume")//指定要消費的隊列
    public void receive(byte[] body) {
        long end = System.currentTimeMillis();
        String start = new String(body);//解析消息體,內容傳的消息發送時間
        System.out.println("start: "+ start);
        System.out.println("end: "+ end);
        System.out.println("offset: "+(Long.parseLong(start)-end));//計算實際定時時間
    }
}
//生產者
@Component
public class MessageProducer {
    @Autowired
    private RabbitTemplate rabbitTemplate;
    public void send(String str){
        MessageProperties messageProperties = new MessageProperties();
        Message message = new Message(str.getBytes(), messageProperties);
        messageProperties.setContentType("json");
        messageProperties.setExpiration("10000");//設置消息超時時間爲10秒
        rabbitTemplate.send("exchange-product","",message);//發送消息到exchange-product
    }
}

演示的例子很簡單。爲了方便,是在spring的環境下寫的。exchange配置的類型爲fanout,所以不須要指定routing key。實際應用時能夠換成別的類型。也沒有指定手動ack。實際應用時仍是建議手動ack。網絡

測試

public class ApplicationTests {
	@Autowired
	MessageProducer messageProducer;
	@Test
	public void push() {
		long start = System.currentTimeMillis();
		messageProducer.send(""+start);
	}
}


能夠看到最終偏差大概在0.1秒。因爲我本機和rabbitmq不在一個局域網,因此網絡開銷比較大。局域網鏈接估計偏差會小一些。分佈式

缺點

因爲rabbitmq的消息,在沒有優先級的狀況下,是會按順序消費,判斷超時也是按順序的。如有兩條消息message1和message2。message1的定時時間(即超時時間)爲10秒。message2的定時時間爲5秒。若message1先發送,則須要先等待message1超時後publish到DLX,message2才能判斷超時publish到DLX。最終message2可能須要等10秒。
那這種定時還有什麼用呢
剛剛舉得例子是由於兩條消息的超時時間不一樣。若兩條消息超時時間相同便沒有問題了。所以這種定時只適用於,同一業務的消息定時時間固定。多個業務建多個隊列和exchange就行了。測試

相關文章
相關標籤/搜索