聊聊skywalking的RemoteClientManager

本文主要研究一下skywalking的RemoteClientManagerjava

RemoteClientManager

skywalking-6.6.0/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/client/RemoteClientManager.javanode

public class RemoteClientManager implements Service {

    private static final Logger logger = LoggerFactory.getLogger(RemoteClientManager.class);

    private final ModuleDefineHolder moduleDefineHolder;
    private ClusterNodesQuery clusterNodesQuery;
    private volatile List<RemoteClient> usingClients;
    private GaugeMetrics gauge;
    private int remoteTimeout;

    /**
     * Initial the manager for all remote communication clients.
     *
     * @param moduleDefineHolder for looking up other modules
     * @param remoteTimeout      for cluster internal communication, in second unit.
     */
    public RemoteClientManager(ModuleDefineHolder moduleDefineHolder, int remoteTimeout) {
        this.moduleDefineHolder = moduleDefineHolder;
        this.usingClients = ImmutableList.of();
        this.remoteTimeout = remoteTimeout;
    }

    public void start() {
        Executors.newSingleThreadScheduledExecutor().scheduleWithFixedDelay(this::refresh, 1, 5, TimeUnit.SECONDS);
    }

    /**
     * Query OAP server list from the cluster module and create a new connection for the new node. Make the OAP server
     * orderly because of each of the server will send stream data to each other by hash code.
     */
    void refresh() {
        if (gauge == null) {
            gauge = moduleDefineHolder.find(TelemetryModule.NAME).provider().getService(MetricsCreator.class)
                .createGauge("cluster_size", "Cluster size of current oap node",
                    MetricsTag.EMPTY_KEY, MetricsTag.EMPTY_VALUE);
        }
        try {
            if (Objects.isNull(clusterNodesQuery)) {
                synchronized (RemoteClientManager.class) {
                    if (Objects.isNull(clusterNodesQuery)) {
                        this.clusterNodesQuery = moduleDefineHolder.find(ClusterModule.NAME).provider().getService(ClusterNodesQuery.class);
                    }
                }
            }

            if (logger.isDebugEnabled()) {
                logger.debug("Refresh remote nodes collection.");
            }

            List<RemoteInstance> instanceList = clusterNodesQuery.queryRemoteNodes();
            instanceList = distinct(instanceList);
            Collections.sort(instanceList);

            gauge.setValue(instanceList.size());

            if (logger.isDebugEnabled()) {
                instanceList.forEach(instance -> logger.debug("Cluster instance: {}", instance.toString()));
            }

            if (!compare(instanceList)) {
                if (logger.isDebugEnabled()) {
                    logger.debug("ReBuilding remote clients.");
                }
                reBuildRemoteClients(instanceList);
            }

            printRemoteClientList();
        } catch (Throwable t) {
            logger.error(t.getMessage(), t);
        }
    }

    //......
}
  • RemoteClientManager提供了getRemoteClient方法獲取usingClients,它還提供了start方法,該方法註冊一個定時任務每隔5秒執行一次refresh;refresh方法經過clusterNodesQuery.queryRemoteNodes()獲取instanceList列表,而後根據Address去重一下再排序,而後跟本地的RemoteClient列表進行對比,若是有發現變動則觸發reBuildRemoteClients操做,最後在執行printRemoteClientList

reBuildRemoteClients

skywalking-6.6.0/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/client/RemoteClientManager.javagit

public class RemoteClientManager implements Service {

    //......

    private void reBuildRemoteClients(List<RemoteInstance> remoteInstances) {
        final Map<Address, RemoteClientAction> remoteClientCollection = this.usingClients.stream()
            .collect(Collectors.toMap(RemoteClient::getAddress, client -> new RemoteClientAction(client, Action.Close)));

        final Map<Address, RemoteClientAction> latestRemoteClients = remoteInstances.stream()
            .collect(Collectors.toMap(RemoteInstance::getAddress, remote -> new RemoteClientAction(null, Action.Create)));

        final Set<Address> unChangeAddresses = Sets.intersection(remoteClientCollection.keySet(), latestRemoteClients.keySet());

        unChangeAddresses.stream()
            .filter(remoteClientCollection::containsKey)
            .forEach(unChangeAddress -> remoteClientCollection.get(unChangeAddress).setAction(Action.Unchanged));

        // make the latestRemoteClients including the new clients only
        unChangeAddresses.forEach(latestRemoteClients::remove);
        remoteClientCollection.putAll(latestRemoteClients);

        final List<RemoteClient> newRemoteClients = new LinkedList<>();
        remoteClientCollection.forEach((address, clientAction) -> {
            switch (clientAction.getAction()) {
                case Unchanged:
                    newRemoteClients.add(clientAction.getRemoteClient());
                    break;
                case Create:
                    if (address.isSelf()) {
                        RemoteClient client = new SelfRemoteClient(moduleDefineHolder, address);
                        newRemoteClients.add(client);
                    } else {
                        RemoteClient client = new GRPCRemoteClient(moduleDefineHolder, address, 1, 3000, remoteTimeout);
                        client.connect();
                        newRemoteClients.add(client);
                    }
                    break;
            }
        });

        //for stable ordering for rolling selector
        Collections.sort(newRemoteClients);
        this.usingClients = ImmutableList.copyOf(newRemoteClients);

        remoteClientCollection.values()
            .stream()
            .filter(remoteClientAction -> remoteClientAction.getAction().equals(Action.Close))
            .forEach(remoteClientAction -> remoteClientAction.getRemoteClient().close());
    }

    //......
}
  • reBuildRemoteClients方法先構建remoteClientCollection及latestRemoteClients,而後取交集獲得unChangeAddresses,而後從latestRemoteClients移除unChangeAddresses,最後再把latestRemoteClients添加到remoteClientCollection;以後遍歷remoteClientCollection,對於action爲Create的區分爲SelfRemoteClient及GRPCRemoteClient,對於GRPCRemoteClient的還執行一下connect操做;最後對newRemoteClients進行排序,而後從新賦值給usingClients;最後對於action爲close的remoteClient執行close操做

RemoteSenderService

skywalking-6.6.0/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/RemoteSenderService.javagithub

public class RemoteSenderService implements Service {
    private static final Logger logger = LoggerFactory.getLogger(RemoteSenderService.class);

    private final ModuleManager moduleManager;
    private final HashCodeSelector hashCodeSelector;
    private final ForeverFirstSelector foreverFirstSelector;
    private final RollingSelector rollingSelector;

    public RemoteSenderService(ModuleManager moduleManager) {
        this.moduleManager = moduleManager;
        this.hashCodeSelector = new HashCodeSelector();
        this.foreverFirstSelector = new ForeverFirstSelector();
        this.rollingSelector = new RollingSelector();
    }

    public void send(String nextWorkName, StreamData streamData, Selector selector) {
        RemoteClientManager clientManager = moduleManager.find(CoreModule.NAME).provider().getService(RemoteClientManager.class);
        RemoteClient remoteClient = null;

        List<RemoteClient> clientList = clientManager.getRemoteClient();
        if (clientList.size() == 0) {
            logger.warn("There is no available remote server for now, ignore the streaming data until the cluster metadata initialized.");
            return;
        }
        switch (selector) {
            case HashCode:
                remoteClient = hashCodeSelector.select(clientList, streamData);
                break;
            case Rolling:
                remoteClient = rollingSelector.select(clientList, streamData);
                break;
            case ForeverFirst:
                remoteClient = foreverFirstSelector.select(clientList, streamData);
                break;
        }
        remoteClient.push(nextWorkName, streamData);
    }
}
  • RemoteSenderService提供了send方法,該方法從clientManager.getRemoteClient()獲取clientList,而後根據selector類型從中選取一個remoteClient執行remoteClient.push(nextWorkName, streamData)

RemoteClientSelector

skywalking-6.6.0/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/selector/RemoteClientSelector.javaapache

public interface RemoteClientSelector {
    RemoteClient select(List<RemoteClient> clients, StreamData streamData);
}
  • RemoteClientSelector定義了select方法

HashCodeSelector

skywalking-6.6.0/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/selector/HashCodeSelector.javaide

public class HashCodeSelector implements RemoteClientSelector {

    @Override public RemoteClient select(List<RemoteClient> clients, StreamData streamData) {
        int size = clients.size();
        int selectIndex = Math.abs(streamData.remoteHashCode()) % size;
        return clients.get(selectIndex);
    }
}
  • HashCodeSelector實現了RemoteClientSelector接口,它經過Math.abs(streamData.remoteHashCode()) % size來選擇selectIndex

RollingSelector

skywalking-6.6.0/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/selector/RollingSelector.javaui

public class RollingSelector implements RemoteClientSelector {

    private int index = 0;

    @Override public RemoteClient select(List<RemoteClient> clients, StreamData streamData) {
        int size = clients.size();
        index++;
        int selectIndex = Math.abs(index) % size;

        if (index == Integer.MAX_VALUE) {
            index = 0;
        }
        return clients.get(selectIndex);
    }
}
  • RollingSelector實現了RemoteClientSelector接口,它經過每次遞增index而後根據Math.abs(index) % size選擇selectIndex

ForeverFirstSelector

skywalking-6.6.0/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/selector/ForeverFirstSelector.javathis

public class ForeverFirstSelector implements RemoteClientSelector {

    private static final Logger logger = LoggerFactory.getLogger(ForeverFirstSelector.class);

    @Override public RemoteClient select(List<RemoteClient> clients, StreamData streamData) {
        if (logger.isDebugEnabled()) {
            logger.debug("clients size: {}", clients.size());
        }
        return clients.get(0);
    }
}
  • ForeverFirstSelector實現了RemoteClientSelector接口,它始終返回第一個client

小結

RemoteClientManager提供了getRemoteClient方法獲取usingClients,它還提供了start方法,該方法註冊一個定時任務每隔5秒執行一次refresh;refresh方法經過clusterNodesQuery.queryRemoteNodes()獲取instanceList列表,而後根據Address去重一下再排序,而後跟本地的RemoteClient列表進行對比,若是有發現變動則觸發reBuildRemoteClients操做,最後在執行printRemoteClientListdebug

doc

相關文章
相關標籤/搜索