延時消息是指消息被髮送之後,並不想讓消費者當即拿到消息,而是等待指定時間後,消費者纔拿到這個消息進行消費。java
使用延時消息的典型場景,例如:數據庫
這些場景對應的解決方案,包括:服務器
除此以外,還可使用消息隊列來實現延時消息,例如 RocketMQ。markdown
RocketMQ 是一個分佈式消息和流數據平臺,具備低延遲、高性能、高可靠性、萬億級容量和靈活的可擴展性。RocketMQ 是2012年阿里巴巴開源的第三代分佈式消息中間件。架構
咱們的系統完成某項操做以後,會推送事件消息到業務方的接口。當咱們調用業務方的通知接口返回值爲成功時,表示本次推送消息成功;當返回值爲失敗時,則會屢次推送消息,直到返回成功爲止(保證至少成功一次)。分佈式
當咱們推送失敗後,雖然會進行屢次推送消息,但並非當即進行。會有必定的延遲,並按照必定的規則進行推送消息。ide
例如:1小時後嘗試推送、3小時後嘗試推送、1天后嘗試推送、3天后嘗試推送等等。所以,考慮使用延時消息實現該功能。性能
生產者負責產生消息,生產者向消息服務器發送由業務應用程序系統生成的消息。測試
首先,定義一個支持延時發送的 AbstractProducer。ui
abstract class AbstractProducer :ProducerBean() { var producerId: String? = null var topic: String? = null var tag: String?=null var timeoutMillis: Int? = null var delaySendTimeMills: Long? = null val log = LogFactory.getLog(this.javaClass) open fun sendMessage(messageBody: Any, tag: String) { val msgBody = JSON.toJSONString(messageBody) val message = Message(topic, tag, msgBody.toByteArray()) if (delaySendTimeMills != null) { val startDeliverTime = System.currentTimeMillis() + delaySendTimeMills!! message.startDeliverTime = startDeliverTime log.info( "send delay message producer startDeliverTime:${startDeliverTime}currentTime :${System.currentTimeMillis()}") } val logMessageId = buildLogMessageId(message) try { val sendResult = send(message) log.info(logMessageId + "producer messageId: " + sendResult.getMessageId() + "\n" + "messageBody: " + msgBody) } catch (e: Exception) { log.error(logMessageId + "messageBody: " + msgBody + "\n" + " error: " + e.message, e) } } fun buildLogMessageId(message: Message): String { return "topic: " + message.topic + "\n" + "producer: " + producerId + "\n" + "tag: " + message.tag + "\n" + "key: " + message.key + "\n" } } 複製代碼
根據業務須要,增長一個支持重試機制的 Producer
@Component @ConfigurationProperties("mqs.ons.producers.xxx-producer") @Configuration @Data class CleanReportPushEventProducer :AbstractProducer() { lateinit var delaySecondList:List<Long> fun sendMessage(messageBody: CleanReportPushEventMessage){ //重試超過次數以後再也不發事件 if (delaySecondList!=null) { if(messageBody.times>=delaySecondList.size){ return } val msgBody = JSON.toJSONString(messageBody) val message = Message(topic, tag, msgBody.toByteArray()) val delayTimeMills = delaySecondList[messageBody.times]*1000L message.startDeliverTime = System.currentTimeMillis() + delayTimeMills log.info( "messageBody: " + msgBody+ "startDeliverTime: "+message.startDeliverTime ) val logMessageId = buildLogMessageId(message) try { val sendResult = send(message) log.info(logMessageId + "producer messageId: " + sendResult.getMessageId() + "\n" + "messageBody: " + msgBody) } catch (e: Exception) { log.error(logMessageId + "messageBody: " + msgBody + "\n" + " error: " + e.message, e) } } } } 複製代碼
在 CleanReportPushEventProducer 中,超過了重試的次數就不會再發送消息了。
每一次延時消息的時間也會不一樣,所以須要根據重試的次數來獲取這個delayTimeMills 。
經過 System.currentTimeMillis() + delayTimeMills 能夠設置 message 的 startDeliverTime。而後調用 send(message) 便可發送延時消息。
咱們使用商用版的 RocketMQ,所以支持精度爲秒級別的延遲消息。在開源版本中,RocketMQ 只支持18個特定級別的延遲消息。:(
消費者負責消費消息,消費者從消息服務器拉取信息並將其輸入用戶應用程序。
定義 Push 類型的 AbstractConsumer:
@Data abstract class AbstractConsumer ():MessageListener{ var consumerId: String? = null lateinit var subscribeOptions: List<SubscribeOptions> var threadNums: Int? = null val log = LogFactory.getLog(this.javaClass) override fun consume(message: Message, context: ConsumeContext): Action { val logMessageId = buildLogMessageId(message) val body = String(message.body) try { log.info(logMessageId + " body: " + body) val result = consumeInternal(message, context, JSON.parseObject(body, getMessageBodyType(message.tag))) log.info(logMessageId + " result: " + result.name) return result } catch (e: Exception) { if (message.reconsumeTimes >= 3) { log.error(logMessageId + " error: " + e.message, e) } return Action.ReconsumeLater } } abstract fun getMessageBodyType(tag: String): Type? abstract fun consumeInternal(message: Message, context: ConsumeContext, obj: Any): Action protected fun buildLogMessageId(message: Message): String { return "topic: " + message.topic + "\n" + "consumer: " + consumerId + "\n" + "tag: " + message.tag + "\n" + "key: " + message.key + "\n" + "MsgId:" + message.msgID + "\n" + "BornTimestamp" + message.bornTimestamp + "\n" + "StartDeliverTime:" + message.startDeliverTime + "\n" + "ReconsumeTimes:" + message.reconsumeTimes + "\n" } } 複製代碼
再定義具體的消費者,而且在消費失敗以後可以再發送一次消息。
@Configuration @ConfigurationProperties("mqs.ons.consumers.clean-report-push-event-consumer") @Data class CleanReportPushEventConsumer(val cleanReportService: CleanReportService,val eventProducer:CleanReportPushEventProducer):AbstractConsumer() { val logger: Logger = LoggerFactory.getLogger(this.javaClass) override fun consumeInternal(message: Message, context: ConsumeContext, obj: Any): Action { if(obj is CleanReportPushEventMessage){ //清除事件 logger.info("consumer clean-report event report_id:${obj.id} ") //消費失敗以後再發送一次消息 if(!cleanReportService.sendCleanReportEvent(obj.id)){ val times = obj.times+1 eventProducer.sendMessage(CleanReportPushEventMessage(obj.id,times)) } } return Action.CommitMessage } override fun getMessageBodyType(tag: String): Type? { return CleanReportPushEventMessage::class.java } } 複製代碼
其中,cleanReportService 的 sendCleanReportEvent() 會經過 http 的方式調用業務方提供的接口,進行事件消息的推送。若是推送失敗了,則會進行下一次的推送。(這裏使用了 eventProducer 的 sendMessage() 方法再次投遞消息,是由於要根據調用的http接口返回的內容來判斷消息是否發送成功。)
最後,定義 ConsumerFactory
@Component class ConsumerFactory(val consumers: List<AbstractConsumer>,val aliyunOnsOptions: AliyunOnsOptions) { val logger: Logger = LoggerFactory.getLogger(this.javaClass) @PostConstruct fun start() { CompletableFuture.runAsync{ consumers.stream().forEach { val properties = buildProperties(it.consumerId!!, it.threadNums) val consumer = ONSFactory.createConsumer(properties) if (it.subscribeOptions != null && !it.subscribeOptions!!.isEmpty()) { for (options in it.subscribeOptions!!) { consumer.subscribe(options.topic, options.tag, it) } consumer.start() val message = "\n".plus( it.subscribeOptions!!.stream().map{ a -> String.format("topic: %s, tag: %s has been started", a.topic, a.tag)} .collect(Collectors.toList<Any>())) logger.info(String.format("consumer: %s\n", message)) } } } } private fun buildProperties(consumerId: String,threadNums: Int?): Properties { val properties = Properties() properties.put(PropertyKeyConst.ConsumerId, consumerId) properties.put(PropertyKeyConst.AccessKey, aliyunOnsOptions.accessKey) properties.put(PropertyKeyConst.SecretKey, aliyunOnsOptions.secretKey) if (StringUtils.isNotEmpty(aliyunOnsOptions.onsAddr)) { properties.put(PropertyKeyConst.ONSAddr, aliyunOnsOptions.onsAddr) } else { // 測試環境接入RocketMQ properties.put(PropertyKeyConst.NAMESRV_ADDR, aliyunOnsOptions.nameServerAddress) } properties.put(PropertyKeyConst.ConsumeThreadNums, threadNums!!) return properties } } 複製代碼
正如本文開頭曾介紹過,可使用多種方式來實現延時消息。然而,咱們的系統自己就大量使用了 RocketMQ,藉助成熟的 RocketMQ 實現延時消息不失爲一種可靠而又方便的方式。
Java與Android技術棧:每週更新推送原創技術文章,歡迎掃描下方的公衆號二維碼並關注,期待與您的共同成長和進步。