以前咱們經過三篇文章初步分析了 MetaServer 的基本架構,MetaServer 這三篇文章爲咱們接下來的工做作了堅實的鋪墊。java
本系列咱們接着分析 Data Server,順帶會涉及一些 Session Server。由於 DataServer 和 MetaServer 代碼實現和架構的基本套路相似,因此咱們主要關心差別點和DataServer的特色。node
本文會分析DataServer程序的基本架構。算法
前面文章專一於系統業務自己,本系列文章會換一種思路,重點在於分析設計和架構,即利用多篇文章,從多個角度反推總結 DataServer 或者 SOFARegistry 的實現機制和架構思路,讓你們藉以學習阿里如何設計。編程
具體學習方法是:bootstrap
學習時注意點是:緩存
由於會從多個維度來分析設計,好比業務維度和架構維度,所以在本系列中,可能有的文章會集中在模式的總結提取,有的文章會集中在業務實現,有的文章會集中在具體知識點的運用,也會出現 某一個業務模塊或者代碼段由於業務和實現 在不一樣文章中被說起的現象,但願你們事先有所瞭解。服務器
首先,咱們要回憶下SOFARegistry 整體架構網絡
應用服務器集羣。Client 層是應用層,每一個應用系統經過依賴註冊中心相關的客戶端 jar 包,經過編程方式來使用服務註冊中心的服務發佈和服務訂閱能力。session
Session 服務器集羣。顧名思義,Session 層是會話層,經過長鏈接和 Client 層的應用服務器保持通信,負責接收 Client 的服務發佈和服務訂閱請求。該層只在內存中保存各個服務的發佈訂閱關係,對於具體的服務信息,只在 Client 層和 Data 層之間透傳轉發。Session 層是無狀態的,能夠隨着 Client 層應用規模的增加而擴容。架構
數據服務器集羣。Data 層經過分片存儲的方式保存着所用應用的服務註冊數據。數據按照 dataInfoId(每一份服務數據的惟一標識)進行一致性 Hash 分片,多副本備份,保證數據的高可用。下文的重點也在於隨着數據規模的增加,Data 層如何在不影響業務的前提下實現平滑的擴縮容。
元數據服務器集羣。這個集羣管轄的範圍是 Session 服務器集羣和 Data 服務器集羣的服務器信息,其角色就至關於 SOFARegistry 架構內部的服務註冊中心,只不過 SOFARegistry 做爲服務註冊中心是服務於廣大應用服務層,而 Meta 集羣是服務於 SOFARegistry 內部的 Session 集羣和 Data 集羣,Meta 層可以感知到 Session 節點和 Data 節點的變化,並通知集羣的其它節點。
對於一個程序來講,什麼樣纔算是優秀的架構,其實沒有一個放之四海而皆準的標準,關於這方面的書或者文章也有不少,因此咱們就從最簡單直接的角度,即從結果來想:即靜態和動態兩方面。
好比,假設你程序是基於SpringBoot,那麼Bean的構建和分類就很是重要,若是Bean處理得很好,對你整理動態架構是很是有幫助。
下面就開始分析DataServer程序的基本架構。
目錄結構以下,咱們能夠看出來SOFAReistry大體思路,固然由於業務和架構耦合,因此個人分類不必定徹底恰當,也有其餘分類的方式,具體取決於你本身的思考方式。
程序基礎業務功能:
業務功能:
具體目錄以下:
. ├── DataApplication.java ├── bootstrap ├── cache ├── change ├── datasync │ └── sync ├── event │ └── handler ├── executor ├── node ├── remoting │ ├── dataserver │ │ ├── handler │ │ └── task │ ├── handler │ ├── metaserver │ │ ├── handler │ │ ├── provideData │ │ │ └── processor │ │ └── task │ └── sessionserver │ ├── disconnect │ ├── forward │ └── handler ├── renew ├── resource └── util複製代碼
依然是相似MetaServer的路數,使用SpringBoot框架來進行整體搭建。
@EnableDataServer@SpringBootApplicationpublic class DataApplication {public static void main(String[] args) { SpringApplication.run(DataApplication.class, args); } }複製代碼
EnableDataServer這個註解將引入基本配置 DataServerBeanConfiguration。
@Target(ElementType.TYPE)@Retention(RetentionPolicy.RUNTIME)@Documented@Import(DataServerBeanConfiguration.class)public @interface EnableDataServer { }複製代碼
DataServer是SpringBoot程序。因此大量使用Bean。
DataServerBeanConfiguration 的做用是構建各類相關配置,從其中能夠看出來DataServer相關模塊和功能。
系統初始化時的 bean 都在 DataServerBeanConfiguration 裏面經過 JavaConfig 來註冊,主要以以下幾個配置類體現(配置類會有變動,具體內容能夠參照源碼實現):
部分Bean的功能以下:
縮減版代碼以下 :
@Configuration@Import(DataServerInitializer.class)@EnableConfigurationPropertiespublic class DataServerBeanConfiguration {@Bean@ConditionalOnMissingBeanpublic DataServerBootstrap dataServerBootstrap() {}@Configurationprotected static class DataServerBootstrapConfigConfiguration {}@Configurationpublic static class DataServerStorageConfiguration {}@Configurationpublic static class LogTaskConfigConfiguration {}@Configurationpublic static class SessionRemotingConfiguration {}@Configurationpublic static class DataServerNotifyBeanConfiguration {}@Configurationpublic static class DataServerSyncBeanConfiguration {}@Configurationpublic static class DataServerEventBeanConfiguration {}@Configurationpublic static class DataServerRemotingBeanConfiguration {}@Configurationpublic static class ResourceConfiguration {}@Configurationpublic static class ExecutorConfiguration {}@Configurationpublic static class DataProvideDataConfiguration {} }複製代碼
DataServer 模塊啓動入口類爲 DataServerInitializer,該類不禁 JavaConfig 管理配置,而是繼承了 SmartLifecycle 接口,在啓動時由 Spring 框架調用其 start 方法。其簡略版代碼以下:
public class DataServerInitializer implements SmartLifecycle {@Autowiredprivate DataServerBootstrap dataServerBootstrap;@Overridepublic void start() { dataServerBootstrap.start();this.isRunning = true; } }複製代碼
該方法中調用了 DataServerBootstrap#start 方法,用於啓動一系列的初始化服務。
public void start() {try { openDataServer(); openDataSyncServer(); openHttpServer(); startRaftClient(); fetchProviderData(); startScheduler(); Runtime.getRuntime().addShutdownHook(new Thread(this::doStop)); } }複製代碼
DataServerBootstrap負責程序的啓動,具體以下:
@EnableConfigurationPropertiespublic class DataServerBootstrap {// 節點間的 bolt 通訊組件以及其配置@Autowiredprivate DataServerConfig dataServerConfig; @Resource(name = "serverHandlers")private Collection<AbstractServerHandler> serverHandlers;@Resource(name = "serverSyncHandlers")private Collection<AbstractServerHandler> serverSyncHandlers; @Autowiredprivate Exchange boltExchange;private Server server;private Server dataSyncServer; // 用於控制的Http 通訊組件以及其配置@Autowiredprivate ApplicationContext applicationContext; @Autowiredprivate ResourceConfig jerseyResourceConfig;@Autowiredprivate Exchange jerseyExchange; private Server httpServer; // JVM 內部的事件通訊組件以及其配置@Autowiredprivate EventCenter eventCenter; // MetaServer Raft相關組件@Autowiredprivate IMetaServerService metaServerService; @Autowiredprivate DatumLeaseManager datumLeaseManager; // 定時器組件以及其配置@Autowiredprivate Scheduler syncDataScheduler;@Autowiredprivate CacheDigestTask cacheDigestTask;/** * start dataserver */public void start() { openDataServer(); // 節點間的 bolt 通訊組件以及其配置openDataSyncServer(); openHttpServer(); // 用於控制的Http 通訊組件以及其配置startRaftClient(); // MetaServer Raft相關組件fetchProviderData(); startScheduler(); // 定時器組件以及其配置Runtime.getRuntime().addShutdownHook(new Thread(this::doStop)); }// 節點間的 bolt 通訊組件以及其配置private void openDataServer() {if (serverForSessionStarted.compareAndSet(false, true)) { server = boltExchange.open(new URL(NetUtil.getLocalAddress().getHostAddress(), dataServerConfig.getPort()), serverHandlers .toArray(new ChannelHandler[serverHandlers.size()])); } }private void openDataSyncServer() {if (serverForDataSyncStarted.compareAndSet(false, true)) { dataSyncServer = boltExchange.open(new URL(NetUtil.getLocalAddress() .getHostAddress(), dataServerConfig.getSyncDataPort()), serverSyncHandlers .toArray(new ChannelHandler[serverSyncHandlers.size()])); } }// 用於控制的Http 通訊組件以及其配置private void openHttpServer() {if (httpServerStarted.compareAndSet(false, true)) { bindResourceConfig(); httpServer = jerseyExchange.open(new URL(NetUtil.getLocalAddress().getHostAddress(), dataServerConfig .getHttpServerPort()), new ResourceConfig[] { jerseyResourceConfig }); } }// MetaServer Raft相關組件private void startRaftClient() { metaServerService.startRaftClient(); eventCenter.post(new MetaServerChangeEvent(metaServerService.getMetaServerMap())); }private void fetchProviderData() { ProvideData provideData = metaServerService .fetchData(ValueConstants.ENABLE_DATA_DATUM_EXPIRE);boolean enableDataDatumExpire = Boolean.parseBoolean((String) provideData.getProvideData() .getObject()); datumLeaseManager.setRenewEnable(enableDataDatumExpire); }// 定時器組件以及其配置private void startScheduler() {if (schedulerStarted.compareAndSet(false, true)) { syncDataScheduler.startScheduler();// start all startTask except correction taskeventCenter.post(new StartTaskEvent( Arrays.stream(StartTaskTypeEnum.values()).filter(type -> type != StartTaskTypeEnum.RENEW) .collect(Collectors.toSet())));//start dump logcacheDigestTask.start(); } } }複製代碼
DataServer 的核心啓動類是 DataServerBootstrap,對於其內部模塊分類,官方博客主要說起其主要組件 :
該類主要包含了三類組件:節點間的 bolt 通訊組件、JVM 內部的事件通訊組件、定時器組件。
我這裏劃分的更加細緻,把組件劃分爲以下:
dataServer 負責數據相關服務,好比數據服務,獲取數據的推送,服務上下線通知等;
DataServer是基於Bolt進行通信。
private void openDataServer() {try {if (serverForSessionStarted.compareAndSet(false, true)) { server = boltExchange.open(new URL(NetUtil.getLocalAddress().getHostAddress(), dataServerConfig.getPort()), serverHandlers .toArray(new ChannelHandler[serverHandlers.size()])); } } }複製代碼
其響應函數爲serverHandlers
@Bean(name = "serverHandlers")public Collection<AbstractServerHandler> serverHandlers() { Collection<AbstractServerHandler> list = new ArrayList<>(); list.add(getDataHandler()); list.add(clientOffHandler()); list.add(getDataVersionsHandler()); list.add(publishDataProcessor()); list.add(sessionServerRegisterHandler()); list.add(unPublishDataHandler()); list.add(dataServerConnectionHandler()); list.add(renewDatumHandler()); list.add(datumSnapshotHandler());return list; }複製代碼
其具體功能以下 :
dataSyncServer 主要是處理一些數據同步相關的服務;也是基於Bolt進行通信。
private void openDataSyncServer() {try {if (serverForDataSyncStarted.compareAndSet(false, true)) { dataSyncServer = boltExchange.open(new URL(NetUtil.getLocalAddress() .getHostAddress(), dataServerConfig.getSyncDataPort()), serverSyncHandlers .toArray(new ChannelHandler[serverSyncHandlers.size()])); } } }複製代碼
其響應函數爲serverSyncHandlers。
@Bean(name = "serverSyncHandlers")public Collection<AbstractServerHandler> serverSyncHandlers() { Collection<AbstractServerHandler> list = new ArrayList<>(); list.add(getDataHandler()); list.add(publishDataProcessor()); list.add(unPublishDataHandler()); list.add(notifyFetchDatumHandler()); list.add(notifyOnlineHandler()); list.add(syncDataHandler()); list.add(dataSyncServerConnectionHandler());return list; }複製代碼
其具體功能以下 :
HttpServer 是 Http 通訊組件,提供一系列 REST 接口,用於 dashboard 管理、數據查詢等。
其基於Jersey進行通信。
private void openHttpServer() {try {if (httpServerStarted.compareAndSet(false, true)) { bindResourceConfig(); httpServer = jerseyExchange.open(new URL(NetUtil.getLocalAddress().getHostAddress(), dataServerConfig .getHttpServerPort()), new ResourceConfig[] { jerseyResourceConfig }); } } }複製代碼
各 Handler 具體做用如圖 3 所示:
圖 各 Handler 做用
Raft相關的是:
private void startRaftClient() { metaServerService.startRaftClient(); eventCenter.post(new MetaServerChangeEvent(metaServerService.getMetaServerMap())); }複製代碼
這個模塊輔助各類按期任務,具體做用是:
private void startScheduler() {try {if (schedulerStarted.compareAndSet(false, true)) { syncDataScheduler.startScheduler();// start all startTask except correction taskeventCenter.post(new StartTaskEvent( Arrays.stream(StartTaskTypeEnum.values()).filter(type -> type != StartTaskTypeEnum.RENEW) .collect(Collectors.toSet())));//start dump logcacheDigestTask.start(); } } }複製代碼
啓動了versionCheckExecutor和scheduler,具體會調用LocalAcceptorStore中的函數進行按期檢測。
public class Scheduler {public final ExecutorService versionCheckExecutor;private final ScheduledExecutorService scheduler;private final ThreadPoolExecutor expireCheckExecutor;@Autowiredprivate AcceptorStore localAcceptorStore;/** * constructor */public Scheduler() { scheduler = new ScheduledThreadPoolExecutor(4, new NamedThreadFactory("SyncDataScheduler")); expireCheckExecutor = new ThreadPoolExecutor(1, 3, 0, TimeUnit.SECONDS,new SynchronousQueue<>(), new NamedThreadFactory("SyncDataScheduler-expireChangeCheck")); versionCheckExecutor = new ThreadPoolExecutor(2, 2, 0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<>(), new NamedThreadFactory("SyncDataScheduler-versionChangeCheck")); }/** * start scheduler */public void startScheduler() { scheduler.schedule(new TimedSupervisorTask("FetchDataLocal", scheduler, expireCheckExecutor, 3, TimeUnit.SECONDS, 10, () -> localAcceptorStore.checkAcceptorsChangAndExpired()),30, TimeUnit.SECONDS); versionCheckExecutor.execute(() -> localAcceptorStore.changeDataCheck()); }/** * stop scheduler */public void stopScheduler() {if (scheduler != null && !scheduler.isShutdown()) { scheduler.shutdown(); }if (versionCheckExecutor != null && !versionCheckExecutor.isShutdown()) { versionCheckExecutor.shutdown(); } } }複製代碼
StartTaskEventHandler內部有一個ScheduledExecutorService 和 tasks,一旦StartTaskEventHandler收到一個StartTaskEvent,就會按期調用tasks中的task執行;
@Bean(name = "tasks")public List<AbstractTask> tasks() { List<AbstractTask> list = new ArrayList<>(); list.add(connectionRefreshTask()); list.add(connectionRefreshMetaTask()); list.add(renewNodeTask());return list; }複製代碼
具體代碼以下:
public class StartTaskEventHandler extends AbstractEventHandler<StartTaskEvent> {@Resource(name = "tasks")private List<AbstractTask> tasks;private ScheduledExecutorService executor = null;@Overridepublic List<Class<? extends StartTaskEvent>> interest() {return Lists.newArrayList(StartTaskEvent.class); }@Overridepublic void doHandle(StartTaskEvent event) {if (executor == null || executor.isShutdown()) { getExecutor(); }for (AbstractTask task : tasks) {if (event.getSuitableTypes().contains(task.getStartTaskTypeEnum())) { executor.scheduleWithFixedDelay(task, task.getInitialDelay(), task.getDelay(), task.getTimeUnit()); } } }private void getExecutor() { executor = ExecutorFactory.newScheduledThreadPool(tasks.size(), this.getClass() .getSimpleName()); } }複製代碼
這裏專門就StartTaskEventHandler作簡要說明,其就是針對 tasks Bean 裏面聲明的task,進行啓動。
可是具體啓動哪些task,則須要依據event裏面的設置決定,下面代碼中的循環就是看看tasks和event中如何匹配。
for (AbstractTask task : tasks) {if (event.getSuitableTypes().contains(task.getStartTaskTypeEnum())) { executor.scheduleWithFixedDelay(task, task.getInitialDelay(), task.getDelay(),task.getTimeUnit()); } }複製代碼
具體代碼以下:
public class StartTaskEventHandler extends AbstractEventHandler<StartTaskEvent> {@Resource(name = "tasks")private List<AbstractTask> tasks;private ScheduledExecutorService executor = null;@Overridepublic List<Class<? extends StartTaskEvent>> interest() {return Lists.newArrayList(StartTaskEvent.class); }@Overridepublic void doHandle(StartTaskEvent event) {if (executor == null || executor.isShutdown()) { getExecutor(); }for (AbstractTask task : tasks) {if (event.getSuitableTypes().contains(task.getStartTaskTypeEnum())) { executor.scheduleWithFixedDelay(task, task.getInitialDelay(), task.getDelay(), task.getTimeUnit()); } } }private void getExecutor() { executor = ExecutorFactory.newScheduledThreadPool(tasks.size(), this.getClass() .getSimpleName()); } }複製代碼
對應的beans,一共三個task。
@Bean(name = "tasks")public List<AbstractTask> tasks() { List<AbstractTask> list = new ArrayList<>(); list.add(connectionRefreshTask()); list.add(connectionRefreshMetaTask()); list.add(renewNodeTask());return list; }複製代碼
對應了StartTaskTypeEnum中的枚舉,其中VersionCompareTask沒實現。
public enum StartTaskTypeEnum {/** * ConnectionRefreshMetaTask */CONNECT_META,/** * ConnectionRefreshDataTask */CONNECT_DATA,/** * RenewNodeTask */RENEW,/** * VersionCompareTask */VERSION_COMPARE }複製代碼
咱們用 StartTaskEvent 舉例,這裏使用Set來指定本消息適用什麼task處理。
public class StartTaskEvent implements Event {private final Set<StartTaskTypeEnum> suitableTypes;public StartTaskEvent(Set<StartTaskTypeEnum> suitableTypes) {this.suitableTypes = suitableTypes; }public Set<StartTaskTypeEnum> getSuitableTypes() {return suitableTypes; } }複製代碼
在 MetaServerChangeEventHandler 之中,則啓動了renew task。
if (obj instanceof NodeChangeResult) { NodeChangeResult<DataNode> result = (NodeChangeResult<DataNode>) obj; Map<String, Long> versionMap = result.getDataCenterListVersions();//send renew after first register dataNodeSet<StartTaskTypeEnum> set = new HashSet<>(); set.add(StartTaskTypeEnum.RENEW); eventCenter.post(new StartTaskEvent(set)); eventCenter.post(new DataServerChangeEvent(result.getNodes(), versionMap, DataServerChangeEvent.FromType.REGISTER_META));break; }複製代碼
在啓動時候,post了event,可是指定了啓動非RENEW task。
private void startScheduler() {try {if (schedulerStarted.compareAndSet(false, true)) { syncDataScheduler.startScheduler();// start all startTask except correction taskeventCenter.post(new StartTaskEvent( Arrays.stream(StartTaskTypeEnum.values()).filter(type -> type != StartTaskTypeEnum.RENEW) .collect(Collectors.toSet())));//start dump logcacheDigestTask.start(); } } catch (Exception e) { schedulerStarted.set(false);throw new RuntimeException("Data Scheduler start error!", e); } }複製代碼
最後動態架構以下,咱們也大體知道,DataServer就是一個SpringBoot程序,有幾個Server,有若干Bean,有若干定時服務,具體有一些其餘業務模塊等等,這對咱們接下來的理解有幫助。
+---------------------------------------------------------------------------+ | [DataServerBootstrap] | | | | | | +------------------------------------------+ +------------------------+ | | | [Bolt related] | | [http relatged] | | | | | | | | | | DataServerConfig | | httpServer | | | | | | | | | | boltExchange | | jerseyExchange | | | | | | | | | | server +-----------> serverHandlers | | applicationContext | | | | | | | | | | dataSyncServer+----> serverSyncHandlers | | jerseyResourceConfig | | | | | | | | | +------------------------------------------+ +------------------------+ | | +---------------------+ +----------------+ +------------------------+ | | |[meta related] | |[JVM related] | |[Timer related] | | | | | | | | | | | | metaServerService | | | | syncDataScheduler | | | | | | EventCenter | | | | | | datumLeaseManager | | | | CacheDigestTask | | | +---------------------+ +----------------+ | | | | +------------------------+ | +---------------------------------------------------------------------------+複製代碼
由於從問題出發更有幫助,因此咱們總結出一些問題列表,這些咱們指望在之後的分析中陸續解決。