本文主要研究一下elasticsearch的TransportProxyClienthtml
elasticsearch-6.4.3-sources.jar!/org/elasticsearch/client/transport/TransportProxyClient.javajava
final class TransportProxyClient { private final TransportClientNodesService nodesService; private final Map<Action, TransportActionNodeProxy> proxies; TransportProxyClient(Settings settings, TransportService transportService, TransportClientNodesService nodesService, List<GenericAction> actions) { this.nodesService = nodesService; Map<Action, TransportActionNodeProxy> proxies = new HashMap<>(); for (GenericAction action : actions) { if (action instanceof Action) { proxies.put((Action) action, new TransportActionNodeProxy(settings, action, transportService)); } } this.proxies = unmodifiableMap(proxies); } public <Request extends ActionRequest, Response extends ActionResponse, RequestBuilder extends ActionRequestBuilder<Request, Response, RequestBuilder>> void execute(final Action<Request, Response, RequestBuilder> action, final Request request, ActionListener<Response> listener) { final TransportActionNodeProxy<Request, Response> proxy = proxies.get(action); assert proxy != null : "no proxy found for action: " + action; nodesService.execute((n, l) -> proxy.execute(n, request, l), listener); } }
elasticsearch-6.4.3-sources.jar!/org/elasticsearch/action/TransportActionNodeProxy.javanode
public class TransportActionNodeProxy<Request extends ActionRequest, Response extends ActionResponse> extends AbstractComponent { private final TransportService transportService; private final GenericAction<Request, Response> action; private final TransportRequestOptions transportOptions; public TransportActionNodeProxy(Settings settings, GenericAction<Request, Response> action, TransportService transportService) { super(settings); this.action = action; this.transportService = transportService; this.transportOptions = action.transportOptions(settings); } public void execute(final DiscoveryNode node, final Request request, final ActionListener<Response> listener) { ActionRequestValidationException validationException = request.validate(); if (validationException != null) { listener.onFailure(validationException); return; } transportService.sendRequest(node, action.name(), request, transportOptions, new ActionListenerResponseHandler<>(listener, action::newResponse)); } }
elasticsearch-6.4.3-sources.jar!/org/elasticsearch/client/transport/TransportClientNodesService.java負載均衡
final class TransportClientNodesService extends AbstractComponent implements Closeable { private final TimeValue nodesSamplerInterval; private final long pingTimeout; private final ClusterName clusterName; private final TransportService transportService; private final ThreadPool threadPool; private final Version minCompatibilityVersion; // nodes that are added to be discovered private volatile List<DiscoveryNode> listedNodes = Collections.emptyList(); private final Object mutex = new Object(); private volatile List<DiscoveryNode> nodes = Collections.emptyList(); // Filtered nodes are nodes whose cluster name does not match the configured cluster name private volatile List<DiscoveryNode> filteredNodes = Collections.emptyList(); private final AtomicInteger tempNodeIdGenerator = new AtomicInteger(); private final NodeSampler nodesSampler; private volatile ScheduledFuture nodesSamplerFuture; private final AtomicInteger randomNodeGenerator = new AtomicInteger(Randomness.get().nextInt()); private final boolean ignoreClusterName; private volatile boolean closed; private final TransportClient.HostFailureListener hostFailureListener; //...... public TransportClientNodesService addTransportAddresses(TransportAddress... transportAddresses) { synchronized (mutex) { if (closed) { throw new IllegalStateException("transport client is closed, can't add an address"); } List<TransportAddress> filtered = new ArrayList<>(transportAddresses.length); for (TransportAddress transportAddress : transportAddresses) { boolean found = false; for (DiscoveryNode otherNode : listedNodes) { if (otherNode.getAddress().equals(transportAddress)) { found = true; logger.debug("address [{}] already exists with [{}], ignoring...", transportAddress, otherNode); break; } } if (!found) { filtered.add(transportAddress); } } if (filtered.isEmpty()) { return this; } List<DiscoveryNode> builder = new ArrayList<>(listedNodes); for (TransportAddress transportAddress : filtered) { DiscoveryNode node = new DiscoveryNode("#transport#-" + tempNodeIdGenerator.incrementAndGet(), transportAddress, Collections.emptyMap(), Collections.emptySet(), minCompatibilityVersion); logger.debug("adding address [{}]", node); builder.add(node); } listedNodes = Collections.unmodifiableList(builder); nodesSampler.sample(); } return this; } public TransportClientNodesService removeTransportAddress(TransportAddress transportAddress) { synchronized (mutex) { if (closed) { throw new IllegalStateException("transport client is closed, can't remove an address"); } List<DiscoveryNode> listNodesBuilder = new ArrayList<>(); for (DiscoveryNode otherNode : listedNodes) { if (!otherNode.getAddress().equals(transportAddress)) { listNodesBuilder.add(otherNode); } else { logger.debug("removing address [{}] from listed nodes", otherNode); } } listedNodes = Collections.unmodifiableList(listNodesBuilder); List<DiscoveryNode> nodesBuilder = new ArrayList<>(); for (DiscoveryNode otherNode : nodes) { if (!otherNode.getAddress().equals(transportAddress)) { nodesBuilder.add(otherNode); } else { logger.debug("disconnecting from node with address [{}]", otherNode); transportService.disconnectFromNode(otherNode); } } nodes = Collections.unmodifiableList(nodesBuilder); nodesSampler.sample(); } return this; } //...... }
通常是經過配置文件指定的clusterNodes
);nodesSampler.sample()方法會對listedNodes進行進一步檢測,好比將clusterName不是當前配置的clusterName的放到filteredNodes,剩下的再進行鏈接的創建,成功的放到nodes裏頭elasticsearch-6.4.3-sources.jar!/org/elasticsearch/client/transport/TransportClient.javadom
public abstract class TransportClient extends AbstractClient { private final TransportClientNodesService nodesService; private final TransportProxyClient proxy; //...... /** * Returns the current connected transport nodes that this client will use. * <p> * The nodes include all the nodes that are currently alive based on the transport * addresses provided. */ public List<DiscoveryNode> connectedNodes() { return nodesService.connectedNodes(); } /** * The list of filtered nodes that were not connected to, for example, due to * mismatch in cluster name. */ public List<DiscoveryNode> filteredNodes() { return nodesService.filteredNodes(); } /** * Returns the listed nodes in the transport client (ones added to it). */ public List<DiscoveryNode> listedNodes() { return nodesService.listedNodes(); } /** * Adds a transport address that will be used to connect to. * <p> * The Node this transport address represents will be used if its possible to connect to it. * If it is unavailable, it will be automatically connected to once it is up. * <p> * In order to get the list of all the current connected nodes, please see {@link #connectedNodes()}. */ public TransportClient addTransportAddress(TransportAddress transportAddress) { nodesService.addTransportAddresses(transportAddress); return this; } /** * Adds a list of transport addresses that will be used to connect to. * <p> * The Node this transport address represents will be used if its possible to connect to it. * If it is unavailable, it will be automatically connected to once it is up. * <p> * In order to get the list of all the current connected nodes, please see {@link #connectedNodes()}. */ public TransportClient addTransportAddresses(TransportAddress... transportAddress) { nodesService.addTransportAddresses(transportAddress); return this; } /** * Removes a transport address from the list of transport addresses that are used to connect to. */ public TransportClient removeTransportAddress(TransportAddress transportAddress) { nodesService.removeTransportAddress(transportAddress); return this; } //...... }
elasticsearch-6.4.3-sources.jar!/org/elasticsearch/client/transport/TransportClientNodesService.java異步
TransportClientNodesService(Settings settings, TransportService transportService, ThreadPool threadPool, TransportClient.HostFailureListener hostFailureListener) { super(settings); this.clusterName = ClusterName.CLUSTER_NAME_SETTING.get(settings); this.transportService = transportService; this.threadPool = threadPool; this.minCompatibilityVersion = Version.CURRENT.minimumCompatibilityVersion(); this.nodesSamplerInterval = TransportClient.CLIENT_TRANSPORT_NODES_SAMPLER_INTERVAL.get(this.settings); this.pingTimeout = TransportClient.CLIENT_TRANSPORT_PING_TIMEOUT.get(this.settings).millis(); this.ignoreClusterName = TransportClient.CLIENT_TRANSPORT_IGNORE_CLUSTER_NAME.get(this.settings); if (logger.isDebugEnabled()) { logger.debug("node_sampler_interval[{}]", nodesSamplerInterval); } if (TransportClient.CLIENT_TRANSPORT_SNIFF.get(this.settings)) { this.nodesSampler = new SniffNodesSampler(); } else { this.nodesSampler = new SimpleNodeSampler(); } this.hostFailureListener = hostFailureListener; this.nodesSamplerFuture = threadPool.schedule(nodesSamplerInterval, ThreadPool.Names.GENERIC, new ScheduledNodeSampler()); } //...... class ScheduledNodeSampler implements Runnable { @Override public void run() { try { nodesSampler.sample(); if (!closed) { nodesSamplerFuture = threadPool.schedule(nodesSamplerInterval, ThreadPool.Names.GENERIC, this); } } catch (Exception e) { logger.warn("failed to sample", e); } } } //......
默認是false
)來判斷是建立SniffNodesSampler仍是SimpleNodeSampler,經過threadPool註冊一個調度任務,每隔nodesSamplerInterval執行ScheduledNodeSampler;ScheduledNodeSampler實現了Runnable接口,其fun方法主要是調用nodesSampler.sample(),以後只要TransportClientNodesService沒有close,則會繼續註冊調度任務,並更新nodesSamplerFutureelasticsearch-6.4.3-sources.jar!/org/elasticsearch/client/transport/TransportClientNodesService.javaelasticsearch
abstract class NodeSampler { public void sample() { synchronized (mutex) { if (closed) { return; } doSample(); } } protected abstract void doSample(); /** * Establishes the node connections. If validateInHandshake is set to true, the connection will fail if * node returned in the handshake response is different than the discovery node. */ List<DiscoveryNode> establishNodeConnections(Set<DiscoveryNode> nodes) { for (Iterator<DiscoveryNode> it = nodes.iterator(); it.hasNext(); ) { DiscoveryNode node = it.next(); if (!transportService.nodeConnected(node)) { try { logger.trace("connecting to node [{}]", node); transportService.connectToNode(node); } catch (Exception e) { it.remove(); logger.debug(() -> new ParameterizedMessage("failed to connect to discovered node [{}]", node), e); } } } return Collections.unmodifiableList(new ArrayList<>(nodes)); } }
elasticsearch-6.4.3-sources.jar!/org/elasticsearch/client/transport/TransportClientNodesService.javaide
class SimpleNodeSampler extends NodeSampler { @Override protected void doSample() { HashSet<DiscoveryNode> newNodes = new HashSet<>(); ArrayList<DiscoveryNode> newFilteredNodes = new ArrayList<>(); for (DiscoveryNode listedNode : listedNodes) { try (Transport.Connection connection = transportService.openConnection(listedNode, LISTED_NODES_PROFILE)){ final PlainTransportFuture<LivenessResponse> handler = new PlainTransportFuture<>( new FutureTransportResponseHandler<LivenessResponse>() { @Override public LivenessResponse read(StreamInput in) throws IOException { LivenessResponse response = new LivenessResponse(); response.readFrom(in); return response; } }); transportService.sendRequest(connection, TransportLivenessAction.NAME, new LivenessRequest(), TransportRequestOptions.builder().withType(TransportRequestOptions.Type.STATE).withTimeout(pingTimeout).build(), handler); final LivenessResponse livenessResponse = handler.txGet(); if (!ignoreClusterName && !clusterName.equals(livenessResponse.getClusterName())) { logger.warn("node {} not part of the cluster {}, ignoring...", listedNode, clusterName); newFilteredNodes.add(listedNode); } else { // use discovered information but do keep the original transport address, // so people can control which address is exactly used. DiscoveryNode nodeWithInfo = livenessResponse.getDiscoveryNode(); newNodes.add(new DiscoveryNode(nodeWithInfo.getName(), nodeWithInfo.getId(), nodeWithInfo.getEphemeralId(), nodeWithInfo.getHostName(), nodeWithInfo.getHostAddress(), listedNode.getAddress(), nodeWithInfo.getAttributes(), nodeWithInfo.getRoles(), nodeWithInfo.getVersion())); } } catch (ConnectTransportException e) { logger.debug(() -> new ParameterizedMessage("failed to connect to node [{}], ignoring...", listedNode), e); hostFailureListener.onNodeDisconnected(listedNode, e); } catch (Exception e) { logger.info(() -> new ParameterizedMessage("failed to get node info for {}, disconnecting...", listedNode), e); } } nodes = establishNodeConnections(newNodes); filteredNodes = Collections.unmodifiableList(newFilteredNodes); } }
重試一次
)最後賦值給nodeselasticsearch-6.4.3-sources.jar!/org/elasticsearch/client/transport/TransportClientNodesService.javaui
class SniffNodesSampler extends NodeSampler { @Override protected void doSample() { // the nodes we are going to ping include the core listed nodes that were added // and the last round of discovered nodes Set<DiscoveryNode> nodesToPing = new HashSet<>(); for (DiscoveryNode node : listedNodes) { nodesToPing.add(node); } for (DiscoveryNode node : nodes) { nodesToPing.add(node); } final CountDownLatch latch = new CountDownLatch(nodesToPing.size()); final ConcurrentMap<DiscoveryNode, ClusterStateResponse> clusterStateResponses = ConcurrentCollections.newConcurrentMap(); try { for (final DiscoveryNode nodeToPing : nodesToPing) { threadPool.executor(ThreadPool.Names.MANAGEMENT).execute(new AbstractRunnable() { /** * we try to reuse existing connections but if needed we will open a temporary connection * that will be closed at the end of the execution. */ Transport.Connection connectionToClose = null; void onDone() { try { IOUtils.closeWhileHandlingException(connectionToClose); } finally { latch.countDown(); } } @Override public void onFailure(Exception e) { onDone(); if (e instanceof ConnectTransportException) { logger.debug(() -> new ParameterizedMessage("failed to connect to node [{}], ignoring...", nodeToPing), e); hostFailureListener.onNodeDisconnected(nodeToPing, e); } else { logger.info(() -> new ParameterizedMessage( "failed to get local cluster state info for {}, disconnecting...", nodeToPing), e); } } @Override protected void doRun() throws Exception { Transport.Connection pingConnection = null; if (nodes.contains(nodeToPing)) { try { pingConnection = transportService.getConnection(nodeToPing); } catch (NodeNotConnectedException e) { // will use a temp connection } } if (pingConnection == null) { logger.trace("connecting to cluster node [{}]", nodeToPing); connectionToClose = transportService.openConnection(nodeToPing, LISTED_NODES_PROFILE); pingConnection = connectionToClose; } transportService.sendRequest(pingConnection, ClusterStateAction.NAME, Requests.clusterStateRequest().clear().nodes(true).local(true), TransportRequestOptions.builder().withType(TransportRequestOptions.Type.STATE) .withTimeout(pingTimeout).build(), new TransportResponseHandler<ClusterStateResponse>() { @Override public ClusterStateResponse newInstance() { return new ClusterStateResponse(); } @Override public String executor() { return ThreadPool.Names.SAME; } @Override public void handleResponse(ClusterStateResponse response) { clusterStateResponses.put(nodeToPing, response); onDone(); } @Override public void handleException(TransportException e) { logger.info(() -> new ParameterizedMessage( "failed to get local cluster state for {}, disconnecting...", nodeToPing), e); try { hostFailureListener.onNodeDisconnected(nodeToPing, e); } finally { onDone(); } } }); } }); } latch.await(); } catch (InterruptedException e) { Thread.currentThread().interrupt(); return; } HashSet<DiscoveryNode> newNodes = new HashSet<>(); HashSet<DiscoveryNode> newFilteredNodes = new HashSet<>(); for (Map.Entry<DiscoveryNode, ClusterStateResponse> entry : clusterStateResponses.entrySet()) { if (!ignoreClusterName && !clusterName.equals(entry.getValue().getClusterName())) { logger.warn("node {} not part of the cluster {}, ignoring...", entry.getValue().getState().nodes().getLocalNode(), clusterName); newFilteredNodes.add(entry.getKey()); continue; } for (ObjectCursor<DiscoveryNode> cursor : entry.getValue().getState().nodes().getDataNodes().values()) { newNodes.add(cursor.value); } } nodes = establishNodeConnections(newNodes); filteredNodes = Collections.unmodifiableList(new ArrayList<>(newFilteredNodes)); } }
重試一次
)最後賦值給nodeselasticsearch-6.4.3-sources.jar!/org/elasticsearch/client/transport/TransportClientNodesService.javathis
final class TransportClientNodesService extends AbstractComponent implements Closeable { private final AtomicInteger randomNodeGenerator = new AtomicInteger(Randomness.get().nextInt()); //...... public <Response> void execute(NodeListenerCallback<Response> callback, ActionListener<Response> listener) { // we first read nodes before checking the closed state; this // is because otherwise we could be subject to a race where we // read the state as not being closed, and then the client is // closed and the nodes list is cleared, and then a // NoNodeAvailableException is thrown // it is important that the order of first setting the state of // closed and then clearing the list of nodes is maintained in // the close method final List<DiscoveryNode> nodes = this.nodes; if (closed) { throw new IllegalStateException("transport client is closed"); } ensureNodesAreAvailable(nodes); int index = getNodeNumber(); RetryListener<Response> retryListener = new RetryListener<>(callback, listener, nodes, index, hostFailureListener); DiscoveryNode node = retryListener.getNode(0); try { callback.doWithNode(node, retryListener); } catch (Exception e) { try { //this exception can't come from the TransportService as it doesn't throw exception at all listener.onFailure(e); } finally { retryListener.maybeNodeFailed(node, e); } } } private void ensureNodesAreAvailable(List<DiscoveryNode> nodes) { if (nodes.isEmpty()) { String message = String.format(Locale.ROOT, "None of the configured nodes are available: %s", this.listedNodes); throw new NoNodeAvailableException(message); } } private int getNodeNumber() { int index = randomNodeGenerator.incrementAndGet(); if (index < 0) { index = 0; randomNodeGenerator.set(0); } return index; } public static class RetryListener<Response> implements ActionListener<Response> { private final NodeListenerCallback<Response> callback; private final ActionListener<Response> listener; private final List<DiscoveryNode> nodes; private final int index; private final TransportClient.HostFailureListener hostFailureListener; private volatile int i; RetryListener(NodeListenerCallback<Response> callback, ActionListener<Response> listener, List<DiscoveryNode> nodes, int index, TransportClient.HostFailureListener hostFailureListener) { this.callback = callback; this.listener = listener; this.nodes = nodes; this.index = index; this.hostFailureListener = hostFailureListener; } @Override public void onResponse(Response response) { listener.onResponse(response); } @Override public void onFailure(Exception e) { Throwable throwable = ExceptionsHelper.unwrapCause(e); if (throwable instanceof ConnectTransportException) { maybeNodeFailed(getNode(this.i), (ConnectTransportException) throwable); int i = ++this.i; if (i >= nodes.size()) { listener.onFailure(new NoNodeAvailableException("None of the configured nodes were available: " + nodes, e)); } else { try { callback.doWithNode(getNode(i), this); } catch(final Exception inner) { inner.addSuppressed(e); // this exception can't come from the TransportService as it doesn't throw exceptions at all listener.onFailure(inner); } } } else { listener.onFailure(e); } } final DiscoveryNode getNode(int i) { return nodes.get((index + i) % nodes.size()); } final void maybeNodeFailed(DiscoveryNode node, Exception ex) { if (ex instanceof NodeDisconnectedException || ex instanceof NodeNotConnectedException) { hostFailureListener.onNodeDisconnected(node, ex); } } } //...... }
默認是false
)來判斷是建立SniffNodesSampler仍是SimpleNodeSampler,經過threadPool註冊一個調度任務,每隔nodesSamplerInterval執行ScheduledNodeSampler;ScheduledNodeSampler實現了Runnable接口,其fun方法主要是調用nodesSampler.sample(),以後只要TransportClientNodesService沒有close,則會繼續註冊調度任務,並更新nodesSamplerFuture重試一次
)最後賦值給nodes重試一次
)最後賦值給nodes