SOFARegistry 是螞蟻金服開源的一個生產級、高時效、高可用的服務註冊中心。javascript
本系列文章重點在於分析設計和架構,即利用多篇文章,從多個角度反推總結 DataServer 或者 SOFARegistry 的實現機制和架構思路,讓你們藉以學習阿里如何設計。java
本文爲第十篇,主要是從業務角度進行梳理。看看DataServer如何與MetaServer交互。node
首先咱們要複習下MetaServer的重要性。spring
MetaServer元數據服務器集羣。這個集羣管轄的範圍是 Session 服務器集羣和 Data 服務器集羣的服務器信息,其角色就至關於 SOFARegistry 架構內部的服務註冊中心,只不過 SOFARegistry 做爲服務註冊中心是服務於廣大應用服務層,而 Meta 集羣是服務於 SOFARegistry 內部的 Session 集羣和 Data 集羣,Meta 層可以感知到 Session 節點和 Data 節點的變化,並通知集羣的其它節點。bootstrap
因此,若是想獲取節點的變化,DataServer就必須重點研究如何與MetaServer交互。服務器
居於Bolt協議,DataServer在與Meta Server的交互中,使用了推拉模型。網絡
咱們在這裏重點分析其設計策略以下:架構
此模塊目錄結構以下,大體能夠推論,併發
DefaultMetaServiceImpl 是 Meta Server 相關模塊主體;異步
MetaServerConnectionFactory是鏈接管理;
ConnectionRefreshMetaTask 是按期循環task;
handler目錄下是三個響應函數;
provideData 目錄下是配置相關功能;
具體目錄結構以下:
│ ├── metaserver │ │ ├── DefaultMetaServiceImpl.java │ │ ├── IMetaServerService.java │ │ ├── MetaServerConnectionFactory.java │ │ ├── handler │ │ │ ├── NotifyProvideDataChangeHandler.java │ │ │ ├── ServerChangeHandler.java │ │ │ └── StatusConfirmHandler.java │ │ ├── provideData │ │ │ ├── ProvideDataProcessor.java │ │ │ ├── ProvideDataProcessorManager.java │ │ │ └── processor │ │ │ └── DatumExpireProvideDataProcessor.java │ │ └── task │ │ └── ConnectionRefreshMetaTask.java
MetaServer相關組件以下:
這裏有一個問題 :爲何 DataServerBootstrap 之中還有 startRaftClient,按說DataServer只用Http和Bolt就能夠了。
原來是用 raft 協議來獲取MetaServer集羣中leader的地址等信息: raftClient.getLeader();
好比 renewNodeTask 時候會用到。
Raft相關啓動是在startRaftClient,此函數的做用是:
具體代碼是:
private void startRaftClient() { metaServerService.startRaftClient(); eventCenter.post(new MetaServerChangeEvent(metaServerService.getMetaServerMap())); }
前面提到了,當系統啓動以後,會主動發送一個MetaServerChangeEvent,咱們就看看其內容。
public class MetaServerChangeEvent implements Event { private Map<String, Set<String>> ipMap; /** * constructor * @param ipMap */ public MetaServerChangeEvent(Map<String, Set<String>> ipMap) { this.ipMap = ipMap; } public Map<String, Set<String>> getIpMap() { return ipMap; } }
其運行狀態以下:
event = {MetaServerChangeEvent@5991} ipMap = {HashMap@5678} size = 1 "DefaultDataCenter" -> {ConcurrentHashMap$KeySetView@6007} size = 1
MetaServerChangeEvent有三種來源:啓動主動獲取,按期,推送。這三種具體以下:
咱們先簡述來源:
這就是上面提到的,啓動時會從配置裏面讀取meta server配置,metaServerService.getMetaServerMap();據此構建MetaServerChangeEvent,投放到EventCenter之中。
當 DataServer 節點初始化成功後,會啓動任務自動去鏈接 MetaServer。即,該任務會往事件中心 EventCenter 註冊一個 DataServerChangeEvent 事件,該事件註冊後會被觸發,以後將對新增節點計算 Hash 值,同時進行納管分片。
具體啓動時,會從配置裏面讀取meta server配置,metaServerService.getMetaServerMap();據此構建MetaServerChangeEvent,投放到EventCenter之中。
private void startRaftClient() { metaServerService.startRaftClient(); eventCenter.post(new MetaServerChangeEvent(metaServerService.getMetaServerMap())); }
堆棧以下
register:44, MetaServerConnectionFactory (com.alipay.sofa.registry.server.data.remoting.metaserver) registerMetaServer:129, MetaServerChangeEventHandler (com.alipay.sofa.registry.server.data.event.handler) doHandle:92, MetaServerChangeEventHandler (com.alipay.sofa.registry.server.data.event.handler) doHandle:55, MetaServerChangeEventHandler (com.alipay.sofa.registry.server.data.event.handler) handle:51, AbstractEventHandler (com.alipay.sofa.registry.server.data.event.handler) post:56, EventCenter (com.alipay.sofa.registry.server.data.event) startRaftClient:197, DataServerBootstrap (com.alipay.sofa.registry.server.data.bootstrap) start:131, DataServerBootstrap (com.alipay.sofa.registry.server.data.bootstrap) start:47, DataServerInitializer (com.alipay.sofa.registry.server.data.bootstrap) doStart:173, DefaultLifecycleProcessor (org.springframework.context.support) access$200:50, DefaultLifecycleProcessor (org.springframework.context.support) start:350, DefaultLifecycleProcessor$LifecycleGroup (org.springframework.context.support) startBeans:149, DefaultLifecycleProcessor (org.springframework.context.support) onRefresh:112, DefaultLifecycleProcessor (org.springframework.context.support) finishRefresh:880, AbstractApplicationContext (org.springframework.context.support) refresh:546, AbstractApplicationContext (org.springframework.context.support) refresh:693, SpringApplication (org.springframework.boot) refreshContext:360, SpringApplication (org.springframework.boot) run:303, SpringApplication (org.springframework.boot) run:1118, SpringApplication (org.springframework.boot) run:1107, SpringApplication (org.springframework.boot) main:41, DataApplication (com.alipay.sofa.registry.server.data)
這部分是ConnectionRefreshMetaTask完成。ConnectionRefreshMetaTask 是按期 task,其在 Bean tasks 裏面配置。
StartTaskEventHandler 會調用到 tasks,當收到 StartTaskEvent 以後,會啓動 tasks裏面的幾個AbstractTask。
public class StartTaskEventHandler extends AbstractEventHandler<StartTaskEvent> { @Resource(name = "tasks") private List<AbstractTask> tasks; private ScheduledExecutorService executor = null; @Override public List<Class<? extends StartTaskEvent>> interest() { return Lists.newArrayList(StartTaskEvent.class); } @Override public void doHandle(StartTaskEvent event) { if (executor == null || executor.isShutdown()) { getExecutor(); } for (AbstractTask task : tasks) { if (event.getSuitableTypes().contains(task.getStartTaskTypeEnum())) { executor.scheduleWithFixedDelay(task, task.getInitialDelay(), task.getDelay(),task.getTimeUnit()); } } } private void getExecutor() { executor = ExecutorFactory.newScheduledThreadPool(tasks.size(), this.getClass() .getSimpleName()); } }
具體tasks以下:
@Bean(name = "tasks") public List<AbstractTask> tasks() { List<AbstractTask> list = new ArrayList<>(); list.add(connectionRefreshTask()); list.add(connectionRefreshMetaTask()); list.add(renewNodeTask()); return list; }
ConnectionRefreshMetaTask 是按期task,會按期向EventCenter投放一個 MetaServerChangeEvent。
執行時候調用 metaServerService.getMetaServerMap();
返回一個MetaServerChangeEvent,而且添加到EventCenter之中。
public class ConnectionRefreshMetaTask extends AbstractTask { @Autowired private IMetaServerService metaServerService; @Autowired private EventCenter eventCenter; @Override public void handle() { eventCenter.post(new MetaServerChangeEvent(metaServerService.getMetaServerMap())); } }
ServerChangeHandler 是 metaClientHandler 的一部分,是MetaNodeExchanger 的響應函數。
ServerChangeHandler 繼承了AbstractClientHandler,在interest之中,配置了會響應NodeChangeResult。
若是Meta有推送,ServerChangeHandler這裏就有響應,這個會是 Meta Server 主動通知。
在ServerChangeHandler之中,拿到了NodeChangeResult以後,會判斷變動節點類型,這裏會根據 Note 類型不一樣,決定產生 DataServerChangeEvent 仍是MetaServerChangeEvent。若是是NodeType.META,就發送消息給eventCenter,eventCenter.post(new MetaServerChangeEvent(map));
,這就是MetaServerChangeEvent的來源之一。
public class ServerChangeHandler extends AbstractClientHandler<NodeChangeResult> { @Autowired private EventCenter eventCenter; @Autowired private DataServerConfig dataServerConfig; @Override public Object doHandle(Channel channel, NodeChangeResult request) { ExecutorFactory.getCommonExecutor().execute(() -> { if (request.getNodeType() == NodeType.DATA) { eventCenter.post(new DataServerChangeEvent(request.getNodes(), request.getDataCenterListVersions(), FromType.META_NOTIFY)); } else if (request.getNodeType() == NodeType.META) { Map<String, Map<String, MetaNode>> metaNodesMap = request.getNodes(); if (metaNodesMap != null && !metaNodesMap.isEmpty()) { Map<String, MetaNode> metaNodeMap = metaNodesMap.get(dataServerConfig.getLocalDataCenter()); if (metaNodeMap != null && !metaNodeMap.isEmpty()) { HashMap<String, Set<String>> map = new HashMap<>(); map.put(dataServerConfig.getLocalDataCenter(), metaNodeMap.keySet()); eventCenter.post(new MetaServerChangeEvent(map)); } } } }); return CommonResponse.buildSuccessResponse(); } @Override public Class interest() { return NodeChangeResult.class; } @Override public HandlerType getType() { return HandlerType.PROCESSER; } @Override protected Node.NodeType getConnectNodeType() { return Node.NodeType.DATA; } }
此時邏輯圖以下,能夠看到三種MetaServerChangeEvent消息來源,ServerChangeHandler也會提供DataServerChangeEvent:
+-------------------------------+ |[DataServerBootstrap] | MetaServerChangeEvent | | | +-------------------------+ | startRaftClient | | | | | | | | +-------------------------------+ | +-------------------------------+ | | [Timer] | | | | | +-------------+ | ConnectionRefreshMetaTask +------------------------------> | EventCenter | | | MetaServerChangeEvent | +-------+-----+ +-------------------------------+ | ^ +-------------------------------+ | | | [Push<NodeChangeResult>] | | | | | | | | +-------------------------+ | | | MetaServerChangeEvent | | ServerChangeHandler | | | +----------------------------------------+ +-------------------------------+ DataServerChangeEvent
MetaServerChangeEventHandler 用來響應 MetaServerChangeEvent 消息。由於其繼承了AbstractEventHandler,因此 MetaServerChangeEventHandler 已經註冊到了EventCenter之上。
注意,這裏有一個再次轉換DataServerChangeEvent的過程,即這裏又會主動和MetaServer交互,若是返回消息是NodeChangeResult,就轉換爲DataServerChangeEvent。
這是由於Meta Server的這個推送,也許是告訴data Server,"hi,目前data server也有變更,兄弟你再來拉取下"。
在處理時候,MetaServerChangeEventHandler會去與MetaServer交互,看看其有效性,若是有效,就註冊。
邏輯以下:
DataNode(new URL(DataServerConfig.IP), dataServerConfig .getLocalDataCenter());
具體代碼以下:
public class MetaServerChangeEventHandler extends AbstractEventHandler<MetaServerChangeEvent> { @Autowired private DataServerConfig dataServerConfig; @Autowired private IMetaServerService metaServerService; @Autowired private MetaNodeExchanger metaNodeExchanger; @Autowired private EventCenter eventCenter; @Autowired private MetaServerConnectionFactory metaServerConnectionFactory; @Override public List<Class<? extends MetaServerChangeEvent>> interest() { return Lists.newArrayList(MetaServerChangeEvent.class); } @Override public void doHandle(MetaServerChangeEvent event) { Map<String, Set<String>> ipMap = event.getIpMap(); for (Entry<String, Set<String>> ipEntry : ipMap.entrySet()) { String dataCenter = ipEntry.getKey(); Set<String> ips = ipEntry.getValue(); if (!CollectionUtils.isEmpty(ips)) { for (String ip : ips) { Connection connection = metaServerConnectionFactory.getConnection(dataCenter, ip); if (connection == null || !connection.isFine()) { registerMetaServer(dataCenter, ip); } } Set<String> ipSet = metaServerConnectionFactory.getIps(dataCenter); for (String ip : ipSet) { if (!ips.contains(ip)) { metaServerConnectionFactory.remove(dataCenter, ip); } } } else { //remove connections of dataCenter if the connectionMap of the dataCenter in ipMap is empty removeDataCenter(dataCenter); } } //remove connections of dataCenter if the dataCenter not exist in ipMap Set<String> dataCenters = metaServerConnectionFactory.getAllDataCenters(); for (String dataCenter : dataCenters) { if (!ipMap.containsKey(dataCenter)) { removeDataCenter(dataCenter); } } } private void registerMetaServer(String dataCenter, String ip) { PeerId leader = metaServerService.getLeader(); for (int tryCount = 0; tryCount < TRY_COUNT; tryCount++) { try { Channel channel = metaNodeExchanger.connect(new URL(ip, dataServerConfig .getMetaServerPort())); //connect all meta server if (channel != null && channel.isConnected()) { metaServerConnectionFactory.register(dataCenter, ip, ((BoltChannel) channel).getConnection()); } //register leader meta node if (ip.equals(leader.getIp())) { Object obj = null; try { obj = metaNodeExchanger.request(new Request() { @Override public Object getRequestBody() { return new DataNode(new URL(DataServerConfig.IP), dataServerConfig .getLocalDataCenter()); } @Override public URL getRequestUrl() { return new URL(ip, dataServerConfig.getMetaServerPort()); } }).getResult(); } if (obj instanceof NodeChangeResult) { NodeChangeResult<DataNode> result = (NodeChangeResult<DataNode>) obj; Map<String, Long> versionMap = result.getDataCenterListVersions(); //send renew after first register dataNode Set<StartTaskTypeEnum> set = new HashSet<>(); set.add(StartTaskTypeEnum.RENEW); eventCenter.post(new StartTaskEvent(set)); eventCenter.post(new DataServerChangeEvent(result.getNodes(), versionMap,DataServerChangeEvent.FromType.REGISTER_META)); break; } } } } }
此時邏輯圖以下:
+-------------------------------+ |[DataServerBootstrap] | MetaServerChangeEvent | | | +-------------------------+ | startRaftClient | | | | | +---------------+ | | | | | +-------------------------------+ | | | +-------------------------------+ | | | | [Timer] | | v | | | | 1 +-------+-----+ | | ConnectionRefreshMetaTask +------------------------------> | EventCenter +----+ | | | MetaServerChangeEvent | +-------+-----+ | | +-------------------------------+ | ^ | | +-------------------------------+ | | | | | | | | | | | [Push<NodeChangeResult>] | | | | | | | | | | | | +-------------------------+ | | | | | MetaServerChangeEvent | | | | ServerChangeHandler | 2 | | | | +----------------------------------------+ | | +-------------------------------+ DataServerChangeEvent | | | | | | MetaServerChangeEvent | | 3 | | +----------------------------------------------------+ | | | v | +-----------------+--------------+ DataServerChangeEvent | | | 4 | | MetaServerChangeEventHandler +------------------------------------------+ | | +--------------------------------+
手機以下:
下面咱們講講dataServer如何管理metaServer的鏈接。
咱們知道,一次 tcp 請求大體分爲三個步驟:創建鏈接、通訊、關閉鏈接。每次創建新鏈接都會經歷三次握手,中間包含三次網絡傳輸,對於高併發的系統,這是一筆不小的負擔;關閉鏈接一樣如此。爲了減小每次網絡調用請求的開銷,對鏈接進行管理、複用,能夠極大的提升系統的性能。
爲了提升通訊效率,咱們須要考慮複用鏈接,減小 TCP 三次握手的次數,所以須要有鏈接管理的機制。
關於鏈接管理,SOFARegistry有兩個層次的鏈接管理,分別是 Connection 和 Node。
能夠用socket(localIp,localPort, remoteIp,remotePort )表明一個鏈接,在Netty中用Channel來表示,在sofa-bolt使用Connection對象來抽象一個鏈接,一個鏈接在client跟server端各用一個connection對象表示。
有了Connection這個抽象以後,天然的須要提供接口來管理Connection, 這個接口就是ConnectionFactory。
不管是服務端仍是客戶端,其實本質都在作一件事情:建立 ConnectionEventHandler 實例並添加到 Netty 的 pipeline 中。
以後當有 ConnectionEvent 觸發時(不管是 Netty 定義的事件被觸發,仍是 SOFABolt 定義的事件被觸發),ConnectionEventHandler 會經過異步線程執行器通知 ConnectionEventListener,ConnectionEventListener 將消息派發給具體的 ConnectionEventProcessor 實現類。
metaServerConnectionFactory 是用來存儲全部 meta Sever Connection,這是Bolt的機制應用,須要維持一個長鏈接。
MetaServerChangeEvent 內容是:dataCenter,以及其下面的Data Server ip 列表。對應MetaServerConnectionFactory 的 MAP 是:
Map< dataCenter : Map<ip, Connection> >
具體定義以下:
public class MetaServerConnectionFactory { private final Map<String, Map<String, Connection>> MAP = new ConcurrentHashMap<>(); /** * * @param dataCenter * @param ip * @param connection */ public void register(String dataCenter, String ip, Connection connection) { Map<String, Connection> connectionMap = MAP.get(dataCenter); if (connectionMap == null) { Map<String, Connection> newConnectionMap = new ConcurrentHashMap<>(); connectionMap = MAP.putIfAbsent(dataCenter, newConnectionMap); if (connectionMap == null) { connectionMap = newConnectionMap; } } connectionMap.put(ip, connection); } }
只是在 MetaServerChangeEventHandler . doHandle 函數中有添加操做,調用了metaServerConnectionFactory.register
。
因此在 doHandle 函數中,遍歷Event全部的 meta Server IP,這裏每個ip對應一個 data Center。對於每個ip作以下操做:
removeDataCenter(dataCenter);
其中使用了metaNodeExchanger去鏈接metaServer。具體代碼以下:
private void registerMetaServer(String dataCenter, String ip) { PeerId leader = metaServerService.getLeader(); for (int tryCount = 0; tryCount < TRY_COUNT; tryCount++) { try { Channel channel = metaNodeExchanger.connect(new URL(ip, dataServerConfig .getMetaServerPort())); //connect all meta server if (channel != null && channel.isConnected()) { metaServerConnectionFactory.register(dataCenter, ip, ((BoltChannel) channel).getConnection()); } //其餘操做 } }
MetaServerConnectionFactory在運行時以下:
metaServerConnectionFactory = {MetaServerConnectionFactory@5387} MAP = {ConcurrentHashMap@6154} size = 1 "DefaultDataCenter" -> {ConcurrentHashMap@6167} size = 1
dataServer和metaServer之間是推拉模型交互。
MetaNodeExchanger 是 bolt Exchange,把metaServer相關的網絡操做集中在一塊兒。不管是MetaServerChangeEventHandler仍是DefaultMetaServiceImpl,都基於此與Meta Server交互。其中
connect 設置了響應函數metaClientHandlers
而 request 時候,若是失敗了,則會 metaServerService.refreshLeader().getIp() 刷新地址,從新調用。
這裏會測試MetaServer有效性 。
public class MetaNodeExchanger implements NodeExchanger { @Autowired private Exchange boltExchange; @Autowired private IMetaServerService metaServerService; @Autowired private DataServerConfig dataServerConfig; @Resource(name = "metaClientHandlers") private Collection<AbstractClientHandler> metaClientHandlers; @Override public Response request(Request request) { Client client = boltExchange.getClient(Exchange.META_SERVER_TYPE); try { final Object result = client.sendSync(request.getRequestUrl(), request.getRequestBody(), dataServerConfig.getRpcTimeout()); return () -> result; } catch (Exception e) { //retry URL url = new URL(metaServerService.refreshLeader().getIp(), dataServerConfig.getMetaServerPort()); final Object result = client.sendSync(url, request.getRequestBody(), request.getTimeout() != null ? request.getTimeout() : dataServerConfig.getRpcTimeout()); return () -> result; } } public Channel connect(URL url) { Client client = boltExchange.getClient(Exchange.META_SERVER_TYPE); if (client == null) { synchronized (this) { client = boltExchange.getClient(Exchange.META_SERVER_TYPE); if (client == null) { client = boltExchange.connect(Exchange.META_SERVER_TYPE, url, metaClientHandlers.toArray(new ChannelHandler[metaClientHandlers.size()])); } } } //try to connect data Channel channel = client.getChannel(url); if (channel == null) { synchronized (this) { channel = client.getChannel(url); if (channel == null) { channel = client.connect(url); } } } return channel; } }
MetaNodeExchanger響應Handler以下,這部分是推模型,前面已經提到了,serverChangeHandler會響應推送。
@Bean(name = "metaClientHandlers") public Collection<AbstractClientHandler> metaClientHandlers() { Collection<AbstractClientHandler> list = new ArrayList<>(); list.add(serverChangeHandler()); list.add(statusConfirmHandler()); list.add(notifyProvideDataChangeHandler()); return list; }
DefaultMetaServiceImpl是Meta Server相關服務的核心實現。
其中,raftClient是raft的入口,metaNodeExchanger 是bolt的入口。metaServerConnectionFactory 保存目前全部的 meta server bolt connection。
public class DefaultMetaServiceImpl implements IMetaServerService { @Autowired private DataServerConfig dataServerConfig; @Autowired private MetaNodeExchanger metaNodeExchanger; @Autowired private MetaServerConnectionFactory metaServerConnectionFactory; @Autowired private DataServerCache dataServerCache; private RaftClient raftClient; }
刷新是重要功能之一,用來獲取raft leader。
@Override public PeerId getLeader() { if (raftClient == null) { startRaftClient(); } PeerId leader = raftClient.getLeader(); if (leader == null) { throw new RuntimeException( "[DefaultMetaServiceImpl] register MetaServer get no leader!"); } return leader; } @Override public PeerId refreshLeader() { if (raftClient == null) { startRaftClient(); } PeerId leader = raftClient.refreshLeader(); if (leader == null) { throw new RuntimeException("[RaftClientManager] refresh MetaServer get no leader!"); } return leader; }
另一個重要功能是重連。
getMetaServerMap完成了重連,getMetaServerMap 的做用:
GetNodesRequest(NodeType.META)
;GetNodesRequest(NodeType.META)
的結果 NodeChangeResult,構建一個 MetaServerChangeEvent,放入EventCenter。eventCenter.post(new MetaServerChangeEvent(metaServerService.getMetaServerMap()));
具體代碼以下:
@Override public Map<String, Set<String>> getMetaServerMap() { HashMap<String, Set<String>> map = new HashMap<>(); Set<String> set = dataServerConfig.getMetaServerIpAddresses(); Map<String, Connection> connectionMap = metaServerConnectionFactory .getConnections(dataServerConfig.getLocalDataCenter()); Connection connection = null; try { if (connectionMap.isEmpty()) { List<String> list = new ArrayList(set); Collections.shuffle(list); connection = ((BoltChannel) metaNodeExchanger.connect(new URL(list.iterator() .next(), dataServerConfig.getMetaServerPort()))).getConnection(); } else { List<Connection> connections = new ArrayList<>(connectionMap.values()); Collections.shuffle(connections); connection = connections.iterator().next(); if (!connection.isFine()) { connection = ((BoltChannel) metaNodeExchanger.connect(new URL(connection .getRemoteIP(), dataServerConfig.getMetaServerPort()))).getConnection(); } } GetNodesRequest request = new GetNodesRequest(NodeType.META); final Connection finalConnection = connection; Object obj = metaNodeExchanger.request(new Request() { @Override public Object getRequestBody() { return request; } @Override public URL getRequestUrl() { return new URL(finalConnection.getRemoteIP(), finalConnection.getRemotePort()); } }).getResult(); if (obj instanceof NodeChangeResult) { NodeChangeResult<MetaNode> result = (NodeChangeResult<MetaNode>) obj; Map<String, Map<String, MetaNode>> metaNodesMap = result.getNodes(); if (metaNodesMap != null && !metaNodesMap.isEmpty()) { Map<String, MetaNode> metaNodeMap = metaNodesMap.get(dataServerConfig .getLocalDataCenter()); if (metaNodeMap != null && !metaNodeMap.isEmpty()) { map.put(dataServerConfig.getLocalDataCenter(), metaNodeMap.keySet()); } } } } return map; }
其中,具體獲取MetaServer信息是在
@ConfigurationProperties(prefix = DataServerConfig.PRE_FIX) public class DataServerConfig { /** * Getter method for property <tt>metaServerIpAddress</tt>. * * @return property value of metaServerIpAddress */ public Set<String> getMetaServerIpAddresses() { if (metaIps != null && !metaIps.isEmpty()) { return metaIps; } metaIps = new HashSet<>(); if (commonConfig != null) { Map<String, Collection<String>> metaMap = commonConfig.getMetaNode(); if (metaMap != null && !metaMap.isEmpty()) { String localDataCenter = commonConfig.getLocalDataCenter(); if (localDataCenter != null && !localDataCenter.isEmpty()) { Collection<String> metas = metaMap.get(localDataCenter); if (metas != null && !metas.isEmpty()) { metaIps = metas.stream().map(NetUtil::getIPAddressFromDomain).collect(Collectors.toSet()); } } } } return metaIps; } }
在文中咱們能夠看到,MetaServerChangeEvent也會轉化爲 DataServerChangeEvent,投放到EventCenter。
如前圖的2,4兩步。這是由於Meta Server的這個推送,也許是告訴data Server,"hi,目前data server也有變更"。因此下一期咱們介紹如何處理DataServerChangeEvent。
前圖:
+-------------------------------+ |[DataServerBootstrap] | MetaServerChangeEvent | | | +-------------------------+ | startRaftClient | | | | | +---------------+ | | | | | +-------------------------------+ | | | +-------------------------------+ | | | | [Timer] | | v | | | | 1 +-------+-----+ | | ConnectionRefreshMetaTask +------------------------------> | EventCenter +----+ | | | MetaServerChangeEvent | +-------+-----+ | | +-------------------------------+ | ^ | | +-------------------------------+ | | | | | | | | | | | [Push<NodeChangeResult>] | | | | | | | | | | | | +-------------------------+ | | | | | MetaServerChangeEvent | | | | ServerChangeHandler | 2 | | | | +----------------------------------------+ | | +-------------------------------+ DataServerChangeEvent | | | | | | MetaServerChangeEvent | | 3 | | +----------------------------------------------------+ | | | v | +-----------------+--------------+ DataServerChangeEvent | | | 4 | | MetaServerChangeEventHandler +------------------------------------------+ | | +--------------------------------+
手機以下: