Vert.x中EventBus中的使用


注意:使用的是vert.x3.0 僅支持到java8當中有一些lambda表達式。如不明確請自補java8新特性。java

The Event Busnode

event bus 是vert.x的神經系統。算法

每一個vert.x的實例都有一個單一的event bus 實例。它是使用vertx.eventBus()方法得到的。編程

event bus 贊成程序中的不一樣語言編寫的模塊進行通訊。不論他們是一樣的vert.x實例。仍是不一樣的vert.x實例。瀏覽器

它甚至可以橋接瀏覽器中執行的Javascript通訊。編程語言

event bus可以在分佈式系統中的多個server節點之間進行點對點通訊和多個瀏覽器。分佈式

event bus支持公佈/訂閱模式。點對點模式,和請求/響應模式。ide

event bus的API是很easy的。它主要包含註冊消息處理事件。取消處理事件。發送和公佈消息。spa

首先理論命令行

尋址

event bus上的消息被髮送到一個地址。

vert.x不包括不論什麼花哨的尋址方案。

在vert.x中,一個地址就是一個簡單的String字符串。不論什麼字符串都是有效的。只是最好的方法是使用某種有計劃或者有規則的方案,比方使用一個私有的空間名稱。

一些有參考價值的樣例:europe.news.feed1, acme.games.pacman, sausages, and X。

事件-消息的處理程序

收到消息的處理程序,你在一個地址上註冊一個處理程序,來消息後將觸發這個處理程序。

同一個消息處理程序可以註冊到不一樣的地址上,相同同一個地址也能註冊多個處理程序。

公佈/訂閱模式

event bus 支持公佈消息

消息被公佈到一個地址。公佈意味着將消息交給所有訂閱並註冊處理程序的地址來處理。

這跟你們熟悉的公佈/訂閱模式沒有什麼不一樣。

點對點和請求/響應模式

event bus 支持點對點消息傳遞。

消息被髮送到一個地址。

vert.x將發送它到一個註冊消息處理程序的地址。

假設有多個處理程序註冊地址,vert.x將選擇一個來處理(採用非嚴格循環算法)。

強烈不推薦。

當接收到消息的程序處理完畢後,可以決定是否回覆。發送程序接到回覆後也可以進行響應回覆,假設他們這樣作應答處理程序將被調用。

當接收方到返回發送方。這樣可以無限反覆,這又是一種常見的消息傳遞模式:請求/響應模式

最優傳輸

vert.x能夠作到最優傳輸。不會有意識的丟失消息。這是很重要的。

然而,event bus的部分或全部失敗仍是有可能形成消息丟失的。

假設你的應用程序很在意消息的完整性和時序性。那麼你的代碼處理應該是冪等的。以便在消息處理程序復甦後又一次發送消息。

消息類型

開箱贊成vert.x使用不論什麼的原始/簡單類型,字符串或者緩衝區發送消息。

然而這裏有一個不成文的規定或者說建議。那就是最好使用JSON格式的子串來進行消息的傳遞。

JSON字串在所有的編程語言中都是很easy建立。讀取和解析的。在vert.x下它已經變成一種通用語言了。

假設你不是必需使用JSON或者說你不想。

event bus 很靈活。

它還支持發送隨意對象。還可以定義您想要發送的對象的編解碼器。

EVENT BUS 的API

讓咱們跳進event bus的API。

得到event bus 的對象

你可以經過例如如下代碼得到event bus的單一對象:

EventBus eb = vertx.eventBus();

註冊處理事件

使用如下這個簡單方法註冊一個消費處理程序:

EventBus eb = vertx.eventBus();

eb.consumer("news.uk.sport", message -> {

System.out.println("I have received a message: " + message.body());

});

當一個消息到達你的處理事件是。你的事件將被激活。並處理這個消息。

consumer()方法返回一個MessageConsumer的對象實例。這個對象隨後用於註銷處理程序,或者用處理程序做爲流。

然而您也可以使用consumer()返回MessageConsumer沒有處理程序,而後單獨設置處理程序。好比:

EventBus eb = vertx.eventBus();

MessageConsumer<String> consumer = eb.consumer("news.uk.sport");consumer.handler(message -> {

System.out.println("I have received a message: " + message.body());

});

當在集羣事件總線上註冊一個處理程序時,它可以花一些時間登記到集羣的所有節點上。

假設你但願在註冊完畢時獲得通知的話,你可以在MessageConsumer上註冊一個註冊完畢的處理程序:

consumer.completionHandler(res -> {

if (res.succeeded()) {

System.out.println("The handler registration has reached all nodes");

} else {

System.out.println("Registration failed!");

}

});

註銷處理事件

去除處理事件。叫作註銷。

假設你是集羣事件總線, 假設你想當這個過程完畢時通知註銷,你可以使用如下的方法:

consumer.unregister(res -> {

if (res.succeeded()) {

System.out.println("The handler un-registration has reached all nodes");

} else {

System.out.println("Un-registration failed!");

}

});

公佈消息

公佈消息很easy。僅僅需要把它公佈到指定地址就能夠:

eventBus.publish("news.uk.sport", "Yay! Someone kicked a ball");

這一消息將被交付所有訂閱news.uk.sport地址處理。

發送消息

發送消息將致使僅僅有一個註冊地址的處理程序接收到消息(多個註冊地址也僅僅有一個能收到)。

這就是點對點模式,選擇處理程序的方法採用非嚴格循環方式。

你可用用send()方法發送一條消息。

eventBus.send("news.uk.sport", "Yay! Someone kicked a ball");

未解決的指令包含在<stdiin>-include::override/eventbus_headers.adoc[] ==== The Message object

你的消息處理程序收到的是一個Message。

消息的body相應着是應該發送仍是應該公佈。

消息的headers是可用的。

回覆消息

有時你發送消息後但願獲得接收到消息的人的回覆。

這就需要你使用請求-響應模式。

要作到這一點,在消息發送的時候,你可以指定一個回覆事件。

當你接收到消息的時候,你可以經過調用reply()方法來應答。

當這一切發生的時候它會致使一個答覆發送回發送方,發送方收到應答消息再作處理。

接收方:

MessageConsumer<String> consumer = eventBus.consumer("news.uk.sport");

consumer.handler(message -> {

System.out.println("I have received a message: " + message.body());

message.reply("how interesting!");

});

發送方:

eventBus.send("news.uk.sport", "Yay! Someone kicked a ball across a patch of grass", ar -> {

if (ar.succeeded()) {

 System.out.println("Received reply: " + ar.result().body());
 }
 });

相應答也可以作應答。這樣你就可以在兩個不一樣的程序中建立一個包括多個回合的對話。

發送超時

當你發送消息時和指定應答事件時你可以經過DeliveryOptions指定超時時間。

假設應答事件很多於超時時間,這個應答事件將失敗。

默認的超時時間是30S。

發送失敗

消息發送失敗的其它緣由,包含:

沒有可用的事件去發送消息

接收者已經明白使用失敗:失敗的消息

在所有狀況下。應答事件將回復特定的失敗。

未解決的指令包括在<stdin> - include::override/eventbus.adoc[]==== Clustered Event Bus

event bus 不僅存在於一個單一的Vert.x實例中,在一個集羣中不一樣的Vert.x實例也可以造成一個單一的,分佈的事件總線。

集羣編程

假設你建立一個Vert.x實例用於集羣編程。你需要的獲得一個關於集羣事件總線配置

VertxOptions options = new VertxOptions();

Vertx.clusteredVertx(options, res -> {

if (res.succeeded()) {

Vertx vertx = res.result();

EventBus eventBus = vertx.eventBus();

System.out.println("We now have a clustered event bus: " + eventBus);

} else {

System.out.println("Failed: " + res.cause());

}});

你應該確保在你的類路徑中實現了一個ClusterManager,好比默認的:HazelcastClusterManager。

使用命令集羣

你可以使用命令行執行集羣:vertx run my-verticle.js -cluster

Automatic clean-up in verticles

If you’re registering event bus handlers from inside verticles, those handlers will be automatically unregisteredwhen the verticle is undeployed.

相關文章
相關標籤/搜索