[從源碼學設計]螞蟻金服SOFARegistry 之 如何與Meta Server交互

[從源碼學設計]螞蟻金服SOFARegistry 之 如何與Meta Server交互

0x00 摘要

SOFARegistry 是螞蟻金服開源的一個生產級、高時效、高可用的服務註冊中心。javascript

本系列文章重點在於分析設計和架構,即利用多篇文章,從多個角度反推總結 DataServer 或者 SOFARegistry 的實現機制和架構思路,讓你們藉以學習阿里如何設計。java

本文爲第十篇,主要是從業務角度進行梳理。看看DataServer如何與MetaServer交互。node

0x01 業務範疇

1.1 MetaServer的重要性

首先咱們要複習下MetaServer的重要性。spring

MetaServer元數據服務器集羣。這個集羣管轄的範圍是 Session 服務器集羣和 Data 服務器集羣的服務器信息,其角色就至關於 SOFARegistry 架構內部的服務註冊中心,只不過 SOFARegistry 做爲服務註冊中心是服務於廣大應用服務層,而 Meta 集羣是服務於 SOFARegistry 內部的 Session 集羣和 Data 集羣,Meta 層可以感知到 Session 節點和 Data 節點的變化,並通知集羣的其它節點。bootstrap

因此,若是想獲取節點的變化,DataServer就必須重點研究如何與MetaServer交互服務器

1.2 推拉模型

居於Bolt協議,DataServer在與Meta Server的交互中,使用了推拉模型。網絡

1.3 分析策略

咱們在這裏重點分析其設計策略以下:架構

  • 用什麼來確保交互的有效性。
  • 用什麼來解耦。
  • 用什麼來確保網絡交互的效率。

0x02 目錄結構

此模塊目錄結構以下,大體能夠推論,併發

  • DefaultMetaServiceImpl 是 Meta Server 相關模塊主體;異步

  • MetaServerConnectionFactory是鏈接管理;

  • ConnectionRefreshMetaTask 是按期循環task;

  • handler目錄下是三個響應函數;

  • provideData 目錄下是配置相關功能;

具體目錄結構以下:

│   ├── metaserver
│   │   ├── DefaultMetaServiceImpl.java
│   │   ├── IMetaServerService.java
│   │   ├── MetaServerConnectionFactory.java
│   │   ├── handler
│   │   │   ├── NotifyProvideDataChangeHandler.java
│   │   │   ├── ServerChangeHandler.java
│   │   │   └── StatusConfirmHandler.java
│   │   ├── provideData
│   │   │   ├── ProvideDataProcessor.java
│   │   │   ├── ProvideDataProcessorManager.java
│   │   │   └── processor
│   │   │       └── DatumExpireProvideDataProcessor.java
│   │   └── task
│   │       └── ConnectionRefreshMetaTask.java

0x03 Bean

MetaServer相關組件以下:

  • metaServerService,用來與MetaServer進行交互,基於raft和Bolt;
  • datumLeaseManager,用來維護具體數據;

0x04 Raft協議

這裏有一個問題 :爲何 DataServerBootstrap 之中還有 startRaftClient,按說DataServer只用Http和Bolt就能夠了。

原來是用 raft 協議來獲取MetaServer集羣中leader的地址等信息raftClient.getLeader(); 好比 renewNodeTask 時候會用到。

Raft相關啓動是在startRaftClient,此函數的做用是:

  • 啓動Raft客戶端,保證分佈式一致性;
  • 向 EventCenter 投放MetaServerChangeEvent;

具體代碼是:

private void startRaftClient() {
    metaServerService.startRaftClient();
    eventCenter.post(new MetaServerChangeEvent(metaServerService.getMetaServerMap()));
}

0x05 消息處理

前面提到了,當系統啓動以後,會主動發送一個MetaServerChangeEvent,咱們就看看其內容。

5.1 MetaServerChangeEvent

public class MetaServerChangeEvent implements Event {

    private Map<String, Set<String>> ipMap;

    /**
     * constructor
     * @param ipMap
     */
    public MetaServerChangeEvent(Map<String, Set<String>> ipMap) {
        this.ipMap = ipMap;
    }

    public Map<String, Set<String>> getIpMap() {
        return ipMap;
    }
}

其運行狀態以下:

event = {MetaServerChangeEvent@5991} 
 ipMap = {HashMap@5678}  size = 1
  "DefaultDataCenter" -> {ConcurrentHashMap$KeySetView@6007}  size = 1

5.2 消息來源

MetaServerChangeEvent有三種來源:啓動主動獲取,按期,推送。這三種具體以下:

  • 啓動主動獲取:這個主動查詢而且拉取的過程,這個過程基本上相似一個同步過程,體現爲客戶端一次查詢結果的同步返回。
  • 版本變動推送:爲了肯定服務發佈數據的變動,對於這個服務感興趣的全部客戶端訂閱方都須要推送,進行推送。因爲性能要求必須併發執行而且異步肯定推送成功。
  • 按期輪訓:這樣避免了某次變動通知沒有通知到全部訂閱方的狀況。

咱們先簡述來源:

5.2.1 啓動

這就是上面提到的,啓動時會從配置裏面讀取meta server配置,metaServerService.getMetaServerMap();據此構建MetaServerChangeEvent,投放到EventCenter之中。

當 DataServer 節點初始化成功後,會啓動任務自動去鏈接 MetaServer。即,該任務會往事件中心 EventCenter 註冊一個 DataServerChangeEvent 事件,該事件註冊後會被觸發,以後將對新增節點計算 Hash 值,同時進行納管分片。

具體啓動時,會從配置裏面讀取meta server配置,metaServerService.getMetaServerMap();據此構建MetaServerChangeEvent,投放到EventCenter之中。

private void startRaftClient() {
    metaServerService.startRaftClient();
    eventCenter.post(new MetaServerChangeEvent(metaServerService.getMetaServerMap()));
}

堆棧以下

register:44, MetaServerConnectionFactory (com.alipay.sofa.registry.server.data.remoting.metaserver)
registerMetaServer:129, MetaServerChangeEventHandler (com.alipay.sofa.registry.server.data.event.handler)
doHandle:92, MetaServerChangeEventHandler (com.alipay.sofa.registry.server.data.event.handler)
doHandle:55, MetaServerChangeEventHandler (com.alipay.sofa.registry.server.data.event.handler)
handle:51, AbstractEventHandler (com.alipay.sofa.registry.server.data.event.handler)
post:56, EventCenter (com.alipay.sofa.registry.server.data.event)
startRaftClient:197, DataServerBootstrap (com.alipay.sofa.registry.server.data.bootstrap)
start:131, DataServerBootstrap (com.alipay.sofa.registry.server.data.bootstrap)
start:47, DataServerInitializer (com.alipay.sofa.registry.server.data.bootstrap)
doStart:173, DefaultLifecycleProcessor (org.springframework.context.support)
access$200:50, DefaultLifecycleProcessor (org.springframework.context.support)
start:350, DefaultLifecycleProcessor$LifecycleGroup (org.springframework.context.support)
startBeans:149, DefaultLifecycleProcessor (org.springframework.context.support)
onRefresh:112, DefaultLifecycleProcessor (org.springframework.context.support)
finishRefresh:880, AbstractApplicationContext (org.springframework.context.support)
refresh:546, AbstractApplicationContext (org.springframework.context.support)
refresh:693, SpringApplication (org.springframework.boot)
refreshContext:360, SpringApplication (org.springframework.boot)
run:303, SpringApplication (org.springframework.boot)
run:1118, SpringApplication (org.springframework.boot)
run:1107, SpringApplication (org.springframework.boot)
main:41, DataApplication (com.alipay.sofa.registry.server.data)

5.2.2 定時

這部分是ConnectionRefreshMetaTask完成。ConnectionRefreshMetaTask 是按期 task,其在 Bean tasks 裏面配置。

StartTaskEventHandler 會調用到 tasks,當收到 StartTaskEvent 以後,會啓動 tasks裏面的幾個AbstractTask。

public class StartTaskEventHandler extends AbstractEventHandler<StartTaskEvent> {
    @Resource(name = "tasks")
    private List<AbstractTask>       tasks;

    private ScheduledExecutorService executor     = null;

    @Override
    public List<Class<? extends StartTaskEvent>> interest() {
        return Lists.newArrayList(StartTaskEvent.class);
    }

    @Override
    public void doHandle(StartTaskEvent event) {
        if (executor == null || executor.isShutdown()) {
            getExecutor();
        }

        for (AbstractTask task : tasks) {
            if (event.getSuitableTypes().contains(task.getStartTaskTypeEnum())) {
                executor.scheduleWithFixedDelay(task, task.getInitialDelay(), task.getDelay(),task.getTimeUnit());
            }
        }
    }

    private void getExecutor() {
        executor = ExecutorFactory.newScheduledThreadPool(tasks.size(), this.getClass()
            .getSimpleName());
    }
}

具體tasks以下:

@Bean(name = "tasks")
public List<AbstractTask> tasks() {
    List<AbstractTask> list = new ArrayList<>();
    list.add(connectionRefreshTask());
    list.add(connectionRefreshMetaTask());
    list.add(renewNodeTask());
    return list;
}

ConnectionRefreshMetaTask 是按期task,會按期向EventCenter投放一個 MetaServerChangeEvent。

執行時候調用 metaServerService.getMetaServerMap();返回一個MetaServerChangeEvent,而且添加到EventCenter之中。

public class ConnectionRefreshMetaTask extends AbstractTask {

    @Autowired
    private IMetaServerService metaServerService;

    @Autowired
    private EventCenter        eventCenter;

    @Override
    public void handle() {
        eventCenter.post(new MetaServerChangeEvent(metaServerService.getMetaServerMap()));
    }
}

5.2.3 推送

ServerChangeHandler 是 metaClientHandler 的一部分,是MetaNodeExchanger 的響應函數。

ServerChangeHandler 繼承了AbstractClientHandler,在interest之中,配置了會響應NodeChangeResult。

若是Meta有推送,ServerChangeHandler這裏就有響應,這個會是 Meta Server 主動通知

在ServerChangeHandler之中,拿到了NodeChangeResult以後,會判斷變動節點類型,這裏會根據 Note 類型不一樣,決定產生 DataServerChangeEvent 仍是MetaServerChangeEvent。若是是NodeType.META,就發送消息給eventCenter,eventCenter.post(new MetaServerChangeEvent(map));,這就是MetaServerChangeEvent的來源之一。

public class ServerChangeHandler extends AbstractClientHandler<NodeChangeResult> {

    @Autowired
    private EventCenter         eventCenter;

    @Autowired
    private DataServerConfig    dataServerConfig;

    @Override
    public Object doHandle(Channel channel, NodeChangeResult request) {
        ExecutorFactory.getCommonExecutor().execute(() -> {
          
            if (request.getNodeType() == NodeType.DATA) {
              
                eventCenter.post(new DataServerChangeEvent(request.getNodes(),
                        request.getDataCenterListVersions(), FromType.META_NOTIFY));
              
            } else if (request.getNodeType() == NodeType.META) {
              
                Map<String, Map<String, MetaNode>> metaNodesMap = request.getNodes();
                if (metaNodesMap != null && !metaNodesMap.isEmpty()) {
                    Map<String, MetaNode> metaNodeMap = metaNodesMap.get(dataServerConfig.getLocalDataCenter());
                    if (metaNodeMap != null && !metaNodeMap.isEmpty()) {
                        HashMap<String, Set<String>> map = new HashMap<>();
                        map.put(dataServerConfig.getLocalDataCenter(), metaNodeMap.keySet());
                        eventCenter.post(new MetaServerChangeEvent(map));
                    }
                }
            }
        });
        return CommonResponse.buildSuccessResponse();
    }

    @Override
    public Class interest() {
        return NodeChangeResult.class;
    }

    @Override
    public HandlerType getType() {
        return HandlerType.PROCESSER;
    }

    @Override
    protected Node.NodeType getConnectNodeType() {
        return Node.NodeType.DATA;
    }
}

此時邏輯圖以下,能夠看到三種MetaServerChangeEvent消息來源,ServerChangeHandler也會提供DataServerChangeEvent:

+-------------------------------+
|[DataServerBootstrap]          |   MetaServerChangeEvent
|                               |
|                               +-------------------------+
|       startRaftClient         |                         |
|                               |                         |
|                               |                         |
+-------------------------------+                         |
+-------------------------------+                         |
| [Timer]                       |                         |
|                               |                         |      +-------------+
|  ConnectionRefreshMetaTask    +------------------------------> | EventCenter |
|                               | MetaServerChangeEvent   |      +-------+-----+
+-------------------------------+                         |              ^
+-------------------------------+                         |              |
| [Push<NodeChangeResult>]      |                         |              |
|                               |                         |              |
|                               +-------------------------+              |
|                               |  MetaServerChangeEvent                 |
|      ServerChangeHandler      |                                        |
|                               +----------------------------------------+
+-------------------------------+      DataServerChangeEvent

0x06 MetaServerChangeEventHandler

MetaServerChangeEventHandler 用來響應 MetaServerChangeEvent 消息。由於其繼承了AbstractEventHandler,因此 MetaServerChangeEventHandler 已經註冊到了EventCenter之上。

注意,這裏有一個再次轉換DataServerChangeEvent的過程,即這裏又會主動和MetaServer交互,若是返回消息是NodeChangeResult,就轉換爲DataServerChangeEvent。

這是由於Meta Server的這個推送,也許是告訴data Server,"hi,目前data server也有變更,兄弟你再來拉取下"。

在處理時候,MetaServerChangeEventHandler會去與MetaServer交互,看看其有效性,若是有效,就註冊。

邏輯以下:

  • 在MetaServerChangeEventHandler之中,會遍歷MetaServerChangeEvent之中的 dataCenter, ip進行註冊,registerMetaServer(dataCenter, ip); 在registerMetaServer之中:
    • 獲取 meta server的 leader;
    • 使用 metaNodeExchanger.connect 對 IP,getMetaServerPort 進行鏈接;
    • 獲得Channel以後,註冊到 metaServerConnectionFactory 之中
    • 若是 ip不是meta leader,則再次調用metaNodeExchanger註冊本身 DataNode(new URL(DataServerConfig.IP), dataServerConfig .getLocalDataCenter());
    • 註冊成功以後,則給EventCenter發送 DataServerChangeEvent,內部繼續處理 ;

具體代碼以下:

public class MetaServerChangeEventHandler extends AbstractEventHandler<MetaServerChangeEvent> {

    @Autowired
    private DataServerConfig            dataServerConfig;

    @Autowired
    private IMetaServerService          metaServerService;

    @Autowired
    private MetaNodeExchanger           metaNodeExchanger;

    @Autowired
    private EventCenter                 eventCenter;

    @Autowired
    private MetaServerConnectionFactory metaServerConnectionFactory;

    @Override
    public List<Class<? extends MetaServerChangeEvent>> interest() {
        return Lists.newArrayList(MetaServerChangeEvent.class);
    }

    @Override
    public void doHandle(MetaServerChangeEvent event) {
        Map<String, Set<String>> ipMap = event.getIpMap();
        for (Entry<String, Set<String>> ipEntry : ipMap.entrySet()) {
            String dataCenter = ipEntry.getKey();
            Set<String> ips = ipEntry.getValue();
            if (!CollectionUtils.isEmpty(ips)) {
                for (String ip : ips) {
                    Connection connection = metaServerConnectionFactory.getConnection(dataCenter,
                        ip);
                    if (connection == null || !connection.isFine()) {
                        registerMetaServer(dataCenter, ip);
                    }
                }
                Set<String> ipSet = metaServerConnectionFactory.getIps(dataCenter);
                for (String ip : ipSet) {
                    if (!ips.contains(ip)) {
                        metaServerConnectionFactory.remove(dataCenter, ip);
                    }
                }
            } else {
                //remove connections of dataCenter if the connectionMap of the dataCenter in ipMap is empty
                removeDataCenter(dataCenter);
            }
        }
        //remove connections of dataCenter if the dataCenter not exist in ipMap
        Set<String> dataCenters = metaServerConnectionFactory.getAllDataCenters();
        for (String dataCenter : dataCenters) {
            if (!ipMap.containsKey(dataCenter)) {
                removeDataCenter(dataCenter);
            }
        }
    }

    private void registerMetaServer(String dataCenter, String ip) {

        PeerId leader = metaServerService.getLeader();

        for (int tryCount = 0; tryCount < TRY_COUNT; tryCount++) {
            try {
                Channel channel = metaNodeExchanger.connect(new URL(ip, dataServerConfig
                    .getMetaServerPort()));
                //connect all meta server
                if (channel != null && channel.isConnected()) {
                    metaServerConnectionFactory.register(dataCenter, ip,
                        ((BoltChannel) channel).getConnection());
                }
              
                //register leader meta node
                if (ip.equals(leader.getIp())) {
                    Object obj = null;
                    try {
                        obj = metaNodeExchanger.request(new Request() {
                            @Override
                            public Object getRequestBody() {
                                return new DataNode(new URL(DataServerConfig.IP), dataServerConfig
                                    .getLocalDataCenter());
                            }

                            @Override
                            public URL getRequestUrl() {
                                return new URL(ip, dataServerConfig.getMetaServerPort());
                            }
                        }).getResult();
                    } 
                  
                    if (obj instanceof NodeChangeResult) {
                        NodeChangeResult<DataNode> result = (NodeChangeResult<DataNode>) obj;
                        Map<String, Long> versionMap = result.getDataCenterListVersions();

                        //send renew after first register dataNode
                        Set<StartTaskTypeEnum> set = new HashSet<>();
                        set.add(StartTaskTypeEnum.RENEW);
                        eventCenter.post(new StartTaskEvent(set));

                        eventCenter.post(new DataServerChangeEvent(result.getNodes(), versionMap,DataServerChangeEvent.FromType.REGISTER_META));
                        break;
                    }
                }
        }
    }
}

此時邏輯圖以下:

+-------------------------------+
|[DataServerBootstrap]          |   MetaServerChangeEvent
|                               |
|                               +-------------------------+
|       startRaftClient         |                         |
|                               |                         |              +---------------+
|                               |                         |              |               |
+-------------------------------+                         |              |               |
+-------------------------------+                         |              |               |
| [Timer]                       |                         |              v               |
|                               |                         |  1   +-------+-----+         |
|  ConnectionRefreshMetaTask    +------------------------------> | EventCenter +----+    |
|                               | MetaServerChangeEvent   |      +-------+-----+    |    |
+-------------------------------+                         |              ^          |    |
+-------------------------------+                         |              |          |    |
|                               |                         |              |          |    |
| [Push<NodeChangeResult>]      |                         |              |          |    |
|                               |                         |              |          |    |
|                               +-------------------------+              |          |    |
|                               |  MetaServerChangeEvent                 |          |    |
|      ServerChangeHandler      |                               2        |          |    |
|                               +----------------------------------------+          |    |
+-------------------------------+      DataServerChangeEvent                        |    |
                                                                                    |    |
                                                                                    |    |
                                 MetaServerChangeEvent                              |    |
                                                                   3                |    |
                               +----------------------------------------------------+    |
                               |                                                         |
                               v                                                         |
             +-----------------+--------------+         DataServerChangeEvent            |
             |                                |                                   4      |
             |  MetaServerChangeEventHandler  +------------------------------------------+
             |                                |
             +--------------------------------+

手機以下:

6.1 鏈接管理

下面咱們講講dataServer如何管理metaServer的鏈接。

咱們知道,一次 tcp 請求大體分爲三個步驟:創建鏈接、通訊、關閉鏈接。每次創建新鏈接都會經歷三次握手,中間包含三次網絡傳輸,對於高併發的系統,這是一筆不小的負擔;關閉鏈接一樣如此。爲了減小每次網絡調用請求的開銷,對鏈接進行管理、複用,能夠極大的提升系統的性能。

爲了提升通訊效率,咱們須要考慮複用鏈接,減小 TCP 三次握手的次數,所以須要有鏈接管理的機制。

關於鏈接管理,SOFARegistry有兩個層次的鏈接管理,分別是 Connection 和 Node。

6.1.1 Connection管理

能夠用socket(localIp,localPort, remoteIp,remotePort )表明一個鏈接,在Netty中用Channel來表示,在sofa-bolt使用Connection對象來抽象一個鏈接,一個鏈接在client跟server端各用一個connection對象表示。

有了Connection這個抽象以後,天然的須要提供接口來管理Connection, 這個接口就是ConnectionFactory。

6.1.2 ConnectionFactory

不管是服務端仍是客戶端,其實本質都在作一件事情:建立 ConnectionEventHandler 實例並添加到 Netty 的 pipeline 中。
以後當有 ConnectionEvent 觸發時(不管是 Netty 定義的事件被觸發,仍是 SOFABolt 定義的事件被觸發),ConnectionEventHandler 會經過異步線程執行器通知 ConnectionEventListener,ConnectionEventListener 將消息派發給具體的 ConnectionEventProcessor 實現類。

6.1.3 MetaServerConnectionFactory

metaServerConnectionFactory 是用來存儲全部 meta Sever Connection,這是Bolt的機制應用,須要維持一個長鏈接。

MetaServerChangeEvent 內容是:dataCenter,以及其下面的Data Server ip 列表。對應MetaServerConnectionFactory 的 MAP 是:

Map< dataCenter : Map<ip, Connection> >

具體定義以下:

public class MetaServerConnectionFactory {

    private final Map<String, Map<String, Connection>> MAP = new ConcurrentHashMap<>();

    /**
     *
     * @param dataCenter
     * @param ip
     * @param connection
     */
    public void register(String dataCenter, String ip, Connection connection) {

        Map<String, Connection> connectionMap = MAP.get(dataCenter);
        if (connectionMap == null) {
            Map<String, Connection> newConnectionMap = new ConcurrentHashMap<>();
            connectionMap = MAP.putIfAbsent(dataCenter, newConnectionMap);
            if (connectionMap == null) {
                connectionMap = newConnectionMap;
            }
        }

        connectionMap.put(ip, connection);
    }

}

6.1.4 添加Connection

只是在 MetaServerChangeEventHandler . doHandle 函數中有添加操做,調用了metaServerConnectionFactory.register

因此在 doHandle 函數中,遍歷Event全部的 meta Server IP,這裏每個ip對應一個 data Center。對於每個ip作以下操做:

  • 重連registerMetaServer。
    • connect all meta server,就是把Connection放進MetaServerConnectionFactory
    • register leader meta node,就是從新向 leader meta node 發送一個 DataNode 請求;
    • 當收到請求結果時候,根據結果內容,往 EventCenter中插入DataServerChangeEvent,這個之後處理;
  • 若是MetaServerConnectionFactory中有在Event中不存在的 meta server ip,就從 MetaServerConnectionFactory 中移除。
  • 若是 MetaServerConnectionFactory 中有在Event中不存在的 data server ip,就removeDataCenter(dataCenter);

其中使用了metaNodeExchanger去鏈接metaServer。具體代碼以下:

private void registerMetaServer(String dataCenter, String ip) {

    PeerId leader = metaServerService.getLeader();

    for (int tryCount = 0; tryCount < TRY_COUNT; tryCount++) {
        try {
            Channel channel = metaNodeExchanger.connect(new URL(ip, dataServerConfig
                .getMetaServerPort()));
            //connect all meta server
            if (channel != null && channel.isConnected()) {
                metaServerConnectionFactory.register(dataCenter, ip,
                    ((BoltChannel) channel).getConnection());
            }
            //其餘操做
    }
}

MetaServerConnectionFactory在運行時以下:

metaServerConnectionFactory = {MetaServerConnectionFactory@5387} 
 MAP = {ConcurrentHashMap@6154}  size = 1
  "DefaultDataCenter" -> {ConcurrentHashMap@6167}  size = 1

0x07 MetaNodeExchanger

dataServer和metaServer之間是推拉模型交互

MetaNodeExchanger 是 bolt Exchange,把metaServer相關的網絡操做集中在一塊兒。不管是MetaServerChangeEventHandler仍是DefaultMetaServiceImpl,都基於此與Meta Server交互。其中

  • connect 設置了響應函數metaClientHandlers

  • 而 request 時候,若是失敗了,則會 metaServerService.refreshLeader().getIp() 刷新地址,從新調用。

這裏會測試MetaServer有效性 。

public class MetaNodeExchanger implements NodeExchanger {
    @Autowired
    private Exchange                          boltExchange;

    @Autowired
    private IMetaServerService                metaServerService;

    @Autowired
    private DataServerConfig                  dataServerConfig;

    @Resource(name = "metaClientHandlers")
    private Collection<AbstractClientHandler> metaClientHandlers;

    @Override
    public Response request(Request request) {
        Client client = boltExchange.getClient(Exchange.META_SERVER_TYPE);
        try {
            final Object result = client.sendSync(request.getRequestUrl(), request.getRequestBody(),
                    dataServerConfig.getRpcTimeout());
            return () -> result;
        } catch (Exception e) {
            //retry
            URL url = new URL(metaServerService.refreshLeader().getIp(),
                    dataServerConfig.getMetaServerPort());
            final Object result = client.sendSync(url, request.getRequestBody(),
                    request.getTimeout() != null ? request.getTimeout() : dataServerConfig.getRpcTimeout());
            return () -> result;
        }
    }

    public Channel connect(URL url) {
        Client client = boltExchange.getClient(Exchange.META_SERVER_TYPE);
        if (client == null) {
            synchronized (this) {
                client = boltExchange.getClient(Exchange.META_SERVER_TYPE);
                if (client == null) {
                    client = boltExchange.connect(Exchange.META_SERVER_TYPE, url,
                        metaClientHandlers.toArray(new ChannelHandler[metaClientHandlers.size()]));
                }
            }
        }
        //try to connect data
        Channel channel = client.getChannel(url);
        if (channel == null) {
            synchronized (this) {
                channel = client.getChannel(url);
                if (channel == null) {
                    channel = client.connect(url);
                }
            }
        }

        return channel;
    }
}

7.1 Client Handler

MetaNodeExchanger響應Handler以下,這部分是推模型,前面已經提到了,serverChangeHandler會響應推送。

@Bean(name = "metaClientHandlers")
public Collection<AbstractClientHandler> metaClientHandlers() {
    Collection<AbstractClientHandler> list = new ArrayList<>();
    list.add(serverChangeHandler());
    list.add(statusConfirmHandler());
    list.add(notifyProvideDataChangeHandler());
    return list;
}

0x08 核心服務

DefaultMetaServiceImpl是Meta Server相關服務的核心實現。

8.1 DefaultMetaServiceImpl

其中,raftClient是raft的入口,metaNodeExchanger 是bolt的入口。metaServerConnectionFactory 保存目前全部的 meta server bolt connection。

public class DefaultMetaServiceImpl implements IMetaServerService {
    @Autowired
    private DataServerConfig            dataServerConfig;

    @Autowired
    private MetaNodeExchanger           metaNodeExchanger;

    @Autowired
    private MetaServerConnectionFactory metaServerConnectionFactory;

    @Autowired
    private DataServerCache             dataServerCache;

    private RaftClient                  raftClient;
}

8.2 刷新

刷新是重要功能之一,用來獲取raft leader。

@Override
public PeerId getLeader() {
    if (raftClient == null) {
        startRaftClient();
    }
    PeerId leader = raftClient.getLeader();
    if (leader == null) {
        throw new RuntimeException(
            "[DefaultMetaServiceImpl] register MetaServer get no leader!");
    }
    return leader;
}

@Override
public PeerId refreshLeader() {
    if (raftClient == null) {
        startRaftClient();
    }
    PeerId leader = raftClient.refreshLeader();
    if (leader == null) {
        throw new RuntimeException("[RaftClientManager] refresh MetaServer get no leader!");
    }
    return leader;
}

8.3 重連

另一個重要功能是重連。

getMetaServerMap完成了重連,getMetaServerMap 的做用:

  • 獲取 Meta Server 的IP列表,放入set;
  • 獲取 Meta Server 的 Connection列表,放入connectionMap;
  • 若是 connectionMap 是空,則對於 set 中的 ip列表,進行重連;
  • 若是 connectionMap 非空,則對於 connectionMap 中的 ip列表,進行重連;
  • 拿到上面的 Connection 以後,進行調用 GetNodesRequest(NodeType.META)
  • 根據 GetNodesRequest(NodeType.META) 的結果 NodeChangeResult,構建一個 MetaServerChangeEvent,放入EventCenter。eventCenter.post(new MetaServerChangeEvent(metaServerService.getMetaServerMap()));

具體代碼以下:

@Override
public Map<String, Set<String>> getMetaServerMap() {
    HashMap<String, Set<String>> map = new HashMap<>();
    Set<String> set = dataServerConfig.getMetaServerIpAddresses();

    Map<String, Connection> connectionMap = metaServerConnectionFactory
        .getConnections(dataServerConfig.getLocalDataCenter());
    Connection connection = null;
    try {
        if (connectionMap.isEmpty()) {
            List<String> list = new ArrayList(set);
            Collections.shuffle(list);
            connection = ((BoltChannel) metaNodeExchanger.connect(new URL(list.iterator()
                .next(), dataServerConfig.getMetaServerPort()))).getConnection();
        } else {
            List<Connection> connections = new ArrayList<>(connectionMap.values());
            Collections.shuffle(connections);
            connection = connections.iterator().next();
            if (!connection.isFine()) {
                connection = ((BoltChannel) metaNodeExchanger.connect(new URL(connection
                    .getRemoteIP(), dataServerConfig.getMetaServerPort()))).getConnection();
            }
        }

        GetNodesRequest request = new GetNodesRequest(NodeType.META);
        final Connection finalConnection = connection;
        Object obj = metaNodeExchanger.request(new Request() {
            @Override
            public Object getRequestBody() {
                return request;
            }

            @Override
            public URL getRequestUrl() {
                return new URL(finalConnection.getRemoteIP(), finalConnection.getRemotePort());
            }
        }).getResult();
        if (obj instanceof NodeChangeResult) {
            NodeChangeResult<MetaNode> result = (NodeChangeResult<MetaNode>) obj;

            Map<String, Map<String, MetaNode>> metaNodesMap = result.getNodes();
            if (metaNodesMap != null && !metaNodesMap.isEmpty()) {
                Map<String, MetaNode> metaNodeMap = metaNodesMap.get(dataServerConfig
                    .getLocalDataCenter());
                if (metaNodeMap != null && !metaNodeMap.isEmpty()) {
                    map.put(dataServerConfig.getLocalDataCenter(), metaNodeMap.keySet());
                } 
            }
        }
    } 
    return map;
}

其中,具體獲取MetaServer信息是在

@ConfigurationProperties(prefix = DataServerConfig.PRE_FIX)
public class DataServerConfig {
    /**
     * Getter method for property <tt>metaServerIpAddress</tt>.
     *
     * @return property value of metaServerIpAddress
     */
    public Set<String> getMetaServerIpAddresses() {
        if (metaIps != null && !metaIps.isEmpty()) {
            return metaIps;
        }
        metaIps = new HashSet<>();
        if (commonConfig != null) {
            Map<String, Collection<String>> metaMap = commonConfig.getMetaNode();
            if (metaMap != null && !metaMap.isEmpty()) {
                String localDataCenter = commonConfig.getLocalDataCenter();
                if (localDataCenter != null && !localDataCenter.isEmpty()) {
                    Collection<String> metas = metaMap.get(localDataCenter);
                    if (metas != null && !metas.isEmpty()) {
                        metaIps = metas.stream().map(NetUtil::getIPAddressFromDomain).collect(Collectors.toSet());
                    }
                }
            }
        }
        return metaIps;
    }  
}

0x09 後續

在文中咱們能夠看到,MetaServerChangeEvent也會轉化爲 DataServerChangeEvent,投放到EventCenter。

前圖的2,4兩步。這是由於Meta Server的這個推送,也許是告訴data Server,"hi,目前data server也有變更"。因此下一期咱們介紹如何處理DataServerChangeEvent

前圖

+-------------------------------+
|[DataServerBootstrap]          |   MetaServerChangeEvent
|                               |
|                               +-------------------------+
|       startRaftClient         |                         |
|                               |                         |              +---------------+
|                               |                         |              |               |
+-------------------------------+                         |              |               |
+-------------------------------+                         |              |               |
| [Timer]                       |                         |              v               |
|                               |                         |  1   +-------+-----+         |
|  ConnectionRefreshMetaTask    +------------------------------> | EventCenter +----+    |
|                               | MetaServerChangeEvent   |      +-------+-----+    |    |
+-------------------------------+                         |              ^          |    |
+-------------------------------+                         |              |          |    |
|                               |                         |              |          |    |
| [Push<NodeChangeResult>]      |                         |              |          |    |
|                               |                         |              |          |    |
|                               +-------------------------+              |          |    |
|                               |  MetaServerChangeEvent                 |          |    |
|      ServerChangeHandler      |                               2        |          |    |
|                               +----------------------------------------+          |    |
+-------------------------------+      DataServerChangeEvent                        |    |
                                                                                    |    |
                                                                                    |    |
                                 MetaServerChangeEvent                              |    |
                                                                   3                |    |
                               +----------------------------------------------------+    |
                               |                                                         |
                               v                                                         |
             +-----------------+--------------+         DataServerChangeEvent            |
             |                                |                                   4      |
             |  MetaServerChangeEventHandler  +------------------------------------------+
             |                                |
             +--------------------------------+

手機以下:

0xFF 參考

sofa-bolt學習

相關文章
相關標籤/搜索