聊聊skywalking的grpc-configuration-sync

本文主要研究一下skywalking的grpc-configuration-syncjava

configuration-service.proto

skywalking-6.6.0/oap-server/server-configuration/grpc-configuration-sync/src/main/proto/configuration-service.protogit

syntax = "proto3";

option java_multiple_files = true;
option java_package = "org.apache.skywalking.oap.server.configuration.service";

service ConfigurationService {
    rpc call (ConfigurationRequest) returns (ConfigurationResponse) {
    }
}

message ConfigurationRequest {
    // Logic name of this cluster,
    // in case the remote configuration center implementation support
    // configuration management for multiple clusters.
    string clusterName = 1;
}

message ConfigurationResponse {
    // Include all config items.
    // All config name should be not empty,
    // the name is composed by "module name"."provider name"."item name".
    // Each watcher implementor provides this, and it will be notified when the value changed.
    //
    // If the config center wants to set the value to NULL or empty,
    // must set the name with empty value explicitly.
    repeated Config configTable = 1;
}

message Config {
    string name = 1;
    string value = 2;
}
  • configuration-service.proto定義了ConfigurationService,它有一個call方法,參數爲ConfigurationRequest,返回ConfigurationResponse

GRPCConfigurationProvider

skywalking-6.6.0/oap-server/server-configuration/grpc-configuration-sync/src/main/java/org/apache/skywalking/oap/server/configuration/grpc/GRPCConfigurationProvider.javagithub

public class GRPCConfigurationProvider extends AbstractConfigurationProvider {
    private RemoteEndpointSettings settings;

    public GRPCConfigurationProvider() {
        settings = new RemoteEndpointSettings();
    }

    @Override public String name() {
        return "grpc";
    }

    @Override public ModuleConfig createConfigBeanIfAbsent() {
        return settings;
    }

    @Override protected ConfigWatcherRegister initConfigReader() throws ModuleStartException {
        if (Strings.isNullOrEmpty(settings.getHost())) {
            throw new ModuleStartException("No host setting.");
        }
        if (settings.getPort() < 1) {
            throw new ModuleStartException("No port setting.");
        }

        return new GRPCConfigWatcherRegister(settings);
    }
}
  • GRPCConfigurationProvider繼承了AbstractConfigurationProvider,它的initConfigReader根據RemoteEndpointSettings建立GRPCConfigWatcherRegister

RemoteEndpointSettings

skywalking-6.6.0/oap-server/server-configuration/grpc-configuration-sync/src/main/java/org/apache/skywalking/oap/server/configuration/grpc/RemoteEndpointSettings.javaapache

@Setter
@Getter
public class RemoteEndpointSettings extends ModuleConfig {
    private String host;
    private int port;
    private String clusterName = "default";
    // Sync configuration per 60 seconds.
    private int period = 60;

    @Override public String toString() {
        return "RemoteEndpointSettings{" +
            "host='" + host + '\'' +
            ", port=" + port +
            ", clusterName='" + clusterName + '\'' +
            '}';
    }
}

public class RemoteEndpointSettings定義了host、port、clusterName、period屬性ide

GRPCConfigWatcherRegister

skywalking-6.6.0/oap-server/server-configuration/grpc-configuration-sync/src/main/java/org/apache/skywalking/oap/server/configuration/grpc/GRPCConfigWatcherRegister.javaui

public class GRPCConfigWatcherRegister extends ConfigWatcherRegister {
    private static final Logger logger = LoggerFactory.getLogger(GRPCConfigWatcherRegister.class);

    private RemoteEndpointSettings settings;
    private ConfigurationServiceGrpc.ConfigurationServiceBlockingStub stub;

    public GRPCConfigWatcherRegister(RemoteEndpointSettings settings) {
        super(settings.getPeriod());
        this.settings = settings;
        stub = ConfigurationServiceGrpc.newBlockingStub(NettyChannelBuilder.forAddress(settings.getHost(), settings.getPort()).usePlaintext().build());
    }

    @Override public ConfigTable readConfig(Set<String> keys) {
        ConfigTable table = new ConfigTable();
        try {
            ConfigurationResponse response = stub.call(ConfigurationRequest.newBuilder().setClusterName(settings.getClusterName()).build());
            response.getConfigTableList().forEach(config -> {
                final String name = config.getName();
                if (keys.contains(name)) {
                    table.add(new ConfigTable.ConfigItem(name, config.getValue()));
                }
            });
        } catch (Exception e) {
            logger.error("Remote config center [" + settings + "] is not available.", e);
        }
        return table;
    }
}
  • GRPCConfigWatcherRegister繼承了ConfigWatcherRegister,它的構造器根據RemoteEndpointSettings構造ConfigurationServiceGrpc.ConfigurationServiceBlockingStub;其readConfig經過stub.call方法獲取configTableList而後添加到table中

小結

GRPCConfigurationProvider繼承了AbstractConfigurationProvider,它的initConfigReader根據RemoteEndpointSettings建立GRPCConfigWatcherRegister;GRPCConfigWatcherRegister繼承了ConfigWatcherRegister,它的構造器根據RemoteEndpointSettings構造ConfigurationServiceGrpc.ConfigurationServiceBlockingStub;其readConfig經過stub.call方法獲取configTableList而後添加到table中this

doc

相關文章
相關標籤/搜索