前言:由於ClusteredEventBus涉及集羣,有必產生網絡問題,從而引入了NetServer、ServerID等涉及網絡,端口的類。在以前的EventBusImpl中, 使用的數據結構是以address-List<Handler>做爲k-v的map容器。做爲EventBusImpl的子類,ClusteredEventBus的邏輯結構上同樣的。 不過把address-List<ServerID>做爲k-v。node
原理:在start方法中,利用第三方框架(默認hazelcast)實現的集羣同步map(變量subs) ,獲取已有的節點信息。而後根據參數,對自身服務器的端口實現監聽,把自身服務器信息放入前面的map,讓其餘節點感知。調用consumer方法時,以address-List<ServerID>做爲k-v存在一個map的容器中。調用send方法時,以address爲k從map裏取出ServerID.而後把消息利用TCP協議發送給對應的服務器。服務器
代碼:網絡
public static final String CLUSTER_PUBLIC_HOST_PROP_NAME = "vertx.cluster.public.host";// 這2個字段是爲了從System.getProperty()取值,優先級//1.System.getProperty()2.EventBusOptions public static final String CLUSTER_PUBLIC_PORT_PROP_NAME = "vertx.cluster.public.port"; private static final Buffer PONG = Buffer.buffer(new byte[]{(byte) 1}); private static final String SERVER_ID_HA_KEY = "server_id"; private static final String SUBS_MAP_NAME = "__vertx.subs"; //集羣數據存放在集羣同步的map中,須要約定一個固定的key統一存取。 private final ClusterManager clusterManager; private final HAManager haManager; private final ConcurrentMap<ServerID, ConnectionHolder> connections = new ConcurrentHashMap<>();//根據socket長連接 private final Context sendNoContext; private EventBusOptions options; // 建立時的參數 private AsyncMultiMap<String, ClusterNodeInfo> subs; // 集羣核心數據 k是address,value是HazelcastClusterNodeInfo private Set<String> ownSubs = new ConcurrentHashSet<>();// 自身訂閱(Subscribe)的addrees private ServerID serverID; // 自身服務器信息(IP和port) private ClusterNodeInfo nodeInfo; // 自身集羣信息(NodeID、IP和port) private NetServer server;//
在 public void start(Handler<AsyncResult<Void>> resultHandler) 方法中。數據結構
作了不少事件,不少邏輯。框架
1.subs = ar2.result(); 獲取集羣數據。從集羣拉取數據,ar2.succeeded() 爲前置判斷。直接排除網絡、配置等錯誤的可能。socket
2.建立底層的端口監聽。這裏端口有大坑,有2個概念:async
actualPort 和 publicPort
actualPort是值真正監聽的端口,從option傳值過來,沒有則隨機產生。ide
publicPort是放到共享給集羣的端口,爲了通知別的節點讓它們往這裏發數據。官方的解釋是爲了容器狀況考慮。在容器裏運行時,和主機的端口是經過代理訪問的。對於這2個port ,由於這裏有好幾個變量能夠賦值,全部裏面有優先級:代理
actualPort: 1.VertxOptions 也是 EventBusOptions 的setClusterPublicHost,查看VertxOptions.setClusterPort() / VertxOptions.setClusterPublicHost() 方法,發現其實就是對EventBusOptions操做。 2.隨機產生。
publicPort 1.系統變量CLUSTER_PUBLIC_PORT_PROP_NAME 2.VertxOptions 也是 EventBusOptions 的setClusterPublicHost 3.上面的actualPort
這由於端口直接涉及到通訊,設置不對就沒法使用。若是是集羣內多節點的狀況,須要設置host,不須要設置port. 由於host默認值是 "localhost",port默認值是隨機產生的可用端口(假設爲51854),host和port會產生ServerID。若是不設置host,A節點就會把 "localhost:51854"傳到集羣上。其餘B節想要訪問A時,會根據這個信息去訪問 localhost:51854,結果訪問到自身去了。code
下面重點就是consumer 和 send/poblish方法。
調用consumer方法時,會依次調用到addRegistration(),往集羣共享的subs中放入信息,達到傳播的目的。
@Override protected <T> void addRegistration(boolean newAddress, String address,boolean replyHandler, boolean localOnly,Handler<AsyncResult<Void>> completionHandler) { if (newAddress && subs != null && !replyHandler && !localOnly) { // Propagate the information subs.add(address, nodeInfo, completionHandler); ownSubs.add(address); } else { completionHandler.handle(Future.succeededFuture()); } }
調用send/poblish方法時,會依次調用到sendOrPub(),
@Override protected <T> void sendOrPub(SendContextImpl<T> sendContext) { String address = sendContext.message.address(); // 這裏只是定義resultHandler,沒有執行,若是要執行,還需 //要resultHandler.handler(AsyncResult) Handler<AsyncResult<ChoosableIterable<ClusterNodeInfo>>> resultHandler = asyncResult -> { if (asyncResult.succeeded()) { // 重要的 server ChoosableIterable<ClusterNodeInfo> serverIDs = asyncResult.result(); if (serverIDs != null && !serverIDs.isEmpty()) { sendToSubs(serverIDs, sendContext); } else { if (metrics != null) { metrics.messageSent(address, !sendContext.message.isSend(), true, false); } deliverMessageLocally(sendContext); } } else { log.error("Failed to send message", asyncResult.cause()); } }; // 這裏纔是處理 。subs存的是k-v是 address-List<HazelcastClusterNodeInfo> // get(k)就是把List<HazelcastClusterNodeInfo>取出來,交給上面的handler if (Vertx.currentContext() == null) { // Guarantees the order when there is no current context sendNoContext.runOnContext(v -> { subs.get(address, resultHandler); }); } else { subs.get(address, resultHandler); } }
sendToSubs()方法是包含了 send/publish 的判斷,這個邏輯原本是在deliverMessageLocally(MessageImpl msg)完成的。
protected MessageImpl createMessage(boolean send, String address, MultiMap headers, Object body, String codecName)方法裏,單機版產生的是 MessageImpl, 集羣版產生ClusteredMessage。 ClusteredMessage此類包含了對Buffer 的操做,幫助socket通訊。