碼雲:https://gitee.com/iicode/KMQueue github:https://github.com/fnpac/KMQueuejava
該框架是基於redis實現的分佈式隊列,簡單靈活。git
下面簡單介紹下該隊列的一些設計,若是還有其餘不懂得地方能夠參考源碼和註釋,代碼中我加入了詳盡的註釋。github
還有其餘問題能夠提issue。redis
KMQueue隊列分爲兩種模式:spring
default
- 簡單隊列safe
- 安全隊列其中默認爲default
。數據庫
能夠以queueName:queueMode
格式設置隊列的模式。安全
queueName 隊列名稱服務器
default 爲默認隊列,能夠不指定,默認值。 特性:隊列任務可能會丟失,隊列任務沒有超時限制。框架
queueMode 隊列模式,可選值有:default、safe。分佈式
safe 爲安全隊列,任務有重試策略,達到重試次數依舊失敗或者任務存活超時(這裏說的超時是指AliveTimeout)(這二者都稱爲最終失敗),Monitor會發出通知, 這樣能夠根據業務作一些處理,推薦將這些失敗的任務持久化到數據庫做爲日誌記錄。固然或許你還有更好的處理方式。
注意:須要開啓備份隊列監聽程序BackupQueueMonitor,不然安全隊列中最終失敗的任務只會存儲在備份隊列中,而沒有消費者去消費處理,這是很危險的行爲
new KMQueueManager.Builder("127.0.0.1", 6379, "worker1_queue", "worker2_queue:safe") ...
worker1_queue
爲簡單隊列,worker2_queue
爲安全隊列。
注意:爲了更好的支持業務(將已存在的某個隊列的
DEFAULT
改成SAFE
,並重啓服務的狀況),作以下處理: 當new KMQueueManager.Builder
的隊列名稱參數中,只要有一個隊列指定了SAFE
模式,就會建立備份隊列(用於隊列任務監控,設置任務超時、失敗任務重試等), 而且該備份隊列的名稱基於傳入的全部隊列名稱生成(不管其隊列是不是SAFE
模式)。
上面的例子中,備份隊列的生成策略爲:
base64(md5("worker1_queue" + "worker2_queue"))
構造方法聲明以下:
public Task(String queue, String uid, boolean isUnique, String type, String data, Task.TaskStatus status)
uid:若是業務須要區分隊列任務的惟一性,請自行生成uid參數, 不然隊列默認使用uuid生成策略,這會致使即便data數據徹底相同的任務也會被看成兩個不一樣的任務處理。
是不是惟一任務,即隊列中同一時刻只存在一個該任務。
type:用於業務邏輯的處理,你能夠根據不一樣的type任務類型,調用不一樣的handler去處理,能夠不傳。
有三種方式獲取Redis鏈接,詳情查看KMQueueManager.Builder
構造方法的三種重載形式。 若是你使用spring,建議獲取spring中配置的redis鏈接池對象,並經過以下構造方法建立隊列管理器:
public Builder(Pool<Jedis> pool, String... queues)
aliveTimeout
);由於初始化備份隊列時設置了循環標記; 因此Monitor這裏採用定時Job策略,使用brpoplpush backupQueue backupQueue
循環遍歷備份隊列,遇到循環標記結束循環遍歷。 對執行超時(對應的是大於protectedTimeout
)或者存活時間超時(對應的是大於aliveTimeout
)的任務作處理。
分爲兩種狀況:
Pipeline
,決定這些任務的一些額外處理,好比持久化到數據庫作日誌記錄。 // 任務完全失敗後的處理,須要實現Pipeline接口,自行實現處理邏輯 TaskPipeline taskPipeline = new TaskPipeline(); BackupQueueMonitor backupQueueMonitor = new BackupQueueMonitor.Builder("127.0.0.1", 6379, backUpQueueName) ... .setPipeline(taskPipeline).build();
RetryTimes
的任務從新放回任務隊列執行,同時更新任務狀態:
@Test public void pushTaskTest() { KMQueueManager kmQueueManager = new KMQueueManager.Builder("127.0.0.1", 6379, "worker1_queue", "worker2_queue:safe") .setMaxWaitMillis(-1L) .setMaxTotal(600) .setMaxIdle(300) .setAliveTimeout(Constant.ALIVE_TIMEOUT) .build(); // 初始化隊列 kmQueueManager.init(); // 1.獲取隊列 TaskQueue taskQueue = kmQueueManager.getTaskQueue("worker2_queue"); // 2.建立任務 JSONObject ob = new JSONObject(); ob.put("data", "mail proxy task"); String data = JSON.toJSONString(ob); // 參數 uid:若是業務須要區分隊列任務的惟一性,請自行生成uid參數, // 不然隊列默認使用uuid生成策略,這會致使即便data數據徹底相同的任務也會被看成兩個不一樣的任務處理。 // 參數 type:用於業務邏輯的處理,你能夠根據不一樣的type任務類型,調用不一樣的handler去處理,能夠不傳。 Task task = new Task(taskQueue.getName(), "", true, "", data, new Task.TaskStatus()); // 3.將任務加入隊列 taskQueue.pushTask(task); }
@Test public void popTaskTest() { KMQueueManager kmQueueManager = new KMQueueManager.Builder("127.0.0.1", 6379, "worker1_queue", "worker2_queue:safe") .setMaxWaitMillis(-1L) .setMaxTotal(600) .setMaxIdle(300) .setAliveTimeout(Constant.ALIVE_TIMEOUT) .build(); // 初始化隊列 kmQueueManager.init(); // 1.獲取隊列 TaskQueue taskQueue = kmQueueManager.getTaskQueue("worker2_queue"); // 2.獲取任務 Task task = taskQueue.popTask(); // 業務處理放到TaskConsumersHandler裏 if (task != null) { task.doTask(kmQueueManager, TaskConsumersHandler.class); } }
你能夠自行實現TaskHandler
接口,建立適合你本身業務邏輯的任務處理類,並經過下面代碼執行任務處理。
task.doTask(kmQueueManager, TaskHandler.class)
_若是業務處理拋出異常,隊列也將其看成任務執行完成處理,
並經過taskQueue.finishTask(this)
完成任務。
public void doTask(KMQueueManager kmQueueManager, Class clazz) { // 獲取任務所屬隊列 TaskQueue taskQueue = kmQueueManager.getTaskQueue(this.getQueue()); String queueMode = taskQueue.getMode(); if (KMQueueManager.SAFE.equals(queueMode)) {// 安全隊列 try { handleTask(clazz); } catch (Throwable e) { e.printStackTrace(); } // 任務執行完成,刪除備份隊列的相應任務 taskQueue.finishTask(this); } else {// 普通隊列 handleTask(clazz); } }
不會再進行任務重試操做。
這點可能不太容易理解,爲何任務拋出異常失敗了,隊列不會執行重試呢?
由於任務執行拋出異常是業務級的錯誤,隊列不作干預。
隊列的重試只是針對消費任務的線程被kill掉或者服務器宕機等狀況,此時該任務還沒執行完,任務的消費者還沒告訴隊列任務執行完成了。 此時備份隊列監控會執行任務的重試。
若是你想在任務拋出異常失敗時執行任務重試,能夠不使用task.doTask
,當任務拋出異常時,不執行任務的taskQueue.finishTask(this)
操做。 這樣備份隊列監控會在下一個job對該任務進行檢查處理。
taskQueue.finishTask(this)
是一個很是方便的工具方法。
@Test public void monitorTaskTest() { // 任務完全失敗後的處理,須要實現Pipeline接口,自行實現處理邏輯 TaskPipeline taskPipeline = new TaskPipeline(); // 根據任務隊列的名稱構造備份隊列的名稱,注意:這裏的任務隊列參數必定要和KMQueueManager構造時傳入的一一對應。 String backUpQueueName = KMQUtils.genBackUpQueueName("worker1_queue", "worker2_queue:safe"); // 構造Monitor監聽器 BackupQueueMonitor backupQueueMonitor = new BackupQueueMonitor.Builder("127.0.0.1", 6379, backUpQueueName) .setMaxWaitMillis(-1L) .setMaxTotal(600) .setMaxIdle(300) .setAliveTimeout(Constant.ALIVE_TIMEOUT) .setProtectedTimeout(Constant.PROTECTED_TIMEOUT) .setRetryTimes(Constant.RETRY_TIMES) .setPipeline(taskPipeline).build(); // 執行監聽 backupQueueMonitor.monitor(); }
重要的事情說三遍:
若是指定了隊列的模式爲安全隊列,必定要開啓備份隊列監控!!!
若是指定了隊列的模式爲安全隊列,必定要開啓備份隊列監控!!!
若是指定了隊列的模式爲安全隊列,必定要開啓備份隊列監控!!!