本文來自於個人公衆號 程序猿天璇:Nacos 1.3.0-BETA 即未來襲,此次來波大的!,轉載請保留連接 ;)java
本次1.3.0-BETA的改動程度很大,涉及兩個模塊的修改以及新增一個核心模塊。node
在1.3.0-BETA以前,nacos的naming模塊以及config模塊存在各自的集羣成員節點列表管理任務。爲了統一nacos集羣下成員列表的尋址模式,將集羣節點管理的實現從naming模塊以及config模塊剝離出來,統一下沉到了core模塊的尋址模式,同時新增命令參數-Dnacos.member.list
進行設置nacos集羣節點列表,該參數能夠看做是cluster.conf
文件的一個替代。 前nacos的尋址模式類別以下:mysql
b. 集羣模式:git
AddressServerMemberLookupgithub
邏輯圖以下:web
本次還新增成員節點元數據信息,如site、raft_port、adweight、weight,以支持未來在成員節點之間作相應的負載均衡或者其餘操做,所以cluster.conf 件中配置集羣成員節點列表的格式以下:sql
1172.20.10.7:7001?raft_port=8001&site=unknown&adweight=0&weight=1
該格式徹底兼容本來的cluster.conf
格式,用戶在使用 1.3.0-BETA版本時, 無需改動cluster.conf
文件的內容。shell
接下來介紹除了單機模式下的尋址模式的其餘三種尋址模式數據庫
該尋址模式是基於cluster.conf
文件進行管理的,每一個節點會讀取各${nacos.home}/conf
下的cluster.conf
件內的成員節點列表,而後組成一個集羣。而且在首次讀取完${nacos.home}/conf
下的cluster.conf
文件後,會自動向操做系統的inotify機制註冊一個目錄監聽器,監聽${nacos.home}/conf
目錄下的全部文件變更(注意,這裏只會監聽文件,對於目錄下的文件變更沒法監聽),當須要進行集羣節點擴縮容時,須要手動去修改每一個節點各自${nacos.home}/conf
下的cluster.conf
的成員節點列表內容。數組
private FileWatcher watcher = new FileWatcher() { @Override public void onChange(FileChangeEvent event) { readClusterConfFromDisk(); } @Override public boolean interest(String context) { return StringUtils.contains(context, "cluster.conf"); } }; @Override public void run() throws NacosException { readClusterConfFromDisk(); if (memberManager.getServerList().isEmpty()) { throw new NacosException(NacosException.SERVER_ERROR, "Failed to initialize the member node, is empty" ); } // Use the inotify mechanism to monitor file changes and automat ically // trigger the reading of cluster.conf try { WatchFileCenter.registerWatcher(ApplicationUtils.getConfFile Path(), watcher); } catch (Throwable e) { Loggers.CLUSTER.error("An exception occurred in the launch f ile monitor : {}", e); } }
首次啓動時直接讀取cluster.conf
文件內的節點列表信息,而後向WatchFileCenter
註冊一個目錄監聽器,當cluster.conf
文件發生變更時自動觸發readClusterConfFromDisk()
從新讀取cluster.conf
文件。
該尋址模式是基於一個額外的web服務器來管理cluster.conf
,每一個節點按期向該web服務器請求cluster.conf
的文件內容,而後實現集羣節點間的尋址,以及擴縮容。
當須要進行集羣擴縮容時,只須要修改cluster.conf
文件便可,而後每一個節點向地址服務器請求時會自動獲得最新的cluster.conf
文件內容。
public void init(ServerMemberManager memberManager) throws NacosExce ption { super.init(memberManager); initAddressSys(); this.maxFailCount =Integer.parseInt(ApplicationUtils.getProperty("maxHealthCheckFailCount", "12")); } private void initAddressSys() { String envDomainName = System.getenv("address_server_domain"); if (StringUtils.isBlank(envDomainName)) { domainName = System.getProperty("address.server.domain", "jm env.tbsite.net"); } else { domainName = envDomainName; } String envAddressPort = System.getenv("address_server_port"); if (StringUtils.isBlank(envAddressPort)) { addressPort = System.getProperty("address.server.port", "8080"); } else { addressPort = envAddressPort; } addressUrl = System.getProperty("address.server.url", memberManager.getContextPath() + "/" + "serverlist"); addressServerUrl = "http://" + domainName + ":" + addressPort + addressUrl;envIdUrl = "http://" + domainName + ":" + addressPort + "/env"; Loggers.CORE.info("ServerListService address-server port:" + addressPort); Loggers.CORE.info("ADDRESS_SERVER_URL:" + addressServerUrl); } @SuppressWarnings("PMD.UndefineMagicConstantRule") @Override public void run() throws NacosException { // With the address server, you need to perform a synchronous me mber node pull at startup // Repeat three times, successfully jump out boolean success = false; Throwable ex = null; int maxRetry = ApplicationUtils.getProperty("nacos.core.address-server.retry", Integer.class, 5); for (int i = 0; i < maxRetry; i ++) { try { syncFromAddressUrl(); success = true; break; } catch (Throwable e) { ex = e; Loggers.CLUSTER.error("[serverlist] exception, error : {}", ex); } } if (!success) { throw new NacosException(NacosException.SERVER_ERROR, ex);; } task = new AddressServerSyncTask(); GlobalExecutor.scheduleSyncJob(task, 5_000L); }
在初始化時,會主動去向地址服務器同步當前的集羣成員列表信息,若是失敗則進行重試,其最大重試次數可經過設置nacos.core.address-server.retry
來控制,默認是5次,而後成功以後,將建立定時任務去向地址服務器同步集羣成員節點信息。
該尋址模式是新增的集羣節點發現模式,該模式須要cluster.conf
或者-Dnacos.member.list
提供初始化集羣節點列表,假設已有集羣cluster-one中有A、B、C三個節點,新節點D要加集羣,那麼只須要節點D在啓動時的集羣節點列表存在A、B、C三個中的一個便可,而後節點之間會相互同步各自知道的集羣節點列表,在必定的是時間內,A、B、C、D四個節點知道的集羣節點成員列表都會是[A、B、C、D]在執行集羣節點列表同步時,會隨機選取K個處於UP狀態的節點進行同步。
Collection<Member> members = MemberUtils.kRandom(memberManager, membe r -> { // local node or node check failed will not perform task processing if (memberManager.isSelf(member) || !member.check ()) { return false; } NodeState state = member.getState(); return !(state == NodeState.DOWN || state == Node State.SUSPICIOUS); });
經過一個簡單的流程圖看下DiscoveryMemberLookup是怎麼工做的
圖片正在加載中。。。
因爲未來Nacos會對總體通訊通道作升級,採用GRPC優化nacos-server之間,nacos-client與nacos-server之間的通訊,同時爲了兼容目前已有的HTTP協議接口,那麼勢必會帶來這個問題,本機用於RPC協議的端口如何讓其餘節點知道?這裏有兩個解決方案。
以前的cluster.conf格式
ip[:port] ip[:port] ip[:port]
因爲nacos默認端口是8848,所以在端口未被修改的狀況下,能夠直接寫IP列表
新的cluster.conf
ip[:port][:RPC_PORT] ip[:port][:RPC_PORT] ip[:port][:RPC_PORT]
對於以前的cluster.conf
是徹底支持的,由於nacos內部能夠經過一些計算來約定RPC_PORT
的端口值,也能夠經過顯示的設置來約定。經過計算來約定RPC_PORT的代碼以下:
// member port int port = Member.getPort(); // Set the default Raft port information for security int rpcPort = port + 1000 >= 65535 ? port + 1 : port + 1000;
可是這樣會有一個問題,即若是用戶手動設置了RPC_PORT
的話,那麼對於客戶端、服務端來講,感知新的RPC_PORT
就要修改對應的配置文件或者初始化參數。所以但願說可以讓用戶無感知的過渡到RPC_PORT
通訊通道,即用戶須要對RPC協議使用的端口無需本身在進行設置。
端口協商即利用目前已有的HTTP接口,將RPC協議佔用的端口經過HTTP接口進行查詢返回,這樣不管是客戶端仍是服務端,都無需修改目前已有的初始化參數或者cluster.conf
文件,其大體時序圖以下:
經過一個額外的端口獲取HTTP接口,直接在內部實現RPC端口的協商,而且只會在初始化時進行拉取,這樣,未來nacos新增任何一種協議的端口都無需修改相應的配置信息,自動完成協議端口的感知。
從nacos的將來的總體架構圖能夠看出,一致性協議層將是做爲nacos的最爲核心的模塊,將服務於構建在core模塊之上的各個功能模塊,或者服務與core模塊自己。而一致性協議由於分區容錯性的存在,須要在可用性與一致性之間作選擇,所以就存在兩大類一致性:最終一致性和強一致性。在nacos中,這兩類致性協議都是可能用到的,好比naming模塊,對於服務實例的數據管理分別用到了AP以及CP,而對於config模塊,將會涉及使用CP。同時還有以下幾個功能需求點:
根據一致協議以及上述功能需求點,本次作了一個抽象的一致協議層以及相關的接口。
所謂一致性,即多個副本之間是否可以保持一致性的特性,而副本的本質就是數據,對數據的操做,不是獲取就是修改。同時,一致協議實際上是針對分佈式狀況的,而這必然涉及多個節點,所以,須要有相應的接口可以調整一致性協議的協同工做節點。若是咱們要觀察一致性協議運行的狀況,該怎麼辦?好比Raft協議,咱們但願得知當前集羣中的Leader是誰,任期的狀況,當前集羣中的成員節點有誰?所以,還須要提供一個一致性協議元數據獲取。
綜上所述,ConsistencyProtcol的大體設計能夠出來了
/** * Has nothing to do with the specific implementation of the consist ency protocol * Initialization sequence: init(Config) * * <ul> * <li>{@link Config} : Relevant configuration information requi red by the consistency protocol, * for example, the Raft protocol needs to set the election time out time, the location where * the Log is stored, and the snapshot task execution interval</ li> * <li>{@link ConsistencyProtocol#protocolMetaData()} : Returns metadata information of the consistency * protocol, such as leader, term, and other metadata informatio n in the Raft protocol</li> * </ul> * * @author <a href="mailto:liaochuntao@live.com">liaochuntao</a> */ public interface ConsistencyProtocol<T extends Config> extends CommandOperations { /** * Consistency protocol initialization: perform initialization o perations based on the incoming Config * 一致性協議初始化,根據 Config 實現類 * * @param config {@link Config} */ void init(T config); /** * Copy of metadata information for this consensus protocol * 該一致性協議的元數據信息 * * @return metaData {@link ProtocolMetaData} */ ProtocolMetaData protocolMetaData(); /** * Obtain data according to the request * 數據獲取操做,根據GetRequest中的請求上下文進行查詢相應的數據 * * @param request request * @return data {@link GetRequest} * @throws Exception */ GetResponse getData(GetRequest request) throws Exception; /** * Data operation, returning submission results synchronously * 同步數據提交,在 Datum 中已攜帶相應的數據操做信息 * * @param data {@link Log} * @return submit operation result * @throws Exception */ LogFuture submit(Log data) throws Exception; /** * Data submission operation, returning submission results async hronously * 異步數據提交,在 Datum 中已攜帶相應的數據操做信息,返回一個Future,自行操做,提交發 的異常會在CompleteFuture中 * * @param data {@link Log} * @return {@link CompletableFuture<LogFuture>} submit result * @throws Exception when submit throw Exception */ CompletableFuture<LogFuture> submitAsync(Log data); /** * New member list * 新的成員節點列表,一致性協議處理相應的成員節點是加入仍是離開 * * @param addresses [ip:port, ip:port, ...] */ void memberChange(Set<String> addresses); /** * Consistency agreement service shut down * 一致性協議服務關閉 */ void shutdown(); }
針對CP協議,因爲存在Leader的概念,所以須要提供一個方法用於獲取CP協議當前的Leader是誰
public interface CPProtocol<C extends Config> extends ConsistencyPro tocol<C> { /** * Returns whether this node is a leader node * * @param group business module info * @return is leader * @throws Exception */ boolean isLeader(String group) throws Exception; }
上面說到,一致性協議實際上是對於數據操做而言的,數據操做基本分爲兩大類:數據查詢以及數據修改,同時還要知足不一樣功能模塊之間的數據進行隔離。所以這裏針對數據修改操做以及數據查詢操做分別闡述。
1. 數據修改
綜上,能夠得出Log對象的設計以下:
message Log { // 功能模塊分組信息 string group = 1; // 摘要或者標識 string key = 2; // 具體請求數據 bytes data = 3; // 數據類型 string type = 4; // 更爲具體的數據操做 string operation = 5; // 額外信息 map<string, string> extendInfo = 6; }
2. 數據查詢
綜上,能夠得出GetRequest對象的設計以下
message GetRequest { // 功能模塊分組信息 string group = 1; // 具體請求數據 bytes data = 2; // 額外信息 map<string, string> extendInfo = 3; }
當數據操做經過一致性協議進行submit以後,每一個節點須要去處理這個Log或者GetRequest對象,所以,咱們須要抽象出一個Log、GetRequest對象的Processor,不一樣的功能模塊經過實現該處理器,ConsistencyProtocol內部會根據Log、GetRequest的group屬性,將Log、GetRequest對象路由到具體的Processor,固然,Processor也須要代表本身是屬於哪個功能模塊的。
public abstract class LogProcessor { /** * get data by key * * @param request request {@link GetRequest} * @return target type data */ public abstract GetResponse getData(GetRequest request); /** * Process Submitted Log * * @param log {@link Log} * @return {@link boolean} */ public abstract LogFuture onApply(Log log); /** * Irremediable errors that need to trigger business price cuts * * @param error {@link Throwable} */ public void onError(Throwable error) { } /** * In order for the state machine that handles the transaction to be able to route * the Log to the correct LogProcessor, the LogProcessor needs to have an identity * information * * @return Business unique identification name */ public abstract String group(); }
針對CP協議,好比Raft協議,存在快照的設計,所以咱們須要針對CP協議單獨擴展出一個方法。
public abstract class LogProcessor4CP extends LogProcessor { /** * Discovery snapshot handler * It is up to LogProcessor to decide which SnapshotOperate shou ld be loaded and saved by itself * * @return {@link List <SnapshotOperate>} */ public List<SnapshotOperation> loadSnapshotOperate() { return Collections.emptyList(); } }
咱們能夠經過一個時序圖看看,一致性協議層的大體工做流程以下:
一致性協議層抽象好以後,剩下就是具體一致性協議實現的選擇了,這裏咱們選擇了螞蟻服開源的JRaft,那麼咱們如何將JRaft做爲CP協議的一個Backend呢?下面的簡單流程圖描述了當JRaft做爲CP協議的一個Backend時的初始化流程。
圖片正在加載中。。。
JRaftProtocol是當JRaft做爲CP協議的Backend時的一個ConsistencyProtocol的具體實現,其內部有一個JRaftServer成員屬性,JRaftServer分裝了JRaft的各類API操做,好比數據操做的提交,數據的查詢,成員節點的變動,Leader節點的查詢等等。
注意事項:JRaft運行期間產生的數據在
${nacos.home}/protocol/raft
文件目錄下。不一樣的業務模塊有不一樣的文件分組,若是當節點出現crash或者異常關閉時,清空該目錄下的文件,重啓節點便可。
因爲JRaft實現了raft group的概念,所以,徹底能夠利用 raft group的設計,爲每一個功能模塊單首創建個raft group。這裏給出部分代碼,該代碼體現瞭如何將LogProcessor嵌入到狀態機中併爲每一個LogPrcessor建立一個Raft Group。
synchronized void createMultiRaftGroup(Collection<LogProcessor4CP> processors) { // There is no reason why the LogProcessor cannot be processed b ecause of the synchronization if (!this.isStarted) { this.processors.addAll(processors); return; } final String parentPath = Paths.get(ApplicationUtils.getNacosHome(), "protocol/raft").toString(); for (LogProcessor4CP processor : processors) { final String groupName = processor.group(); if (alreadyRegisterBiz.contains(groupName)) { throw new DuplicateRaftGroupException(groupName); } alreadyRegisterBiz.add(groupName); final String logUri = Paths.get(parentPath, groupName, "log").toString(); final String snapshotUri = Paths.get(parentPath, groupName,"snapshot").toString(); final String metaDataUri = Paths.get(parentPath, groupName,"meta-data").toString(); // Initialize the raft file storage path for different services try { DiskUtils.forceMkdir(new File(logUri)); DiskUtils.forceMkdir(new File(snapshotUri)); DiskUtils.forceMkdir(new File(metaDataUri)); } catch (Exception e) { Loggers.RAFT.error("Init Raft-File dir have some error : {}", e); throw new RuntimeException(e); } // Ensure that each Raft Group has its own configuration and NodeOptions Configuration configuration = conf.copy(); NodeOptions copy = nodeOptions.copy(); // Here, the LogProcessor is passed into StateMachine, and when the StateMachine // triggers onApply, the onApply of the LogProcessor is actually called NacosStateMachine machine = new NacosStateMachine(this, processor); copy.setLogUri(logUri); copy.setRaftMetaUri(metaDataUri); copy.setSnapshotUri(snapshotUri); copy.setFsm(machine); copy.setInitialConf(configuration); // Set snapshot interval, default 1800 seconds int doSnapshotInterval = ConvertUtils.toInt(raftConfig.getVal(RaftSysConstants.RAFT_SNAPSHOT_INTERVAL_SECS), RaftSysConstants.DEFAULT_RAFT_SNAPSHOT_INTERVAL_ SECS); // If the business module does not implement a snapshot processor, cancel the snapshot doSnapshotInterval = CollectionUtils.isEmpty(processor.loadS napshotOperate()) ? 0 : doSnapshotInterval; copy.setSnapshotIntervalSecs(doSnapshotInterval); Loggers.RAFT.info("create raft group : {}", groupName); RaftGroupService raftGroupService = new RaftGroupService(gro upName, localPeerId, copy, rpcServer, true); // Because RpcServer has been started before, it is not allo wed to start again here Node node = raftGroupService.start(false); machine.setNode(node); RouteTable.getInstance().updateConfiguration(groupName, conf iguration); // Turn on the leader auto refresh for this group Random random = new Random(); long period = nodeOptions.getElectionTimeoutMs() + random.ne xtInt(5 * 1000); RaftExecutor.scheduleRaftMemberRefreshJob(() -> refreshRoute Table(groupName), period, period, TimeUnit.MILLISECONDS); // Save the node instance corresponding to the current group multiRaftGroup.put(groupName, new RaftGroupTuple(node, proce ssor, raftGroupService)); } }
或許有的人會有疑問,爲何要建立多個raft group,既然以前已經設計出了LogProcessor,徹底能夠利用一個Raft
Group,在狀態機appl時,根據Log的group屬性進行路由到不一樣的LogProcessor便可,每一個功能模塊就建立一個raft group,不是會消耗大量的資源嗎?
正如以前所說,咱們但願獨工做的模塊之間相互不存在影響,好比A模塊處理Log由於存在Block操做可能使得apply的速度緩慢,亦或者可能中途發生異常,對於Raft協議來講,當日志apply失敗時,狀態機將不可以繼續向前推動,由於若是繼續向前推動的話,因爲上一步的apply失敗,後面的全部apply均可能失敗,將會致使這個節點的數據與其餘節點的數據永遠不一致。若是說咱們將全部獨立工做的模塊,對於數據操做的請求處理放在同一個raft group,即一個狀態機中,就不可避免的會出現上述所說的問題,某個模塊在apply 日誌發生不可控的因素時,會影響其餘模塊的正常工做。
爲了使用者可以對JRaft進行相關簡單的運維,如Leader的切換,重置當前Raft集羣成員,觸發某個節點進行Snapshot操做等等,提供了一個簡單的HTTP接口進行操做,而且該接口有必定的限制,即每次只會執行一條運維指令。
1.切換某一個Raft Group的Leader節點
POST /nacos/v1/core/ops/raft { "groupId": "xxx", "transferLeader": "ip:{raft_port}" }
2.重置某一個Raft Group的集羣成員
POST /nacos/v1/core/ops/raft { "groupId": "xxx", "resetRaftCluster": "ip:{raft_port},ip:{raft_port},ip:{raft_por t},ip:{raft_port}" }
3.觸發某一個Raft Group執行快照操做
POST /nacos/v1/core/ops/raft { "groupId": "xxx", "doSnapshot": "ip:{raft_port}" }
### Sets the Raft cluster election timeout, default value is 5 second nacos.core.protocol.raft.data.election_timeout_ms=5000 ### Sets the amount of time the Raft snapshot will execute periodica lly, default is 30 minute nacos.core.protocol.raft.data.snapshot_interval_secs=30 ### Requested retries, default value is 1 nacos.core.protocol.raft.data.request_failoverRetries=1 ### raft internal worker threads nacos.core.protocol.raft.data.core_thread_num=8 ### Number of threads required for raft business request processing nacos.core.protocol.raft.data.cli_service_thread_num=4 ### raft linear read strategy, defaults to index nacos.core.protocol.raft.data.read_index_type=ReadOnlySafe ### rpc request timeout, default 5 seconds nacos.core.protocol.raft.data.rpc_request_timeout_ms=5000 ### Maximum size of each file RPC (snapshot copy) request between me mbers, default is 128 K nacos.core.protocol.raft.data.max_byte_count_per_rpc=131072 ### Maximum number of logs sent from leader to follower, default is 1024 nacos.core.protocol.raft.data.max_entries_size=1024 ### Maximum body size for sending logs from leader to follower, defa ult is 512K nacos.core.protocol.raft.data.max_body_size=524288 ### Maximum log storage buffer size, default 256K nacos.core.protocol.raft.data.max_append_buffer_size=262144 ### Election timer interval will be a random maximum outside the spe cified time, default is 1 second nacos.core.protocol.raft.data.max_election_delay_ms=1000 ### Specify the ratio between election timeout and heartbeat interval. Heartbeat interval is equal to ### electionTimeoutMs/electionHeartbeatFactor,One tenth by default. nacos.core.protocol.raft.data.election_heartbeat_factor=10 ### The tasks submitted to the leader accumulate the maximum batch s ize of a batch flush log storage. The default is 32 tasks. nacos.core.protocol.raft.data.apply_batch=32 ### Call fsync when necessary when writing logs and meta informatio n, usually should be true nacos.core.protocol.raft.data.sync=true ### Whether to write snapshot / raft meta-information to call fsync. The default is false. When sync is true, it is preferred to respect sync. nacos.core.protocol.raft.data.sync_meta=false ### Internal disruptor buffer size. For applications with high write throughput, you need to increase this value. The default value is 16384. nacos.core.protocol.raft.data.disruptor_buffer_size=16384 ### Whether to enable replication of pipeline request optimization, which is enabled by default nacos.core.protocol.raft.data.replicator_pipeline=true ### Maximum number of in-flight requests with pipeline requests enab led, default is 256 nacos.core.protocol.raft.data.max_replicator_inflight_msgs=256 ### Whether to enable LogEntry checksum nacos.core.protocol.raft.data.enable_log_entry_checksum=false
nacos內嵌的分佈式ID爲Snakeflower,dataCenterId默認爲1,workerId的值計算方式以下:
InetAddress address; try { address = InetAddress.getLocalHost(); } catch (final UnknownHostException e) { throw new IllegalStateException( "Cannot get LocalHost InetAddress, please ch eck your network!"); } byte[] ipAddressByteArray = address.getAddress(); workerId = (((ipAddressByteArray[ipAddressByteArray.length - 2] & 0B 11) << Byte.SIZE) + (ipAddressByteArray[ipAddressByteArray.length - 1] & 0xFF));
若是須要手動指定dataCenterId以及workerId,則在application.properties或者啓動時添加命令 參數
### set the dataCenterID manually # nacos.core.snowflake.data-center= ### set the WorkerID manually # nacos.core.snowflake.worker-id=
將一次請求操做涉及的SQL上下文按順序保存起來。而後經過一致協議層將本次請求涉及的SQL上下 文進行同步,而後每一個節點將其解析並從新按順序在一次數據庫會話中執行。
當使用者開啓1.3.0-BETA的新特性——內嵌分佈式關係型數據存儲時,全部的寫操做請求都將路由到Leader節點進行處理;可是,因爲Raft狀態機的特性,當某一個節點在apply數據庫操做請求時發生非SQL邏輯錯誤引起的異常時,將致使狀態機沒法繼續正常進行工做,此時將會觸發配置管理模塊的降級操做。
private void registerSubscribe() { NotifyCenter.registerSubscribe(new SmartSubscribe() { @Override public void onEvent(Event event) { if (event instanceof RaftDBErrorRecoverEvent) { downgrading = false; return; } if (event instanceof RaftDBErrorEvent) { downgrading = true; } } @Override public boolean canNotify(Event event) { return (event instanceof RaftDBErrorEvent) || (event instanceof RaftDBErrorRecoverEvent); } }); }
所以,綜上所述,能夠經過活動圖來理解下,什麼狀況下須要將請求進行轉發呢?
數據庫的DML語句是select、insert、update、delete,根據SQL語句對於數據操做的性質,能夠分爲兩類:query以及update,select語句對應的是數據查詢,insert、update、delete語句對應的是數據修改。同時在進行數據庫操做時,爲了不SQL入注,使用的是PreparedStatement,所以須要SQL語句+參數,所以能夠獲得兩個關於數據庫操做的Request對象。
public class SelectRequest implements Serializable { private static final long serialVersionUID = 2212052574976898602L; // 查詢類別,由於 前使 的是JdbcTemplate,查詢單個、查詢多個,是否使 RowM apper轉爲對象 5private byte queryType; // sql語句 // select * from config_info where private String sql; private Object[] args; private String className; }
public class ModifyRequest implements Serializable { private static final long serialVersionUID = 4548851816596520564L; private int executeNo; private String sql; private Object[] args; }
配置發佈操做涉及三個事務:
這三個事務都在配置發佈這個大事務下,若是說咱們對每一個事務操做進行一個Raft協議提交,假設一、2兩個事務經過Raft提交後都成功Apply了,第三個事務在進行Raft提交後apply失敗,那麼對於這個配置發佈的大事務來講,是須要總體回滾的,不然就會違反原子性,那麼可能須要說將事務回滾操做又進行一次Raft提交,那麼總體的複雜程度上升,而且直接引了分佈式事務的管理,所以爲了不這個問題,咱們將這三個事務涉及的SQL上下文進行整合成一個大的SQL上下文,對這大的SQL上下文進行Raft協議提交。保證了三個子事務在同一次數據庫會話當中,成功解決原子性的問題,同時因爲Raft協議對於事務日誌的處理是串行執行的,所以至關於將數據庫的事務隔離級別調整爲串行化。
public void addConfigInfo(final String srcIp, final String srcUser, final ConfigInfo configInfo, final Timestamp time, final Map<String, Object> configAdvanceInfo, final boolean notify) { try { // 同過雪花ID獲取一個ID值 long configId = idGeneratorManager.nextId(configInfoId); long configHistoryId = idGeneratorManager.nextId(this.configHistoryId); // 配置插入 addConfigInfoAtomic(configId, srcIp, srcUser, configInfo, time, configAdvanceInfo); String configTags = configAdvanceInfo == null ? null : (String) configAdvanceInfo.get("config_tags"); // 配置與標籤信息關聯操做 addConfigTagsRelation(configId, configTags, configInfo.getD ataId(), configInfo.getGroup(), configInfo.getTenant()); // 配置歷史插入 insertConfigHistoryAtomic(configHistoryId, configInfo, srcI p, srcUser, time, "I"); boolean result = databaseOperate.smartUpdate(); if (!result) { throw new NacosConfigException("Config add failed"); } if (notify) { EventDispatcher.fireEvent( new ConfigDataChangeEvent(false, configInfo.getDataId(), configInfo.getGroup(), configInfo.getTenant(), time.getTime())); } } finally { SqlContextUtils.cleanCurrentSqlContext(); } } public long addConfigInfoAtomic(final long id, final String srcIp, final String srcUser, final ConfigInfo configInfo, final Timestamp time, Map<String, Object> configAdvanceInfo) { ... // 參數處理 ... final String sql = "INSERT INTO config_info(id, data_id, group_id, tenant_id, app_name, content, md5, src_ip, src_user, gmt_create," + "gmt_modified, c_desc, c_use, effect, type, c_schema) VALUES(?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)"; final Object[] args = new Object[] { id, configInfo.getDataId(), configInfo.getGroup(), tenantTmp, appNameTmp, configInfo.getContent(), md5Tmp, srcIp, srcUser, time, time, desc, use, effect, type, schema, }; SqlContextUtils.addSqlContext(sql, args); return id; } public void addConfigTagRelationAtomic(long configId, String tagName, String dataId, String group, String tenant) { final String sql = "INSERT INTO config_tags_relation(id,tag_name,tag_type,data_id,group_id,tenant_id) " + "VALUES(?,?,?,?,?,?)"; final Object[] args = new Object[] { configId, tagName, null, d ataId, group, tenant }; SqlContextUtils.addSqlContext(sql, args); } public void insertConfigHistoryAtomic(long configHistoryId, ConfigI nfo configInfo, String srcIp, String srcUser, final Timestamp time, String ops) { ... // 參數處理 ... final String sql = "INSERT INTO his_config_info (id,data_id,group_id,tenant_id,app_name,content,md5," + "src_ip,src_user,gmt_modified,op_type) VALUES(?,?,?,?,?,?,?,?,?,?,?)"; final Object[] args = new Object[] { configHistoryId, configInf o.getDataId(), configInfo.getGroup(), tenantTmp, appNameTmp, configInfo.getContent(), md5Tmp, srcIp, srcUser, time, ops}; SqlContextUtils.addSqlContext(sql, args); } /** * Temporarily saves all insert, update, and delete statements under * a transaction in the order in which they occur * * @author <a href="mailto:liaochuntao@live.com">liaochuntao</a> */ public class SqlContextUtils { private static final ThreadLocal<ArrayList<ModifyRequest>> SQL_CONTEXT = ThreadLocal.withInitial(ArrayList::new); public static void addSqlContext(String sql, Object... args) { ArrayList<ModifyRequest> requests = SQL_CONTEXT.get(); ModifyRequest context = new ModifyRequest(); context.setExecuteNo(requests.size()); context.setSql(sql); context.setArgs(args); requests.add(context); SQL_CONTEXT.set(requests); } public static List<ModifyRequest> getCurrentSqlContext() { return SQL_CONTEXT.get(); } public static void cleanCurrentSqlContext() { SQL_CONTEXT.remove(); } }
經過一個時序圖來更加直觀的理解
#*************** Embed Storage Related Configurations *************** # ### This value is true in stand-alone mode and false in cluster mode ### If this value is set to true in cluster mode, nacos's distributed storage engine is turned on embeddedStorage=true
是否啓用內嵌的分佈式關係型存儲的活動圖
直接查詢每一個節點的derby存儲的數據
GET /nacos/v1/cs/ops/derby?sql=select * from config_info return List<Map<String, Object>>
將於Apache Derby官方一塊兒嘗試基於Raft實現BingLog的同步複製操做,從底層實現數據庫同步能力。