最近須要作一個延時處理的功能,主要是從kafka中消費消息後根據消息中的某個延時字段來進行延時處理,在實際的實現過程當中有一些須要注意的地方,記錄以下。java
說到java中的定時功能,首先想到的Timer和ScheduledThreadPoolExecutor,可是相比之下Timer能夠排除,主要緣由有如下幾點:redis
ScheduledExecutorService executorService = new ScheduledThreadPoolExecutor(10, new NamedThreadFactory("scheduleThreadPool"), new
ThreadPoolExecutor.AbortPolicy());
//從消息中取出延遲時間及相關信息的代碼略
int delayTime = 0;
executorService.scheduleWithFixedDelay(new Runnable() {
@Override
public void run() {
//具體操做邏輯
}},0,delayTime, TimeUnit.SECONDS);
複製代碼
其中NamedThreadFactory是我自定義的一個線程工廠,主要給線程池定義名稱及相關日誌打印便於後續的問題分析,這裏就很少作介紹了。拒絕策略也是採用默認的拒絕策略。 而後測試了一下,知足目標需求的功能,能夠作到延遲指定時間後執行,至此彷佛功能就被完成了。 你們可能疑問,這也太簡單了有什麼好說的,可是這種方式實現簡單是簡單可是存在一個潛在的問題,問題在哪呢,讓咱們看一下ScheduledThreadPoolExecutor的源碼:api
public ScheduledThreadPoolExecutor(int corePoolSize,ThreadFactory threadFactory) {
super(corePoolSize, Integer.MAX_VALUE, 0,
TimeUnit.NANOSECONDS,new DelayedWorkQueue(), threadFactory);}
複製代碼
ScheduledThreadPoolExecutor 因爲它自身的延時和週期的特性,默認使用了DelayWorkQueue,而並不像咱們平時使用的SingleThreadExecutor等構造是可使用本身定義的LinkedBlockingQueue而且設置隊列大小,問題就出在這裏。DelayWrokQueue是一個無界隊列,而咱們的目標數據源是kafka,也就是一個高併發高吞吐的消息隊列,很大可能在某一時間段有大量的消息過來從而致使OOM,在使用多線程時咱們是確定要考慮到OOM的可能性的,由於OOM帶來的後果每每比較嚴重,系統OOM臨時的解決辦法通常只能是重啓,可能會致使用戶數據丟失等不可能挽回的問題,因此從編碼設計階段要採用儘量穩妥的手段來避免這些問題。安全
//添加元素
ZADD key score member [[score member] [score member] …]
//根據分值及限制數量查詢
ZRANGEBYSCORE key min max [WITHSCORES] [LIMIT offset count]
//從zset中刪除指定成員
ZREM key member [member …]
複製代碼
咱們採用redis基礎數據結構的zset結構,採用score來存儲咱們目標發送時間的數值,總體處理流程以下:bash
public void onMessage(String topic, String message) {
String orderId;
int delayTime = 0;
try {
Map<String, String> msgMap = gson.fromJson(message, new TypeToken<Map<String, String>>() {
}.getType());
if (msgMap.isEmpty()) {
return;
}
LOGGER.info("onMessage kafka content:{}", msgMap.toString());
orderId = msgMap.get("orderId");
if(StringUtils.isNotEmpty(orderId)){
delayTime = Integer.parseInt(msgMap.get("delayTime"));
Calendar calendar = Calendar.getInstance();
//計算出預計發送時間
calendar.add(Calendar.MINUTE, delayTime);
long sendTime = calendar.getTimeInMillis();
RedisUtils.getInstance().zetAdd(Constant.DELAY, sendTime, orderId);
LOGGER.info("orderId:{}---放入redis中等待發送---sendTime:{}", ---orderId:{}, sendTime);
}
} catch (Exception e) {
LOGGER.info("onMessage 延時發送異常:{}", e);
}
}
複製代碼
public void run(){
//獲取批量大小
int orderNum = Integer.parseInt(PropertyUtil.get(Constant.ORDER_NUM,"100"));
try {
//批量獲取離發送時間最近的orderNum條數據
Calendar calendar = Calendar.getInstance();
long now = calendar.getTimeInMillis();
//獲取無限早到如今的事件key(防止上次批量數量小於放入數量,存在歷史數據未消費狀況)
Set<String> orderIds = RedisUtils.getInstance().zrangeByScore(Constant.DELAY, 0, now, 0, orderNum);
LOGGER.info("task.getOrderFromRedis---size:{}---orderIds:{}", orderIds.size(), gson.toJson(orderIds));
if (CollectionUtils.isNotEmpty(orders)){
//刪除key 防止重複發送
for (String orderId : orderIds) {
RedisUtils.getInstance().zrem(Constant.DELAY, orderId);
}
//接下來執行發送等業務邏輯
}
} catch (Exception e) {
LOGGER.warn("task.run exception:{}", e);
}
}
複製代碼
至此完成了依賴redis和線程完成了延時發送的功能。數據結構
那麼對上面兩種不一樣的實現方式進行一下優缺點比較:多線程
第一種方式實現簡單,不依賴外部組件,可以快速的實現目標功能,但缺點也很明顯,須要在特定的場景下使用,若是是我這種消息量大的狀況下使用極可能是有問題,固然在數據源消息很少的狀況下不失爲好的選擇。併發
第二種方式實現稍微複雜一點,可是可以適應消息量大的場景,採用redis的zset做爲了「中間件」的效果,而且幫助咱們進行延時的功能實現可以較好的適應高併發場景,缺點在於在編寫的過程當中須要考慮實際的因素較多,例如線程的執行週期時間,發送可能會有必定時間的延遲,批量數據大小的設置等等。ide
綜上是本人此次延時功能的實現過程的兩種實現方式的總結,具體採用哪一種方式還需你們根據實際狀況選擇,但願能給你們帶來幫助。ps:因爲本人的技術能力有限,文章中可能出現技術描述不許確或者錯誤的狀況懇請各位大佬指出,我立馬進行改正,避免誤導你們,謝謝!高併發