注意:使用的是vert.x3.0 僅支持到java8當中有一些lambda表達式。如不明確請自補java8新特性。java
The Event Busnode
event bus 是vert.x的神經系統。算法
每一個vert.x的實例都有一個單一的event bus 實例。它是使用vertx.eventBus()方法得到的。編程
event bus 贊成程序中的不一樣語言編寫的模塊進行通訊。不論他們是一樣的vert.x實例。仍是不一樣的vert.x實例。瀏覽器
它甚至可以橋接瀏覽器中執行的Javascript通訊。編程語言
event bus可以在分佈式系統中的多個server節點之間進行點對點通訊和多個瀏覽器。分佈式
event bus支持公佈/訂閱模式。點對點模式,和請求/響應模式。ide
event bus的API是很easy的。它主要包含註冊消息處理事件。取消處理事件。發送和公佈消息。spa
首先理論命令行
尋址
event bus上的消息被髮送到一個地址。
vert.x不包括不論什麼花哨的尋址方案。
在vert.x中,一個地址就是一個簡單的String字符串。不論什麼字符串都是有效的。只是最好的方法是使用某種有計劃或者有規則的方案,比方使用一個私有的空間名稱。
一些有參考價值的樣例:europe.news.feed1, acme.games.pacman, sausages, and X。
事件-消息的處理程序
收到消息的處理程序,你在一個地址上註冊一個處理程序,來消息後將觸發這個處理程序。
同一個消息處理程序可以註冊到不一樣的地址上,相同同一個地址也能註冊多個處理程序。
公佈/訂閱模式
event bus 支持公佈消息
消息被公佈到一個地址。公佈意味着將消息交給所有訂閱並註冊處理程序的地址來處理。
這跟你們熟悉的公佈/訂閱模式沒有什麼不一樣。
點對點和請求/響應模式
event bus 支持點對點消息傳遞。
消息被髮送到一個地址。
vert.x將發送它到一個註冊消息處理程序的地址。
假設有多個處理程序註冊地址,vert.x將選擇一個來處理(採用非嚴格循環算法)。
強烈不推薦。
當接收到消息的程序處理完畢後,可以決定是否回覆。發送程序接到回覆後也可以進行響應回覆,假設他們這樣作應答處理程序將被調用。
當接收方到返回發送方。這樣可以無限反覆,這又是一種常見的消息傳遞模式:請求/響應模式
最優傳輸
vert.x能夠作到最優傳輸。不會有意識的丟失消息。這是很重要的。
然而,event bus的部分或全部失敗仍是有可能形成消息丟失的。
假設你的應用程序很在意消息的完整性和時序性。那麼你的代碼處理應該是冪等的。以便在消息處理程序復甦後又一次發送消息。
消息類型
開箱贊成vert.x使用不論什麼的原始/簡單類型,字符串或者緩衝區發送消息。
然而這裏有一個不成文的規定或者說建議。那就是最好使用JSON格式的子串來進行消息的傳遞。
JSON字串在所有的編程語言中都是很easy建立。讀取和解析的。在vert.x下它已經變成一種通用語言了。
假設你不是必需使用JSON或者說你不想。
event bus 很靈活。
它還支持發送隨意對象。還可以定義您想要發送的對象的編解碼器。
EVENT BUS 的API
讓咱們跳進event bus的API。
得到event bus 的對象
你可以經過例如如下代碼得到event bus的單一對象:
EventBus eb = vertx.eventBus();
註冊處理事件
使用如下這個簡單方法註冊一個消費處理程序:
EventBus eb = vertx.eventBus();
eb.consumer("news.uk.sport", message -> {
System.out.println("I have received a message: " + message.body());
});
當一個消息到達你的處理事件是。你的事件將被激活。並處理這個消息。
consumer()方法返回一個MessageConsumer的對象實例。這個對象隨後用於註銷處理程序,或者用處理程序做爲流。
然而您也可以使用consumer()返回MessageConsumer沒有處理程序,而後單獨設置處理程序。好比:
EventBus eb = vertx.eventBus();
MessageConsumer<String> consumer = eb.consumer("news.uk.sport");consumer.handler(message -> {
System.out.println("I have received a message: " + message.body());
});
當在集羣事件總線上註冊一個處理程序時,它可以花一些時間登記到集羣的所有節點上。
假設你但願在註冊完畢時獲得通知的話,你可以在MessageConsumer上註冊一個註冊完畢的處理程序:
consumer.completionHandler(res -> {
if (res.succeeded()) {
System.out.println("The handler registration has reached all nodes");
} else {
System.out.println("Registration failed!");
}
});
註銷處理事件
去除處理事件。叫作註銷。
假設你是集羣事件總線, 假設你想當這個過程完畢時通知註銷,你可以使用如下的方法:
consumer.unregister(res -> {
if (res.succeeded()) {
System.out.println("The handler un-registration has reached all nodes");
} else {
System.out.println("Un-registration failed!");
}
});
公佈消息
公佈消息很easy。僅僅需要把它公佈到指定地址就能夠:
eventBus.publish("news.uk.sport", "Yay! Someone kicked a ball");
這一消息將被交付所有訂閱news.uk.sport地址處理。
發送消息
發送消息將致使僅僅有一個註冊地址的處理程序接收到消息(多個註冊地址也僅僅有一個能收到)。
這就是點對點模式,選擇處理程序的方法採用非嚴格循環方式。
你可用用send()方法發送一條消息。
eventBus.send("news.uk.sport", "Yay! Someone kicked a ball");
未解決的指令包含在<stdiin>-include::override/eventbus_headers.adoc[] ==== The Message object
你的消息處理程序收到的是一個Message。
消息的body相應着是應該發送仍是應該公佈。
消息的headers是可用的。
回覆消息
有時你發送消息後但願獲得接收到消息的人的回覆。
這就需要你使用請求-響應模式。
要作到這一點,在消息發送的時候,你可以指定一個回覆事件。
當你接收到消息的時候,你可以經過調用reply()方法來應答。
當這一切發生的時候它會致使一個答覆發送回發送方,發送方收到應答消息再作處理。
接收方:
MessageConsumer<String> consumer = eventBus.consumer("news.uk.sport");
consumer.handler(message -> {
System.out.println("I have received a message: " + message.body());
message.reply("how interesting!");
});
發送方:
eventBus.send("news.uk.sport", "Yay! Someone kicked a ball across a patch of grass", ar -> {
if (ar.succeeded()) {
System.out.println("Received reply: " + ar.result().body());
}
});
相應答也可以作應答。這樣你就可以在兩個不一樣的程序中建立一個包括多個回合的對話。
發送超時
當你發送消息時和指定應答事件時你可以經過DeliveryOptions指定超時時間。
假設應答事件很多於超時時間,這個應答事件將失敗。
默認的超時時間是30S。
發送失敗
消息發送失敗的其它緣由,包含:
沒有可用的事件去發送消息
接收者已經明白使用失敗:失敗的消息
在所有狀況下。應答事件將回復特定的失敗。
未解決的指令包括在<stdin> - include::override/eventbus.adoc[]==== Clustered Event Bus
event bus 不僅存在於一個單一的Vert.x實例中,在一個集羣中不一樣的Vert.x實例也可以造成一個單一的,分佈的事件總線。
集羣編程
假設你建立一個Vert.x實例用於集羣編程。你需要的獲得一個關於集羣事件總線配置
VertxOptions options = new VertxOptions();
Vertx.clusteredVertx(options, res -> {
if (res.succeeded()) {
Vertx vertx = res.result();
EventBus eventBus = vertx.eventBus();
System.out.println("We now have a clustered event bus: " + eventBus);
} else {
System.out.println("Failed: " + res.cause());
}});
你應該確保在你的類路徑中實現了一個ClusterManager,好比默認的:HazelcastClusterManager。
使用命令集羣
你可以使用命令行執行集羣:vertx run my-verticle.js -cluster
Automatic clean-up in verticles
If you’re registering event bus handlers from inside verticles, those handlers will be automatically unregisteredwhen the verticle is undeployed.