源碼分析-分佈式鏈路追蹤:Skywalking存儲插件能力-elasticsearch

如上爲Skywalking的總體領域概念設計,基於領域模型設計,咱們能夠獲取不少信息:java

  • 存儲插件化git

  • 存儲模塊化es6

  • 存儲能力多樣性面試

總體源碼結構以下:算法

存儲能力主要包括:express

  • elasticsearchapache

  • influxdb微信

  • jaeger架構

  • jdbc-hikaricp併發

  • zipkin

這裏只是簡單分析elasticsearch7存儲的源碼,也是很是概要的分析,爲何呢主要是想帶着你們分析,讓你們也具有源碼分析的能力,並熱愛分析各類框架的源碼。

首先看storage-elasticsearch7-plugin目錄下的resources/META-INF.services目錄下的org.apache.skywalking.oap.server.library.module.ModuleProvider文件,這個就是模塊化設計

## Licensed to the Apache Software Foundation (ASF) under one or more# contributor license agreements. See the NOTICE file distributed with# this work for additional information regarding copyright ownership.# The ASF licenses this file to You under the Apache License, Version 2.0# (the "License"); you may not use this file except in compliance with# the License. You may obtain a copy of the License at## http://www.apache.org/licenses/LICENSE-2.0## Unless required by applicable law or agreed to in writing, software# distributed under the License is distributed on an "AS IS" BASIS,# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.# See the License for the specific language governing permissions and# limitations under the License.##
org.apache.skywalking.oap.server.storage.plugin.elasticsearch7.StorageModuleElasticsearch7Provider


其次看StorageModuleElasticsearch7Provider

  • prepare()方法

@Overridepublic void prepare() throws ServiceNotProvidedException { if (!StringUtil.isEmpty(config.getNameSpace())) { //獲取配置中心關於Elasticsearch7的配置-es的命名空間 config.setNameSpace(config.getNameSpace().toLowerCase()); } if (!StringUtil.isEmpty(config.getSecretsManagementFile())) { MultipleFilesChangeMonitor monitor = new MultipleFilesChangeMonitor( 10, readableContents -> { final byte[] secretsFileContent = readableContents.get(0); if (secretsFileContent == null) { return; } Properties secrets = new Properties(); secrets.load(new ByteArrayInputStream(secretsFileContent)); config.setUser(secrets.getProperty("user", null)); config.setPassword(secrets.getProperty("password", null)); config.setTrustStorePass(secrets.getProperty("trustStorePass", null));
if (elasticSearch7Client == null) { //In the startup process, we just need to change the username/password } else { // The client has connected, updates the config and connects again. elasticSearch7Client.setUser(config.getUser()); elasticSearch7Client.setPassword(config.getPassword()); elasticSearch7Client.setTrustStorePass(config.getTrustStorePass()); elasticSearch7Client.connect(); } }, config.getSecretsManagementFile(), config.getTrustStorePass()); /** * By leveraging the sync update check feature when startup. */ monitor.start(); }
//初始化客戶端,包括es集羣節點、es協議以及信任的存儲路徑 elasticSearch7Client = new ElasticSearch7Client( config.getClusterNodes(), config.getProtocol(), config.getTrustStorePath(), config .getTrustStorePass(), config.getUser(), config.getPassword(), indexNameConverters(config.getNameSpace()) );
//註冊各類DAO客戶端,完成基於DAO插件模塊的設計的初始化 this.registerServiceImplementation( IBatchDAO.class, new BatchProcessEsDAO(elasticSearch7Client, config.getBulkActions(), config.getFlushInterval(), config.getConcurrentRequests() )); this.registerServiceImplementation(StorageDAO.class, new StorageEs7DAO(elasticSearch7Client)); this.registerServiceImplementation( IHistoryDeleteDAO.class, new HistoryDeleteEsDAO(elasticSearch7Client)); this.registerServiceImplementation( INetworkAddressAliasDAO.class, new NetworkAddressAliasEsDAO( elasticSearch7Client, config.getResultWindowMaxSize() )); this.registerServiceImplementation(ITopologyQueryDAO.class, new TopologyQueryEsDAO(elasticSearch7Client)); this.registerServiceImplementation(IMetricsQueryDAO.class, new MetricsQueryEs7DAO(elasticSearch7Client)); this.registerServiceImplementation( ITraceQueryDAO.class, new TraceQueryEs7DAO(elasticSearch7Client, config.getSegmentQueryMaxSize())); this.registerServiceImplementation( IMetadataQueryDAO.class, new MetadataQueryEs7DAO(elasticSearch7Client, config.getMetadataQueryMaxSize())); this.registerServiceImplementation( IAggregationQueryDAO.class, new AggregationQueryEs7DAO(elasticSearch7Client)); this.registerServiceImplementation(IAlarmQueryDAO.class, new AlarmQueryEs7DAO(elasticSearch7Client)); this.registerServiceImplementation(ITopNRecordsQueryDAO.class, new TopNRecordsQueryEsDAO(elasticSearch7Client)); this.registerServiceImplementation(ILogQueryDAO.class, new LogQueryEs7DAO(elasticSearch7Client));
this.registerServiceImplementation( IProfileTaskQueryDAO.class, new ProfileTaskQueryEsDAO( elasticSearch7Client, config.getProfileTaskQueryMaxSize() )); this.registerServiceImplementation( IProfileTaskLogQueryDAO.class, new ProfileTaskLogEsDAO( elasticSearch7Client, config.getProfileTaskQueryMaxSize() )); this.registerServiceImplementation( IProfileThreadSnapshotQueryDAO.class, new ProfileThreadSnapshotQueryEs7DAO( elasticSearch7Client, config.getProfileTaskQueryMaxSize() )); this.registerServiceImplementation( UITemplateManagementDAO.class, new UITemplateManagementEsDAO(elasticSearch7Client));}
  • start()方法

@Overridepublic void start() throws ModuleStartException { MetricsCreator metricCreator = getManager().find(TelemetryModule.NAME).provider().getService(MetricsCreator.class); HealthCheckMetrics healthChecker = metricCreator.createHealthCheckerGauge("storage_elasticsearch", MetricsTag.EMPTY_KEY, MetricsTag.EMPTY_VALUE); //開啓健康檢查 elasticSearch7Client.registerChecker(healthChecker); try { //開啓鏈接 elasticSearch7Client.connect();  //完成es在OAP端的安裝(由於要區分es6和es7),因此就作了這麼一個模塊 StorageEs7Installer installer = new StorageEs7Installer(elasticSearch7Client, getManager(), config); getManager().find(CoreModule.NAME).provider().getService(ModelCreator.class).addModelListener(installer); } catch (StorageException | IOException | KeyStoreException | NoSuchAlgorithmException | KeyManagementException | CertificateException e) { throw new ModuleStartException(e.getMessage(), e); }}


源碼分析-分佈式鏈路追蹤:Skywalking 往期文章:

Skywalking、SpringCloudGateway以及SpringWebFlux如何融合


源碼分析Nacos往期文章:

Nacos源碼分析系列之Naming模塊-集羣篇-初級版

Nacos源碼分析系列之Naming模塊-集羣篇-理論概念

Nacos源碼分析系列之Naming模塊-如何運行篇

Nacos源碼分析系列之總體分層架構


歡迎加入做者的知識星球,裏面會有不少精彩的內容:


歡迎加我的微信號,交流技術:


做者近期文章運營計劃:

  • 已經開通gitchat帳戶,並持續輸出了不少高質量的文章

  • 我的微信公衆號-架構師玄學之路已經和開源中國原創計劃綁定,會實時推送到開源中國

  • 已經開通infoq平臺帳號,而且會持續輸出關於架構師的經典文章

  • 開通知識星球,開始試運行,歡迎你們來關注


gitchat精彩文章列表:

  • 從高性能、高可用及高併發角度剖析 RocketMQ

  • 一篇文章就能搞定基礎面試:Java 併發包(JUC)及應用場景,助你能夠反擼面試官

  • 調侃面試官,分佈式選舉算法 Raft 在 Nacos 中的應用

  • 你所不知道的 RocketMQ 的集羣管理:副本機制

  • 分佈式鏈路追蹤:Skywalking 底層存儲模型設計

  • 分佈式鏈路追蹤:Skywalking 的鏈路模型設計

  • 分佈式鏈路追蹤:Skywalking 探針模型設計

  • 分佈式鏈路追蹤:集羣管理設計

  • 分佈式鏈路追蹤 SkyWalking:配置管理設計

  • 分佈式鏈路追蹤 Skywalking:底層通訊設計

  • 分佈式鏈路追蹤 Skywalking:告警和度量架構設計

  • 分佈式鏈路追蹤 Skywalking:插件化和模塊化架構設計

  • SkyWalking 分佈式鏈路追蹤:最新 Kafka 通訊模型設計

  • 分佈式鏈路追蹤:Spring-Cloud-Sleuth 探針模型設計

  • Spring Cloud Sleuth:分佈式鏈路追蹤之通訊模型設計


做者-遊俠,一名對技術、架構、業務和管理如何融合,並孜孜不倦的高級碼農。

本文分享自微信公衆號 - 架構師玄學之路(andy_aty)。
若有侵權,請聯繫 support@oschina.cn 刪除。
本文參與「OSC源創計劃」,歡迎正在閱讀的你也加入,一塊兒分享。

相關文章
相關標籤/搜索