[從源碼學設計]螞蟻金服SOFARegistry 之 程序基本架構

0x00 摘要

以前咱們經過三篇文章初步分析了 MetaServer 的基本架構,MetaServer 這三篇文章爲咱們接下來的工做作了堅實的鋪墊。java

本系列咱們接着分析 Data Server,順帶會涉及一些 Session Server。由於 DataServer 和 MetaServer 代碼實現和架構的基本套路相似,因此咱們主要關心差別點和DataServer的特色。node

本文會分析DataServer程序的基本架構。算法

0x01 思路

前面文章專一於系統業務自己,本系列文章會換一種思路,重點在於分析設計和架構,即利用多篇文章,從多個角度反推總結 DataServer 或者 SOFARegistry 的實現機制和架構思路,讓你們藉以學習阿里如何設計。編程

具體學習方法是:bootstrap

  • 先自行設想業務場景,存在問題以及解決方案。必定要密切聯繫業務,一切脫離業務談設計,都是耍流氓;
  • 而後看螞蟻金服源碼,看看他們是怎麼解決問題的,和本身的方案有何區別。由於螞蟻金服的當前代碼,不必定是在目前應用場景下的最理想方案(好比hash的解決方案),但確定是在各類力量博弈後的產物,是經歷了金融級別錘鍊下的最佳實踐。
  • 由於本身的設想確定和現實差距很大,因此在研究代碼以後,須要調整本身的思路,再次思考。
  • 而後看看阿里的解決方案在將來有沒有能夠改進的地方;
  • 最後看看是否能夠從源碼中提煉出共同點或者是能夠複用的辦法或者模式。

學習時注意點是:緩存

  • 架構設計的本質之一是平衡和妥協。一個系統在不一樣的時期、不一樣的數據環境、不一樣的應用場景下會選擇不一樣的架構,在選擇時本質上是在平衡一些重要的點;
  • 重點關注算法以及其餘相關邏輯在時間和空間上的關係——這樣一種邏輯上的架構關係。在一個系統中,這些維度(空間和時間)縱橫交錯,使得複雜度很是高。咱們學習的目的就是分離這些維度,簡化之間的交互。
  • 不光要看表面的,還要看底層的思路和邏輯。不光要看代碼註釋中提到的,更要挖掘代碼註釋中沒有提到的;
  • 咱們既要深刻研究一個個孤立的功能/組件/模塊,也要在架構的角度和業務場景下從新審視這些模塊,這樣能夠對組件之間的關係有更加深刻的理解,能夠從全局角度來看這個系統;
  • 思惟方式的轉變纔是你最應該在乎的部分;

由於會從多個維度來分析設計,好比業務維度和架構維度,所以在本系列中,可能有的文章會集中在模式的總結提取,有的文章會集中在業務實現,有的文章會集中在具體知識點的運用,也會出現 某一個業務模塊或者代碼段由於業務和實現 在不一樣文章中被說起的現象,但願你們事先有所瞭解。服務器

0x02 基本架構&準則

2.1 SOFARegistry 整體架構

首先,咱們要回憶下SOFARegistry 整體架構網絡

  • Client 層

應用服務器集羣。Client 層是應用層,每一個應用系統經過依賴註冊中心相關的客戶端 jar 包,經過編程方式來使用服務註冊中心的服務發佈和服務訂閱能力。session

  • Session 層

Session 服務器集羣。顧名思義,Session 層是會話層,經過長鏈接和 Client 層的應用服務器保持通信,負責接收 Client 的服務發佈和服務訂閱請求。該層只在內存中保存各個服務的發佈訂閱關係,對於具體的服務信息,只在 Client 層和 Data 層之間透傳轉發。Session 層是無狀態的,能夠隨着 Client 層應用規模的增加而擴容。架構

  • Data 層

數據服務器集羣。Data 層經過分片存儲的方式保存着所用應用的服務註冊數據。數據按照 dataInfoId(每一份服務數據的惟一標識)進行一致性 Hash 分片,多副本備份,保證數據的高可用。下文的重點也在於隨着數據規模的增加,Data 層如何在不影響業務的前提下實現平滑的擴縮容。

  • Meta 層

元數據服務器集羣。這個集羣管轄的範圍是 Session 服務器集羣和 Data 服務器集羣的服務器信息,其角色就至關於 SOFARegistry 架構內部的服務註冊中心,只不過 SOFARegistry 做爲服務註冊中心是服務於廣大應用服務層,而 Meta 集羣是服務於 SOFARegistry 內部的 Session 集羣和 Data 集羣,Meta 層可以感知到 Session 節點和 Data 節點的變化,並通知集羣的其它節點。

2.2 準則

對於一個程序來講,什麼樣纔算是優秀的架構,其實沒有一個放之四海而皆準的標準,關於這方面的書或者文章也有不少,因此咱們就從最簡單直接的角度,即從結果來想:即靜態和動態兩方面。

  • 靜態 :這個角度就是當你拿到一個新代碼,你首先會看其目錄結構。若是這個程序的目錄結構清晰,只看目錄結構就能讓你能把這個代碼邏輯整理出來,只從文件名字就能知道它應該屬於什麼目錄,什麼模塊,不會出現某一個文件讓你以爲其實應該放到另外目錄的衝動,那麼這個程序從靜態角度講,其架構就是優秀的。
  • 動態 :這個角度就是當你只是大概瀏覽了代碼,你閉眼以後,本身可以在腦子中把程序運行模塊構建出來,可以知道程序分紅幾個功能模塊,清晰的知道程序的入口,能構架出來其基本功能的流程和內部模塊交互邏輯,那麼這個程序從動態角度講,其架構就是優秀的。

好比,假設你程序是基於SpringBoot,那麼Bean的構建和分類就很是重要,若是Bean處理得很好,對你整理動態架構是很是有幫助。

下面就開始分析DataServer程序的基本架構。

0x03 目錄結構

目錄結構以下,咱們能夠看出來SOFAReistry大體思路,固然由於業務和架構耦合,因此個人分類不必定徹底恰當,也有其餘分類的方式,具體取決於你本身的思考方式。

  • 程序主體:DataApplication;
  • 程序入口以及Bean:bootstrap;

程序基礎業務功能:

  • 網絡:remoting;
  • 輔助:utils;
  • http:resource;
  • 緩存:cache;
  • 線程:executor;

業務功能:

  • renew;
  • datasync;
  • change;
  • event;
  • node;

具體目錄以下:

.
├── DataApplication.java
├── bootstrap
├── cache
├── change
├── datasync
│   └── sync
├── event
│   └── handler
├── executor
├── node
├── remoting
│   ├── dataserver
│   │   ├── handler
│   │   └── task
│   ├── handler
│   ├── metaserver
│   │   ├── handler
│   │   ├── provideData
│   │   │   └── processor
│   │   └── task
│   └── sessionserver
│       ├── disconnect
│       ├── forward
│       └── handler
├── renew
├── resource
└── util複製代碼

0x04 基本架構

依然是相似MetaServer的路數,使用SpringBoot框架來進行整體搭建。

@EnableDataServer@SpringBootApplicationpublic class DataApplication {public static void main(String[] args) {
        SpringApplication.run(DataApplication.class, args);
    }
}複製代碼

EnableDataServer這個註解將引入基本配置 DataServerBeanConfiguration。

@Target(ElementType.TYPE)@Retention(RetentionPolicy.RUNTIME)@Documented@Import(DataServerBeanConfiguration.class)public @interface EnableDataServer {
}複製代碼

0x05 配置Bean

DataServer是SpringBoot程序。因此大量使用Bean。

DataServerBeanConfiguration 的做用是構建各類相關配置,從其中能夠看出來DataServer相關模塊和功能。

系統初始化時的 bean 都在 DataServerBeanConfiguration 裏面經過 JavaConfig 來註冊,主要以以下幾個配置類體現(配置類會有變動,具體內容能夠參照源碼實現):

  • DataServerBootstrap
  • DataServerBootstrapConfigConfiguration,
    • CommonConfig
    • DataServerConfig
    • DataNodeStatus
    • PropertySplitter
  • DataServerStorageConfiguration
    • DatumCache
    • LocalDatumStorage
  • LogTaskConfigConfiguration
    • CacheDigestTask
  • SessionRemotingConfiguration
    • jerseyExchange
    • boltExchange
    • MetaNodeExchanger
    • DataNodeExchanger
    • DataServerCache
    • ForwardService
    • SessionServerConnectionFactory
    • DataServerConnectionFactory
    • MetaServerConnectionFactory
    • serverHandlers
    • serverSyncHandlers
    • dataClientHandlers
    • metaClientHandlers
    • AfterWorkingProcessHandler
    • DatumLeaseManager
    • DisconnectEventHandler
  • DataServerNotifyBeanConfiguration
    • DataChangeHandler
    • SessionServerNotifier
    • TempPublisherNotifier
    • BackUpNotifier
    • SnapshotBackUpNotifier
  • DataServerSyncBeanConfiguration
    • SyncDataService
    • LocalAcceptorStore
    • syncDataScheduler
    • StoreServiceFactory
  • DataServerEventBeanConfiguration
    • DataServerChangeEventHandler
    • LocalDataServerChangeEventHandler
    • MetaServerChangeEventHandler
    • StartTaskEventHandler
    • LocalDataServerCleanHandler
    • GetSyncDataHandler
    • EventCenter
    • DataChangeEventCenter
  • DataServerRemotingBeanConfiguration
    • ConnectionRefreshTask
    • ConnectionRefreshMetaTask
    • RenewNodeTask
    • List tasks,包括上面三個Bean
    • DefaultMetaServiceImpl
  • ResourceConfiguration
    • jerseyResourceConfig
    • HealthResource
    • DataDigestResource
  • ExecutorConfiguration
    • publishProcessorExecutor
    • renewDatumProcessorExecutor
    • getDataProcessorExecutor
  • DataProvideDataConfiguration
    • ProvideDataProcessorManager
    • datumExpireProvideDataProcessor

部分Bean的功能以下:

  • DataServerBootstrapConfigConfiguration:該配置類主要做用是提供一些 DataServer 服務啓動時基本的 Bean,好比 DataServerConfig 基礎配置 Bean、DataNodeStatus 節點狀態 Bean、DatumCache 緩存 Bean 等;
  • LogTaskConfigConfiguration:該配置類主要用於提供一些日誌處理相關的 Bean;
  • SessionRemotingConfiguration:該配置類主要做用是提供一些與 SessionServer 相互通訊的 Bean,以及鏈接過程當中的一些請求處理 Bean。好比 BoltExchange、JerseyExchange 等用於啓動服務的 Bean,還有節點上下線、數據發佈等的 Bean,爲關鍵配置類;
  • DataServerNotifyBeanConfiguration:該配置類中配置的 Bean 主要用於進行事件通知,如用於處理數據變動的 DataChangeHandler 等;
  • DataServerSyncBeanConfiguration:該配置類中配置的 Bean 主要用於數據同步操做;
  • DataServerEventBeanConfiguration:該配置類中配置的 Bean 主要用於處理與數據節點相關的事件,如事件中心 EventCenter、數據變化事件中心 DataChangeEventCenter 等;
  • DataServerRemotingBeanConfiguration:該配置類中配置的 Bean 主要用於 DataServer 的鏈接管理;
  • ResourceConfiguration:該配置類中配置的 Bean 主要用於提供一些 Rest 接口資源;
  • AfterWorkingProcessConfiguration:該配置類中配置一些後處理 Handler Bean,用於處理一些業務邏輯結束後的後處理動做;
  • ExecutorConfiguration:該配置類主要配置一些線程池 Bean,用於執行不一樣的任務;

縮減版代碼以下 :

@Configuration@Import(DataServerInitializer.class)@EnableConfigurationPropertiespublic class DataServerBeanConfiguration {@Bean@ConditionalOnMissingBeanpublic DataServerBootstrap dataServerBootstrap() {}@Configurationprotected static class DataServerBootstrapConfigConfiguration {}@Configurationpublic static class DataServerStorageConfiguration {}@Configurationpublic static class LogTaskConfigConfiguration {}@Configurationpublic static class SessionRemotingConfiguration {}@Configurationpublic static class DataServerNotifyBeanConfiguration {}@Configurationpublic static class DataServerSyncBeanConfiguration {}@Configurationpublic static class DataServerEventBeanConfiguration {}@Configurationpublic static class DataServerRemotingBeanConfiguration {}@Configurationpublic static class ResourceConfiguration {}@Configurationpublic static class ExecutorConfiguration {}@Configurationpublic static class DataProvideDataConfiguration {}
}複製代碼

0x06 啓動

6.1 入口

DataServer 模塊啓動入口類爲 DataServerInitializer,該類不禁 JavaConfig 管理配置,而是繼承了 SmartLifecycle 接口,在啓動時由 Spring 框架調用其 start 方法。其簡略版代碼以下:

public class DataServerInitializer implements SmartLifecycle {@Autowiredprivate DataServerBootstrap dataServerBootstrap;@Overridepublic void start() {
        dataServerBootstrap.start();this.isRunning = true;
    }
}複製代碼

該方法中調用了 DataServerBootstrap#start 方法,用於啓動一系列的初始化服務。

public void start() {try {
        openDataServer();
        openDataSyncServer();
        openHttpServer();
        startRaftClient();
        fetchProviderData();
        startScheduler();
        Runtime.getRuntime().addShutdownHook(new Thread(this::doStop));
    } 
}複製代碼

6.2 啓動業務

DataServerBootstrap負責程序的啓動,具體以下:

@EnableConfigurationPropertiespublic class DataServerBootstrap {// 節點間的 bolt 通訊組件以及其配置@Autowiredprivate DataServerConfig                  dataServerConfig;  @Resource(name = "serverHandlers")private Collection<AbstractServerHandler> serverHandlers;@Resource(name = "serverSyncHandlers")private Collection<AbstractServerHandler> serverSyncHandlers;  
  @Autowiredprivate Exchange                          boltExchange;private Server                            server;private Server                            dataSyncServer;  
  // 用於控制的Http 通訊組件以及其配置@Autowiredprivate ApplicationContext                applicationContext;    
  @Autowiredprivate ResourceConfig                    jerseyResourceConfig;@Autowiredprivate Exchange                          jerseyExchange; private Server                            httpServer; 
  // JVM 內部的事件通訊組件以及其配置@Autowiredprivate EventCenter                       eventCenter;  // MetaServer Raft相關組件@Autowiredprivate IMetaServerService                metaServerService;  @Autowiredprivate DatumLeaseManager                 datumLeaseManager;  // 定時器組件以及其配置@Autowiredprivate Scheduler                         syncDataScheduler;@Autowiredprivate CacheDigestTask                   cacheDigestTask;/**
     * start dataserver
     */public void start() {
            openDataServer(); // 節點間的 bolt 通訊組件以及其配置openDataSyncServer();

            openHttpServer(); // 用於控制的Http 通訊組件以及其配置startRaftClient(); // MetaServer Raft相關組件fetchProviderData(); 

            startScheduler(); // 定時器組件以及其配置Runtime.getRuntime().addShutdownHook(new Thread(this::doStop));
    }// 節點間的 bolt 通訊組件以及其配置private void openDataServer() {if (serverForSessionStarted.compareAndSet(false, true)) {
                server = boltExchange.open(new URL(NetUtil.getLocalAddress().getHostAddress(),
                    dataServerConfig.getPort()), serverHandlers
                    .toArray(new ChannelHandler[serverHandlers.size()]));
            }
    }private void openDataSyncServer() {if (serverForDataSyncStarted.compareAndSet(false, true)) {
                dataSyncServer = boltExchange.open(new URL(NetUtil.getLocalAddress()
                    .getHostAddress(), dataServerConfig.getSyncDataPort()), serverSyncHandlers
                    .toArray(new ChannelHandler[serverSyncHandlers.size()]));
            }
    }// 用於控制的Http 通訊組件以及其配置private void openHttpServer() {if (httpServerStarted.compareAndSet(false, true)) {
                bindResourceConfig();
                httpServer = jerseyExchange.open(new URL(NetUtil.getLocalAddress().getHostAddress(), dataServerConfig
                        .getHttpServerPort()), new ResourceConfig[] { jerseyResourceConfig });
            }
    }// MetaServer Raft相關組件private void startRaftClient() { 
        metaServerService.startRaftClient();
        eventCenter.post(new MetaServerChangeEvent(metaServerService.getMetaServerMap()));
    }private void fetchProviderData() {
        ProvideData provideData = metaServerService
            .fetchData(ValueConstants.ENABLE_DATA_DATUM_EXPIRE);boolean enableDataDatumExpire = Boolean.parseBoolean((String) provideData.getProvideData()
            .getObject());
        datumLeaseManager.setRenewEnable(enableDataDatumExpire);
    }// 定時器組件以及其配置private void startScheduler() {if (schedulerStarted.compareAndSet(false, true)) {
                syncDataScheduler.startScheduler();// start all startTask except correction taskeventCenter.post(new StartTaskEvent(
                        Arrays.stream(StartTaskTypeEnum.values()).filter(type -> type != StartTaskTypeEnum.RENEW)
                                .collect(Collectors.toSet())));//start dump logcacheDigestTask.start();
            }
    }
}複製代碼

6.2 核心組件

DataServer 的核心啓動類是 DataServerBootstrap,對於其內部模塊分類,官方博客主要說起其主要組件 :

該類主要包含了三類組件:節點間的 bolt 通訊組件、JVM 內部的事件通訊組件、定時器組件。

我這裏劃分的更加細緻,把組件劃分爲以下:

  • 節點間的 bolt 通訊組件以及其配置
    • DataServerConfig。配置
    • boltExchange。bolt組件通信組件,用來給server和dataSyncServer提供通信服務;
    • server。dataServer 則負責數據相關服務,好比數據服務,獲取數據的推送,服務上下線通知等;
    • dataSyncServer。dataSyncServer 主要是處理一些數據同步相關的服務;
    • serverHandlers。服務響應函數
    • serverSyncHandlers。服務響應函數,從其註冊的 handler 來看,dataSyncServer 和 dataSever 的職責有部分重疊;
  • 用於控制的Http 通訊組件以及其配置,提供一系列 REST 接口,用於 dashboard 管理、數據查詢等;
    • jerseyResourceConfig。配置
    • jerseyExchange。jersey組件通信組件,提供服務;
    • applicationContext。註冊服務所需;
    • httpServer 主要提供一系列 http 接口,用於 dashboard 管理、數據查詢等;
  • MetaServer相關組件
    • metaServerService,用來與MetaServer進行交互,基於raft和Bolt;
    • datumLeaseManager,用來維護具體數據;
  • JVM 內部的事件通訊組件以及其配置
    • EventCenter。DataServer 內部邏輯主要是經過事件驅動機制來實現的,一個事件每每會有多個投遞源,很是適合用 EventCenter 來解耦事件投遞和事件處理之間的邏輯;
  • 定時器組件以及其配置
    • syncDataScheduler,主要啓動了expireCheckExecutor,versionCheckExecutor,即例如定時檢測節點信息、定時檢測數據版本信息;
    • CacheDigestTask,用來定時分析;

6.3 Server組件

6.3.1 DataServer

dataServer 負責數據相關服務,好比數據服務,獲取數據的推送,服務上下線通知等;

DataServer是基於Bolt進行通信。

private void openDataServer() {try {if (serverForSessionStarted.compareAndSet(false, true)) {
            server = boltExchange.open(new URL(NetUtil.getLocalAddress().getHostAddress(),
                dataServerConfig.getPort()), serverHandlers
                .toArray(new ChannelHandler[serverHandlers.size()]));
        }
    } 
}複製代碼

其響應函數爲serverHandlers

@Bean(name = "serverHandlers")public Collection<AbstractServerHandler> serverHandlers() {
    Collection<AbstractServerHandler> list = new ArrayList<>();
    list.add(getDataHandler());
    list.add(clientOffHandler());
    list.add(getDataVersionsHandler());
    list.add(publishDataProcessor());
    list.add(sessionServerRegisterHandler());
    list.add(unPublishDataHandler());
    list.add(dataServerConnectionHandler());
    list.add(renewDatumHandler());
    list.add(datumSnapshotHandler());return list;
}複製代碼

其具體功能以下 :

  • getDataHandler:從當前Data節點中獲取註冊信息數據,若當前節點不處於工做狀態,則改成下個節點;
  • clientOffHandler:服務訂閱者下線;
  • getDataVersionsHandler:獲取數據版本號;
  • publishDataProcessor:服務註冊信息發佈;
  • sessionServerRegisterHandler:sessionServer會話註冊;
  • unPublishDataHandler :服務下線處理;
  • dataServerConnectionHandler:鏈接管理;
  • renewDatumHandler:數據續約管理;
  • datumSnapshotHandler:數據快照管理;

6.3.2 DataSyncServer

dataSyncServer 主要是處理一些數據同步相關的服務;也是基於Bolt進行通信。

private void openDataSyncServer() {try {if (serverForDataSyncStarted.compareAndSet(false, true)) {
            dataSyncServer = boltExchange.open(new URL(NetUtil.getLocalAddress()
                .getHostAddress(), dataServerConfig.getSyncDataPort()), serverSyncHandlers
                .toArray(new ChannelHandler[serverSyncHandlers.size()]));
        }
    }
}複製代碼

其響應函數爲serverSyncHandlers。

@Bean(name = "serverSyncHandlers")public Collection<AbstractServerHandler> serverSyncHandlers() {
    Collection<AbstractServerHandler> list = new ArrayList<>();
    list.add(getDataHandler());
    list.add(publishDataProcessor());
    list.add(unPublishDataHandler());
    list.add(notifyFetchDatumHandler());
    list.add(notifyOnlineHandler());
    list.add(syncDataHandler());
    list.add(dataSyncServerConnectionHandler());return list;
}複製代碼

其具體功能以下 :

  • getDataHandler:獲取Data節點註冊信息數據;
  • publishDataProcessor :服務註冊信息發佈;
  • unPublishDataHandler :服務下線處理;
  • notifyFetchDatumHandler :對比版本號,抓去最新服務註冊數據;
  • notifyOnlineHandler:檢查Data節點是否在線;
  • syncDataHandler:數據同步;
  • dataSyncServerConnectionHandler:鏈接管理;

6.3.3 HttpServer

HttpServer 是  Http 通訊組件,提供一系列 REST 接口,用於 dashboard 管理、數據查詢等。

其基於Jersey進行通信。

private void openHttpServer() {try {if (httpServerStarted.compareAndSet(false, true)) {
            bindResourceConfig();
            httpServer = jerseyExchange.open(new URL(NetUtil.getLocalAddress().getHostAddress(), dataServerConfig
                    .getHttpServerPort()), new ResourceConfig[] { jerseyResourceConfig });
        }
    }
}複製代碼

6.3.4 Handler

各 Handler 具體做用如圖 3 所示:

圖 3 各 Handler 做用

圖 各 Handler 做用

6.3.5 Raft

Raft相關的是:

  • 啓動Raft客戶端,保證分佈式一致性;
  • 向 EventCenter 投放MetaServerChangeEvent;
private void startRaftClient() {
    metaServerService.startRaftClient();
    eventCenter.post(new MetaServerChangeEvent(metaServerService.getMetaServerMap()));
}複製代碼

6.3.6 Scheduler

這個模塊輔助各類按期任務,具體做用是:

  • 啓動數據同步任務;
    • 定時檢測數據接受者節點變化狀況,下線過時節點;
    • 啓動數據變動輪詢;
  • 向EventCenter投放消息,以便由這些消息對應的響應函數處理,包括:
    • CONNECT_META,具體由 ConnectionRefreshMetaTask處理;
    • CONNECT_DATA,具體由 ConnectionRefreshTask 處理;
    • VERSION_COMPARE,這個目前沒有處理;
    • 須要注意的是,RENEW 類型消息在系統啓動時候沒有投放,而是在 MetaServerChangeEventHandler  .   registerMetaServer 之中,當註冊以後,纔會進行投放,以此按期Renew;
  • 啓動dump log任務;
private void startScheduler() {try {if (schedulerStarted.compareAndSet(false, true)) {
            syncDataScheduler.startScheduler();// start all startTask except correction taskeventCenter.post(new StartTaskEvent(
                    Arrays.stream(StartTaskTypeEnum.values()).filter(type -> type != StartTaskTypeEnum.RENEW)
                            .collect(Collectors.toSet())));//start dump logcacheDigestTask.start();
        }
    } 
}複製代碼
6.3.6.1 startScheduler

啓動了versionCheckExecutor和scheduler,具體會調用LocalAcceptorStore中的函數進行按期檢測。

public class Scheduler {public final ExecutorService           versionCheckExecutor;private final ScheduledExecutorService scheduler;private final ThreadPoolExecutor       expireCheckExecutor;@Autowiredprivate AcceptorStore                  localAcceptorStore;/**
     * constructor
     */public Scheduler() {
        scheduler = new ScheduledThreadPoolExecutor(4, new NamedThreadFactory("SyncDataScheduler"));

        expireCheckExecutor = new ThreadPoolExecutor(1, 3, 0, TimeUnit.SECONDS,new SynchronousQueue<>(), new NamedThreadFactory("SyncDataScheduler-expireChangeCheck"));

        versionCheckExecutor = new ThreadPoolExecutor(2, 2, 0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<>(), new NamedThreadFactory("SyncDataScheduler-versionChangeCheck"));

    }/**
     * start scheduler
     */public void startScheduler() {

        scheduler.schedule(new TimedSupervisorTask("FetchDataLocal", scheduler, expireCheckExecutor, 3,
                        TimeUnit.SECONDS, 10, () -> localAcceptorStore.checkAcceptorsChangAndExpired()),30, TimeUnit.SECONDS);


        versionCheckExecutor.execute(() -> localAcceptorStore.changeDataCheck());

    }/**
     * stop scheduler
     */public void stopScheduler() {if (scheduler != null && !scheduler.isShutdown()) {
            scheduler.shutdown();
        }if (versionCheckExecutor != null && !versionCheckExecutor.isShutdown()) {
            versionCheckExecutor.shutdown();
        }
    }
}複製代碼
6.3.6.2 StartTaskEventHandler

StartTaskEventHandler內部有一個ScheduledExecutorService 和 tasks,一旦StartTaskEventHandler收到一個StartTaskEvent,就會按期調用tasks中的task執行;

@Bean(name = "tasks")public List<AbstractTask> tasks() {
    List<AbstractTask> list = new ArrayList<>();
    list.add(connectionRefreshTask());
    list.add(connectionRefreshMetaTask());
    list.add(renewNodeTask());return list;
}複製代碼

具體代碼以下:

public class StartTaskEventHandler extends AbstractEventHandler<StartTaskEvent> {@Resource(name = "tasks")private List<AbstractTask>       tasks;private ScheduledExecutorService executor     = null;@Overridepublic List<Class<? extends StartTaskEvent>> interest() {return Lists.newArrayList(StartTaskEvent.class);
    }@Overridepublic void doHandle(StartTaskEvent event) {if (executor == null || executor.isShutdown()) {
            getExecutor();
        }for (AbstractTask task : tasks) {if (event.getSuitableTypes().contains(task.getStartTaskTypeEnum())) {
                executor.scheduleWithFixedDelay(task, task.getInitialDelay(), task.getDelay(),
                    task.getTimeUnit());
            }
        }
    }private void getExecutor() {
        executor = ExecutorFactory.newScheduledThreadPool(tasks.size(), this.getClass()
            .getSimpleName());
    }
}複製代碼

6.4 處理Task

這裏專門就StartTaskEventHandler作簡要說明,其就是針對 tasks Bean 裏面聲明的task,進行啓動。

可是具體啓動哪些task,則須要依據event裏面的設置決定,下面代碼中的循環就是看看tasks和event中如何匹配。

        for (AbstractTask task : tasks) {if (event.getSuitableTypes().contains(task.getStartTaskTypeEnum())) {
                executor.scheduleWithFixedDelay(task, task.getInitialDelay(), task.getDelay(),task.getTimeUnit());
            }
        }複製代碼

具體代碼以下:

public class StartTaskEventHandler extends AbstractEventHandler<StartTaskEvent> {@Resource(name = "tasks")private List<AbstractTask>       tasks;private ScheduledExecutorService executor     = null;@Overridepublic List<Class<? extends StartTaskEvent>> interest() {return Lists.newArrayList(StartTaskEvent.class);
    }@Overridepublic void doHandle(StartTaskEvent event) {if (executor == null || executor.isShutdown()) {
            getExecutor();
        }for (AbstractTask task : tasks) {if (event.getSuitableTypes().contains(task.getStartTaskTypeEnum())) {
                executor.scheduleWithFixedDelay(task, task.getInitialDelay(), task.getDelay(),
                    task.getTimeUnit());

            }
        }
    }private void getExecutor() {
        executor = ExecutorFactory.newScheduledThreadPool(tasks.size(), this.getClass()
            .getSimpleName());
    }
}複製代碼

6.4.1 beans

對應的beans,一共三個task。

@Bean(name = "tasks")public List<AbstractTask> tasks() {
    List<AbstractTask> list = new ArrayList<>();
    list.add(connectionRefreshTask());
    list.add(connectionRefreshMetaTask());
    list.add(renewNodeTask());return list;
}複製代碼

對應了StartTaskTypeEnum中的枚舉,其中VersionCompareTask沒實現。

public enum StartTaskTypeEnum {/**
     * ConnectionRefreshMetaTask
     */CONNECT_META,/**
     * ConnectionRefreshDataTask
     */CONNECT_DATA,/**
     * RenewNodeTask
     */RENEW,/**
     * VersionCompareTask
     */VERSION_COMPARE
}複製代碼

6.4.2 解耦

咱們用 StartTaskEvent 舉例,這裏使用Set來指定本消息適用什麼task處理。

public class StartTaskEvent implements Event {private final Set<StartTaskTypeEnum> suitableTypes;public StartTaskEvent(Set<StartTaskTypeEnum> suitableTypes) {this.suitableTypes = suitableTypes;
    }public Set<StartTaskTypeEnum> getSuitableTypes() {return suitableTypes;
    }
}複製代碼

在 MetaServerChangeEventHandler 之中,則啓動了renew task。

if (obj instanceof NodeChangeResult) {
    NodeChangeResult<DataNode> result = (NodeChangeResult<DataNode>) obj;
    Map<String, Long> versionMap = result.getDataCenterListVersions();//send renew after first register dataNodeSet<StartTaskTypeEnum> set = new HashSet<>();
    set.add(StartTaskTypeEnum.RENEW);
    eventCenter.post(new StartTaskEvent(set));

    eventCenter.post(new DataServerChangeEvent(result.getNodes(), versionMap,
            DataServerChangeEvent.FromType.REGISTER_META));break;
}複製代碼

在啓動時候,post了event,可是指定了啓動非RENEW task。

private void startScheduler() {try {if (schedulerStarted.compareAndSet(false, true)) {
            syncDataScheduler.startScheduler();// start all startTask except correction taskeventCenter.post(new StartTaskEvent(
                    Arrays.stream(StartTaskTypeEnum.values()).filter(type -> type != StartTaskTypeEnum.RENEW)
                            .collect(Collectors.toSet())));//start dump logcacheDigestTask.start();
        }
    } catch (Exception e) {
        schedulerStarted.set(false);throw new RuntimeException("Data Scheduler start error!", e);
    }
}複製代碼

0x07 動態結構

最後動態架構以下,咱們也大體知道,DataServer就是一個SpringBoot程序,有幾個Server,有若干Bean,有若干定時服務,具體有一些其餘業務模塊等等,這對咱們接下來的理解有幫助。

+---------------------------------------------------------------------------+
| [DataServerBootstrap]                                                     |
|                                                                           |
|                                                                           |
| +------------------------------------------+  +------------------------+  |
| | [Bolt related]                           |  | [http relatged]        |  |
| |                                          |  |                        |  |
| |            DataServerConfig              |  |      httpServer        |  |
| |                                          |  |                        |  |
| |              boltExchange                |  |    jerseyExchange      |  |
| |                                          |  |                        |  |
| |    server +-----------> serverHandlers   |  |   applicationContext   |  |
| |                                          |  |                        |  |
| | dataSyncServer+----> serverSyncHandlers  |  | jerseyResourceConfig   |  |
| |                                          |  |                        |  |
| +------------------------------------------+  +------------------------+  |
| +---------------------+   +----------------+  +------------------------+  |
| |[meta related]       |   |[JVM related]   |  |[Timer related]         |  |
| |                     |   |                |  |                        |  |
| |  metaServerService  |   |                |  |   syncDataScheduler    |  |
| |                     |   |  EventCenter   |  |                        |  |
| |  datumLeaseManager  |   |                |  |    CacheDigestTask     |  |
| +---------------------+   +----------------+  |                        |  |
|                                               +------------------------+  |
+---------------------------------------------------------------------------+複製代碼

0x08 問題列表

由於從問題出發更有幫助,因此咱們總結出一些問題列表,這些咱們指望在之後的分析中陸續解決。

  • 問題:Datacenter到底是什麼概念?
  • 問題:DataServer應該當成什麼系統來看?
  • 問題:DataServer應該實現什麼功能?如何實現?
  • 問題:如何維持高可用?
  • 問題:如何負載均衡?
  • 問題:DataServer之間如何同步?實現數據一致性?
  • 問題:SessionServer如何尋址DataServer?
  • 問題:客戶端如何知道應該聯繫哪一個SessionServer?
  • 問題:SessionServer在DataServer內部如何表示,有緩存嘛?
  • 問題:hash路由表是什麼樣的?
  • 問題:DataServer如何把信息推送給全部SessionServer?
  • 問題:DataServer如何同步給其餘DataServer?
  • 問題:dataSyncServer 主要是處理一些數據同步相關的服務;dataServer 則負責數據相關服務;二者有什麼區別?
  • 問題:EventCenter的機制,裏面有幾種Event?
  • 問題:如何輪詢MetaServer?
  • 問題:如何判斷當前機房節點?
  • 問題:DataServer集羣內部如何數據遷移?
  • 問題:SessionServer 和 DataServer 之間的通訊,是基於推拉結合的機制?
  • 問題:爲何 DataServerBootstrap 之中還有  startRaftClient?
  • 問題:MetaServerChangeEventHandler怎麼啓動,誰來控制,用來作啥?
  • 問題:DatumLeaseManager 的做用?
  • 問題:SessionServer從DataServer拉取什麼?
  • 問題:DataServer如何向MetaServer 來renew本身?是否認期?
  • 問題:DataServer如何知道,保存其餘 DataServer?其餘地方用到了嗎?
  • 問題:須要考慮 DataServer 須要保存什麼?
  • 問題:版本號用來作什麼?
  • 問題:DatumCache 用來作什麼?
  • 問題:爲何要有 AfterWorkingProcess?
  • 問題:bolt怎麼維護connection?
相關文章
相關標籤/搜索