KMQueue 基於Redis的分佈式消息隊列

KMQueue

Build Status

碼雲:https://gitee.com/iicode/KMQueue github:https://github.com/fnpac/KMQueuejava

該框架是基於redis實現的分佈式隊列,簡單靈活。git

下面簡單介紹下該隊列的一些設計,若是還有其餘不懂得地方能夠參考源碼和註釋,代碼中我加入了詳盡的註釋。github

還有其餘問題能夠提issue。redis

設計

序列圖

基於Redis的分佈式消息隊列設計.png

隊列模式

KMQueue隊列分爲兩種模式:spring

  • default - 簡單隊列
  • safe - 安全隊列

其中默認爲default數據庫

能夠以queueName:queueMode格式設置隊列的模式。安全

  • queueName 隊列名稱服務器

    default 爲默認隊列,能夠不指定,默認值。 特性:隊列任務可能會丟失,隊列任務沒有超時限制。框架

  • queueMode 隊列模式,可選值有:default、safe。分佈式

    safe 爲安全隊列,任務有重試策略,達到重試次數依舊失敗或者任務存活超時(這裏說的超時是指AliveTimeout)(這二者都稱爲最終失敗),Monitor會發出通知, 這樣能夠根據業務作一些處理,推薦將這些失敗的任務持久化到數據庫做爲日誌記錄。固然或許你還有更好的處理方式。

    注意:須要開啓備份隊列監聽程序BackupQueueMonitor,不然安全隊列中最終失敗的任務只會存儲在備份隊列中,而沒有消費者去消費處理,這是很危險的行爲

new KMQueueManager.Builder("127.0.0.1", 6379, "worker1_queue", "worker2_queue:safe")
...

worker1_queue爲簡單隊列,worker2_queue爲安全隊列。

注意:爲了更好的支持業務(將已存在的某個隊列的DEFAULT改成SAFE,並重啓服務的狀況),作以下處理: 當new KMQueueManager.Builder隊列名稱參數中,只要有一個隊列指定了SAFE模式,就會建立備份隊列(用於隊列任務監控,設置任務超時、失敗任務重試等), 而且該備份隊列的名稱基於傳入的全部隊列名稱生成(不管其隊列是不是SAFE模式)。

上面的例子中,備份隊列的生成策略爲:

base64(md5("worker1_queue" + "worker2_queue"))

Task(任務)

構造方法聲明以下:

public Task(String queue,
            String uid,
            boolean isUnique,
            String type,
            String data,
            Task.TaskStatus status)
  • uid:若是業務須要區分隊列任務的惟一性,請自行生成uid參數, 不然隊列默認使用uuid生成策略,這會致使即便data數據徹底相同的任務也會被看成兩個不一樣的任務處理。

  • 是不是惟一任務,即隊列中同一時刻只存在一個該任務。

  • type:用於業務邏輯的處理,你能夠根據不一樣的type任務類型,調用不一樣的handler去處理,能夠不傳。

KMQueueManager(隊列管理器)

有三種方式獲取Redis鏈接,詳情查看KMQueueManager.Builder構造方法的三種重載形式。 若是你使用spring,建議獲取spring中配置的redis鏈接池對象,並經過以下構造方法建立隊列管理器:

public Builder(Pool<Jedis> pool, String... queues)

RedisTaskQueue(任務隊列)

  • 1.採用阻塞隊列,以阻塞的方式(brpop)獲取任務隊列中的任務;
  • 2.判斷任務存活時間是否超時(對應的是大於aliveTimeout);
  • 3.更新任務的執行時間戳,放入備份隊列的隊首(lpush);

BackupQueueMonitor(備份隊列監控)

由於初始化備份隊列時設置了循環標記; 因此Monitor這裏採用定時Job策略,使用brpoplpush backupQueue backupQueue循環遍歷備份隊列,遇到循環標記結束循環遍歷。 對執行超時(對應的是大於protectedTimeout)或者存活時間超時(對應的是大於aliveTimeout)的任務作處理。

分爲兩種狀況:

  • 任務存活時間超時 || (任務執行超時&任務重試次數大於RetryTimes):任務再也不重試從備份隊列刪除該任務。 相應的能夠經過實現Pipeline,決定這些任務的一些額外處理,好比持久化到數據庫作日誌記錄。
    // 任務完全失敗後的處理,須要實現Pipeline接口,自行實現處理邏輯
    TaskPipeline taskPipeline = new TaskPipeline();
    BackupQueueMonitor backupQueueMonitor = new BackupQueueMonitor.Builder("127.0.0.1", 6379, backUpQueueName)
                    ...
                    .setPipeline(taskPipeline).build();
  • 任務執行超時&任務重試次數小於RetryTimes:即超時而且重複執行次數小於RetryTimes的任務從新放回任務隊列執行,同時更新任務狀態:
    • 放入任務隊列,優先處理(<rpush>);
    • 任務state標記爲"retry";
    • 重試次數+1;

使用Demo

生產任務

@Test
public void pushTaskTest() {
    KMQueueManager kmQueueManager = new KMQueueManager.Builder("127.0.0.1", 6379, "worker1_queue", "worker2_queue:safe")
            .setMaxWaitMillis(-1L)
            .setMaxTotal(600)
            .setMaxIdle(300)
            .setAliveTimeout(Constant.ALIVE_TIMEOUT)
            .build();
    // 初始化隊列
    kmQueueManager.init();

    // 1.獲取隊列
    TaskQueue taskQueue = kmQueueManager.getTaskQueue("worker2_queue");
    // 2.建立任務
    JSONObject ob = new JSONObject();
    ob.put("data", "mail proxy task");
    String data = JSON.toJSONString(ob);
    // 參數 uid:若是業務須要區分隊列任務的惟一性,請自行生成uid參數,
    // 不然隊列默認使用uuid生成策略,這會致使即便data數據徹底相同的任務也會被看成兩個不一樣的任務處理。
    // 參數 type:用於業務邏輯的處理,你能夠根據不一樣的type任務類型,調用不一樣的handler去處理,能夠不傳。
    Task task = new Task(taskQueue.getName(), "", true, "", data, new Task.TaskStatus());
    // 3.將任務加入隊列
    taskQueue.pushTask(task);
}

消費任務

@Test
public void popTaskTest() {
    KMQueueManager kmQueueManager = new KMQueueManager.Builder("127.0.0.1", 6379, "worker1_queue", "worker2_queue:safe")
            .setMaxWaitMillis(-1L)
            .setMaxTotal(600)
            .setMaxIdle(300)
            .setAliveTimeout(Constant.ALIVE_TIMEOUT)
            .build();
    // 初始化隊列
    kmQueueManager.init();

    // 1.獲取隊列
    TaskQueue taskQueue = kmQueueManager.getTaskQueue("worker2_queue");
    // 2.獲取任務
    Task task = taskQueue.popTask();
    // 業務處理放到TaskConsumersHandler裏
    if (task != null) {
        task.doTask(kmQueueManager, TaskConsumersHandler.class);
    }
}

你能夠自行實現TaskHandler接口,建立適合你本身業務邏輯的任務處理類,並經過下面代碼執行任務處理。

task.doTask(kmQueueManager, TaskHandler.class)

_若是業務處理拋出異常,隊列也將其看成任務執行完成處理,

並經過taskQueue.finishTask(this)完成任務。

public void doTask(KMQueueManager kmQueueManager, Class clazz) {

    // 獲取任務所屬隊列
    TaskQueue taskQueue = kmQueueManager.getTaskQueue(this.getQueue());
    String queueMode = taskQueue.getMode();
    if (KMQueueManager.SAFE.equals(queueMode)) {// 安全隊列
        try {
            handleTask(clazz);
        } catch (Throwable e) {
            e.printStackTrace();
        }
        // 任務執行完成,刪除備份隊列的相應任務
        taskQueue.finishTask(this);
    } else {// 普通隊列
        handleTask(clazz);
    }
}

不會再進行任務重試操做。

這點可能不太容易理解,爲何任務拋出異常失敗了,隊列不會執行重試呢?

由於任務執行拋出異常是業務級的錯誤,隊列不作干預。

隊列的重試只是針對消費任務的線程被kill掉或者服務器宕機等狀況,此時該任務還沒執行完,任務的消費者還沒告訴隊列任務執行完成了。 此時備份隊列監控會執行任務的重試。

若是你想在任務拋出異常失敗時執行任務重試,能夠不使用task.doTask,當任務拋出異常時,不執行任務的taskQueue.finishTask(this)操做。 這樣備份隊列監控會在下一個job對該任務進行檢查處理。

taskQueue.finishTask(this)是一個很是方便的工具方法。

備份隊列監控

@Test
public void monitorTaskTest() {

    // 任務完全失敗後的處理,須要實現Pipeline接口,自行實現處理邏輯
    TaskPipeline taskPipeline = new TaskPipeline();
    // 根據任務隊列的名稱構造備份隊列的名稱,注意:這裏的任務隊列參數必定要和KMQueueManager構造時傳入的一一對應。
    String backUpQueueName = KMQUtils.genBackUpQueueName("worker1_queue", "worker2_queue:safe");
    // 構造Monitor監聽器
    BackupQueueMonitor backupQueueMonitor = new BackupQueueMonitor.Builder("127.0.0.1", 6379, backUpQueueName)
            .setMaxWaitMillis(-1L)
            .setMaxTotal(600)
            .setMaxIdle(300)
            .setAliveTimeout(Constant.ALIVE_TIMEOUT)
            .setProtectedTimeout(Constant.PROTECTED_TIMEOUT)
            .setRetryTimes(Constant.RETRY_TIMES)
            .setPipeline(taskPipeline).build();
    // 執行監聽
    backupQueueMonitor.monitor();
}

補充

重要的事情說三遍:

若是指定了隊列的模式爲安全隊列,必定要開啓備份隊列監控!!!

若是指定了隊列的模式爲安全隊列,必定要開啓備份隊列監控!!!

若是指定了隊列的模式爲安全隊列,必定要開啓備份隊列監控!!!

相關文章
相關標籤/搜索