Vert.x系列(三)--ClusteredEventBus源碼分析

 前言:由於ClusteredEventBus涉及集羣,有必產生網絡問題,從而引入了NetServer、ServerID等涉及網絡,端口的類。在以前的EventBusImpl中, 使用的數據結構是以address-List<Handler>做爲k-v的map容器。做爲EventBusImpl的子類,ClusteredEventBus的邏輯結構上同樣的。 不過把address-List<ServerID>做爲k-v。node

原理:在start方法中,利用第三方框架(默認hazelcast)實現的集羣同步map(變量subs) ,獲取已有的節點信息。而後根據參數,對自身服務器的端口實現監聽,把自身服務器信息放入前面的map,讓其餘節點感知。調用consumer方法時,以address-List<ServerID>做爲k-v存在一個map的容器中。調用send方法時,以address爲k從map裏取出ServerID.而後把消息利用TCP協議發送給對應的服務器。服務器

代碼:網絡

public static final String CLUSTER_PUBLIC_HOST_PROP_NAME = "vertx.cluster.public.host";// 這2個字段是爲了從System.getProperty()取值,優先級//1.System.getProperty()2.EventBusOptions

public static final String CLUSTER_PUBLIC_PORT_PROP_NAME = "vertx.cluster.public.port";

private static final Buffer PONG = Buffer.buffer(new byte[]{(byte) 1});

private static final String SERVER_ID_HA_KEY = "server_id";

private static final String SUBS_MAP_NAME = "__vertx.subs"; //集羣數據存放在集羣同步的map中,須要約定一個固定的key統一存取。

private final ClusterManager clusterManager;

private final HAManager haManager;

private final ConcurrentMap<ServerID, ConnectionHolder> connections = new ConcurrentHashMap<>();//根據socket長連接

private final Context sendNoContext;

private EventBusOptions options; // 建立時的參數

private AsyncMultiMap<String, ClusterNodeInfo> subs; // 集羣核心數據 k是address,value是HazelcastClusterNodeInfo

private Set<String> ownSubs = new ConcurrentHashSet<>();// 自身訂閱(Subscribe)的addrees

private ServerID serverID; // 自身服務器信息(IP和port)

private ClusterNodeInfo nodeInfo; // 自身集羣信息(NodeID、IP和port)

private NetServer server;//

在 public void start(Handler<AsyncResult<Void>> resultHandler) 方法中。數據結構

作了不少事件,不少邏輯。框架

1.subs = ar2.result(); 獲取集羣數據。從集羣拉取數據,ar2.succeeded() 爲前置判斷。直接排除網絡、配置等錯誤的可能。socket

2.建立底層的端口監聽。這裏端口有大坑,有2個概念:async

actualPort 和 publicPort

actualPort是值真正監聽的端口,從option傳值過來,沒有則隨機產生。ide

publicPort是放到共享給集羣的端口,爲了通知別的節點讓它們往這裏發數據。官方的解釋是爲了容器狀況考慮。在容器裏運行時,和主機的端口是經過代理訪問的。對於這2個port ,由於這裏有好幾個變量能夠賦值,全部裏面有優先級:代理

actualPort:
1.VertxOptions 也是 EventBusOptions 的setClusterPublicHost,查看VertxOptions.setClusterPort() / VertxOptions.setClusterPublicHost() 方法,發現其實就是對EventBusOptions操做。
2.隨機產生。

 

publicPort
1.系統變量CLUSTER_PUBLIC_PORT_PROP_NAME
2.VertxOptions 也是 EventBusOptions 的setClusterPublicHost
3.上面的actualPort

這由於端口直接涉及到通訊,設置不對就沒法使用。若是是集羣內多節點的狀況,須要設置host,不須要設置port. 由於host默認值是 "localhost",port默認值是隨機產生的可用端口(假設爲51854),host和port會產生ServerID。若是不設置host,A節點就會把 "localhost:51854"傳到集羣上。其餘B節想要訪問A時,會根據這個信息去訪問 localhost:51854,結果訪問到自身去了。code

 

下面重點就是consumer 和 send/poblish方法。

調用consumer方法時,會依次調用到addRegistration(),往集羣共享的subs中放入信息,達到傳播的目的。

@Override
protected <T> void addRegistration(boolean newAddress, String address,boolean replyHandler, boolean localOnly,Handler<AsyncResult<Void>> completionHandler) {
if (newAddress && subs != null && !replyHandler && !localOnly) {
    // Propagate the information
    subs.add(address, nodeInfo, completionHandler);
    ownSubs.add(address);
} else {
    completionHandler.handle(Future.succeededFuture());
}
}

調用send/poblish方法時,會依次調用到sendOrPub(),

@Override

protected <T> void sendOrPub(SendContextImpl<T> sendContext) {

String address = sendContext.message.address();
// 這裏只是定義resultHandler,沒有執行,若是要執行,還需
//要resultHandler.handler(AsyncResult)
Handler<AsyncResult<ChoosableIterable<ClusterNodeInfo>>> resultHandler = asyncResult -> {
if (asyncResult.succeeded()) {
// 重要的 server
ChoosableIterable<ClusterNodeInfo> serverIDs = asyncResult.result();
if (serverIDs != null && !serverIDs.isEmpty()) {
sendToSubs(serverIDs, sendContext);
} else {
if (metrics != null) {
metrics.messageSent(address, !sendContext.message.isSend(), true, false);
}
deliverMessageLocally(sendContext);
}
} else {
log.error("Failed to send message", asyncResult.cause());
}
};

// 這裏纔是處理 。subs存的是k-v是 address-List<HazelcastClusterNodeInfo>
// get(k)就是把List<HazelcastClusterNodeInfo>取出來,交給上面的handler
if (Vertx.currentContext() == null) {
// Guarantees the order when there is no current context
sendNoContext.runOnContext(v -> {
subs.get(address, resultHandler);
});
} else {
subs.get(address, resultHandler);
}
}

sendToSubs()方法是包含了 send/publish 的判斷,這個邏輯原本是在deliverMessageLocally(MessageImpl msg)完成的。

protected MessageImpl createMessage(boolean send, String address, MultiMap headers, Object body, String codecName)方法裏,單機版產生的是 MessageImpl, 集羣版產生ClusteredMessage。 ClusteredMessage此類包含了對Buffer 的操做,幫助socket通訊。 

相關文章
相關標籤/搜索