延時消息是指消息被髮送之後,並不想讓消費者當即拿到消息,而是等待指定時間後,消費者纔拿到這個消息進行消費。java
使用延時消息的典型場景,例如:數據庫
這些場景對應的解決方案,包括:服務器
除此以外,還可使用消息隊列來實現延時消息,例如 RocketMQ。架構
RocketMQ 是一個分佈式消息和流數據平臺,具備低延遲、高性能、高可靠性、萬億級容量和靈活的可擴展性。RocketMQ 是2012年阿里巴巴開源的第三代分佈式消息中間件。分佈式
咱們的系統完成某項操做以後,會推送事件消息到業務方的接口。當咱們調用業務方的通知接口返回值爲成功時,表示本次推送消息成功;當返回值爲失敗時,則會屢次推送消息,直到返回成功爲止(保證至少成功一次)。ide
當咱們推送失敗後,雖然會進行屢次推送消息,但並非當即進行。會有必定的延遲,並按照必定的規則進行推送消息。性能
例如:1小時後嘗試推送、3小時後嘗試推送、1天后嘗試推送、3天后嘗試推送等等。所以,考慮使用延時消息實現該功能。測試
生產者負責產生消息,生產者向消息服務器發送由業務應用程序系統生成的消息。ui
首先,定義一個支持延時發送的 AbstractProducer。this
abstract class AbstractProducer :ProducerBean() {
var producerId: String? = null
var topic: String? = null
var tag: String?=null
var timeoutMillis: Int? = null
var delaySendTimeMills: Long? = null
val log = LogFactory.getLog(this.javaClass)
open fun sendMessage(messageBody: Any, tag: String) {
val msgBody = JSON.toJSONString(messageBody)
val message = Message(topic, tag, msgBody.toByteArray())
if (delaySendTimeMills != null) {
val startDeliverTime = System.currentTimeMillis() + delaySendTimeMills!!
message.startDeliverTime = startDeliverTime
log.info( "send delay message producer startDeliverTime:${startDeliverTime}currentTime :${System.currentTimeMillis()}")
}
val logMessageId = buildLogMessageId(message)
try {
val sendResult = send(message)
log.info(logMessageId + "producer messageId: " + sendResult.getMessageId() + "\n" + "messageBody: " + msgBody)
} catch (e: Exception) {
log.error(logMessageId + "messageBody: " + msgBody + "\n" + " error: " + e.message, e)
}
}
fun buildLogMessageId(message: Message): String {
return "topic: " + message.topic + "\n" +
"producer: " + producerId + "\n" +
"tag: " + message.tag + "\n" +
"key: " + message.key + "\n"
}
}
複製代碼
根據業務須要,增長一個支持重試機制的 Producer
@Component
@ConfigurationProperties("mqs.ons.producers.xxx-producer")
@Configuration
@Data
class CleanReportPushEventProducer :AbstractProducer() {
lateinit var delaySecondList:List<Long>
fun sendMessage(messageBody: CleanReportPushEventMessage){
//重試超過次數以後再也不發事件
if (delaySecondList!=null) {
if(messageBody.times>=delaySecondList.size){
return
}
val msgBody = JSON.toJSONString(messageBody)
val message = Message(topic, tag, msgBody.toByteArray())
val delayTimeMills = delaySecondList[messageBody.times]*1000L
message.startDeliverTime = System.currentTimeMillis() + delayTimeMills
log.info( "messageBody: " + msgBody+ "startDeliverTime: "+message.startDeliverTime )
val logMessageId = buildLogMessageId(message)
try {
val sendResult = send(message)
log.info(logMessageId + "producer messageId: " + sendResult.getMessageId() + "\n" + "messageBody: " + msgBody)
} catch (e: Exception) {
log.error(logMessageId + "messageBody: " + msgBody + "\n" + " error: " + e.message, e)
}
}
}
}
複製代碼
在 CleanReportPushEventProducer 中,超過了重試的次數就不會再發送消息了。
每一次延時消息的時間也會不一樣,所以須要根據重試的次數來獲取這個delayTimeMills 。
經過 System.currentTimeMillis() + delayTimeMills 能夠設置 message 的 startDeliverTime。而後調用 send(message) 便可發送延時消息。
咱們使用商用版的 RocketMQ,所以支持精度爲秒級別的延遲消息。在開源版本中,RocketMQ 只支持18個特定級別的延遲消息。:(
消費者負責消費消息,消費者從消息服務器拉取信息並將其輸入用戶應用程序。
定義 Push 類型的 AbstractConsumer:
@Data
abstract class AbstractConsumer ():MessageListener{
var consumerId: String? = null
lateinit var subscribeOptions: List<SubscribeOptions>
var threadNums: Int? = null
val log = LogFactory.getLog(this.javaClass)
override fun consume(message: Message, context: ConsumeContext): Action {
val logMessageId = buildLogMessageId(message)
val body = String(message.body)
try {
log.info(logMessageId + " body: " + body)
val result = consumeInternal(message, context, JSON.parseObject(body, getMessageBodyType(message.tag)))
log.info(logMessageId + " result: " + result.name)
return result
} catch (e: Exception) {
if (message.reconsumeTimes >= 3) {
log.error(logMessageId + " error: " + e.message, e)
}
return Action.ReconsumeLater
}
}
abstract fun getMessageBodyType(tag: String): Type?
abstract fun consumeInternal(message: Message, context: ConsumeContext, obj: Any): Action
protected fun buildLogMessageId(message: Message): String {
return "topic: " + message.topic + "\n" +
"consumer: " + consumerId + "\n" +
"tag: " + message.tag + "\n" +
"key: " + message.key + "\n" +
"MsgId:" + message.msgID + "\n" +
"BornTimestamp" + message.bornTimestamp + "\n" +
"StartDeliverTime:" + message.startDeliverTime + "\n" +
"ReconsumeTimes:" + message.reconsumeTimes + "\n"
}
}
複製代碼
再定義具體的消費者,而且在消費失敗以後可以再發送一次消息。
@Configuration
@ConfigurationProperties("mqs.ons.consumers.clean-report-push-event-consumer")
@Data
class CleanReportPushEventConsumer(val cleanReportService: CleanReportService,val eventProducer:CleanReportPushEventProducer):AbstractConsumer() {
val logger: Logger = LoggerFactory.getLogger(this.javaClass)
override fun consumeInternal(message: Message, context: ConsumeContext, obj: Any): Action {
if(obj is CleanReportPushEventMessage){
//清除事件
logger.info("consumer clean-report event report_id:${obj.id} ")
//消費失敗以後再發送一次消息
if(!cleanReportService.sendCleanReportEvent(obj.id)){
val times = obj.times+1
eventProducer.sendMessage(CleanReportPushEventMessage(obj.id,times))
}
}
return Action.CommitMessage
}
override fun getMessageBodyType(tag: String): Type? {
return CleanReportPushEventMessage::class.java
}
}
複製代碼
其中,cleanReportService 的 sendCleanReportEvent() 會經過 http 的方式調用業務方提供的接口,進行事件消息的推送。若是推送失敗了,則會進行下一次的推送。(這裏使用了 eventProducer 的 sendMessage() 方法再次投遞消息,是由於要根據調用的http接口返回的內容來判斷消息是否發送成功。)
最後,定義 ConsumerFactory
@Component
class ConsumerFactory(val consumers: List<AbstractConsumer>,val aliyunOnsOptions: AliyunOnsOptions) {
val logger: Logger = LoggerFactory.getLogger(this.javaClass)
@PostConstruct
fun start() {
CompletableFuture.runAsync{
consumers.stream().forEach {
val properties = buildProperties(it.consumerId!!, it.threadNums)
val consumer = ONSFactory.createConsumer(properties)
if (it.subscribeOptions != null && !it.subscribeOptions!!.isEmpty()) {
for (options in it.subscribeOptions!!) {
consumer.subscribe(options.topic, options.tag, it)
}
consumer.start()
val message = "\n".plus(
it.subscribeOptions!!.stream().map{ a -> String.format("topic: %s, tag: %s has been started", a.topic, a.tag)}
.collect(Collectors.toList<Any>()))
logger.info(String.format("consumer: %s\n", message))
}
}
}
}
private fun buildProperties(consumerId: String,threadNums: Int?): Properties {
val properties = Properties()
properties.put(PropertyKeyConst.ConsumerId, consumerId)
properties.put(PropertyKeyConst.AccessKey, aliyunOnsOptions.accessKey)
properties.put(PropertyKeyConst.SecretKey, aliyunOnsOptions.secretKey)
if (StringUtils.isNotEmpty(aliyunOnsOptions.onsAddr)) {
properties.put(PropertyKeyConst.ONSAddr, aliyunOnsOptions.onsAddr)
} else {
// 測試環境接入RocketMQ
properties.put(PropertyKeyConst.NAMESRV_ADDR, aliyunOnsOptions.nameServerAddress)
}
properties.put(PropertyKeyConst.ConsumeThreadNums, threadNums!!)
return properties
}
}
複製代碼
正如本文開頭曾介紹過,可使用多種方式來實現延時消息。然而,咱們的系統自己就大量使用了 RocketMQ,藉助成熟的 RocketMQ 實現延時消息不失爲一種可靠而又方便的方式。
Java與Android技術棧:每週更新推送原創技術文章,歡迎掃描下方的公衆號二維碼並關注,期待與您的共同成長和進步。