本文章是 Vert.x 藍圖系列 的第二篇教程。全系列:java
Vert.x Blueprint 系列教程(二) | 開發基於消息的應用 - Vert.x Kue 教程github
本系列已發佈至Vert.x官網:Vert.x Blueprint Tutorialsredis
歡迎回到Vert.x 藍圖系列~在本教程中,咱們將利用Vert.x開發一個基於消息的應用 - Vert.x Kue,它是一個使用Vert.x開發的優先級工做隊列,數據存儲使用的是 Redis。Vert.x Kue是 Automattic/kue 的Vert.x實現版本。咱們可使用Vert.x Kue來處理各類各樣的任務,好比文件轉換、訂單處理等等。算法
經過本教程,你將會學習到如下內容:json
消息、消息系統以及事件驅動的運用segmentfault
Vert.x Event Bus 的幾種事件機制(發佈/訂閱、點對點模式)後端
設計 分佈式 的Vert.x應用ruby
工做隊列的設計
Vert.x Service Proxy(服務代理,即異步RPC)的運用
更深層次的Redis運用
本教程是Vert.x 藍圖系列的第二篇教程,對應的Vert.x版本爲3.3.2。本教程中的完整代碼已託管至GitHub。
既然咱們要用Vert.x開發一個基於消息的應用,那麼咱們先來瞅一瞅Vert.x的消息系統吧~在Vert.x中,咱們能夠經過 Event Bus 來發送和接收各類各樣的消息,這些消息能夠來自不一樣的Vertx
實例。怎麼樣,很酷吧?咱們都將消息發送至Event Bus上的某個地址上,這個地址能夠是任意的字符串。
Event Bus支持三種消息機制:發佈/訂閱(Publish/Subscribe)、點對點(Point to point)以及請求/迴應(Request-Response)模式。下面咱們就來看一看這幾種機制。
在發佈/訂閱模式中,消息被髮布到Event Bus的某一個地址上,全部訂閱此地址的Handler
都會接收到該消息而且調用相應的處理邏輯。咱們來看一看示例代碼:
EventBus eventBus = vertx.eventBus(); eventBus.consumer("foo.bar.baz", r -> { // subscribe to `foo.bar.baz` address System.out.println("1: " + r.body()); }); eventBus.consumer("foo.bar.baz", r -> { // subscribe to `foo.bar.baz` address System.out.println("2: " + r.body()); }); eventBus.publish("foo.bar.baz", "+1s"); // 向此地址發送消息
咱們能夠經過vertx.eventBus()
方法獲取EventBus
的引用,而後咱們就能夠經過consume
方法訂閱某個地址的消息而且綁定一個Handler
。接着咱們經過publish
向此地址發送消息。若是運行上面的例子,咱們會獲得一下結果:
2: +1s 1: +1s
若是咱們把上面的示例中的publish
方法替代成send
方法,上面的實例就變成點對點模式了。在點對點模式中,消息被髮布到Event Bus的某一個地址上。Vert.x會將此消息傳遞給其中監聽此地址的Handler
之一。若是有多個Handler
綁定到此地址,那麼就使用輪詢算法隨機挑一個Handler
傳遞消息。好比在此示例中,程序只會打印2: +1s
或者1: +1s
之中的一個。
當咱們綁定的Handler
接收到消息的時候,咱們可不能夠給消息的發送者回復呢?固然了!當咱們經過send
方法發送消息的時候,咱們能夠同時指定一個回覆處理函數(reply handler)。而後當某個消息的訂閱者接收到消息的時候,它就能夠給發送者回復消息;若是發送者接收到了回覆,發送者綁定的回覆處理函數就會被調用。這就是請求/迴應模式。
好啦,如今咱們已經粗略瞭解了Vert.x中的消息系統 - Event Bus的基本使用,下面咱們就看看Vert.x Kue的基本設計。有關更多關於Event Bus的信息請參考Vert.x Core Manual - Event Bus。
在咱們的項目中,咱們將Vert.x Kue劃分爲兩個模塊:
kue-core
: 核心組件,提供優先級隊列的功能
kue-http
: Web組件,提供Web UI以及REST API
另外咱們還提供一個示例模塊kue-example
用於演示以及闡述如何使用Vert.x Kue。
既然咱們的項目有兩個模塊,那麼你必定會好奇:兩個模塊之間是如何進行通訊的?而且若是咱們寫本身的Kue應用的話,咱們該怎樣去調用Kue Core中的服務呢?不要着急,謎底將在後邊的章節中揭曉:-)
回顧一下Vert.x Kue的做用 - 優先級工做隊列,因此在Vert.x Kue的核心模塊中咱們設計瞭如下的類:
Job
- 任務(做業)數據實體
JobService
- 異步服務接口,提供操做任務以及獲取數據的相關邏輯
KueWorker
- 用於處理任務的Verticle
Kue
- 工做隊列
前邊咱們提到過,咱們的兩個組件之間須要一種通訊機制能夠互相通訊 - 這裏咱們使用Vert.x的集羣模式,即以clustered的模式來部署Verticle。這樣的環境下的Event Bus一樣也是集羣模式的,所以各個組件能夠經過集羣模式下的Event Bus進行通訊。很不錯吧?在Vert.x的集羣模式下,咱們須要指定一個集羣管理器ClusterManager
。這裏咱們使用默認的HazelcastClusterManager
,使用 Hazelcast 做爲集羣管理。
在Vert.x Kue中,咱們將JobService
服務發佈至分佈式的Event Bus上,這樣其它的組件就能夠經過Event Bus調用該服務了。咱們設計了一個KueVerticle
用於註冊服務。Vert.x提供了Vert.x Service Proxy(服務代理組件),能夠很方便地將服務註冊至Event Bus上,而後在其它地方獲取此服務的代理並調用。咱們將在下面的章節中詳細介紹Vert.x Service Proxy。
在咱們的Vert.x Kue中,大多數的異步方法都是基於Future
的。若是您看過藍圖系列的第一篇文章的話,您必定不會對這種模式很陌生。在Vert.x 3.3中,咱們的Future
支持基本的響應式的操做,好比map
和compose
。它們用起來很是方便,由於咱們能夠將多個Future
以響應式的方式組合起來而不用擔憂陷入回調地獄中。
正如咱們在Vert.x Kue 特性介紹中提到的那樣,Vert.x Kue支持兩種級別的事件:任務事件(job events) 以及 隊列事件(queue events)。在Vert.x Kue中,咱們設計了三種事件地址:
vertx.kue.handler.job.{handlerType}.{addressId}.{jobType}
: 某個特定任務的任務事件地址
vertx.kue.handler.workers.{eventType}
: (全局)隊列事件地址
vertx.kue.handler.workers.{eventType}.{addressId}
: 某個特定任務的內部事件地址
在特性介紹文檔中,咱們提到了如下幾種任務事件:
start
開始處理一個任務 (onStart
)
promotion
一個延期的任務時間已到,提高至工做隊列中 (onPromotion
)
progress
任務的進度變化 (onProgress
)
failed_attempt
任務處理失敗,可是還能夠重試 (onFailureAttempt
)
failed
任務處理失敗而且不能重試 (onFailure
)
complete
任務完成 (onComplete
)
remove
任務從後端存儲中移除 (onRemove
)
隊列事件也類似,只不過須要加前綴job_
。這些事件都會經過send
方法發送至Event Bus上。每個任務都有對應的任務事件地址,所以它們可以正確地接收到對應的事件並進行相應的處理邏輯。
特別地,咱們還有兩個內部事件:done
和done_fail
。done
事件對應一個任務在底層的處理已經完成,而done_fail
事件對應一個任務在底層的處理失敗。這兩個事件使用第三種地址進行傳遞。
在Vert.x Kue中,任務共有五種狀態:
INACTIVE
: 任務還未開始處理,在工做隊列中等待處理
ACTIVE
: 任務正在處理中
COMPLETE
: 任務處理完成
FAILED
: 任務處理失敗
DELAYED
: 任務延時處理,正在等待計時器時間到並提高至工做隊列中
咱們使用狀態圖來描述任務狀態的變化:
以及任務狀態的變化伴隨的事件:
爲了讓你們對Vert.x Kue的架構有大體的瞭解,我用一幅圖來簡略描述整個Vert.x Kue的設計:
如今咱們對Vert.x Kue的設計有了大體的瞭解了,下面咱們就來看一看Vert.x Kue的代碼實現了~
咱們來開始探索Vert.x Kue的旅程吧!首先咱們先從GitHub上clone源代碼:
git clone https://github.com/sczyh30/vertx-blueprint-job-queue.git
而後你能夠把項目做爲Gradle項目導入你的IDE中。(如何導入請參考相關IDE幫助文檔)
正如咱們以前所提到的,咱們的Vert.x Kue中有兩個功能模塊和一個實例模塊,所以咱們須要在Gradle工程文件中定義三個子工程。咱們來看一下本項目中的build.gradle
文件:
configure(allprojects) { project -> ext { vertxVersion = "3.3.2" } apply plugin: 'java' repositories { jcenter() } dependencies { compile("io.vertx:vertx-core:${vertxVersion}") compile("io.vertx:vertx-codegen:${vertxVersion}") compile("io.vertx:vertx-rx-java:${vertxVersion}") compile("io.vertx:vertx-hazelcast:${vertxVersion}") compile("io.vertx:vertx-lang-ruby:${vertxVersion}") testCompile("io.vertx:vertx-unit:${vertxVersion}") testCompile group: 'junit', name: 'junit', version: '4.12' } sourceSets { main { java { srcDirs += 'src/main/generated' } } } compileJava { targetCompatibility = 1.8 sourceCompatibility = 1.8 } } project("kue-core") { dependencies { compile("io.vertx:vertx-redis-client:${vertxVersion}") compile("io.vertx:vertx-service-proxy:${vertxVersion}") } jar { archiveName = 'vertx-blueprint-kue-core.jar' from { configurations.compile.collect { it.isDirectory() ? it : zipTree(it) } } manifest { attributes 'Main-Class': 'io.vertx.core.Launcher' attributes 'Main-Verticle': 'io.vertx.blueprint.kue.queue.KueVerticle' } } task annotationProcessing(type: JavaCompile, group: 'build') { // codegen source = sourceSets.main.java classpath = configurations.compile destinationDir = project.file('src/main/generated') options.compilerArgs = [ "-proc:only", "-processor", "io.vertx.codegen.CodeGenProcessor", "-AoutputDirectory=${project.projectDir}/src/main" ] } compileJava { targetCompatibility = 1.8 sourceCompatibility = 1.8 dependsOn annotationProcessing } } project("kue-http") { dependencies { compile(project(":kue-core")) compile("io.vertx:vertx-web:${vertxVersion}") compile("io.vertx:vertx-web-templ-jade:${vertxVersion}") } jar { archiveName = 'vertx-blueprint-kue-http.jar' from { configurations.compile.collect { it.isDirectory() ? it : zipTree(it) } } manifest { attributes 'Main-Class': 'io.vertx.core.Launcher' attributes 'Main-Verticle': 'io.vertx.blueprint.kue.http.KueHttpVerticle' } } } project("kue-example") { dependencies { compile(project(":kue-core")) } jar { archiveName = 'vertx-blueprint-kue-example.jar' from { configurations.compile.collect { it.isDirectory() ? it : zipTree(it) } } manifest { attributes 'Main-Class': 'io.vertx.core.Launcher' attributes 'Main-Verticle': 'io.vertx.blueprint.kue.example.LearningVertxVerticle' } } } task wrapper(type: Wrapper) { gradleVersion = '2.12' }
(⊙o⊙)…比以前的待辦事項服務項目中的長很多誒。。。咱們來解釋一下:
在configure(allprojects)
做用域中,咱們配置了一些全局信息(對全部子工程都適用)。
咱們定義了三個子工程:kue-core
、kue-http
以及kue-example
。這裏咱們來解釋一下里面用到的依賴。在kue-core
中,vertx-redis-client
用於Redis通訊,vertx-service-proxy
用於Event Bus上的服務代理。在kue-http
中,咱們將kue-core
子工程做爲它的一個依賴。vertx-web
和vertx-web-templ-jade
用於Kue Web端的開發。
任務annotationProcessing
用於註解處理(Vert.x Codegen)。咱們已經在上一篇教程中介紹過了,這裏就不展開講了。
咱們還須要在 settings.gradle
中配置工程:
rootProject.name = 'vertx-blueprint-job-queue' include "kue-core" include "kue-http" include "kue-example"
看完了配置文件之後,咱們再來瀏覽一下咱們的項目目錄結構:
. ├── build.gradle ├── kue-core │ └── src │ ├── main │ │ ├── java │ │ └── resources │ └── test │ ├── java │ └── resources ├── kue-example │ └── src │ ├── main │ │ ├── java │ │ └── resources │ └── test │ ├── java │ └── resources ├── kue-http │ └── src │ ├── main │ │ ├── java │ │ └── resources │ └── test │ ├── java │ └── resources └── settings.gradle
在Gradle中,項目的源碼都位於{projectName}/src/main/java
目錄內。這篇教程是圍繞Vert.x Kue Core的,因此咱們的代碼都在kue-core
目錄中。
好啦!如今咱們已經對Vert.x Kue項目的總體結構有了大體的瞭解了,下面咱們開始源碼探索之旅!
Vert.x Kue是用來處理任務的,所以咱們先來看一下表明任務實體的Job
類。Job
類位於io.vertx.blueprint.kue.queue
包下。代碼可能有點長,不要擔憂,咱們把它分紅幾部分,分別來解析。
咱們先來看一下Job
類中的成員屬性:
@DataObject(generateConverter = true) public class Job { // job properties private final String address_id; private long id = -1; private String zid; private String type; private JsonObject data; private Priority priority = Priority.NORMAL; private JobState state = JobState.INACTIVE; private long delay = 0; private int max_attempts = 1; private boolean removeOnComplete = false; private int ttl = 0; private JsonObject backoff; private int attempts = 0; private int progress = 0; private JsonObject result; // job metrics private long created_at; private long promote_at; private long updated_at; private long failed_at; private long started_at; private long duration; // ... }
我去。。。好多屬性!咱們一個一個地解釋:
address_id
: 一個UUID序列,做爲Event Bus的地址
id
: 任務的編號(id)
type
: 任務的類型
data
: 任務攜帶的數據,以 JsonObject
類型表示
priority
: 任務優先級,以 Priority
枚舉類型表示。默認優先級爲正常(NORMAL
)
delay
: 任務的延遲時間,默認是 0
state
: 任務狀態,以 JobState
枚舉類型表示。默認狀態爲等待(INACTIVE
)
attempts
: 任務已經嘗試執行的次數
max_attempts
: 任務嘗試執行次數的最大閾值
removeOnComplete
: 表明任務完成時是否自動從後臺移除
zid
: zset
操做對應的編號(zid),保持先進先出順序
ttl
: TTL(Time to live)
backoff
: 任務重試配置,以 JsonObject
類型表示
progress
: 任務執行的進度
result
: 任務執行的結果,以 JsonObject
類型表示
還有這些統計數據:
created_at
: 表明此任務建立的時間
promote_at
: 表明此任務從延時狀態被提高至等待狀態時的時間
updated_at
: 表明任務更新的時間
failed_at
: 表明任務失敗的時間
started_at
: 表明任務開始的時間
duration
: 表明處理任務花費的時間,單位爲毫秒(ms
)
你可能注意到在 Job
類中還存在着幾個靜態成員變量:
private static Logger logger = LoggerFactory.getLogger(Job.class); private static Vertx vertx; private static RedisClient client; private static EventBus eventBus; public static void setVertx(Vertx v, RedisClient redisClient) { vertx = v; client = redisClient; eventBus = vertx.eventBus(); }
對於 logger
對象,我想你們應該都很熟悉,它表明一個Vert.x Logger實例用於日誌記錄。可是你必定想問爲何 Job
類中存在着一個Vertx
類型的靜態成員。Job
類不該該是一個數據對象嗎?固然咯!Job
類表明一個數據對象,但不只僅是一個數據對象。這裏我模仿了一些Automattic/kue的風格,把一些任務相關邏輯方法放到了Job
類裏,它們大多都是基於Future
的異步方法,所以能夠很方便地去調用以及進行組合變換。好比:
job.save() .compose(Job::updateNow) .compose(j -> j.log("good!"));
因爲咱們不能在Job
類被JVM加載的時候就獲取Vertx
實例,咱們必須手動給Job
類中的靜態Vertx
成員賦值。這裏咱們是在Kue
類中對其進行賦值的。當咱們建立一個工做隊列的時候,Job
類中的靜態成員變量會被初始化。同時爲了保證程序的正確性,咱們須要一個方法來檢測靜態成員變量是否初始化。當咱們在建立一個任務的時候,若是靜態成員此時未被初始化,那麼日誌會給出警告:
private void _checkStatic() { if (vertx == null) { logger.warn("static Vertx instance in Job class is not initialized!"); } }
咱們還注意到 Job
類也是由@DataObject
註解修飾的。Vert.x Codegen能夠處理含有@DataObject
註解的類並生成對應的JSON轉換器,而且Vert.x Service Proxy也須要數據對象。
在Job
類中咱們有四個構造函數。其中address_id
成員必須在一個任務被建立時就被賦值,默認狀況下此地址用一個惟一的UUID字符串表示。每個構造函數中咱們都要調用_checkStatic
函數來檢測靜態成員變量是否被初始化。
正如咱們以前所提到的那樣,咱們經過一個特定的地址vertx.kue.handler.job.{handlerType}.{addressId}.{jobType}
在分佈式的Event Bus上發送和接收任務事件(job events)。因此咱們提供了兩個用於發送和接收事件的輔助函數emit
和on
(相似於Node.js中的EventEmitter
):
@Fluent public <T> Job on(String event, Handler<Message<T>> handler) { logger.debug("[LOG] On: " + Kue.getCertainJobAddress(event, this)); eventBus.consumer(Kue.getCertainJobAddress(event, this), handler); return this; } @Fluent public Job emit(String event, Object msg) { logger.debug("[LOG] Emit: " + Kue.getCertainJobAddress(event, this)); eventBus.send(Kue.getCertainJobAddress(event, this), msg); return this; }
在後面的代碼中,咱們將頻繁使用這兩個輔助函數。
在咱們探索相關的邏輯函數以前,咱們先來描述一下Vert.x Kue的數據在Redis中是以什麼樣的形式存儲的:
全部的key都在vertx_kue
命名空間下(以vertx_kue:
做爲前綴)
vertx:kue:job:{id}
: 存儲任務實體的map
vertx:kue:ids
: 計數器,指示當前最大的任務ID
vertx:kue:job:types
: 存儲全部任務類型的列表
vertx:kue:{type}:jobs
: 指示全部等待狀態下的某種類型任務的列表
vertx_kue:jobs
: 存儲全部任務zid
的有序集合
vertx_kue:job:{state}
: 存儲全部指定狀態的任務zid
的有序集合
vertx_kue:jobs:{type}:{state}
: 存儲全部指定狀態和類型的任務zid
的有序集合
vertx:kue:job:{id}:log
: 存儲指定id
的任務對應日誌的列表
OK,下面咱們就來看看Job
類中重要的邏輯函數。
咱們以前提到過,Vert.x Kue中的任務一共有五種狀態。全部的任務相關的操做都伴隨着任務狀態的變換,所以咱們先來看一下state
方法的實現,它用於改變任務的狀態:
public Future<Job> state(JobState newState) { Future<Job> future = Future.future(); RedisClient client = RedisHelper.client(vertx, new JsonObject()); // use a new client to keep transaction JobState oldState = this.state; client.transaction().multi(r0 -> { // (1) if (r0.succeeded()) { if (oldState != null && !oldState.equals(newState)) { // (2) client.transaction().zrem(RedisHelper.getStateKey(oldState), this.zid, _failure()) .zrem(RedisHelper.getKey("jobs:" + this.type + ":" + oldState.name()), this.zid, _failure()); } client.transaction().hset(RedisHelper.getKey("job:" + this.id), "state", newState.name(), _failure()) // (3) .zadd(RedisHelper.getKey("jobs:" + newState.name()), this.priority.getValue(), this.zid, _failure()) .zadd(RedisHelper.getKey("jobs:" + this.type + ":" + newState.name()), this.priority.getValue(), this.zid, _failure()); switch (newState) { // dispatch different state case ACTIVE: // (4) client.transaction().zadd(RedisHelper.getKey("jobs:" + newState.name()), this.priority.getValue() < 0 ? this.priority.getValue() : -this.priority.getValue(), this.zid, _failure()); break; case DELAYED: // (5) client.transaction().zadd(RedisHelper.getKey("jobs:" + newState.name()), this.promote_at, this.zid, _failure()); break; case INACTIVE: // (6) client.transaction().lpush(RedisHelper.getKey(this.type + ":jobs"), "1", _failure()); break; default: } this.state = newState; client.transaction().exec(r -> { // (7) if (r.succeeded()) { future.complete(this); } else { future.fail(r.cause()); } }); } else { future.fail(r0.cause()); } }); return future.compose(Job::updateNow); }
首先咱們先建立了一個Future
對象。而後咱們調用了 client.transaction().multi(handler)
函數開始一次Redis事務 (1)。在Vert.x 3.3.2中,全部的Redis事務操做都移至RedisTransaction
類中,因此咱們須要先調用client.transaction()
方法去獲取一個事務實例,而後調用multi
表明事務塊的開始。
在multi
函數傳入的Handler
中,咱們先斷定當前的任務狀態。若是當前任務狀態不爲空而且不等於新的任務狀態,咱們就將Redis中存儲的舊的狀態信息移除 (2)。爲了方便起見,咱們提供了一個RedisHelper
輔助類,裏面提供了一些生成特定地址以及編碼解碼zid
的方法:
package io.vertx.blueprint.kue.util; import io.vertx.blueprint.kue.queue.JobState; import io.vertx.core.Vertx; import io.vertx.core.json.JsonObject; import io.vertx.redis.RedisClient; import io.vertx.redis.RedisOptions; public final class RedisHelper { private static final String VERTX_KUE_REDIS_PREFIX = "vertx_kue"; private RedisHelper() { } public static RedisClient client(Vertx vertx, JsonObject config) { return RedisClient.create(vertx, options(config)); } public static RedisOptions options(JsonObject config) { return new RedisOptions() .setHost(config.getString("redis.host", "127.0.0.1")) .setPort(config.getInteger("redis.port", 6379)); } public static String getKey(String key) { return VERTX_KUE_REDIS_PREFIX + ":" + key; } public static String getStateKey(JobState state) { return VERTX_KUE_REDIS_PREFIX + ":jobs:" + state.name(); } public static String createFIFO(long id) { String idLen = "" + ("" + id).length(); int len = 2 - idLen.length(); while (len-- > 0) idLen = "0" + idLen; return idLen + "|" + id; } public static String stripFIFO(String zid) { return zid.substring(zid.indexOf('|') + 1); } public static long numStripFIFO(String zid) { return Long.parseLong(zid.substring(zid.indexOf('|') + 1)); } }
全部的key都必須在vertx_kue
命名空間下,所以咱們封裝了一個getKey
方法。咱們還實現了createFIFO
和stripFIFO
方法用於生成zid
以及解碼zid
。zid
的格式使用了Automattic/Kue中的格式。
回到state
方法來。咱們使用zrem(String key, String member, Handler<AsyncResult<String>> handler)
方法將特定的數據從有序集合中移除。兩個key分別是vertx_kue:job:{state}
以及 vertx_kue:jobs:{type}:{state}
;member
對應着任務的zid
。
接下來咱們使用hset
方法來變動新的狀態 (3),而後用zadd
方法往vertx_kue:job:{state}
和 vertx_kue:jobs:{type}:{state}
兩個有序集合中添加此任務的zid
,同時傳遞一個權重(score)。這個很是重要,咱們就是經過這個實現優先級隊列的。咱們直接使用priority
對應的值做爲score
。這樣,當咱們須要從Redis中獲取任務的時候,咱們就能夠經過zpop
方法獲取優先級最高的任務。咱們會在後面詳細講述。
不一樣的新狀態須要不一樣的操做。對於ACTIVE
狀態,咱們經過zadd
命令將zid
添加至vertx_kue:jobs:ACTIVE
有序集合中並賦予優先級權值 (4)。對於DELAYED
狀態,咱們經過zadd
命令將zid
添加至vertx_kue:jobs:DELAYED
有序集合中並賦予提高時間(promote_at
)權值 (5)。對於INACTIVE
狀態,咱們向vertx:kue:{type}:jobs
列表中添加一個元素 (6)。這些操做都是在Redis事務塊內完成的。最後咱們經過exec
方法一併執行這些事務操做 (7)。若是執行成功,咱們給future
賦值(當前任務)。最後咱們返回future
而且與updateNow
方法相組合。
updateNow
方法很是簡單,就是把updated_at
的值設爲當前時間,而後存到Redis中:
Future<Job> updateNow() { this.updated_at = System.currentTimeMillis(); return this.set("updated_at", String.valueOf(updated_at)); }
這裏咱們來看一下整個Job
類中最重要的方法之一 - save
方法,它的做用是保存任務至Redis中。
public Future<Job> save() { // check Objects.requireNonNull(this.type, "Job type cannot be null"); // (1) if (this.id > 0) return update(); // (2) Future<Job> future = Future.future(); // 生成id client.incr(RedisHelper.getKey("ids"), res -> { // (3) if (res.succeeded()) { this.id = res.result(); this.zid = RedisHelper.createFIFO(id); // (4) String key = RedisHelper.getKey("job:" + this.id); if (this.delay > 0) { this.state = JobState.DELAYED; } client.sadd(RedisHelper.getKey("job:types"), this.type, _failure()); // (5) this.created_at = System.currentTimeMillis(); this.promote_at = this.created_at + this.delay; // 保存任務 client.hmset(key, this.toJson(), _completer(future, this)); // (6) } else { future.fail(res.cause()); } }); return future.compose(Job::update); // (7) }
首先,任務類型不能爲空因此咱們要檢查type
是否爲空 (1)。接着,若是當前任務的id大於0,則表明此任務已經存儲過(由於id是存儲時分配),此時只需執行更新操做(update
)便可 (2)。而後咱們建立一個Future
對象,而後使用incr
方法從vertx_kue:ids
字段獲取一個新的id
(3)。同時咱們使用RedisHelper.createFIFO(id)
方法來生成新的zid
(4)。接着咱們來判斷任務延時是否大於0,若大於0則將當前任務狀態設置爲DELAYED
。而後咱們經過sadd
方法將當前任務類型添加至vertx:kue:job:types
列表中 (5) 而且保存任務建立時間(created_at
)以及任務提高時間(promote_at
)。通過這一系列的操做後,全部的屬性都已準備好,因此咱們能夠利用hmset
方法將此任務實體存儲至vertx:kue:job:{id}
哈希表中 (6)。若是存儲操做成功,那麼將當前任務實體賦給future
,不然記錄錯誤。最後咱們返回此future
而且將其與update
方法進行組合。
update
方法進行一些更新操做,它的邏輯比較簡單:
Future<Job> update() { Future<Job> future = Future.future(); this.updated_at = System.currentTimeMillis(); client.transaction().multi(_failure()) .hset(RedisHelper.getKey("job:" + this.id), "updated_at", String.valueOf(this.updated_at), _failure()) .zadd(RedisHelper.getKey("jobs"), this.priority.getValue(), this.zid, _failure()) .exec(_completer(future, this)); return future.compose(r -> this.state(this.state)); }
能夠看到update
方法只作了三件微小的工做:存儲任務更新時間、存儲zid
以及更改當前任務狀態(組合state
方法)。
最後總結一下將一個任務存儲到Redis中通過的步驟:save -> update -> state
:-)
移除任務很是簡單,藉助zrem
和del
方法便可。咱們來看一下其實現:
public Future<Void> remove() { Future<Void> future = Future.future(); client.transaction().multi(_failure()) .zrem(RedisHelper.getKey("jobs:" + this.stateName()), this.zid, _failure()) .zrem(RedisHelper.getKey("jobs:" + this.type + ":" + this.stateName()), this.zid, _failure()) .zrem(RedisHelper.getKey("jobs"), this.zid, _failure()) .del(RedisHelper.getKey("job:" + this.id + ":log"), _failure()) .del(RedisHelper.getKey("job:" + this.id), _failure()) .exec(r -> { if (r.succeeded()) { this.emit("remove", new JsonObject().put("id", this.id)); future.complete(); } else { future.fail(r.cause()); } }); return future; }
注意到成功移除任務時,咱們會向Event Bus上的特定地址發送remove
任務事件。此事件包含着被移除任務的id
。
咱們能夠經過幾種 onXXX
方法來監放任務事件:
@Fluent public Job onComplete(Handler<Job> completeHandler) { this.on("complete", message -> { completeHandler.handle(new Job((JsonObject) message.body())); }); return this; } @Fluent public Job onFailure(Handler<JsonObject> failureHandler) { this.on("failed", message -> { failureHandler.handle((JsonObject) message.body()); }); return this; } @Fluent public Job onFailureAttempt(Handler<JsonObject> failureHandler) { this.on("failed_attempt", message -> { failureHandler.handle((JsonObject) message.body()); }); return this; } @Fluent public Job onPromotion(Handler<Job> handler) { this.on("promotion", message -> { handler.handle(new Job((JsonObject) message.body())); }); return this; } @Fluent public Job onStart(Handler<Job> handler) { this.on("start", message -> { handler.handle(new Job((JsonObject) message.body())); }); return this; } @Fluent public Job onRemove(Handler<JsonObject> removeHandler) { this.on("start", message -> { removeHandler.handle((JsonObject) message.body()); }); return this; } @Fluent public Job onProgress(Handler<Integer> progressHandler) { this.on("progress", message -> { progressHandler.handle((Integer) message.body()); }); return this; }
注意到不一樣的事件,對應接收的數據類型也有差別。咱們來講明一下:
onComplete
、onPromotion
以及 onStart
: 發送的數據是對應的Job
對象
onFailure
and onFailureAttempt
: 發送的數據是JsonObject
類型的,其格式相似於:
{ "job": {}, "extra": { "message": "some_error" } }
onProgress
: 發送的數據是當前任務進度
onRemove
: 發送的數據是JsonObject
類型的,其中id
表明被移除任務的編號
咱們能夠經過progress
方法來更新任務進度。看一下其實現:
public Future<Job> progress(int complete, int total) { int n = Math.min(100, complete * 100 / total); // (1) this.emit("progress", n); // (2) return this.setProgress(n) // (3) .set("progress", String.valueOf(n)) .compose(Job::updateNow); }
progress
方法接受兩個參數:第一個是當前完成的進度值,第二個是完成狀態須要的進度值。咱們首先計算出當前的進度 (1),而後向特定地址發送progress
事件 (2)。最後咱們將進度存儲至Redis中並更新時間,返回Future
(3)。
當一個任務處理失敗時,若是它有剩餘的重試次數,Vert.x Kue會自動調用failAttempt
方法進行重試。咱們來看一下failAttempt
方法的實現:
Future<Job> failedAttempt(Throwable err) { return this.error(err) .compose(Job::failed) .compose(Job::attemptInternal); }
(⊙o⊙)很是簡短吧~實際上,failAttempt
方法是三個異步方法的組合:error
、failed
以及attemptInternal
。當一個任務須要進行重試的時候,咱們首先向Event Bus發佈 error
隊列事件而且在Redis中記錄日誌,而後將當前的任務狀態置爲FAILED
,最後從新處理此任務。
咱們先來看一下error
方法:
public Future<Job> error(Throwable ex) { return this.emitError(ex) .set("error", ex.getMessage()) .compose(j -> j.log("error | " + ex.getMessage())); }
它的邏輯很簡單:首先咱們向Event Bus發佈 錯誤 事件,而後記錄錯誤日誌便可。這裏咱們封裝了一個發佈錯誤的函數emitError
:
@Fluent public Job emitError(Throwable ex) { JsonObject errorMessage = new JsonObject().put("id", this.id) .put("message", ex.getMessage()); eventBus.publish(Kue.workerAddress("error"), errorMessage); eventBus.send(Kue.getCertainJobAddress("error", this), errorMessage); return this; }
其中發送的錯誤信息格式相似於下面的樣子:
{ "id": 2052, "message": "some error" }
接下來咱們再來看一下failed
方法的實現:
public Future<Job> failed() { this.failed_at = System.currentTimeMillis(); return this.updateNow() .compose(j -> j.set("failed_at", String.valueOf(j.failed_at))) .compose(j -> j.state(JobState.FAILED)); }
很是簡單,首先咱們更新任務的更新時間和失敗時間,而後經過state
方法將當前任務狀態置爲FAILED
便可。
任務重試的核心邏輯在attemptInternal
方法中:
private Future<Job> attemptInternal() { int remaining = this.max_attempts - this.attempts; // (1) if (remaining > 0) { // 還有重試次數 return this.attemptAdd() // (2) .compose(Job::reattempt) // (3) .setHandler(r -> { if (r.failed()) { this.emitError(r.cause()); // (4) } }); } else if (remaining == 0) { // (5) return Future.failedFuture("No more attempts"); } else { // (6) return Future.failedFuture(new IllegalStateException("Attempts Exceeded")); } }
在咱們的Job
數據對象中,咱們存儲了最大重試次數max_attempts
以及已經重試的次數attempts
,因此咱們首先根據這兩個數據計算剩餘的重試次數remaining
(1)。若是還有剩餘次數的話,咱們就先調用attemptAdd
方法增長一次已重試次數並 (2),而後咱們調用reattempt
方法執行真正的任務重試邏輯 (3)。最後返回這兩個異步方法組合的Future
。若是其中一個過程出現錯誤,咱們就發佈error
事件 (4)。若是沒有剩餘次數了或者超出剩餘次數了,咱們直接返回錯誤。
在咱們解析reattempt
方法以前,咱們先來回顧一下Vert.x Kue中的任務失敗恢復機制。Vert.x Kue支持延時重試機制(retry backoff),而且支持不一樣的策略(如 fixed 以及 exponential)。以前咱們提到Job
類中有一個backoff
成員變量,它用於配置延時重試的策略。它的格式相似於這樣:
{ "type": "fixed", "delay": 5000 }
延時重試機制的實如今getBackoffImpl
方法中,它返回一個Function<Integer, Long>
對象,表明一個接受Integer
類型(即attempts
),返回Long
類型(表明計算出的延時值)的函數:
private Function<Integer, Long> getBackoffImpl() { String type = this.backoff.getString("type", "fixed"); // (1) long _delay = this.backoff.getLong("delay", this.delay); // (2) switch (type) { case "exponential": // (3) return attempts -> Math.round(_delay * 0.5 * (Math.pow(2, attempts) - 1)); case "fixed": default: // (4) return attempts -> _delay; } }
首先咱們從backoff
配置中獲取延遲重試策略。目前Vert.x Kue支持兩種策略:fixed
和 exponential
。前者採用固定延遲時間,然後者採用指數增加型延遲時間。默認狀況下Vert.x Kue會採用fixed
策略 (1)。接下來咱們從backoff
配置中獲取延遲時間,若是配置中沒有指定,那麼就使用任務對象中的延遲時間delay
(2)。接下來就是根據具體的策略進行計算了。對於指數型延遲,咱們計算[delay * 0.5 * 2^attempts]
做爲延遲時間 (3);對於固定型延遲策略,咱們直接使用獲取到的延遲時間 (4)。
好啦,如今回到「真正的重試」方法 —— reattempt
方法來:
private Future<Job> reattempt() { if (this.backoff != null) { long delay = this.getBackoffImpl().apply(attempts); // (1) return this.setDelay(delay) .setPromote_at(System.currentTimeMillis() + delay) .update() // (2) .compose(Job::delayed); // (3) } else { return this.inactive(); // (4) } }
首先咱們先檢查backoff
配置是否存在,若存在則計算出對應的延時時間 (1) 而且設定delay
和promote_at
屬性的值而後保存至Redis中 (2)。接着咱們經過delayed
方法將任務的狀態設爲延時(DELAYED
) (3)。若是延時重試配置不存在,咱們就經過inactive
方法直接將此任務置入工做隊列中 (4)。
這就是整個任務重試功能的實現,也不是很複雜蛤?觀察上面的代碼,咱們能夠發現Future
組合無處不在。這種響應式的組合很是方便。想想若是咱們用回調的異步方式來寫代碼的話,咱們很容易陷入回調地獄中(⊙o⊙)。。。幾個回調嵌套起來總顯得不是那麼優美和簡潔,而用響應式的、可組合的Future
就能夠有效地避免這個問題。
不錯!到如今爲止咱們已經探索完Job
類的源碼了~下面咱們來看一下JobService
類。
在本章節中咱們來探索一下JobService
接口及其實現 —— 它包含着各類普通的操做和統計Job
的邏輯。
咱們的JobService
是一個通用邏輯接口,所以咱們但願應用中的每個組件都能訪問此服務,即進行RPC。在Vert.x中,咱們能夠將服務註冊至Event Bus上,而後其它組件就能夠經過Event Bus來遠程調用註冊的服務了。
傳統的RPC有一個缺點:消費者須要阻塞等待生產者的迴應。你可能想說:這是一種阻塞模型,和Vert.x推崇的異步開發模式不相符。沒錯!並且,傳統的RPC不是真正面向失敗設計的。
還好,Vert.x提供了一種高效的、響應式的RPC —— 異步RPC。咱們不須要等待生產者的迴應,而只須要傳遞一個Handler<AsyncResult<R>>
參數給異步方法。這樣當收到生產者結果時,對應的Handler
就會被調用,很是方便,這與Vert.x的異步開發模式相符。而且,AsyncResult
也是面向失敗設計的。
因此講到這裏,你可能想問:到底怎麼在Event Bus上註冊服務呢?咱們是否是須要寫一大堆的邏輯去包裝和發送信息,而後在另外一端解碼信息並進行調用呢?不,這太麻煩了!有了Vert.x 服務代理,咱們不須要這麼作!Vert.x提供了一個組件 Vert.x Service Proxy 來自動生成服務代理。有了它的幫助,咱們就只須要按照規範設計咱們的異步服務接口,而後用@ProxyGen
註解修飾便可。
@ProxyGen
註解的限制@ProxyGen
註解的使用有諸多限制。好比,全部的異步方法都必須是基於回調的,也就是說每一個方法都要接受一個Handler<AsyncResult<R>>
類型的參數。而且,類型R
也是有限制的 —— 只容許基本類型以及數據對象類型。詳情請參考官方文檔。
咱們來看一下JobService
的源碼:
@ProxyGen @VertxGen public interface JobService { static JobService create(Vertx vertx, JsonObject config) { return new JobServiceImpl(vertx, config); } static JobService createProxy(Vertx vertx, String address) { return ProxyHelper.createProxy(JobService.class, vertx, address); } /** * 獲取任務,按照優先級順序 * * @param id job id * @param handler async result handler */ @Fluent JobService getJob(long id, Handler<AsyncResult<Job>> handler); /** * 刪除任務 * * @param id job id * @param handler async result handler */ @Fluent JobService removeJob(long id, Handler<AsyncResult<Void>> handler); /** * 判斷任務是否存在 * * @param id job id * @param handler async result handler */ @Fluent JobService existsJob(long id, Handler<AsyncResult<Boolean>> handler); /** * 獲取任務日誌 * * @param id job id * @param handler async result handler */ @Fluent JobService getJobLog(long id, Handler<AsyncResult<JsonArray>> handler); /** * 獲取某一範圍內某個指定狀態下的任務列表 * * @param state expected job state * @param from from * @param to to * @param order range order * @param handler async result handler */ @Fluent JobService jobRangeByState(String state, long from, long to, String order, Handler<AsyncResult<List<Job>>> handler); /** * 獲取某一範圍內某個指定狀態和類型下的任務列表 * * @param type expected job type * @param state expected job state * @param from from * @param to to * @param order range order * @param handler async result handler */ @Fluent JobService jobRangeByType(String type, String state, long from, long to, String order, Handler<AsyncResult<List<Job>>> handler); /** * 獲取某一範圍內的任務列表(按照順序或倒序) * * @param from from * @param to to * @param order range order * @param handler async result handler */ @Fluent JobService jobRange(long from, long to, String order, Handler<AsyncResult<List<Job>>> handler); // 統計函數 /** * 獲取指定狀態和類型下的任務的數量 * * @param type job type * @param state job state * @param handler async result handler */ @Fluent JobService cardByType(String type, JobState state, Handler<AsyncResult<Long>> handler); /** * 獲取某個狀態下的任務的數量 * * @param state job state * @param handler async result handler */ @Fluent JobService card(JobState state, Handler<AsyncResult<Long>> handler); /** * 獲取COMPLETE狀態任務的數量 * * @param type job type; if null, then return global metrics * @param handler async result handler */ @Fluent JobService completeCount(String type, Handler<AsyncResult<Long>> handler); /** * 獲取FAILED狀態任務的數量 * * @param type job type; if null, then return global metrics */ @Fluent JobService failedCount(String type, Handler<AsyncResult<Long>> handler); /** * 獲取INACTIVE狀態任務的數量 * * @param type job type; if null, then return global metrics */ @Fluent JobService inactiveCount(String type, Handler<AsyncResult<Long>> handler); /** * 獲取ACTIVE狀態任務的數量 * * @param type job type; if null, then return global metrics */ @Fluent JobService activeCount(String type, Handler<AsyncResult<Long>> handler); /** * 獲取DELAYED狀態任務的數量 * * @param type job type; if null, then return global metrics */ @Fluent JobService delayedCount(String type, Handler<AsyncResult<Long>> handler); /** * 獲取當前存在的全部任務類型 * * @param handler async result handler */ @Fluent JobService getAllTypes(Handler<AsyncResult<List<String>>> handler); /** * 獲取指定狀態下的全部任務的ID * * @param state job state * @param handler async result handler */ @Fluent JobService getIdsByState(JobState state, Handler<AsyncResult<List<Long>>> handler); /** * 工做隊列運行時間(ms) * * @param handler async result handler */ @Fluent JobService getWorkTime(Handler<AsyncResult<Long>> handler); }
能夠看到咱們還爲JobService
接口添加了@VertxGen
註解,Vert.x Codegen能夠處理此註解生成多種語言版本的服務。
在JobService
接口中咱們還定義了兩個靜態方法:create
用於建立一個任務服務實例,createProxy
用於建立一個服務代理。
JobService
接口中包含一些任務操做和統計的相關邏輯,每一個方法的功能都已經在註釋中闡述了,所以咱們就直接來看它的實現吧~
JobService
接口的實現位於JobServiceImpl
類中,代碼很是長,所以這裏就不貼代碼了。。。你們能夠對照GitHub中的代碼讀下面的內容。
getJob
: 獲取任務的方法很是簡單。直接利用hgetall
命令從Redis中取出對應的任務便可。
removeJob
: 咱們能夠將此方法看做是getJob
和Job#remove
兩個方法的組合。
existsJob
: 使用exists
命令判斷對應id
的任務是否存在。
getJobLog
: 使用lrange
命令從vertx_kue:job:{id}:log
列表中取出日誌。
rangeGeneral
: 使用zrange
命令獲取必定範圍內的任務,這是一個通用方法。
zrange
操做zrange
返回某一有序集合中某個特定範圍內的元素。詳情請見ZRANGE - Redis。
如下三個方法複用了rangeGeneral
方法:
jobRangeByState
: 指定狀態,對應的key爲vertx_kue:jobs:{state}
。
jobRangeByType
: 指定狀態和類型,對應的key爲vertx_kue:jobs:{type}:{state}
。
jobRange
: 對應的key爲vertx_kue:jobs
。
這兩個通用方法用於任務數量的統計:
cardByType
: 利用zcard
命令獲取某一指定狀態和類型下任務的數量。
card
: 利用zcard
命令獲取某一指定狀態下任務的數量。
下面五個輔助統計方法複用了上面兩個通用方法:
completeCount
failedCount
delayedCount
inactiveCount
activeCount
接着看:
getAllTypes
: 利用smembers
命令獲取vertx_kue:job:types
集合中存儲的全部的任務類型。
getIdsByState
: 使用zrange
獲取某一指定狀態下全部任務的ID。
getWorkTime
: 使用get
命令從vertx_kue:stats:work-time
中獲取Vert.x Kue的工做時間。
既然完成了JobService
的實現,接下來咱們來看一下如何利用Service Proxy將服務註冊至Event Bus上。這裏咱們還須要一個KueVerticle
來建立要註冊的服務實例,而且將其註冊至Event Bus上。
打開io.vertx.blueprint.kue.queue.KueVerticle
類的源碼:
package io.vertx.blueprint.kue.queue; import io.vertx.blueprint.kue.service.JobService; import io.vertx.blueprint.kue.util.RedisHelper; import io.vertx.core.AbstractVerticle; import io.vertx.core.Future; import io.vertx.core.json.JsonObject; import io.vertx.core.logging.Logger; import io.vertx.core.logging.LoggerFactory; import io.vertx.redis.RedisClient; import io.vertx.serviceproxy.ProxyHelper; public class KueVerticle extends AbstractVerticle { private static Logger logger = LoggerFactory.getLogger(Job.class); public static final String EB_JOB_SERVICE_ADDRESS = "vertx.kue.service.job.internal"; // (1) private JsonObject config; private JobService jobService; @Override public void start(Future<Void> future) throws Exception { this.config = config(); this.jobService = JobService.create(vertx, config); // (2) // create redis client RedisClient redisClient = RedisHelper.client(vertx, config); redisClient.ping(pr -> { // (3) test connection if (pr.succeeded()) { logger.info("Kue Verticle is running..."); // (4) register job service ProxyHelper.registerService(JobService.class, vertx, jobService, EB_JOB_SERVICE_ADDRESS); future.complete(); } else { logger.error("oops!", pr.cause()); future.fail(pr.cause()); } }); } }
首先咱們須要定義一個地址用於服務註冊 (1)。在start
方法中,咱們建立了一個任務服務實例 (2),而後經過ping
命令測試Redis鏈接 (3)。若是鏈接正常,那麼咱們就能夠經過ProxyHelper
類中的registerService
輔助方法來將服務實例註冊至Event Bus上 (4)。
這樣,一旦咱們在集羣模式下部署KueVerticle
,服務就會被髮布至Event Bus上,而後咱們就能夠在其餘組件中去遠程調用此服務了。很奇妙吧!
Kue
類表明着工做隊列。咱們來看一下Kue
類的實現。首先先看一下其構造函數:
public Kue(Vertx vertx, JsonObject config) { this.vertx = vertx; this.config = config; this.jobService = JobService.createProxy(vertx, EB_JOB_SERVICE_ADDRESS); this.client = RedisHelper.client(vertx, config); Job.setVertx(vertx, RedisHelper.client(vertx, config)); // init static vertx instance inner job }
這裏咱們須要注意兩點:第一點,咱們經過createProxy
方法來建立一個JobService
的服務代理;第二點,以前提到過,咱們須要在這裏初始化Job
類中的靜態成員變量。
咱們的JobService
是基於回調的,這是服務代理組件所要求的。爲了讓Vert.x Kue更加響應式,使用起來更加方便,咱們在Kue
類中以基於Future的異步模式封裝了JobService
中的全部異步方法。這很簡單,好比這個方法:
@Fluent JobService getJob(long id, Handler<AsyncResult<Job>> handler);
能夠這麼封裝:
public Future<Optional<Job>> getJob(long id) { Future<Optional<Job>> future = Future.future(); jobService.getJob(id, r -> { if (r.succeeded()) { future.complete(Optional.ofNullable(r.result())); } else { future.fail(r.cause()); } }); return future; }
其實就是加一層Future
。其它的封裝過程也相似因此咱們就不細說了。
process
和processBlocking
方法用於處理任務:
public Kue process(String type, int n, Handler<Job> handler) { if (n <= 0) { throw new IllegalStateException("The process times must be positive"); } while (n-- > 0) { processInternal(type, handler, false); }f setupTimers(); return this; } public Kue process(String type, Handler<Job> handler) { processInternal(type, handler, false); setupTimers(); return this; } public Kue processBlocking(String type, int n, Handler<Job> handler) { if (n <= 0) { throw new IllegalStateException("The process times must be positive"); } while (n-- > 0) { processInternal(type, handler, true); } setupTimers(); return this; }
兩個process
方法都相似 —— 它們都是使用Event Loop線程處理任務的,其中第一個方法還能夠指定同時處理任務數量的閾值。咱們來回顧一下使用Event Loop線程的注意事項 —— 咱們不能阻塞Event Loop線程。所以若是咱們須要在處理任務時作一些耗時的操做,咱們可使用processBlocking
方法。這幾個方法的代碼看起來都差很少,那麼區別在哪呢?以前咱們提到過,咱們設計了一種Verticle - KueWorker
,用於處理任務。所以對於process
方法來講,KueWorker
就是一種普通的Verticle;而對於processBlocking
方法來講,KueWorker
是一種Worker Verticle。這兩種Verticle有什麼不一樣呢?區別在於,Worker Verticle會使用Worker線程,所以即便咱們執行一些耗時的操做,Event Loop線程也不會被阻塞。
建立及部署KueWorker
的邏輯在processInternal
方法中,這三個方法都使用了processInternal
方法:
private void processInternal(String type, Handler<Job> handler, boolean isWorker) { KueWorker worker = new KueWorker(type, handler, this); // (1) vertx.deployVerticle(worker, new DeploymentOptions().setWorker(isWorker), r0 -> { // (2) if (r0.succeeded()) { this.on("job_complete", msg -> { long dur = new Job(((JsonObject) msg.body()).getJsonObject("job")).getDuration(); client.incrby(RedisHelper.getKey("stats:work-time"), dur, r1 -> { // (3) if (r1.failed()) r1.cause().printStackTrace(); }); }); } }); }
首先咱們建立一個KueWorker
實例 (1)。咱們將在稍後詳細介紹KueWorker
的實現。而後咱們根據提供的配置來部署此KueWorker
(2)。processInternal
方法的第三個參數表明此KueWorker
是否爲worker verticle。若是部署成功,咱們就監聽complete
事件。每當接收到complete
事件的時候,咱們獲取收到的信息(處理任務消耗的時間),而後用incrby
增長對應的工做時間 (3)。
再回到前面三個處理方法中。除了部署KueWorker
之外,咱們還調用了setupTimers
方法,用於設定定時器以監測延時任務以及監測活動任務TTL。
Vert.x Kue支持延時任務,所以咱們須要在任務延時時間到達時將任務「提高」至工做隊列中等待處理。這個工做是在checkJobPromotion
方法中實現的:
private void checkJobPromotion() { int timeout = config.getInteger("job.promotion.interval", 1000); // (1) int limit = config.getInteger("job.promotion.limit", 1000); // (2) vertx.setPeriodic(timeout, l -> { // (3) client.zrangebyscore(RedisHelper.getKey("jobs:DELAYED"), String.valueOf(0), String.valueOf(System.currentTimeMillis()), new RangeLimitOptions(new JsonObject().put("offset", 0).put("count", limit)), r -> { // (4) if (r.succeeded()) { r.result().forEach(r1 -> { long id = Long.parseLong(RedisHelper.stripFIFO((String) r1)); this.getJob(id).compose(jr -> jr.get().inactive()) // (5) .setHandler(jr -> { if (jr.succeeded()) { jr.result().emit("promotion", jr.result().getId()); // (6) } else { jr.cause().printStackTrace(); } }); }); } else { r.cause().printStackTrace(); } }); }); }
首先咱們從配置中獲取監測延時任務的間隔(job.promotion.interval
,默認1000ms)以及提高數量閾值(job.promotion.limit
,默認1000)。而後咱們使用vertx.setPeriodic
方法設一個週期性的定時器 (3),每隔一段時間就從Redis中獲取須要被提高的任務 (4)。這裏咱們經過zrangebyscore
獲取每一個須要被提高任務的id
。咱們來看一下zrangebyscore
方法的定義:
RedisClient zrangebyscore(String key, String min, String max, RangeLimitOptions options, Handler<AsyncResult<JsonArray>> handler);
key
: 某個有序集合的key,即vertx_kue:jobs:DELAYED
min
and max
: 最小值以及最大值(按照某種模式)。這裏min
是0,而max
是當前時間戳
咱們來回顧一下Job
類中的state
方法。當咱們要把任務狀態設爲DELAYED
的時候,咱們將score設爲promote_at
時間:
case DELAYED: client.transaction().zadd(RedisHelper.getKey("jobs:" + newState.name()), this.promote_at, this.zid, _failure());
所以咱們將max
設爲當前時間(System.currentTimeMillis()
),只要當前時間超過須要提高的時間,這就說明此任務能夠被提高了。
options
: range和limit配置。這裏咱們須要指定LIMIT
值因此咱們用new RangeLimitOptions(new JsonObject().put("offset", 0).put("count", limit)
建立了一個配置
zrangebyscore
的結果是一個JsonArray
,裏面包含着全部等待提高任務的zid
。得到結果後咱們就將每一個zid
轉換爲id
,而後分別獲取對應的任務實體,最後對每一個任務調用inactive
方法來將任務狀態設爲INACTIVE
(5)。若是任務成功提高至工做隊列,咱們就發送promotion
事件 (6)。
咱們知道,Vert.x支持多種語言(如JS,Ruby),所以若是能讓咱們的Vert.x Kue支持多種語言那固然是極好的!這沒有問題~Vert.x Codegen能夠處理含@VertxGen
註解的異步接口,生成多語言版本。@VertxGen
註解一樣限制異步方法 —— 須要基於回調,所以咱們設計了一個CallbackKue
接口用於提供多語言支持。CallbackKue
的設計很是簡單,其實現複用了Kue
和jobService
的代碼。你們能夠直接看源碼,一目瞭然,這裏就不細說了。
注意要生成多語言版本的代碼,須要添加相應的依賴。好比要生成Ruby版本的代碼就要向build.gradle
中添加compile("io.vertx:vertx-lang-ruby:${vertxVersion}")
。
好啦,咱們已經對Vert.x Kue Core的幾個核心部分有了大體的瞭解了,如今是時候探索一下任務處理的本源 - KueWorker
了~
每個worker都對應一個特定的任務類型,而且綁定着特定的處理函數(Handler
),因此咱們須要在建立的時候指定它們。
在KueWorker
中,咱們使用prepareAndStart
方法來準備要處理的任務而且開始處理任務的過程:
private void prepareAndStart() { this.getJobFromBackend().setHandler(jr -> { // (1) if (jr.succeeded()) { if (jr.result().isPresent()) { this.job = jr.result().get(); // (2) process(); // (3) } else { this.emitJobEvent("error", null, new JsonObject().put("message", "job_not_exist")); throw new IllegalStateException("job not exist"); } } else { this.emitJobEvent("error", null, new JsonObject().put("message", jr.cause().getMessage())); jr.cause().printStackTrace(); } }); }
代碼比較直觀。首先咱們經過getJobFromBackend
方法從Redis中按照優先級順序獲取任務 (1)。若是成功獲取任務,咱們就把獲取到的任務保存起來 (2) 而後經過process
方法處理任務 (3)。若是中間出現錯誤,咱們須要發送error
錯誤事件,其中攜帶錯誤信息。
咱們來看一下咱們是如何從Redis中按照優先級順序獲取任務實體的:
private Future<Optional<Job>> getJobFromBackend() { Future<Optional<Job>> future = Future.future(); client.blpop(RedisHelper.getKey(this.type + ":jobs"), 0, r1 -> { // (1) if (r1.failed()) { client.lpush(RedisHelper.getKey(this.type + ":jobs"), "1", r2 -> { if (r2.failed()) future.fail(r2.cause()); }); } else { this.zpop(RedisHelper.getKey("jobs:" + this.type + ":INACTIVE")) // (2) .compose(kue::getJob) // (3) .setHandler(r -> { if (r.succeeded()) { future.complete(r.result()); } else future.fail(r.cause()); }); } }); return future; }
以前咱們已經瞭解到,每當咱們保存一個任務的時候,咱們都會向vertx_kue:{type}:jobs
列表中插入一個新元素表示新的任務可供處理。所以這裏咱們經過blpop
命令來等待可用的任務 (1)。一旦有任務可供處理,咱們就利用zpop
方法取出高優先級的任務的zid
(2)。zpop
命令是一個原子操做,用於從有序集合中彈出最小score值的元素。注意Redis沒有實現zpop
命令,所以咱們須要本身實現。
Redis官方文檔介紹了一種實現zpop
命令的簡單方法 - 利用 WATCH
。這裏咱們利用另一種思路實現zpop
命令:
private Future<Long> zpop(String key) { Future<Long> future = Future.future(); client.transaction() .multi(_failure()) .zrange(key, 0, 0, _failure()) .zremrangebyrank(key, 0, 0, _failure()) .exec(r -> { if (r.succeeded()) { JsonArray res = r.result(); if (res.getJsonArray(0).size() == 0) // empty set future.fail(new IllegalStateException("Empty zpop set")); else { try { future.complete(Long.parseLong(RedisHelper.stripFIFO( res.getJsonArray(0).getString(0)))); } catch (Exception ex) { future.fail(ex); } } } else { future.fail(r.cause()); } }); return future; }
在咱們的zpop
的實現中,咱們首先開始了一個事務塊,而後依次執行zrange
和zremrangebyrank
命令。有關這些命令的詳情咱們就不細說了,能夠參考Redis官方文檔。而後咱們提交事務,若是提交成功,咱們會得到一個JsonArray
類型的結果。正常狀況下咱們均可以經過res.getJsonArray(0).getString(0)
獲取到對應的zid
值。獲取到zid
值之後咱們就能夠將其轉換爲任務的id
了,最後咱們將id
置於Future
內(由於zpop
也是一個異步方法)。
接着回到getJobFromBackend
方法中。獲取到對應的id
以後,咱們就能夠經過Kue
的getJob
函數獲取任務實體了 (3)。因爲getJobFromBackend
也是一個異步方法,所以咱們一樣將結果置於Future
中。
前邊講了那麼多,都是在爲處理任務作準備。。。不要着急,如今終於到了真正的「處理」邏輯咯!咱們看一下process
方法的實現:
private void process() { long curTime = System.currentTimeMillis(); this.job.setStarted_at(curTime) .set("started_at", String.valueOf(curTime)) // (1) set start time .compose(Job::active) // (2) set the job state to ACTIVE .setHandler(r -> { if (r.succeeded()) { Job j = r.result(); // emit start event this.emitJobEvent("start", j, null); // (3) emit job `start` event // (4) process logic invocation try { jobHandler.handle(j); } catch (Exception ex) { j.done(ex); } // (5) consume the job done event eventBus.consumer(Kue.workerAddress("done", j), msg -> { createDoneCallback(j).handle(Future.succeededFuture( ((JsonObject) msg.body()).getJsonObject("result"))); }); eventBus.consumer(Kue.workerAddress("done_fail", j), msg -> { createDoneCallback(j).handle(Future.failedFuture( (String) msg.body())); }); } else { this.emitJobEvent("error", this.job, new JsonObject().put("message", r.cause().getMessage())); r.cause().printStackTrace(); } }); }
到了最核心的函數了!首先咱們先給開始時間賦值 (1) 而後將任務狀態置爲ACTIVE
(2)。若是這兩個操做成功的話,咱們就向Event Bus發送任務開始(start
)事件 (3)。接下來咱們調用真正的處理邏輯 - 以前綁定的jobHandler
(4)。若是處理過程當中拋出異常的話,Vert.x Kue就會調用job.done(ex)
方法發送done_fail
內部事件來通知worker任務處理失敗。可是彷佛沒有看到在哪裏接收並處理done
和done_fail
事件呢?就在這 (5)!一旦Vert.x Kue接收到這兩個事件,它就會調用對應的handler
去進行任務完成或失敗的相應操做。這裏的handler
是由createDoneCallback
方法生成的:
private Handler<AsyncResult<JsonObject>> createDoneCallback(Job job) { return r0 -> { if (job == null) { return; } if (r0.failed()) { this.fail(r0.cause()); // (1) return; } long dur = System.currentTimeMillis() - job.getStarted_at(); job.setDuration(dur) .set("duration", String.valueOf(dur)); // (2) JsonObject result = r0.result(); if (result != null) { job.setResult(result) .set("result", result.encodePrettily()); // (3) } job.complete().setHandler(r -> { // (4) if (r.succeeded()) { Job j = r.result(); if (j.isRemoveOnComplete()) { // (5) j.remove(); } this.emitJobEvent("complete", j, null); // (6) this.prepareAndStart(); // (7) 準備處理下一個任務 } }); }; }
任務處理有兩種狀況:完成和失敗,所以咱們先來看任務成功處理的狀況。咱們首先給任務的用時(duration
)賦值 (2),而且若是任務產生告終果,也給結果(result
)賦值 (3)。而後咱們調用job.complete
方法將狀態設置爲COMPLETE
(4)。若是成功的話,咱們就檢查removeOnComplete
標誌位 (5) 並決定是否將任務從Redis中移除。而後咱們向Event Bus發送任務完成事件(complete
)以及隊列事件job_complete
(6)。如今這個任務的處理過程已經結束了,worker須要準備處理下一個任務了,所以最後咱們調用prepareAndStart
方法準備處理下一個Job
。
人生不如意事十之八九,任務處理過程當中極可能會碰見各類各樣的問題而失敗。當任務處理失敗時,咱們調用KueWorker
中的fail
方法:
private void fail(Throwable ex) { job.failedAttempt(ex).setHandler(r -> { // (1) if (r.failed()) { this.error(r.cause(), job); // (2) } else { Job res = r.result(); if (res.hasAttempts()) { // (3) this.emitJobEvent("failed_attempt", job, new JsonObject().put("message", ex.getMessage())); } else { this.emitJobEvent("failed", job, new JsonObject().put("message", ex.getMessage())); // (4) } prepareAndStart(); // (5) } }); }
面對失敗時,咱們首先經過failedAttempt
方法嘗試從錯誤中恢復 (1)。若是恢復失敗(好比沒有重試次數了)就向Event Bus發送error
隊列事件 (2)。若是恢復成功,咱們就根據是否還有剩餘重試次數來發送對應的事件(failed
或者failed_attempt
)。搞定錯誤之後,worker一樣須要準備處理下一個任務了,所以最後咱們調用prepareAndStart
方法準備處理下一個Job
(5)。
這就是KueWorker
的所有實現,是否是頗有趣呢?看了這麼久的代碼也有些累了,下面是時候來寫個Kue應用跑一下咯~
在io.vertx.blueprint.kue.example
包下(kue-example
子工程)建立一個LearningVertxVerticle
類,而後編寫以下代碼:
package io.vertx.blueprint.kue.example; import io.vertx.blueprint.kue.Kue; import io.vertx.blueprint.kue.queue.Job; import io.vertx.blueprint.kue.queue.Priority; import io.vertx.core.AbstractVerticle; import io.vertx.core.json.JsonObject; public class LearningVertxVerticle extends AbstractVerticle { @Override public void start() throws Exception { // 建立工做隊列 Kue kue = Kue.createQueue(vertx, config()); // 監聽全局錯誤事件 kue.on("error", message -> System.out.println("[Global Error] " + message.body())); JsonObject data = new JsonObject() .put("title", "Learning Vert.x") .put("content", "core"); // 準備學習Vert.x,爽! Job j = kue.createJob("learn vertx", data) .priority(Priority.HIGH) .onComplete(r -> { // 完成任務事件監聽 System.out.println("Feeling: " + r.getResult().getString("feeling", "none")); }).onFailure(r -> { // 任務失敗事件監聽 System.out.println("eee...so difficult..."); }).onProgress(r -> { // 任務進度變動事件監聽 System.out.println("I love this! My progress => " + r); }); // 保存任務 j.save().setHandler(r0 -> { if (r0.succeeded()) { // 開始學習! kue.processBlocking("learn vertx", 1, job -> { job.progress(10, 100); // 3秒速成 vertx.setTimer(3000, r1 -> { job.setResult(new JsonObject().put("feeling", "amazing and wonderful!")) // 結果 .done(); // 完成啦! }); }); } else { System.err.println("Wow, something happened: " + r0.cause().getMessage()); } }); } }
一般狀況下,一個Vert.x Kue應用能夠分爲幾部分:建立工做隊列、建立任務、保存任務以及處理任務。咱們推薦開發者把應用寫成Verticle
的形式。
在這個例子中,咱們要模擬一個學習Vert.x的任務!首先咱們經過Kue.createQueue
方法建立一個工做隊列而且經過on(error, handler)
方法監聽全局錯誤(error
)事件。接着咱們經過kue.createJob
方法建立學習任務,將優先級設定爲HIGH
,而且監聽complete
、failed
以及progress
事件。而後咱們須要保存任務,保存完畢之後咱們就能夠經過processBlocking
方法來執行耗時任務了。在處理邏輯中,咱們首先經過job.progress
方法將進度設爲10
,而後使用vertx.setTimer
方法設一個3秒的定時器,定時器時間到之後賦予結果並完成任務。
像往常同樣,咱們還須要在build.gradle
中配置一下。咱們須要將kue-example
子工程中的Main-Verticle
屬性設爲剛纔寫的io.vertx.blueprint.kue.example.LearningVertxVerticle
:
project("kue-example") { dependencies { compile(project(":kue-core")) } jar { archiveName = 'vertx-blueprint-kue-example.jar' from { configurations.compile.collect { it.isDirectory() ? it : zipTree(it) } } manifest { attributes 'Main-Class': 'io.vertx.core.Launcher' attributes 'Main-Verticle': 'io.vertx.blueprint.kue.example.LearningVertxVerticle' } } }
好了,到了展現時間了!打開終端,構建項目:
gradle build
固然不要忘記運行Redis:
redis-server
而後咱們先運行Vert.x Kue Core部分:
java -jar kue-core/build/libs/vertx-blueprint-kue-core.jar -cluster -ha -conf config/config.json
而後再運行咱們的實例:
java -jar kue-example/build/libs/vertx-blueprint-kue-example.jar -cluster -ha -conf config/config.json
這時終端應該會依次顯示輸出:
INFO: Kue Verticle is running... I love this! My progress => 10 Feeling: amazing and wonderful!
固然你也能夠在Vert.x Kue的Web端查看任務狀況。
棒極了!咱們終於結束了咱們的Vert.x Kue Core探索之旅~~!從這篇超長的教程中,你學到了如何利用Vert.x去開發一個基於消息的應用!太酷了!
若是想了解kue-http
的實現,請移步Vert.x 藍圖 | Vert.x Kue 教程(Web部分)。若是想了解更多的關於Vert.x Kue的特性,請移步Vert.x Kue 特性介紹。
Vert.x能作的不只僅是這些。想要了解更多的關於Vert.x的知識,請參考Vert.x 官方文檔 —— 這永遠是資料最齊全的地方。
My Blog: 「千載絃歌,芳華如夢」 - sczyh30's blog
若是您對Vert.x感興趣,歡迎加入Vert.x中國用戶組QQ羣一塊兒探討。羣號:515203212