Vert.x Blueprint 系列教程(二) | 開發基於消息的應用 - Vert.x Kue

Vert.x 藍圖項目已經發布至Vert.x官方網站:Vert.x Blueprint Tutorialsjava


本文章是 Vert.x 藍圖系列 的第二篇教程。全系列:git

前言

歡迎回到Vert.x 藍圖系列~在本教程中,咱們將利用Vert.x開發一個基於消息的應用 - Vert.x Kue,它是一個使用Vert.x開發的優先級工做隊列,數據存儲使用的是 Redis。Vert.x Kue是 Automattic/kue 的Vert.x實現版本。咱們可使用Vert.x Kue來處理各類各樣的任務,好比文件轉換、訂單處理等等。github

經過本教程,你將會學習到如下內容:web

  • 消息、消息系統以及事件驅動的運用
  • Vert.x Event Bus 的幾種事件機制(發佈/訂閱、點對點模式)
  • 設計 分佈式 的Vert.x應用
  • 工做隊列的設計
  • Vert.x Service Proxy(服務代理,即異步RPC)的運用
  • 更深層次的Redis運用

本教程是Vert.x 藍圖系列的第二篇教程,對應的Vert.x版本爲3.3.2。本教程中的完整代碼已託管至GitHubredis

Vert.x的消息系統

既然咱們要用Vert.x開發一個基於消息的應用,那麼咱們先來瞅一瞅Vert.x的消息系統吧~在Vert.x中,咱們能夠經過 Event Bus 來發送和接收各類各樣的消息,這些消息能夠來自不一樣的Vertx實例。怎麼樣,很酷吧?咱們都將消息發送至Event Bus上的某個地址上,這個地址能夠是任意的字符串。算法

Event Bus支持三種消息機制:發佈/訂閱(Publish/Subscribe)、點對點(Point to point)以及請求/迴應(Request-Response)模式。下面咱們就來看一看這幾種機制。json

發佈/訂閱模式

發佈/訂閱模式中,消息被髮布到Event Bus的某一個地址上,全部訂閱此地址的Handler都會接收到該消息而且調用相應的處理邏輯。咱們來看一看示例代碼:後端

EventBus eventBus = vertx.eventBus();

eventBus.consumer("foo.bar.baz", r -> { // subscribe to `foo.bar.baz` address
  System.out.println("1: " + r.body());
});
eventBus.consumer("foo.bar.baz", r -> { // subscribe to `foo.bar.baz` address
  System.out.println("2: " + r.body());
});

eventBus.publish("foo.bar.baz", "+1s"); // 向此地址發送消息

咱們能夠經過vertx.eventBus()方法獲取EventBus的引用,而後咱們就能夠經過consume方法訂閱某個地址的消息而且綁定一個Handler。接着咱們經過publish向此地址發送消息。若是運行上面的例子,咱們會獲得一下結果:ruby

2: +1s
1: +1s

點對點模式

若是咱們把上面的示例中的publish方法替代成send方法,上面的實例就變成點對點模式了。在點對點模式中,消息被髮布到Event Bus的某一個地址上。Vert.x會將此消息傳遞給其中監聽此地址的Handler之一。若是有多個Handler綁定到此地址,那麼就使用輪詢算法隨機挑一個Handler傳遞消息。好比在此示例中,程序只會打印2: +1s或者1: +1s之中的一個。架構

請求/迴應模式

當咱們綁定的Handler接收到消息的時候,咱們可不能夠給消息的發送者回復呢?固然了!當咱們經過send方法發送消息的時候,咱們能夠同時指定一個回覆處理函數(reply handler)。而後當某個消息的訂閱者接收到消息的時候,它就能夠給發送者回復消息;若是發送者接收到了回覆,發送者綁定的回覆處理函數就會被調用。這就是請求/迴應模式

好啦,如今咱們已經粗略瞭解了Vert.x中的消息系統 - Event Bus的基本使用,下面咱們就看看Vert.x Kue的基本設計。有關更多關於Event Bus的信息請參考Vert.x Core Manual - Event Bus

Vert.x Kue 架構設計

Vert.x Kue 組件劃分

在咱們的項目中,咱們將Vert.x Kue劃分爲兩個模塊:

  • kue-core: 核心組件,提供優先級隊列的功能
  • kue-http: Web組件,提供Web UI以及REST API

另外咱們還提供一個示例模塊kue-example用於演示以及闡述如何使用Vert.x Kue。

既然咱們的項目有兩個模塊,那麼你必定會好奇:兩個模塊之間是如何進行通訊的?而且若是咱們寫本身的Kue應用的話,咱們該怎樣去調用Kue Core中的服務呢?不要着急,謎底將在後邊的章節中揭曉:-)

Vert.x Kue 核心模塊

回顧一下Vert.x Kue的做用 - 優先級工做隊列,因此在Vert.x Kue的核心模塊中咱們設計瞭如下的類:

  • Job - 任務(做業)數據實體
  • JobService - 異步服務接口,提供操做任務以及獲取數據的相關邏輯
  • KueWorker - 用於處理任務的Verticle
  • Kue - 工做隊列

前邊咱們提到過,咱們的兩個組件之間須要一種通訊機制能夠互相通訊 - 這裏咱們使用Vert.x的集羣模式,即以clustered的模式來部署Verticle。這樣的環境下的Event Bus一樣也是集羣模式的,所以各個組件能夠經過集羣模式下的Event Bus進行通訊。很不錯吧?在Vert.x的集羣模式下,咱們須要指定一個集羣管理器ClusterManager。這裏咱們使用默認的HazelcastClusterManager,使用 Hazelcast 做爲集羣管理。

在Vert.x Kue中,咱們將JobService服務發佈至分佈式的Event Bus上,這樣其它的組件就能夠經過Event Bus調用該服務了。咱們設計了一個KueVerticle用於註冊服務。Vert.x提供了Vert.x Service Proxy(服務代理組件),能夠很方便地將服務註冊至Event Bus上,而後在其它地方獲取此服務的代理並調用。咱們將在下面的章節中詳細介紹Vert.x Service Proxy

基於Future的異步模式

在咱們的Vert.x Kue中,大多數的異步方法都是基於Future的。若是您看過藍圖系列的第一篇文章的話,您必定不會對這種模式很陌生。在Vert.x 3.3中,咱們的Future支持基本的響應式的操做,好比mapcompose。它們用起來很是方便,由於咱們能夠將多個Future以響應式的方式組合起來而不用擔憂陷入回調地獄中。

Vert.x Kue中的事件

正如咱們在Vert.x Kue 特性介紹中提到的那樣,Vert.x Kue支持兩種級別的事件:任務事件(job events) 以及 隊列事件(queue events)。在Vert.x Kue中,咱們設計了三種事件地址:

  • vertx.kue.handler.job.{handlerType}.{addressId}.{jobType}: 某個特定任務的任務事件地址
  • vertx.kue.handler.workers.{eventType}: (全局)隊列事件地址
  • vertx.kue.handler.workers.{eventType}.{addressId}: 某個特定任務的內部事件地址

特性介紹文檔中,咱們提到了如下幾種任務事件:

  • start 開始處理一個任務 (onStart)
  • promotion 一個延期的任務時間已到,提高至工做隊列中 (onPromotion)
  • progress 任務的進度變化 (onProgress)
  • failed_attempt 任務處理失敗,可是還能夠重試 (onFailureAttempt)
  • failed 任務處理失敗而且不能重試 (onFailure)
  • complete 任務完成 (onComplete)
  • remove 任務從後端存儲中移除 (onRemove)

隊列事件也類似,只不過須要加前綴job_。這些事件都會經過send方法發送至Event Bus上。每個任務都有對應的任務事件地址,所以它們可以正確地接收到對應的事件並進行相應的處理邏輯。

特別地,咱們還有兩個內部事件:donedone_faildone事件對應一個任務在底層的處理已經完成,而done_fail事件對應一個任務在底層的處理失敗。這兩個事件使用第三種地址進行傳遞。

任務狀態

在Vert.x Kue中,任務共有五種狀態:

  • INACTIVE: 任務還未開始處理,在工做隊列中等待處理
  • ACTIVE: 任務正在處理中
  • COMPLETE: 任務處理完成
  • FAILED: 任務處理失敗
  • DELAYED: 任務延時處理,正在等待計時器時間到並提高至工做隊列中

咱們使用狀態圖來描述任務狀態的變化:

Job State Machine

以及任務狀態的變化伴隨的事件:

Events with state change

總體設計

爲了讓你們對Vert.x Kue的架構有大體的瞭解,我用一幅圖來簡略描述整個Vert.x Kue的設計:

Diagram - How Vert.x Kue works

如今咱們對Vert.x Kue的設計有了大體的瞭解了,下面咱們就來看一看Vert.x Kue的代碼實現了~

項目結構

咱們來開始探索Vert.x Kue的旅程吧!首先咱們先從GitHub上clone源代碼:

git clone https://github.com/sczyh30/vertx-blueprint-job-queue.git

而後你能夠把項目做爲Gradle項目導入你的IDE中。(如何導入請參考相關IDE幫助文檔)

正如咱們以前所提到的,咱們的Vert.x Kue中有兩個功能模塊和一個實例模塊,所以咱們須要在Gradle工程文件中定義三個子工程。咱們來看一下本項目中的build.gradle文件:

configure(allprojects) { project ->

  ext {
    vertxVersion = "3.3.2"
  }

  apply plugin: 'java'

  repositories {
    jcenter()
  }

  dependencies {
    compile("io.vertx:vertx-core:${vertxVersion}")
    compile("io.vertx:vertx-codegen:${vertxVersion}")
    compile("io.vertx:vertx-rx-java:${vertxVersion}")
    compile("io.vertx:vertx-hazelcast:${vertxVersion}")
    compile("io.vertx:vertx-lang-ruby:${vertxVersion}")

    testCompile("io.vertx:vertx-unit:${vertxVersion}")
    testCompile group: 'junit', name: 'junit', version: '4.12'
  }

  sourceSets {
    main {
      java {
        srcDirs += 'src/main/generated'
      }
    }
  }

  compileJava {
    targetCompatibility = 1.8
    sourceCompatibility = 1.8
  }
}

project("kue-core") {

  dependencies {
    compile("io.vertx:vertx-redis-client:${vertxVersion}")
    compile("io.vertx:vertx-service-proxy:${vertxVersion}")
  }

  jar {
    archiveName = 'vertx-blueprint-kue-core.jar'
    from { configurations.compile.collect { it.isDirectory() ? it : zipTree(it) } }
    manifest {
      attributes 'Main-Class': 'io.vertx.core.Launcher'
      attributes 'Main-Verticle': 'io.vertx.blueprint.kue.queue.KueVerticle'
    }
  }

  task annotationProcessing(type: JavaCompile, group: 'build') { // codegen
    source = sourceSets.main.java
    classpath = configurations.compile
    destinationDir = project.file('src/main/generated')
    options.compilerArgs = [
      "-proc:only",
      "-processor", "io.vertx.codegen.CodeGenProcessor",
      "-AoutputDirectory=${project.projectDir}/src/main"
    ]
  }

  compileJava {
    targetCompatibility = 1.8
    sourceCompatibility = 1.8

    dependsOn annotationProcessing
  }
}

project("kue-http") {

  dependencies {
    compile(project(":kue-core"))
    compile("io.vertx:vertx-web:${vertxVersion}")
    compile("io.vertx:vertx-web-templ-jade:${vertxVersion}")
  }

  jar {
    archiveName = 'vertx-blueprint-kue-http.jar'
    from { configurations.compile.collect { it.isDirectory() ? it : zipTree(it) } }
    manifest {
      attributes 'Main-Class': 'io.vertx.core.Launcher'
      attributes 'Main-Verticle': 'io.vertx.blueprint.kue.http.KueHttpVerticle'
    }
  }
}

project("kue-example") {

  dependencies {
    compile(project(":kue-core"))
  }

  jar {
    archiveName = 'vertx-blueprint-kue-example.jar'
    from { configurations.compile.collect { it.isDirectory() ? it : zipTree(it) } }
    manifest {
      attributes 'Main-Class': 'io.vertx.core.Launcher'
      attributes 'Main-Verticle': 'io.vertx.blueprint.kue.example.LearningVertxVerticle'
    }
  }
}

task wrapper(type: Wrapper) {
  gradleVersion = '2.12'
}

(⊙o⊙)…比以前的待辦事項服務項目中的長很多誒。。。咱們來解釋一下:

  • configure(allprojects)做用域中,咱們配置了一些全局信息(對全部子工程都適用)。
  • 咱們定義了三個子工程:kue-corekue-http以及kue-example。這裏咱們來解釋一下里面用到的依賴。在kue-core中,vertx-redis-client用於Redis通訊,vertx-service-proxy用於Event Bus上的服務代理。在kue-http中,咱們將kue-core子工程做爲它的一個依賴。vertx-webvertx-web-templ-jade用於Kue Web端的開發。
  • 任務annotationProcessing用於註解處理(Vert.x Codegen)。咱們已經在上一篇教程中介紹過了,這裏就不展開講了。

咱們還須要在 settings.gradle 中配置工程:

rootProject.name = 'vertx-blueprint-job-queue'

include "kue-core"
include "kue-http"
include "kue-example"

看完了配置文件之後,咱們再來瀏覽一下咱們的項目目錄結構:

.
├── build.gradle
├── kue-core
│   └── src
│       ├── main
│       │   ├── java
│       │   └── resources
│       └── test
│           ├── java
│           └── resources
├── kue-example
│   └── src
│       ├── main
│       │   ├── java
│       │   └── resources
│       └── test
│           ├── java
│           └── resources
├── kue-http
│   └── src
│       ├── main
│       │   ├── java
│       │   └── resources
│       └── test
│           ├── java
│           └── resources
└── settings.gradle

在Gradle中,項目的源碼都位於{projectName}/src/main/java目錄內。這篇教程是圍繞Vert.x Kue Core的,因此咱們的代碼都在kue-core目錄中。

好啦!如今咱們已經對Vert.x Kue項目的總體結構有了大體的瞭解了,下面咱們開始源碼探索之旅!

任務實體 - 不只僅是一個數據對象

Vert.x Kue是用來處理任務的,所以咱們先來看一下表明任務實體的Job類。Job類位於io.vertx.blueprint.kue.queue包下。代碼可能有點長,不要擔憂,咱們把它分紅幾部分,分別來解析。

任務成員屬性

咱們先來看一下Job類中的成員屬性:

@DataObject(generateConverter = true)
public class Job {
    // job properties

    private final String address_id;
    private long id = -1;
    private String zid;
    private String type;
    private JsonObject data;
    private Priority priority = Priority.NORMAL;
    private JobState state = JobState.INACTIVE;
    private long delay = 0;
    private int max_attempts = 1;
    private boolean removeOnComplete = false;
    private int ttl = 0;
    private JsonObject backoff;

    private int attempts = 0;
    private int progress = 0;
    private JsonObject result;

    // job metrics
    private long created_at;
    private long promote_at;
    private long updated_at;
    private long failed_at;
    private long started_at;
    private long duration;


    // ...
}

我去。。。好多屬性!咱們一個一個地解釋:

  • address_id: 一個UUID序列,做爲Event Bus的地址
  • id: 任務的編號(id)
  • type: 任務的類型
  • data: 任務攜帶的數據,以 JsonObject 類型表示
  • priority: 任務優先級,以 Priority 枚舉類型表示。默認優先級爲正常(NORMAL)
  • delay: 任務的延遲時間,默認是 0
  • state: 任務狀態,以 JobState 枚舉類型表示。默認狀態爲等待(INACTIVE)
  • attempts: 任務已經嘗試執行的次數
  • max_attempts: 任務嘗試執行次數的最大閾值
  • removeOnComplete: 表明任務完成時是否自動從後臺移除
  • zid: zset操做對應的編號(zid),保持先進先出順序
  • ttl: TTL(Time to live)
  • backoff: 任務重試配置,以 JsonObject 類型表示
  • progress: 任務執行的進度
  • result: 任務執行的結果,以 JsonObject 類型表示

還有這些統計數據:

  • created_at: 表明此任務建立的時間
  • promote_at: 表明此任務從延時狀態被提高至等待狀態時的時間
  • updated_at: 表明任務更新的時間
  • failed_at: 表明任務失敗的時間
  • started_at: 表明任務開始的時間
  • duration: 表明處理任務花費的時間,單位爲毫秒(ms)

你可能注意到在 Job 類中還存在着幾個靜態成員變量:

private static Logger logger = LoggerFactory.getLogger(Job.class);

private static Vertx vertx;
private static RedisClient client;
private static EventBus eventBus;

public static void setVertx(Vertx v, RedisClient redisClient) {
  vertx = v;
  client = redisClient;
  eventBus = vertx.eventBus();
}

對於 logger 對象,我想你們應該都很熟悉,它表明一個Vert.x Logger實例用於日誌記錄。可是你必定想問爲何 Job 類中存在着一個Vertx類型的靜態成員。Job類不該該是一個數據對象嗎?固然咯!Job類表明一個數據對象,但不只僅是一個數據對象。這裏我模仿了一些Automattic/kue的風格,把一些任務相關邏輯方法放到了Job類裏,它們大多都是基於Future的異步方法,所以能夠很方便地去調用以及進行組合變換。好比:

job.save()
    .compose(Job::updateNow)
    .compose(j -> j.log("good!"));

因爲咱們不能在Job類被JVM加載的時候就獲取Vertx實例,咱們必須手動給Job類中的靜態Vertx成員賦值。這裏咱們是在Kue類中對其進行賦值的。當咱們建立一個工做隊列的時候,Job類中的靜態成員變量會被初始化。同時爲了保證程序的正確性,咱們須要一個方法來檢測靜態成員變量是否初始化。當咱們在建立一個任務的時候,若是靜態成員此時未被初始化,那麼日誌會給出警告:

private void _checkStatic() {
  if (vertx == null) {
    logger.warn("static Vertx instance in Job class is not initialized!");
  }
}

咱們還注意到 Job 類也是由@DataObject註解修飾的。Vert.x Codegen能夠處理含有@DataObject註解的類並生成對應的JSON轉換器,而且Vert.x Service Proxy也須要數據對象。

Job類中咱們有四個構造函數。其中address_id成員必須在一個任務被建立時就被賦值,默認狀況下此地址用一個惟一的UUID字符串表示。每個構造函數中咱們都要調用_checkStatic函數來檢測靜態成員變量是否被初始化。

任務事件輔助函數

正如咱們以前所提到的那樣,咱們經過一個特定的地址vertx.kue.handler.job.{handlerType}.{addressId}.{jobType}在分佈式的Event Bus上發送和接收任務事件(job events)。因此咱們提供了兩個用於發送和接收事件的輔助函數emiton(相似於Node.js中的EventEmitter):

@Fluent
public <T> Job on(String event, Handler<Message<T>> handler) {
  logger.debug("[LOG] On: " + Kue.getCertainJobAddress(event, this));
  eventBus.consumer(Kue.getCertainJobAddress(event, this), handler);
  return this;
}

@Fluent
public Job emit(String event, Object msg) {
  logger.debug("[LOG] Emit: " + Kue.getCertainJobAddress(event, this));
  eventBus.send(Kue.getCertainJobAddress(event, this), msg);
  return this;
}

在後面的代碼中,咱們將頻繁使用這兩個輔助函數。

Redis中的存儲形式

在咱們探索相關的邏輯函數以前,咱們先來描述一下Vert.x Kue的數據在Redis中是以什麼樣的形式存儲的:

  • 全部的key都在vertx_kue命名空間下(以vertx_kue:做爲前綴)
  • vertx:kue:job:{id}: 存儲任務實體的map
  • vertx:kue:ids: 計數器,指示當前最大的任務ID
  • vertx:kue:job:types: 存儲全部任務類型的列表
  • vertx:kue:{type}:jobs: 指示全部等待狀態下的某種類型任務的列表
  • vertx_kue:jobs: 存儲全部任務zid的有序集合
  • vertx_kue:job:{state}: 存儲全部指定狀態的任務zid的有序集合
  • vertx_kue:jobs:{type}:{state}: 存儲全部指定狀態和類型的任務zid的有序集合
  • vertx:kue:job:{id}:log: 存儲指定id的任務對應日誌的列表

OK,下面咱們就來看看Job類中重要的邏輯函數。

改變任務狀態

咱們以前提到過,Vert.x Kue中的任務一共有五種狀態。全部的任務相關的操做都伴隨着任務狀態的變換,所以咱們先來看一下state方法的實現,它用於改變任務的狀態:

public Future<Job> state(JobState newState) {
  Future<Job> future = Future.future();
  RedisClient client = RedisHelper.client(vertx, new JsonObject()); // use a new client to keep transaction
  JobState oldState = this.state;
  client.transaction().multi(r0 -> { // (1)
    if (r0.succeeded()) {
      if (oldState != null && !oldState.equals(newState)) { // (2)
        client.transaction().zrem(RedisHelper.getStateKey(oldState), this.zid, _failure())
          .zrem(RedisHelper.getKey("jobs:" + this.type + ":" + oldState.name()), this.zid, _failure());
      }
      client.transaction().hset(RedisHelper.getKey("job:" + this.id), "state", newState.name(), _failure()) // (3)
        .zadd(RedisHelper.getKey("jobs:" + newState.name()), this.priority.getValue(), this.zid, _failure())
        .zadd(RedisHelper.getKey("jobs:" + this.type + ":" + newState.name()), this.priority.getValue(), this.zid, _failure());

      switch (newState) { // dispatch different state
        case ACTIVE: // (4)
          client.transaction().zadd(RedisHelper.getKey("jobs:" + newState.name()),
            this.priority.getValue() < 0 ? this.priority.getValue() : -this.priority.getValue(),
            this.zid, _failure());
          break;
        case DELAYED: // (5)
          client.transaction().zadd(RedisHelper.getKey("jobs:" + newState.name()),
            this.promote_at, this.zid, _failure());
          break;
        case INACTIVE: // (6)
          client.transaction().lpush(RedisHelper.getKey(this.type + ":jobs"), "1", _failure());
          break;
        default:
      }

      this.state = newState;

      client.transaction().exec(r -> { // (7)
        if (r.succeeded()) {
          future.complete(this);
        } else {
          future.fail(r.cause());
        }
      });
    } else {
      future.fail(r0.cause());
    }
  });

  return future.compose(Job::updateNow);
}

首先咱們先建立了一個Future對象。而後咱們調用了 client.transaction().multi(handler) 函數開始一次Redis事務 (1)。在Vert.x 3.3.2中,全部的Redis事務操做都移至RedisTransaction類中,因此咱們須要先調用client.transaction()方法去獲取一個事務實例,而後調用multi表明事務塊的開始。

multi函數傳入的Handler中,咱們先斷定當前的任務狀態。若是當前任務狀態不爲空而且不等於新的任務狀態,咱們就將Redis中存儲的舊的狀態信息移除 (2)。爲了方便起見,咱們提供了一個RedisHelper輔助類,裏面提供了一些生成特定地址以及編碼解碼zid的方法:

package io.vertx.blueprint.kue.util;

import io.vertx.blueprint.kue.queue.JobState;
import io.vertx.core.Vertx;
import io.vertx.core.json.JsonObject;
import io.vertx.redis.RedisClient;
import io.vertx.redis.RedisOptions;


public final class RedisHelper {

  private static final String VERTX_KUE_REDIS_PREFIX = "vertx_kue";

  private RedisHelper() {
  }

  public static RedisClient client(Vertx vertx, JsonObject config) {
    return RedisClient.create(vertx, options(config));
  }

  public static RedisOptions options(JsonObject config) {
    return new RedisOptions()
      .setHost(config.getString("redis.host", "127.0.0.1"))
      .setPort(config.getInteger("redis.port", 6379));
  }

  public static String getKey(String key) {
    return VERTX_KUE_REDIS_PREFIX + ":" + key;
  }

  public static String getStateKey(JobState state) {
    return VERTX_KUE_REDIS_PREFIX + ":jobs:" + state.name();
  }

  public static String createFIFO(long id) {
    String idLen = "" + ("" + id).length();
    int len = 2 - idLen.length();
    while (len-- > 0)
      idLen = "0" + idLen;
    return idLen + "|" + id;
  }

  public static String stripFIFO(String zid) {
    return zid.substring(zid.indexOf('|') + 1);
  }

  public static long numStripFIFO(String zid) {
    return Long.parseLong(zid.substring(zid.indexOf('|') + 1));
  }
}

全部的key都必須在vertx_kue命名空間下,所以咱們封裝了一個getKey方法。咱們還實現了createFIFOstripFIFO方法用於生成zid以及解碼zidzid的格式使用了Automattic/Kue中的格式。

回到state方法來。咱們使用zrem(String key, String member, Handler<AsyncResult<String>> handler)方法將特定的數據從有序集合中移除。兩個key分別是vertx_kue:job:{state} 以及 vertx_kue:jobs:{type}:{state}member對應着任務的zid

接下來咱們使用hset方法來變動新的狀態 (3),而後用zadd方法往vertx_kue:job:{state}vertx_kue:jobs:{type}:{state}兩個有序集合中添加此任務的zid,同時傳遞一個權重(score)。這個很是重要,咱們就是經過這個實現優先級隊列的。咱們直接使用priority對應的值做爲score。這樣,當咱們須要從Redis中獲取任務的時候,咱們就能夠經過zpop方法獲取優先級最高的任務。咱們會在後面詳細講述。

不一樣的新狀態須要不一樣的操做。對於ACTIVE狀態,咱們經過zadd命令將zid添加至vertx_kue:jobs:ACTIVE有序集合中並賦予優先級權值 (4)。對於DELAYED狀態,咱們經過zadd命令將zid添加至vertx_kue:jobs:DELAYED有序集合中並賦予提高時間(promote_at)權值 (5)。對於INACTIVE狀態,咱們向vertx:kue:{type}:jobs列表中添加一個元素 (6)。這些操做都是在Redis事務塊內完成的。最後咱們經過exec方法一併執行這些事務操做 (7)。若是執行成功,咱們給future賦值(當前任務)。最後咱們返回future而且與updateNow方法相組合。

updateNow方法很是簡單,就是把updated_at的值設爲當前時間,而後存到Redis中:

Future<Job> updateNow() {
  this.updated_at = System.currentTimeMillis();
  return this.set("updated_at", String.valueOf(updated_at));
}

保存任務

這裏咱們來看一下整個Job類中最重要的方法之一 - save方法,它的做用是保存任務至Redis中。

public Future<Job> save() {
  // check
  Objects.requireNonNull(this.type, "Job type cannot be null"); // (1)

  if (this.id > 0)
    return update(); // (2)

  Future<Job> future = Future.future();

  // 生成id
  client.incr(RedisHelper.getKey("ids"), res -> { // (3)
    if (res.succeeded()) {
      this.id = res.result();
      this.zid = RedisHelper.createFIFO(id); // (4)
      String key = RedisHelper.getKey("job:" + this.id);

      if (this.delay > 0) {
        this.state = JobState.DELAYED;
      }

      client.sadd(RedisHelper.getKey("job:types"), this.type, _failure()); // (5)
       this.created_at = System.currentTimeMillis();
       this.promote_at = this.created_at + this.delay;
       // 保存任務
       client.hmset(key, this.toJson(), _completer(future, this)); // (6)
    } else {
      future.fail(res.cause());
    }
  });

  return future.compose(Job::update); // (7)
}

首先,任務類型不能爲空因此咱們要檢查type是否爲空 (1)。接着,若是當前任務的id大於0,則表明此任務已經存儲過(由於id是存儲時分配),此時只需執行更新操做(update)便可 (2)。而後咱們建立一個Future對象,而後使用incr方法從vertx_kue:ids字段獲取一個新的id (3)。同時咱們使用RedisHelper.createFIFO(id)方法來生成新的zid (4)。接着咱們來判斷任務延時是否大於0,若大於0則將當前任務狀態設置爲DELAYED。而後咱們經過sadd方法將當前任務類型添加至vertx:kue:job:types列表中 (5) 而且保存任務建立時間(created_at)以及任務提高時間(promote_at)。通過這一系列的操做後,全部的屬性都已準備好,因此咱們能夠利用hmset方法將此任務實體存儲至vertx:kue:job:{id}哈希表中 (6)。若是存儲操做成功,那麼將當前任務實體賦給future,不然記錄錯誤。最後咱們返回此future而且將其與update方法進行組合。

update方法進行一些更新操做,它的邏輯比較簡單:

Future<Job> update() {
  Future<Job> future = Future.future();
  this.updated_at = System.currentTimeMillis();

  client.transaction().multi(_failure())
    .hset(RedisHelper.getKey("job:" + this.id), "updated_at", String.valueOf(this.updated_at), _failure())
    .zadd(RedisHelper.getKey("jobs"), this.priority.getValue(), this.zid, _failure())
    .exec(_completer(future, this));

  return future.compose(r ->
    this.state(this.state));
}

能夠看到update方法只作了三件微小的工做:存儲任務更新時間、存儲zid以及更改當前任務狀態(組合state方法)。

最後總結一下將一個任務存儲到Redis中通過的步驟:save -> update -> state :-)

移除任務

移除任務很是簡單,藉助zremdel方法便可。咱們來看一下其實現:

public Future<Void> remove() {
  Future<Void> future = Future.future();
  client.transaction().multi(_failure())
    .zrem(RedisHelper.getKey("jobs:" + this.stateName()), this.zid, _failure())
    .zrem(RedisHelper.getKey("jobs:" + this.type + ":" + this.stateName()), this.zid, _failure())
    .zrem(RedisHelper.getKey("jobs"), this.zid, _failure())
    .del(RedisHelper.getKey("job:" + this.id + ":log"), _failure())
    .del(RedisHelper.getKey("job:" + this.id), _failure())
    .exec(r -> {
      if (r.succeeded()) {
        this.emit("remove", new JsonObject().put("id", this.id));
        future.complete();
      } else {
        future.fail(r.cause());
      }
    });
  return future;
}

注意到成功移除任務時,咱們會向Event Bus上的特定地址發送remove任務事件。此事件包含着被移除任務的id

監放任務事件

咱們能夠經過幾種 onXXX 方法來監放任務事件:

@Fluent
public Job onComplete(Handler<Job> completeHandler) {
  this.on("complete", message -> {
    completeHandler.handle(new Job((JsonObject) message.body()));
  });
  return this;
}

@Fluent
public Job onFailure(Handler<JsonObject> failureHandler) {
  this.on("failed", message -> {
    failureHandler.handle((JsonObject) message.body());
  });
  return this;
}

@Fluent
public Job onFailureAttempt(Handler<JsonObject> failureHandler) {
  this.on("failed_attempt", message -> {
    failureHandler.handle((JsonObject) message.body());
  });
  return this;
}

@Fluent
public Job onPromotion(Handler<Job> handler) {
  this.on("promotion", message -> {
    handler.handle(new Job((JsonObject) message.body()));
  });
  return this;
}

@Fluent
public Job onStart(Handler<Job> handler) {
  this.on("start", message -> {
    handler.handle(new Job((JsonObject) message.body()));
  });
  return this;
}

@Fluent
public Job onRemove(Handler<JsonObject> removeHandler) {
  this.on("start", message -> {
    removeHandler.handle((JsonObject) message.body());
  });
  return this;
}

@Fluent
public Job onProgress(Handler<Integer> progressHandler) {
  this.on("progress", message -> {
    progressHandler.handle((Integer) message.body());
  });
  return this;
}

注意到不一樣的事件,對應接收的數據類型也有差別。咱們來講明一下:

  • onCompleteonPromotion 以及 onStart: 發送的數據是對應的Job對象
  • onFailure and onFailureAttempt: 發送的數據是JsonObject類型的,其格式相似於:
{
    "job": {},
    "extra": {
        "message": "some_error"
    }
}
  • onProgress: 發送的數據是當前任務進度
  • onRemove: 發送的數據是JsonObject類型的,其中id表明被移除任務的編號

更新任務進度

咱們能夠經過progress方法來更新任務進度。看一下其實現:

public Future<Job> progress(int complete, int total) {
  int n = Math.min(100, complete * 100 / total); // (1)
  this.emit("progress", n); // (2)
  return this.setProgress(n) // (3)
    .set("progress", String.valueOf(n))
    .compose(Job::updateNow);
}

progress方法接受兩個參數:第一個是當前完成的進度值,第二個是完成狀態須要的進度值。咱們首先計算出當前的進度 (1),而後向特定地址發送progress事件 (2)。最後咱們將進度存儲至Redis中並更新時間,返回Future (3)。

任務失敗以及重試機制

當一個任務處理失敗時,若是它有剩餘的重試次數,Vert.x Kue會自動調用failAttempt方法進行重試。咱們來看一下failAttempt方法的實現:

Future<Job> failedAttempt(Throwable err) {
  return this.error(err)
    .compose(Job::failed)
    .compose(Job::attemptInternal);
}

(⊙o⊙)很是簡短吧~實際上,failAttempt方法是三個異步方法的組合:errorfailed以及attemptInternal。當一個任務須要進行重試的時候,咱們首先向Event Bus發佈 error 隊列事件而且在Redis中記錄日誌,而後將當前的任務狀態置爲FAILED,最後從新處理此任務。

咱們先來看一下error方法:

public Future<Job> error(Throwable ex) {
  return this.emitError(ex)
    .set("error", ex.getMessage())
    .compose(j -> j.log("error | " + ex.getMessage()));
}

它的邏輯很簡單:首先咱們向Event Bus發佈 錯誤 事件,而後記錄錯誤日誌便可。這裏咱們封裝了一個發佈錯誤的函數emitError

@Fluent
public Job emitError(Throwable ex) {
  JsonObject errorMessage = new JsonObject().put("id", this.id)
    .put("message", ex.getMessage());
  eventBus.publish(Kue.workerAddress("error"), errorMessage);
  eventBus.send(Kue.getCertainJobAddress("error", this), errorMessage);
  return this;
}

其中發送的錯誤信息格式相似於下面的樣子:

{
    "id": 2052,
    "message": "some error"
}

接下來咱們再來看一下failed方法的實現:

public Future<Job> failed() {
  this.failed_at = System.currentTimeMillis();
  return this.updateNow()
    .compose(j -> j.set("failed_at", String.valueOf(j.failed_at)))
    .compose(j -> j.state(JobState.FAILED));
}

很是簡單,首先咱們更新任務的更新時間和失敗時間,而後經過state方法將當前任務狀態置爲FAILED便可。

任務重試的核心邏輯在attemptInternal方法中:

private Future<Job> attemptInternal() {
  int remaining = this.max_attempts - this.attempts; // (1)
  if (remaining > 0) { // 還有重試次數
    return this.attemptAdd() // (2)
      .compose(Job::reattempt) // (3)
      .setHandler(r -> {
        if (r.failed()) {
          this.emitError(r.cause()); // (4)
        }
      });
  } else if (remaining == 0) { // (5)
    return Future.failedFuture("No more attempts");
  } else { // (6)
    return Future.failedFuture(new IllegalStateException("Attempts Exceeded"));
  }
}

在咱們的Job數據對象中,咱們存儲了最大重試次數max_attempts以及已經重試的次數attempts,因此咱們首先根據這兩個數據計算剩餘的重試次數remaining (1)。若是還有剩餘次數的話,咱們就先調用attemptAdd方法增長一次已重試次數並 (2),而後咱們調用reattempt方法執行真正的任務重試邏輯 (3)。最後返回這兩個異步方法組合的Future。若是其中一個過程出現錯誤,咱們就發佈error事件 (4)。若是沒有剩餘次數了或者超出剩餘次數了,咱們直接返回錯誤。

在咱們解析reattempt方法以前,咱們先來回顧一下Vert.x Kue中的任務失敗恢復機制。Vert.x Kue支持延時重試機制(retry backoff),而且支持不一樣的策略(如 fixed 以及 exponential)。以前咱們提到Job類中有一個backoff成員變量,它用於配置延時重試的策略。它的格式相似於這樣:

{
    "type": "fixed",
    "delay": 5000
}

延時重試機制的實如今getBackoffImpl方法中,它返回一個Function<Integer, Long>對象,表明一個接受Integer類型(即attempts),返回Long類型(表明計算出的延時值)的函數:

private Function<Integer, Long> getBackoffImpl() {
  String type = this.backoff.getString("type", "fixed"); // (1)
  long _delay = this.backoff.getLong("delay", this.delay); // (2)
  switch (type) {
    case "exponential": // (3)
      return attempts -> Math.round(_delay * 0.5 * (Math.pow(2, attempts) - 1));
    case "fixed":
    default: // (4)
      return attempts -> _delay;
  }
}

首先咱們從backoff配置中獲取延遲重試策略。目前Vert.x Kue支持兩種策略:fixedexponential。前者採用固定延遲時間,然後者採用指數增加型延遲時間。默認狀況下Vert.x Kue會採用fixed策略 (1)。接下來咱們從backoff配置中獲取延遲時間,若是配置中沒有指定,那麼就使用任務對象中的延遲時間delay (2)。接下來就是根據具體的策略進行計算了。對於指數型延遲,咱們計算[delay * 0.5 * 2^attempts]做爲延遲時間 (3);對於固定型延遲策略,咱們直接使用獲取到的延遲時間 (4)。

好啦,如今回到「真正的重試」方法 —— reattempt方法來:

private Future<Job> reattempt() {
  if (this.backoff != null) {
    long delay = this.getBackoffImpl().apply(attempts); // (1)
    return this.setDelay(delay)
      .setPromote_at(System.currentTimeMillis() + delay)
      .update() // (2)
      .compose(Job::delayed); // (3)
  } else {
    return this.inactive(); // (4)
  }
}

首先咱們先檢查backoff配置是否存在,若存在則計算出對應的延時時間 (1) 而且設定delaypromote_at屬性的值而後保存至Redis中 (2)。接着咱們經過delayed方法將任務的狀態設爲延時(DELAYED) (3)。若是延時重試配置不存在,咱們就經過inactive方法直接將此任務置入工做隊列中 (4)。

這就是整個任務重試功能的實現,也不是很複雜蛤?觀察上面的代碼,咱們能夠發現Future組合無處不在。這種響應式的組合很是方便。想想若是咱們用回調的異步方式來寫代碼的話,咱們很容易陷入回調地獄中(⊙o⊙)。。。幾個回調嵌套起來總顯得不是那麼優美和簡潔,而用響應式的、可組合的Future就能夠有效地避免這個問題。

不錯!到如今爲止咱們已經探索完Job類的源碼了~下面咱們來看一下JobService類。

Event Bus 服務 - JobService

在本章節中咱們來探索一下JobService接口及其實現 —— 它包含着各類普通的操做和統計Job的邏輯。

異步RPC

咱們的JobService是一個通用邏輯接口,所以咱們但願應用中的每個組件都能訪問此服務,即進行RPC。在Vert.x中,咱們能夠將服務註冊至Event Bus上,而後其它組件就能夠經過Event Bus來遠程調用註冊的服務了。

傳統的RPC有一個缺點:消費者須要阻塞等待生產者的迴應。你可能想說:這是一種阻塞模型,和Vert.x推崇的異步開發模式不相符。沒錯!並且,傳統的RPC不是真正面向失敗設計的。

還好,Vert.x提供了一種高效的、響應式的RPC —— 異步RPC。咱們不須要等待生產者的迴應,而只須要傳遞一個Handler<AsyncResult<R>>參數給異步方法。這樣當收到生產者結果時,對應的Handler就會被調用,很是方便,這與Vert.x的異步開發模式相符。而且,AsyncResult也是面向失敗設計的。

因此講到這裏,你可能想問:到底怎麼在Event Bus上註冊服務呢?咱們是否是須要寫一大堆的邏輯去包裝和發送信息,而後在另外一端解碼信息並進行調用呢?不,這太麻煩了!有了Vert.x 服務代理,咱們不須要這麼作!Vert.x提供了一個組件 Vert.x Service Proxy 來自動生成服務代理。有了它的幫助,咱們就只須要按照規範設計咱們的異步服務接口,而後用@ProxyGen註解修飾便可。

@ProxyGen註解的限制 @ProxyGen註解的使用有諸多限制。好比,全部的異步方法都必須是基於回調的,也就是說每一個方法都要接受一個Handler<AsyncResult<R>>類型的參數。而且,類型R也是有限制的 —— 只容許基本類型以及數據對象類型。詳情請參考官方文檔

異步服務接口

咱們來看一下JobService的源碼:

@ProxyGen
@VertxGen
public interface JobService {

  static JobService create(Vertx vertx, JsonObject config) {
    return new JobServiceImpl(vertx, config);
  }

  static JobService createProxy(Vertx vertx, String address) {
    return ProxyHelper.createProxy(JobService.class, vertx, address);
  }

  /**
   * 獲取任務,按照優先級順序
   *
   * @param id      job id
   * @param handler async result handler
   */
  @Fluent
  JobService getJob(long id, Handler<AsyncResult<Job>> handler);

  /**
   * 刪除任務
   *
   * @param id      job id
   * @param handler async result handler
   */
  @Fluent
  JobService removeJob(long id, Handler<AsyncResult<Void>> handler);

  /**
   * 判斷任務是否存在
   *
   * @param id      job id
   * @param handler async result handler
   */
  @Fluent
  JobService existsJob(long id, Handler<AsyncResult<Boolean>> handler);

  /**
   * 獲取任務日誌
   *
   * @param id      job id
   * @param handler async result handler
   */
  @Fluent
  JobService getJobLog(long id, Handler<AsyncResult<JsonArray>> handler);

  /**
   * 獲取某一範圍內某個指定狀態下的任務列表
   *
   * @param state   expected job state
   * @param from    from
   * @param to      to
   * @param order   range order
   * @param handler async result handler
   */
  @Fluent
  JobService jobRangeByState(String state, long from, long to, String order, Handler<AsyncResult<List<Job>>> handler);

  /**
   * 獲取某一範圍內某個指定狀態和類型下的任務列表
   *
   * @param type    expected job type
   * @param state   expected job state
   * @param from    from
   * @param to      to
   * @param order   range order
   * @param handler async result handler
   */
  @Fluent
  JobService jobRangeByType(String type, String state, long from, long to, String order, Handler<AsyncResult<List<Job>>> handler);

  /**
   * 獲取某一範圍內的任務列表(按照順序或倒序)
   *
   * @param from    from
   * @param to      to
   * @param order   range order
   * @param handler async result handler
   */
  @Fluent
  JobService jobRange(long from, long to, String order, Handler<AsyncResult<List<Job>>> handler);

  // 統計函數

  /**
   * 獲取指定狀態和類型下的任務的數量
   *
   * @param type    job type
   * @param state   job state
   * @param handler async result handler
   */
  @Fluent
  JobService cardByType(String type, JobState state, Handler<AsyncResult<Long>> handler);

  /**
   * 獲取某個狀態下的任務的數量
   *
   * @param state   job state
   * @param handler async result handler
   */
  @Fluent
  JobService card(JobState state, Handler<AsyncResult<Long>> handler);

  /**
   * 獲取COMPLETE狀態任務的數量
   *
   * @param type    job type; if null, then return global metrics
   * @param handler async result handler
   */
  @Fluent
  JobService completeCount(String type, Handler<AsyncResult<Long>> handler);

  /**
   * 獲取FAILED狀態任務的數量
   *
   * @param type job type; if null, then return global metrics
   */
  @Fluent
  JobService failedCount(String type, Handler<AsyncResult<Long>> handler);

  /**
   * 獲取INACTIVE狀態任務的數量
   *
   * @param type job type; if null, then return global metrics
   */
  @Fluent
  JobService inactiveCount(String type, Handler<AsyncResult<Long>> handler);

  /**
   * 獲取ACTIVE狀態任務的數量
   *
   * @param type job type; if null, then return global metrics
   */
  @Fluent
  JobService activeCount(String type, Handler<AsyncResult<Long>> handler);

  /**
   * 獲取DELAYED狀態任務的數量
   *
   * @param type job type; if null, then return global metrics
   */
  @Fluent
  JobService delayedCount(String type, Handler<AsyncResult<Long>> handler);

  /**
   * 獲取當前存在的全部任務類型
   *
   * @param handler async result handler
   */
  @Fluent
  JobService getAllTypes(Handler<AsyncResult<List<String>>> handler);

  /**
   * 獲取指定狀態下的全部任務的ID
   *
   * @param state   job state
   * @param handler async result handler
   */
  @Fluent
  JobService getIdsByState(JobState state, Handler<AsyncResult<List<Long>>> handler);

  /**
   * 工做隊列運行時間(ms)
   *
   * @param handler async result handler
   */
  @Fluent
  JobService getWorkTime(Handler<AsyncResult<Long>> handler);
}

能夠看到咱們還爲JobService接口添加了@VertxGen註解,Vert.x Codegen能夠處理此註解生成多種語言版本的服務。

JobService接口中咱們還定義了兩個靜態方法:create用於建立一個任務服務實例,createProxy用於建立一個服務代理。

JobService接口中包含一些任務操做和統計的相關邏輯,每一個方法的功能都已經在註釋中闡述了,所以咱們就直接來看它的實現吧~

任務服務的實現

JobService接口的實現位於JobServiceImpl類中,代碼很是長,所以這裏就不貼代碼了。。。你們能夠對照GitHub中的代碼讀下面的內容。

  • getJob: 獲取任務的方法很是簡單。直接利用hgetall命令從Redis中取出對應的任務便可。
  • removeJob: 咱們能夠將此方法看做是getJobJob#remove兩個方法的組合。
  • existsJob: 使用exists命令判斷對應id的任務是否存在。
  • getJobLog: 使用lrange命令從vertx_kue:job:{id}:log列表中取出日誌。
  • rangeGeneral: 使用zrange命令獲取必定範圍內的任務,這是一個通用方法。

zrange 操做 zrange 返回某一有序集合中某個特定範圍內的元素。詳情請見ZRANGE - Redis

如下三個方法複用了rangeGeneral方法:

  • jobRangeByState: 指定狀態,對應的key爲vertx_kue:jobs:{state}
  • jobRangeByType: 指定狀態和類型,對應的key爲vertx_kue:jobs:{type}:{state}
  • jobRange: 對應的key爲vertx_kue:jobs

這兩個通用方法用於任務數量的統計:

  • cardByType: 利用zcard命令獲取某一指定狀態和類型下任務的數量。
  • card: 利用zcard命令獲取某一指定狀態下任務的數量。

下面五個輔助統計方法複用了上面兩個通用方法:

  • completeCount
  • failedCount
  • delayedCount
  • inactiveCount
  • activeCount

接着看:

  • getAllTypes: 利用smembers命令獲取vertx_kue:job:types集合中存儲的全部的任務類型。
  • getIdsByState: 使用zrange獲取某一指定狀態下全部任務的ID。
  • getWorkTime: 使用get命令從vertx_kue:stats:work-time中獲取Vert.x Kue的工做時間。

註冊任務服務

既然完成了JobService的實現,接下來咱們來看一下如何利用Service Proxy將服務註冊至Event Bus上。這裏咱們還須要一個KueVerticle來建立要註冊的服務實例,而且將其註冊至Event Bus上。

打開io.vertx.blueprint.kue.queue.KueVerticle類的源碼:

package io.vertx.blueprint.kue.queue;

import io.vertx.blueprint.kue.service.JobService;
import io.vertx.blueprint.kue.util.RedisHelper;
import io.vertx.core.AbstractVerticle;
import io.vertx.core.Future;
import io.vertx.core.json.JsonObject;
import io.vertx.core.logging.Logger;
import io.vertx.core.logging.LoggerFactory;
import io.vertx.redis.RedisClient;
import io.vertx.serviceproxy.ProxyHelper;


public class KueVerticle extends AbstractVerticle {

  private static Logger logger = LoggerFactory.getLogger(Job.class);

  public static final String EB_JOB_SERVICE_ADDRESS = "vertx.kue.service.job.internal"; // (1)

  private JsonObject config;
  private JobService jobService;

  @Override
  public void start(Future<Void> future) throws Exception {
    this.config = config();
    this.jobService = JobService.create(vertx, config); // (2)
    // create redis client
    RedisClient redisClient = RedisHelper.client(vertx, config);
    redisClient.ping(pr -> { // (3) test connection
      if (pr.succeeded()) {
        logger.info("Kue Verticle is running...");

        // (4) register job service
        ProxyHelper.registerService(JobService.class, vertx, jobService, EB_JOB_SERVICE_ADDRESS);

        future.complete();
      } else {
        logger.error("oops!", pr.cause());
        future.fail(pr.cause());
      }
    });
  }

}

首先咱們須要定義一個地址用於服務註冊 (1)。在start方法中,咱們建立了一個任務服務實例 (2),而後經過ping命令測試Redis鏈接 (3)。若是鏈接正常,那麼咱們就能夠經過ProxyHelper類中的registerService輔助方法來將服務實例註冊至Event Bus上 (4)。

這樣,一旦咱們在集羣模式下部署KueVerticle,服務就會被髮布至Event Bus上,而後咱們就能夠在其餘組件中去遠程調用此服務了。很奇妙吧!

Kue - 工做隊列

Kue類表明着工做隊列。咱們來看一下Kue類的實現。首先先看一下其構造函數:

public Kue(Vertx vertx, JsonObject config) {
  this.vertx = vertx;
  this.config = config;
  this.jobService = JobService.createProxy(vertx, EB_JOB_SERVICE_ADDRESS);
  this.client = RedisHelper.client(vertx, config);
  Job.setVertx(vertx, RedisHelper.client(vertx, config)); // init static vertx instance inner job
}

這裏咱們須要注意兩點:第一點,咱們經過createProxy方法來建立一個JobService的服務代理;第二點,以前提到過,咱們須要在這裏初始化Job類中的靜態成員變量。

基於Future的封裝

咱們的JobService是基於回調的,這是服務代理組件所要求的。爲了讓Vert.x Kue更加響應式,使用起來更加方便,咱們在Kue類中以基於Future的異步模式封裝了JobService中的全部異步方法。這很簡單,好比這個方法:

@Fluent
JobService getJob(long id, Handler<AsyncResult<Job>> handler);

能夠這麼封裝:

public Future<Optional<Job>> getJob(long id) {
  Future<Optional<Job>> future = Future.future();
  jobService.getJob(id, r -> {
    if (r.succeeded()) {
      future.complete(Optional.ofNullable(r.result()));
    } else {
      future.fail(r.cause());
    }
  });
  return future;
}

其實就是加一層Future。其它的封裝過程也相似因此咱們就不細說了。

process和processBlocking方法

processprocessBlocking方法用於處理任務:

public Kue process(String type, int n, Handler<Job> handler) {
  if (n <= 0) {
    throw new IllegalStateException("The process times must be positive");
  }
  while (n-- > 0) {
    processInternal(type, handler, false);
  }f
  setupTimers();
  return this;
}

public Kue process(String type, Handler<Job> handler) {
  processInternal(type, handler, false);
  setupTimers();
  return this;
}

public Kue processBlocking(String type, int n, Handler<Job> handler) {
  if (n <= 0) {
    throw new IllegalStateException("The process times must be positive");
  }
  while (n-- > 0) {
    processInternal(type, handler, true);
  }
  setupTimers();
  return this;
}

兩個process方法都相似 —— 它們都是使用Event Loop線程處理任務的,其中第一個方法還能夠指定同時處理任務數量的閾值。咱們來回顧一下使用Event Loop線程的注意事項 —— 咱們不能阻塞Event Loop線程。所以若是咱們須要在處理任務時作一些耗時的操做,咱們可使用processBlocking方法。這幾個方法的代碼看起來都差很少,那麼區別在哪呢?以前咱們提到過,咱們設計了一種Verticle - KueWorker,用於處理任務。所以對於process方法來講,KueWorker就是一種普通的Verticle;而對於processBlocking方法來講,KueWorker是一種Worker Verticle。這兩種Verticle有什麼不一樣呢?區別在於,Worker Verticle會使用Worker線程,所以即便咱們執行一些耗時的操做,Event Loop線程也不會被阻塞。

建立及部署KueWorker的邏輯在processInternal方法中,這三個方法都使用了processInternal方法:

private void processInternal(String type, Handler<Job> handler, boolean isWorker) {
  KueWorker worker = new KueWorker(type, handler, this); // (1)
  vertx.deployVerticle(worker, new DeploymentOptions().setWorker(isWorker), r0 -> { // (2)
    if (r0.succeeded()) {
      this.on("job_complete", msg -> {
        long dur = new Job(((JsonObject) msg.body()).getJsonObject("job")).getDuration();
        client.incrby(RedisHelper.getKey("stats:work-time"), dur, r1 -> { // (3)
          if (r1.failed())
            r1.cause().printStackTrace();
        });
      });
    }
  });
}

首先咱們建立一個KueWorker實例 (1)。咱們將在稍後詳細介紹KueWorker的實現。而後咱們根據提供的配置來部署此KueWorker (2)。processInternal方法的第三個參數表明此KueWorker是否爲worker verticle。若是部署成功,咱們就監聽complete事件。每當接收到complete事件的時候,咱們獲取收到的信息(處理任務消耗的時間),而後用incrby增長對應的工做時間 (3)。

再回到前面三個處理方法中。除了部署KueWorker之外,咱們還調用了setupTimers方法,用於設定定時器以監測延時任務以及監測活動任務TTL。

監測延時任務

Vert.x Kue支持延時任務,所以咱們須要在任務延時時間到達時將任務「提高」至工做隊列中等待處理。這個工做是在checkJobPromotion方法中實現的:

private void checkJobPromotion() {
  int timeout = config.getInteger("job.promotion.interval", 1000); // (1)
  int limit = config.getInteger("job.promotion.limit", 1000); // (2)
  vertx.setPeriodic(timeout, l -> { // (3)
    client.zrangebyscore(RedisHelper.getKey("jobs:DELAYED"), String.valueOf(0), String.valueOf(System.currentTimeMillis()),
      new RangeLimitOptions(new JsonObject().put("offset", 0).put("count", limit)), r -> {  // (4)
        if (r.succeeded()) {
          r.result().forEach(r1 -> {
            long id = Long.parseLong(RedisHelper.stripFIFO((String) r1));
            this.getJob(id).compose(jr -> jr.get().inactive())  // (5)
              .setHandler(jr -> {
                if (jr.succeeded()) {
                  jr.result().emit("promotion", jr.result().getId()); // (6)
                } else {
                  jr.cause().printStackTrace();
                }
              });
          });
        } else {
          r.cause().printStackTrace();
        }
      });
  });
}

首先咱們從配置中獲取監測延時任務的間隔(job.promotion.interval,默認1000ms)以及提高數量閾值(job.promotion.limit,默認1000)。而後咱們使用vertx.setPeriodic方法設一個週期性的定時器 (3),每隔一段時間就從Redis中獲取須要被提高的任務 (4)。這裏咱們經過zrangebyscore獲取每一個須要被提高任務的id。咱們來看一下zrangebyscore方法的定義:

RedisClient zrangebyscore(String key, String min, String max, RangeLimitOptions options, Handler<AsyncResult<JsonArray>> handler);
  • key: 某個有序集合的key,即vertx_kue:jobs:DELAYED
  • min and max: 最小值以及最大值(按照某種模式)。這裏min0,而max是當前時間戳

咱們來回顧一下Job類中的state方法。當咱們要把任務狀態設爲DELAYED的時候,咱們將score設爲promote_at時間:

case DELAYED:
  client.transaction().zadd(RedisHelper.getKey("jobs:" + newState.name()),
    this.promote_at, this.zid, _failure());

所以咱們將max設爲當前時間(System.currentTimeMillis()),只要當前時間超過須要提高的時間,這就說明此任務能夠被提高了。

  • options: range和limit配置。這裏咱們須要指定LIMIT值因此咱們用new RangeLimitOptions(new JsonObject().put("offset", 0).put("count", limit)建立了一個配置

zrangebyscore的結果是一個JsonArray,裏面包含着全部等待提高任務的zid。得到結果後咱們就將每一個zid轉換爲id,而後分別獲取對應的任務實體,最後對每一個任務調用inactive方法來將任務狀態設爲INACTIVE (5)。若是任務成功提高至工做隊列,咱們就發送promotion事件 (6)。

CallbackKue - 提供多語言支持

咱們知道,Vert.x支持多種語言(如JS,Ruby),所以若是能讓咱們的Vert.x Kue支持多種語言那固然是極好的!這沒有問題~Vert.x Codegen能夠處理含@VertxGen註解的異步接口,生成多語言版本。@VertxGen註解一樣限制異步方法 —— 須要基於回調,所以咱們設計了一個CallbackKue接口用於提供多語言支持。CallbackKue的設計很是簡單,其實現複用了KuejobService的代碼。你們能夠直接看源碼,一目瞭然,這裏就不細說了。

注意要生成多語言版本的代碼,須要添加相應的依賴。好比要生成Ruby版本的代碼就要向build.gradle中添加compile("io.vertx:vertx-lang-ruby:${vertxVersion}")

KueWorker - 任務在此處理

好啦,咱們已經對Vert.x Kue Core的幾個核心部分有了大體的瞭解了,如今是時候探索一下任務處理的本源 - KueWorker了~

每個worker都對應一個特定的任務類型,而且綁定着特定的處理函數(Handler),因此咱們須要在建立的時候指定它們。

prepareAndStart方法

KueWorker中,咱們使用prepareAndStart方法來準備要處理的任務而且開始處理任務的過程:

private void prepareAndStart() {
  this.getJobFromBackend().setHandler(jr -> { // (1)
    if (jr.succeeded()) {
      if (jr.result().isPresent()) {
        this.job = jr.result().get(); // (2)
        process(); // (3)
      } else {
        this.emitJobEvent("error", null, new JsonObject().put("message", "job_not_exist"));
        throw new IllegalStateException("job not exist");
      }
    } else {
        this.emitJobEvent("error", null, new JsonObject().put("message", jr.cause().getMessage()));
        jr.cause().printStackTrace();
    }
  });
}

代碼比較直觀。首先咱們經過getJobFromBackend方法從Redis中按照優先級順序獲取任務 (1)。若是成功獲取任務,咱們就把獲取到的任務保存起來 (2) 而後經過process方法處理任務 (3)。若是中間出現錯誤,咱們須要發送error錯誤事件,其中攜帶錯誤信息。

使用zpop按照優先級順序獲取任務

咱們來看一下咱們是如何從Redis中按照優先級順序獲取任務實體的:

private Future<Optional<Job>> getJobFromBackend() {
  Future<Optional<Job>> future = Future.future();
  client.blpop(RedisHelper.getKey(this.type + ":jobs"), 0, r1 -> { // (1)
    if (r1.failed()) {
      client.lpush(RedisHelper.getKey(this.type + ":jobs"), "1", r2 -> {
        if (r2.failed())
          future.fail(r2.cause());
      });
    } else {
      this.zpop(RedisHelper.getKey("jobs:" + this.type + ":INACTIVE")) // (2)
        .compose(kue::getJob) // (3)
        .setHandler(r -> {
          if (r.succeeded()) {
            future.complete(r.result());
          } else
            future.fail(r.cause());
        });
    }
  });
  return future;
}

以前咱們已經瞭解到,每當咱們保存一個任務的時候,咱們都會向vertx_kue:{type}:jobs列表中插入一個新元素表示新的任務可供處理。所以這裏咱們經過blpop命令來等待可用的任務 (1)。一旦有任務可供處理,咱們就利用zpop方法取出高優先級的任務的zid (2)。zpop命令是一個原子操做,用於從有序集合中彈出最小score值的元素。注意Redis沒有實現zpop命令,所以咱們須要本身實現。

Redis官方文檔介紹了一種實現zpop命令的簡單方法 - 利用 WATCH。這裏咱們利用另一種思路實現zpop命令:

private Future<Long> zpop(String key) {
  Future<Long> future = Future.future();
  client.transaction()
    .multi(_failure())
    .zrange(key, 0, 0, _failure())
    .zremrangebyrank(key, 0, 0, _failure())
    .exec(r -> {
      if (r.succeeded()) {
        JsonArray res = r.result();
        if (res.getJsonArray(0).size() == 0) // empty set
          future.fail(new IllegalStateException("Empty zpop set"));
        else {
          try {
            future.complete(Long.parseLong(RedisHelper.stripFIFO(
              res.getJsonArray(0).getString(0))));
          } catch (Exception ex) {
            future.fail(ex);
          }
        }
      } else {
        future.fail(r.cause());
      }
    });
  return future;
}

在咱們的zpop的實現中,咱們首先開始了一個事務塊,而後依次執行zrangezremrangebyrank命令。有關這些命令的詳情咱們就不細說了,能夠參考Redis官方文檔。而後咱們提交事務,若是提交成功,咱們會得到一個JsonArray類型的結果。正常狀況下咱們均可以經過res.getJsonArray(0).getString(0)獲取到對應的zid值。獲取到zid值之後咱們就能夠將其轉換爲任務的id了,最後咱們將id置於Future內(由於zpop也是一個異步方法)。

接着回到getJobFromBackend方法中。獲取到對應的id以後,咱們就能夠經過KuegetJob函數獲取任務實體了 (3)。因爲getJobFromBackend也是一個異步方法,所以咱們一樣將結果置於Future中。

真正的「處理」邏輯

前邊講了那麼多,都是在爲處理任務作準備。。。不要着急,如今終於到了真正的「處理」邏輯咯!咱們看一下process方法的實現:

private void process() {
  long curTime = System.currentTimeMillis();
  this.job.setStarted_at(curTime)
    .set("started_at", String.valueOf(curTime)) // (1) set start time
    .compose(Job::active) // (2) set the job state to ACTIVE
    .setHandler(r -> {
      if (r.succeeded()) {
        Job j = r.result();
        // emit start event
        this.emitJobEvent("start", j, null);  // (3) emit job `start` event
        // (4) process logic invocation
        try {
          jobHandler.handle(j);
        } catch (Exception ex) {
          j.done(ex);
        }
        // (5) consume the job done event

        eventBus.consumer(Kue.workerAddress("done", j), msg -> {
          createDoneCallback(j).handle(Future.succeededFuture(
            ((JsonObject) msg.body()).getJsonObject("result")));
        });
        eventBus.consumer(Kue.workerAddress("done_fail", j), msg -> {
          createDoneCallback(j).handle(Future.failedFuture(
            (String) msg.body()));
        });
      } else {
          this.emitJobEvent("error", this.job, new JsonObject().put("message", r.cause().getMessage()));
          r.cause().printStackTrace();
      }
    });
}

到了最核心的函數了!首先咱們先給開始時間賦值 (1) 而後將任務狀態置爲ACTIVE (2)。若是這兩個操做成功的話,咱們就向Event Bus發送任務開始(start)事件 (3)。接下來咱們調用真正的處理邏輯 - 以前綁定的jobHandler (4)。若是處理過程當中拋出異常的話,Vert.x Kue就會調用job.done(ex)方法發送done_fail內部事件來通知worker任務處理失敗。可是彷佛沒有看到在哪裏接收並處理donedone_fail事件呢?就在這 (5)!一旦Vert.x Kue接收到這兩個事件,它就會調用對應的handler去進行任務完成或失敗的相應操做。這裏的handler是由createDoneCallback方法生成的:

private Handler<AsyncResult<JsonObject>> createDoneCallback(Job job) {
  return r0 -> {
    if (job == null) {
      return;
    }
    if (r0.failed()) {
      this.fail(r0.cause()); // (1)
      return;
    }
    long dur = System.currentTimeMillis() - job.getStarted_at();
    job.setDuration(dur)
      .set("duration", String.valueOf(dur)); // (2)
    JsonObject result = r0.result();
    if (result != null) {
      job.setResult(result)
        .set("result", result.encodePrettily()); // (3)
    }

    job.complete().setHandler(r -> { // (4)
      if (r.succeeded()) {
        Job j = r.result();
        if (j.isRemoveOnComplete()) { // (5)
          j.remove();
        }
        this.emitJobEvent("complete", j, null); // (6)

        this.prepareAndStart(); // (7) 準備處理下一個任務
      }
    });
  };
}

任務處理有兩種狀況:完成和失敗,所以咱們先來看任務成功處理的狀況。咱們首先給任務的用時(duration)賦值 (2),而且若是任務產生告終果,也給結果(result)賦值 (3)。而後咱們調用job.complete方法將狀態設置爲COMPLETE (4)。若是成功的話,咱們就檢查removeOnComplete標誌位 (5) 並決定是否將任務從Redis中移除。而後咱們向Event Bus發送任務完成事件(complete)以及隊列事件job_complete (6)。如今這個任務的處理過程已經結束了,worker須要準備處理下一個任務了,所以最後咱們調用prepareAndStart方法準備處理下一個Job

處理失敗了怎麼辦?

人生不如意事十之八九,任務處理過程當中極可能會碰見各類各樣的問題而失敗。當任務處理失敗時,咱們調用KueWorker中的fail方法:

private void fail(Throwable ex) {
  job.failedAttempt(ex).setHandler(r -> { // (1)
    if (r.failed()) {
      this.error(r.cause(), job); // (2)
    } else {
      Job res = r.result();
      if (res.hasAttempts()) { // (3)
        this.emitJobEvent("failed_attempt", job, new JsonObject().put("message", ex.getMessage()));
      } else {
        this.emitJobEvent("failed", job, new JsonObject().put("message", ex.getMessage())); // (4)
      }
      prepareAndStart(); // (5)
    }
  });
}

面對失敗時,咱們首先經過failedAttempt方法嘗試從錯誤中恢復 (1)。若是恢復失敗(好比沒有重試次數了)就向Event Bus發送error隊列事件 (2)。若是恢復成功,咱們就根據是否還有剩餘重試次數來發送對應的事件(failed或者failed_attempt)。搞定錯誤之後,worker一樣須要準備處理下一個任務了,所以最後咱們調用prepareAndStart方法準備處理下一個Job (5)。

這就是KueWorker的所有實現,是否是頗有趣呢?看了這麼久的代碼也有些累了,下面是時候來寫個Kue應用跑一下咯~

展現時間!

io.vertx.blueprint.kue.example包下(kue-example子工程)建立一個LearningVertxVerticle類,而後編寫以下代碼:

package io.vertx.blueprint.kue.example;

import io.vertx.blueprint.kue.Kue;
import io.vertx.blueprint.kue.queue.Job;
import io.vertx.blueprint.kue.queue.Priority;
import io.vertx.core.AbstractVerticle;
import io.vertx.core.json.JsonObject;


public class LearningVertxVerticle extends AbstractVerticle {

  @Override
  public void start() throws Exception {
    // 建立工做隊列
    Kue kue = Kue.createQueue(vertx, config());

    // 監聽全局錯誤事件
    kue.on("error", message ->
      System.out.println("[Global Error] " + message.body()));

    JsonObject data = new JsonObject()
      .put("title", "Learning Vert.x")
      .put("content", "core");

    // 準備學習Vert.x,爽!
    Job j = kue.createJob("learn vertx", data)
      .priority(Priority.HIGH)
      .onComplete(r -> { // 完成任務事件監聽
        System.out.println("Feeling: " + r.getResult().getString("feeling", "none"));
    }).onFailure(r -> { // 任務失敗事件監聽
        System.out.println("eee...so difficult...");
    }).onProgress(r -> { // 任務進度變動事件監聽
        System.out.println("I love this! My progress => " + r);
      });

    // 保存任務
    j.save().setHandler(r0 -> {
      if (r0.succeeded()) {
        // 開始學習!
        kue.processBlocking("learn vertx", 1, job -> {
          job.progress(10, 100);
          // 3秒速成
          vertx.setTimer(3000, r1 -> {
            job.setResult(new JsonObject().put("feeling", "amazing and wonderful!")) // 結果
              .done(); // 完成啦!
          });
        });
      } else {
        System.err.println("Wow, something happened: " + r0.cause().getMessage());
      }
    });
  }

}

一般狀況下,一個Vert.x Kue應用能夠分爲幾部分:建立工做隊列、建立任務、保存任務以及處理任務。咱們推薦開發者把應用寫成Verticle的形式。

在這個例子中,咱們要模擬一個學習Vert.x的任務!首先咱們經過Kue.createQueue方法建立一個工做隊列而且經過on(error, handler)方法監聽全局錯誤(error)事件。接着咱們經過kue.createJob方法建立學習任務,將優先級設定爲HIGH,而且監聽completefailed以及progress事件。而後咱們須要保存任務,保存完畢之後咱們就能夠經過processBlocking方法來執行耗時任務了。在處理邏輯中,咱們首先經過job.progress方法將進度設爲10,而後使用vertx.setTimer方法設一個3秒的定時器,定時器時間到之後賦予結果並完成任務。

像往常同樣,咱們還須要在build.gradle中配置一下。咱們須要將kue-example子工程中的Main-Verticle屬性設爲剛纔寫的io.vertx.blueprint.kue.example.LearningVertxVerticle

project("kue-example") {

  dependencies {
    compile(project(":kue-core"))
  }

  jar {
    archiveName = 'vertx-blueprint-kue-example.jar'
    from { configurations.compile.collect { it.isDirectory() ? it : zipTree(it) } }
    manifest {
      attributes 'Main-Class': 'io.vertx.core.Launcher'
      attributes 'Main-Verticle': 'io.vertx.blueprint.kue.example.LearningVertxVerticle'
    }
  }
}

好了,到了展現時間了!打開終端,構建項目:

gradle build

固然不要忘記運行Redis:

redis-server

而後咱們先運行Vert.x Kue Core部分:

java -jar kue-core/build/libs/vertx-blueprint-kue-core.jar -cluster -ha -conf config/config.json

而後再運行咱們的實例:

java -jar kue-example/build/libs/vertx-blueprint-kue-example.jar -cluster -ha -conf config/config.json

這時終端應該會依次顯示輸出:

INFO: Kue Verticle is running...
I love this! My progress => 10
Feeling: amazing and wonderful!

固然你也能夠在Vert.x Kue的Web端查看任務狀況。

完成咱們的探索之旅!

棒極了!咱們終於結束了咱們的Vert.x Kue Core探索之旅~~!從這篇超長的教程中,你學到了如何利用Vert.x去開發一個基於消息的應用!太酷了!

若是想了解kue-http的實現,請移步Vert.x 藍圖 | Vert.x Kue 教程(Web部分)。若是想了解更多的關於Vert.x Kue的特性,請移步Vert.x Kue 特性介紹

Vert.x能作的不只僅是這些。想要了解更多的關於Vert.x的知識,請參考Vert.x 官方文檔 —— 這永遠是資料最齊全的地方。


My Blog: 「千載絃歌,芳華如夢」 - sczyh30's blog

相關文章
相關標籤/搜索