因爲RheaKV要講起來篇幅比較長,因此這裏分紅幾個章節來說,這一章講一講RheaKV初始化作了什麼?html
咱們先來給個例子,咱們從例子來說:java
public static void main(final String[] args) throws Exception {
final PlacementDriverOptions pdOpts = PlacementDriverOptionsConfigured.newConfigured()
.withFake(true) // use a fake pd
.config();
final StoreEngineOptions storeOpts = StoreEngineOptionsConfigured.newConfigured() //
.withStorageType(StorageType.RocksDB)
.withRocksDBOptions(RocksDBOptionsConfigured.newConfigured().withDbPath(Configs.DB_PATH).config())
.withRaftDataPath(Configs.RAFT_DATA_PATH)
.withServerAddress(new Endpoint("127.0.0.1", 8181))
.config();
final RheaKVStoreOptions opts = RheaKVStoreOptionsConfigured.newConfigured() //
.withClusterName(Configs.CLUSTER_NAME) //
.withInitialServerList(Configs.ALL_NODE_ADDRESSES)
.withStoreEngineOptions(storeOpts) //
.withPlacementDriverOptions(pdOpts) //
.config();
System.out.println(opts);
final Node node = new Node(opts);
node.start();
Runtime.getRuntime().addShutdownHook(new Thread(node::stop));
System.out.println("server1 start OK");
}
複製代碼
這裏爲了簡化邏輯,使用的無PD設置node
Node的實現:架構
public class Node {
private final RheaKVStoreOptions options;
private RheaKVStore rheaKVStore;
public Node(RheaKVStoreOptions options) {
this.options = options;
}
public void start() {
this.rheaKVStore = new DefaultRheaKVStore();
this.rheaKVStore.init(this.options);
}
public void stop() {
this.rheaKVStore.shutdown();
}
public RheaKVStore getRheaKVStore() {
return rheaKVStore;
}
}
複製代碼
因此這裏是初始化一個DefaultRheaKVStore,並調用其init方法進行初始化ide
因爲DefaultRheaKVStore的初始化方法都是在init方法中完成,因此這裏直接看DefaultRheaKVStore的init方法。源碼分析
public synchronized boolean init(final RheaKVStoreOptions opts) {
//1. 若是已經啓動了,那麼直接返回
if (this.started) {
LOG.info("[DefaultRheaKVStore] already started.");
return true;
}
this.opts = opts;
// init placement driver
// 2.根據PDoptions設置PD
final PlacementDriverOptions pdOpts = opts.getPlacementDriverOptions();
final String clusterName = opts.getClusterName();
Requires.requireNonNull(pdOpts, "opts.placementDriverOptions");
Requires.requireNonNull(clusterName, "opts.clusterName");
//設置集羣
if (Strings.isBlank(pdOpts.getInitialServerList())) {
// if blank, extends parent's value
pdOpts.setInitialServerList(opts.getInitialServerList());
}
//若是是無 PD 場景, RheaKV 提供 Fake PD Client
if (pdOpts.isFake()) {
this.pdClient = new FakePlacementDriverClient(opts.getClusterId(), clusterName);
} else {
this.pdClient = new RemotePlacementDriverClient(opts.getClusterId(), clusterName);
}
//初始化PD
if (!this.pdClient.init(pdOpts)) {
LOG.error("Fail to init [PlacementDriverClient].");
return false;
}
// init store engine
//3. 初始化存儲引擎
final StoreEngineOptions stOpts = opts.getStoreEngineOptions();
if (stOpts != null) {
stOpts.setInitialServerList(opts.getInitialServerList());
this.storeEngine = new StoreEngine(this.pdClient);
//初始化存儲引擎
if (!this.storeEngine.init(stOpts)) {
LOG.error("Fail to init [StoreEngine].");
return false;
}
}
//獲取當前節點的ip和端口號
final Endpoint selfEndpoint = this.storeEngine == null ? null : this.storeEngine.getSelfEndpoint();
final RpcOptions rpcOpts = opts.getRpcOptions();
Requires.requireNonNull(rpcOpts, "opts.rpcOptions");
//4. 初始化一個RpcService,並重寫getLeader方法
this.rheaKVRpcService = new DefaultRheaKVRpcService(this.pdClient, selfEndpoint) {
@Override
public Endpoint getLeader(final long regionId, final boolean forceRefresh, final long timeoutMillis) {
final Endpoint leader = getLeaderByRegionEngine(regionId);
if (leader != null) {
return leader;
}
return super.getLeader(regionId, forceRefresh, timeoutMillis);
}
};
if (!this.rheaKVRpcService.init(rpcOpts)) {
LOG.error("Fail to init [RheaKVRpcService].");
return false;
}
//獲取重試次數,默認重試兩次
this.failoverRetries = opts.getFailoverRetries();
//默認5000
this.futureTimeoutMillis = opts.getFutureTimeoutMillis();
//是否只從leader讀取數據,默認爲true
this.onlyLeaderRead = opts.isOnlyLeaderRead();
//5.初始化kvDispatcher, 這裏默認爲true
if (opts.isUseParallelKVExecutor()) {
//獲取當前cpu
final int numWorkers = Utils.cpus();
//向左移動4位,至關於乘以16
final int bufSize = numWorkers << 4;
final String name = "parallel-kv-executor";
final ThreadFactory threadFactory = Constants.THREAD_AFFINITY_ENABLED
//這裏選擇是否啓用線程親和性ThreadFactory
? new AffinityNamedThreadFactory(name, true) : new NamedThreadFactory(name, true);
//初始化Dispatcher
this.kvDispatcher = new TaskDispatcher(bufSize, numWorkers, WaitStrategyType.LITE_BLOCKING_WAIT,
threadFactory);
}
this.batchingOpts = opts.getBatchingOptions();
//默認是true
if (this.batchingOpts.isAllowBatching()) {
//這幾個batching暫時不知道是用來作什麼的,等用到再分析
this.getBatching = new GetBatching(KeyEvent::new, "get_batching",
new GetBatchingHandler("get", false));
this.getBatchingOnlySafe = new GetBatching(KeyEvent::new, "get_batching_only_safe",
new GetBatchingHandler("get_only_safe", true));
this.putBatching = new PutBatching(KVEvent::new, "put_batching",
new PutBatchingHandler("put"));
}
LOG.info("[DefaultRheaKVStore] start successfully, options: {}.", opts);
return this.started = true;
}
複製代碼
初始化的操做時在StoreEngine的init方法裏面實現的,咱們直接看這個方法的實現,這個方法是初始化核心對象,邏輯較爲複雜,但願有點耐心看完:ui
StoreEngine#initthis
public synchronized boolean init(final StoreEngineOptions opts) {
if (this.started) {
LOG.info("[StoreEngine] already started.");
return true;
}
this.storeOpts = Requires.requireNonNull(opts, "opts");
Endpoint serverAddress = Requires.requireNonNull(opts.getServerAddress(), "opts.serverAddress");
//獲取ip和端口
final int port = serverAddress.getPort();
final String ip = serverAddress.getIp();
//若是傳入的IP爲空,那麼就設置啓動機器ip做爲serverAddress的ip
if (ip == null || Utils.IP_ANY.equals(ip)) {
serverAddress = new Endpoint(NetUtil.getLocalCanonicalHostName(), port);
opts.setServerAddress(serverAddress);
}
//獲取度量上報時間
final long metricsReportPeriod = opts.getMetricsReportPeriod();
// init region options
List<RegionEngineOptions> rOptsList = opts.getRegionEngineOptionsList();
//1. 若是RegionEngineOptions爲空,則默認初始化一個
if (rOptsList == null || rOptsList.isEmpty()) {
// -1 region
final RegionEngineOptions rOpts = new RegionEngineOptions();
rOpts.setRegionId(Constants.DEFAULT_REGION_ID);
rOptsList = Lists.newArrayList();
rOptsList.add(rOpts);
opts.setRegionEngineOptionsList(rOptsList);
}
//獲取集羣名
final String clusterName = this.pdClient.getClusterName();
//2. 遍歷rOptsList集合,爲其中的RegionEngineOptions對象設置參數
for (final RegionEngineOptions rOpts : rOptsList) {
//用集羣名+「-」+RegionId 拼接設置爲RaftGroupId
rOpts.setRaftGroupId(JRaftHelper.getJRaftGroupId(clusterName, rOpts.getRegionId()));
rOpts.setServerAddress(serverAddress);
rOpts.setInitialServerList(opts.getInitialServerList());
if (rOpts.getNodeOptions() == null) {
// copy common node options
rOpts.setNodeOptions(opts.getCommonNodeOptions() == null ? new NodeOptions() : opts
.getCommonNodeOptions().copy());
}
//若是本來沒有設置度量上報時間,那麼就重置一下
if (rOpts.getMetricsReportPeriod() <= 0 && metricsReportPeriod > 0) {
// extends store opts 300
rOpts.setMetricsReportPeriod(metricsReportPeriod);
}
}
// init store
// 3. 初始化Store和Store裏面的region
final Store store = this.pdClient.getStoreMetadata(opts);
if (store == null || store.getRegions() == null || store.getRegions().isEmpty()) {
LOG.error("Empty store metadata: {}.", store);
return false;
}
this.storeId = store.getId();
// init executors
//4. 初始化執行器
if (this.readIndexExecutor == null) {
this.readIndexExecutor = StoreEngineHelper.createReadIndexExecutor(opts.getReadIndexCoreThreads());
}
if (this.raftStateTrigger == null) {
this.raftStateTrigger = StoreEngineHelper.createRaftStateTrigger(opts.getLeaderStateTriggerCoreThreads());
}
if (this.snapshotExecutor == null) {
this.snapshotExecutor = StoreEngineHelper.createSnapshotExecutor(opts.getSnapshotCoreThreads());
}
// init rpc executors 默認false
final boolean useSharedRpcExecutor = opts.isUseSharedRpcExecutor();
//5. 初始化rpc遠程執行器,用來執行RPCServer的Processors
if (!useSharedRpcExecutor) {
if (this.cliRpcExecutor == null) {
this.cliRpcExecutor = StoreEngineHelper.createCliRpcExecutor(opts.getCliRpcCoreThreads());
}
if (this.raftRpcExecutor == null) {
this.raftRpcExecutor = StoreEngineHelper.createRaftRpcExecutor(opts.getRaftRpcCoreThreads());
}
if (this.kvRpcExecutor == null) {
this.kvRpcExecutor = StoreEngineHelper.createKvRpcExecutor(opts.getKvRpcCoreThreads());
}
}
// init metrics
//作指標度量
startMetricReporters(metricsReportPeriod);
// init rpc server
//6. 初始化rpcServer,供其餘服務調用
this.rpcServer = new RpcServer(port, true, true);
//爲server加入各類processor
RaftRpcServerFactory.addRaftRequestProcessors(this.rpcServer, this.raftRpcExecutor, this.cliRpcExecutor);
StoreEngineHelper.addKvStoreRequestProcessor(this.rpcServer, this);
if (!this.rpcServer.start()) {
LOG.error("Fail to init [RpcServer].");
return false;
}
// init db store
//7. 根據不一樣的類型選擇db
if (!initRawKVStore(opts)) {
return false;
}
// init all region engine
// 8. 爲每一個region初始化RegionEngine
if (!initAllRegionEngine(opts, store)) {
LOG.error("Fail to init all [RegionEngine].");
return false;
}
// heartbeat sender
//若是開啓了自管理的集羣,那麼須要初始化心跳發送器
if (this.pdClient instanceof RemotePlacementDriverClient) {
HeartbeatOptions heartbeatOpts = opts.getHeartbeatOptions();
if (heartbeatOpts == null) {
heartbeatOpts = new HeartbeatOptions();
}
this.heartbeatSender = new HeartbeatSender(this);
if (!this.heartbeatSender.init(heartbeatOpts)) {
LOG.error("Fail to init [HeartbeatSender].");
return false;
}
}
this.startTime = System.currentTimeMillis();
LOG.info("[StoreEngine] start successfully: {}.", this);
return this.started = true;
}
複製代碼
咱們從上面標了號的代碼往下看:spa
這裏會調用pdClient的getStoreMetadata方法進行初始化,這裏咱們看FakePlacementDriverClient的實現: FakePlacementDriverClient#getStoreMetadata線程
public Store getStoreMetadata(final StoreEngineOptions opts) {
//實例化store
final Store store = new Store();
final List<RegionEngineOptions> rOptsList = opts.getRegionEngineOptionsList();
final List<Region> regionList = Lists.newArrayListWithCapacity(rOptsList.size());
store.setId(-1);
store.setEndpoint(opts.getServerAddress());
for (final RegionEngineOptions rOpts : rOptsList) {
//根據rOpts初始化Region實例加入到regionList中
regionList.add(getLocalRegionMetadata(rOpts));
}
store.setRegions(regionList);
return store;
}
複製代碼
這個方法裏面會實例化一個store以後遍歷rOptsList集合,在循環裏面會根據rOptsList裏面的RegionEngineOptions來調用getLocalRegionMetadata方法來實例化region,而後加入到regionList集合中。 在這裏須要主要rOptsList列表和regionList列表的下標是一一對應的關係,在下面的代碼中會用到這個對應關係。
在這裏應該能夠稍微理解到:
這張圖的意義了,每一個store下面會有不少的region。
而後咱們再看看Region怎麼被初始化的: 這裏是調用FakePlacementDriverClient的父類AbstractPlacementDriverClient的getLocalRegionMetadata來進行初始化的 AbstractPlacementDriverClient#getLocalRegionMetadata
protected Region getLocalRegionMetadata(final RegionEngineOptions opts) {
final long regionId = Requires.requireNonNull(opts.getRegionId(), "opts.regionId");
Requires.requireTrue(regionId >= Region.MIN_ID_WITH_MANUAL_CONF, "opts.regionId must >= "
+ Region.MIN_ID_WITH_MANUAL_CONF);
Requires.requireTrue(regionId < Region.MAX_ID_WITH_MANUAL_CONF, "opts.regionId must < "
+ Region.MAX_ID_WITH_MANUAL_CONF);
final byte[] startKey = opts.getStartKeyBytes();
final byte[] endKey = opts.getEndKeyBytes();
final String initialServerList = opts.getInitialServerList();
//實例化region
final Region region = new Region();
final Configuration conf = new Configuration();
// region
region.setId(regionId);
region.setStartKey(startKey);
region.setEndKey(endKey);
region.setRegionEpoch(new RegionEpoch(-1, -1));
// peers
Requires.requireTrue(Strings.isNotBlank(initialServerList), "opts.initialServerList is blank");
//將集羣ip和端口解析到peer中
conf.parse(initialServerList);
//每一個region都會存有集羣的信息
region.setPeers(JRaftHelper.toPeerList(conf.listPeers()));
this.regionRouteTable.addOrUpdateRegion(region);
return region;
}
複製代碼
Region 是最小的 KV 數據單元,可理解爲一個數據分區或者分片,每一個 Region 都有一個左閉右開的區間 [startKey, endKey),這裏初始化都是null,可以根據請求流量/負載/數據量大小等指標自動分裂以及自動副本搬遷。Region 有多個副本 Replication 構建 Raft Groups 存儲在不一樣的 Store 節點,經過 Raft 協議日誌複製功能數據同步到同 Group 的所有節點。
最後會將region存放到regionRouteTable中:
public void addOrUpdateRegion(final Region region) {
Requires.requireNonNull(region, "region");
Requires.requireNonNull(region.getRegionEpoch(), "regionEpoch");
final long regionId = region.getId();
final byte[] startKey = BytesUtil.nullToEmpty(region.getStartKey());
final StampedLock stampedLock = this.stampedLock;
final long stamp = stampedLock.writeLock();
try {
this.regionTable.put(regionId, region.copy());
this.rangeTable.put(startKey, regionId);
} finally {
stampedLock.unlockWrite(stamp);
}
}
複製代碼
在這個方法中將region根據regionId存入到regionTable中,而後根據startKey做爲key存入到rangeTable中。
在initAllRegionEngine裏面會初始化RegionEngine: StoreEngine#initAllRegionEngine
private boolean initAllRegionEngine(final StoreEngineOptions opts, final Store store) {
Requires.requireNonNull(opts, "opts");
Requires.requireNonNull(store, "store");
//獲取主目錄
String baseRaftDataPath = opts.getRaftDataPath();
if (Strings.isNotBlank(baseRaftDataPath)) {
try {
FileUtils.forceMkdir(new File(baseRaftDataPath));
} catch (final Throwable t) {
LOG.error("Fail to make dir for raftDataPath: {}.", baseRaftDataPath);
return false;
}
} else {
baseRaftDataPath = "";
}
final Endpoint serverAddress = opts.getServerAddress();
final List<RegionEngineOptions> rOptsList = opts.getRegionEngineOptionsList();
final List<Region> regionList = store.getRegions();
//由於regionList是根據rOptsList來初始化的,因此這裏校驗同樣數量是否是同樣的
Requires.requireTrue(rOptsList.size() == regionList.size());
for (int i = 0; i < rOptsList.size(); i++) {
//一一對應的獲取相應的RegionEngineOptions和region
final RegionEngineOptions rOpts = rOptsList.get(i);
final Region region = regionList.get(i);
//若是region路徑是空的,那麼就從新設值
if (Strings.isBlank(rOpts.getRaftDataPath())) {
final String childPath = "raft_data_region_" + region.getId() + "_" + serverAddress.getPort();
rOpts.setRaftDataPath(Paths.get(baseRaftDataPath, childPath).toString());
}
Requires.requireNonNull(region.getRegionEpoch(), "regionEpoch");
//根據Region初始化RegionEngine
final RegionEngine engine = new RegionEngine(region, this);
if (engine.init(rOpts)) {
//KV Server 服務端的請求處理服務
// 每一個 RegionKVService 對應一個 Region,只處理自己 Region 範疇內的請求
final RegionKVService regionKVService = new DefaultRegionKVService(engine);
//放入到regionKVServiceTable中
registerRegionKVService(regionKVService);
//設置region與engine映射表
this.regionEngineTable.put(region.getId(), engine);
} else {
LOG.error("Fail to init [RegionEngine: {}].", region);
return false;
}
}
return true;
}
複製代碼
首先這個方法會初始化一個baseRaftDataPath做爲主目錄 而後將rOptsList和regionList都取出來,遍歷rOptsList,並將RegionEngineOptions對應的region也找出來 而後這裏會爲每一個region實例化一個RegionEngine,並將engine包裝到RegionKVService中 最後將RegionKVService放入到regionKVServiceTable映射表中,將region放入到regionEngineTable映射表中
這裏的RegionKVServic是KV Server 服務端的請求處理服務,一個 StoreEngine 中包含不少 RegionKVService, 每一個 RegionKVService 對應一個 Region,只處理自己 Region 範疇內的請求。
初始化RegionEngine#init
public synchronized boolean init(final RegionEngineOptions opts) {
if (this.started) {
LOG.info("[RegionEngine: {}] already started.", this.region);
return true;
}
this.regionOpts = Requires.requireNonNull(opts, "opts");
//實例化狀態機
this.fsm = new KVStoreStateMachine(this.region, this.storeEngine);
// node options
NodeOptions nodeOpts = opts.getNodeOptions();
if (nodeOpts == null) {
nodeOpts = new NodeOptions();
}
//若是度量間隔時間大於零,那麼開啓度量
final long metricsReportPeriod = opts.getMetricsReportPeriod();
if (metricsReportPeriod > 0) {
// metricsReportPeriod > 0 means enable metrics
nodeOpts.setEnableMetrics(true);
}
//初始化集羣配置
nodeOpts.setInitialConf(new Configuration(JRaftHelper.toJRaftPeerIdList(this.region.getPeers())));
nodeOpts.setFsm(this.fsm);
//初始化各類日誌的路徑
final String raftDataPath = opts.getRaftDataPath();
try {
FileUtils.forceMkdir(new File(raftDataPath));
} catch (final Throwable t) {
LOG.error("Fail to make dir for raftDataPath {}.", raftDataPath);
return false;
}
if (Strings.isBlank(nodeOpts.getLogUri())) {
final Path logUri = Paths.get(raftDataPath, "log");
nodeOpts.setLogUri(logUri.toString());
}
if (Strings.isBlank(nodeOpts.getRaftMetaUri())) {
final Path meteUri = Paths.get(raftDataPath, "meta");
nodeOpts.setRaftMetaUri(meteUri.toString());
}
if (Strings.isBlank(nodeOpts.getSnapshotUri())) {
final Path snapshotUri = Paths.get(raftDataPath, "snapshot");
nodeOpts.setSnapshotUri(snapshotUri.toString());
}
LOG.info("[RegionEngine: {}], log uri: {}, raft meta uri: {}, snapshot uri: {}.", this.region,
nodeOpts.getLogUri(), nodeOpts.getRaftMetaUri(), nodeOpts.getSnapshotUri());
final Endpoint serverAddress = opts.getServerAddress();
final PeerId serverId = new PeerId(serverAddress, 0);
final RpcServer rpcServer = this.storeEngine.getRpcServer();
this.raftGroupService = new RaftGroupService(opts.getRaftGroupId(), serverId, nodeOpts, rpcServer, true);
//初始化node節點
this.node = this.raftGroupService.start(false);
RouteTable.getInstance().updateConfiguration(this.raftGroupService.getGroupId(), nodeOpts.getInitialConf());
if (this.node != null) {
final RawKVStore rawKVStore = this.storeEngine.getRawKVStore();
final Executor readIndexExecutor = this.storeEngine.getReadIndexExecutor();
//RaftRawKVStore 是 RheaKV 基於 Raft 複製狀態機 KVStoreStateMachine 的 RawKVStore 接口 KV 存儲實現
//RheaKV 的 Raft 入口,從這裏開始 Raft 流程
this.raftRawKVStore = new RaftRawKVStore(this.node, rawKVStore, readIndexExecutor);
//攔截請求作指標度量
this.metricsRawKVStore = new MetricsRawKVStore(this.region.getId(), this.raftRawKVStore);
// metrics config
if (this.regionMetricsReporter == null && metricsReportPeriod > 0) {
final MetricRegistry metricRegistry = this.node.getNodeMetrics().getMetricRegistry();
if (metricRegistry != null) {
final ScheduledExecutorService scheduler = this.storeEngine.getMetricsScheduler();
// start raft node metrics reporter
this.regionMetricsReporter = Slf4jReporter.forRegistry(metricRegistry) //
.prefixedWith("region_" + this.region.getId()) //
.withLoggingLevel(Slf4jReporter.LoggingLevel.INFO) //
.outputTo(LOG) //
.scheduleOn(scheduler) //
.shutdownExecutorOnStop(scheduler != null) //
.build();
this.regionMetricsReporter.start(metricsReportPeriod, TimeUnit.SECONDS);
}
}
this.started = true;
LOG.info("[RegionEngine] start successfully: {}.", this);
}
return this.started;
}
複製代碼
走到了這裏能夠找獲得我在第一講的時候講過熟悉的幾個實例了。若是不熟悉的話,不妨去翻閱一下個人第一篇文章:1. SOFAJRaft源碼分析— SOFAJRaft啓動時作了什麼? 在這裏會實例化狀態機,是KVStoreStateMachine的實例; 收動爲LogUri、RaftMetaUri、SnapshotUri賦值,並獲取storeEngine裏的rpcServer; 啓動raftGroupService返回通過初始化的node; 接下來會實例化raftRawKVStore,這個實例是RheaKV 的 Raft 入口,從這裏開始 Raft 流程,全部的RheaKV數據都是經過它來處理。
RheaKV初始化也是講了不少的內容,這一篇講了RheaKV在啓動時須要初始化哪些組件,Store和Region又是一個怎樣的關係,已經JRaft是在哪裏啓動的,狀態機是在哪裏設置的等等,內容也是很是的豐富。從這裏也能夠感覺到,看到一個好的架構設計就是一種享受。