本文主要研究一下Elasticsearch RestClient的RequestLoggerjava
elasticsearch-7.0.1/client/rest/src/main/java/org/elasticsearch/client/RequestLogger.javanode
final class RequestLogger { private static final Log tracer = LogFactory.getLog("tracer"); private RequestLogger() { } /** * Logs a request that yielded a response */ static void logResponse(Log logger, HttpUriRequest request, HttpHost host, HttpResponse httpResponse) { if (logger.isDebugEnabled()) { logger.debug("request [" + request.getMethod() + " " + host + getUri(request.getRequestLine()) + "] returned [" + httpResponse.getStatusLine() + "]"); } if (logger.isWarnEnabled()) { Header[] warnings = httpResponse.getHeaders("Warning"); if (warnings != null && warnings.length > 0) { logger.warn(buildWarningMessage(request, host, warnings)); } } if (tracer.isTraceEnabled()) { String requestLine; try { requestLine = buildTraceRequest(request, host); } catch(IOException e) { requestLine = ""; tracer.trace("error while reading request for trace purposes", e); } String responseLine; try { responseLine = buildTraceResponse(httpResponse); } catch(IOException e) { responseLine = ""; tracer.trace("error while reading response for trace purposes", e); } tracer.trace(requestLine + '\n' + responseLine); } } /** * Logs a request that failed */ static void logFailedRequest(Log logger, HttpUriRequest request, Node node, Exception e) { if (logger.isDebugEnabled()) { logger.debug("request [" + request.getMethod() + " " + node.getHost() + getUri(request.getRequestLine()) + "] failed", e); } if (tracer.isTraceEnabled()) { String traceRequest; try { traceRequest = buildTraceRequest(request, node.getHost()); } catch (IOException e1) { tracer.trace("error while reading request for trace purposes", e); traceRequest = ""; } tracer.trace(traceRequest); } } static String buildWarningMessage(HttpUriRequest request, HttpHost host, Header[] warnings) { StringBuilder message = new StringBuilder("request [").append(request.getMethod()).append(" ").append(host) .append(getUri(request.getRequestLine())).append("] returned ").append(warnings.length).append(" warnings: "); for (int i = 0; i < warnings.length; i++) { if (i > 0) { message.append(","); } message.append("[").append(warnings[i].getValue()).append("]"); } return message.toString(); } /** * Creates curl output for given request */ static String buildTraceRequest(HttpUriRequest request, HttpHost host) throws IOException { String requestLine = "curl -iX " + request.getMethod() + " '" + host + getUri(request.getRequestLine()) + "'"; if (request instanceof HttpEntityEnclosingRequest) { HttpEntityEnclosingRequest enclosingRequest = (HttpEntityEnclosingRequest) request; if (enclosingRequest.getEntity() != null) { requestLine += " -d '"; HttpEntity entity = enclosingRequest.getEntity(); if (entity.isRepeatable() == false) { entity = new BufferedHttpEntity(enclosingRequest.getEntity()); enclosingRequest.setEntity(entity); } requestLine += EntityUtils.toString(entity, StandardCharsets.UTF_8) + "'"; } } return requestLine; } /** * Creates curl output for given response */ static String buildTraceResponse(HttpResponse httpResponse) throws IOException { StringBuilder responseLine = new StringBuilder(); responseLine.append("# ").append(httpResponse.getStatusLine()); for (Header header : httpResponse.getAllHeaders()) { responseLine.append("\n# ").append(header.getName()).append(": ").append(header.getValue()); } responseLine.append("\n#"); HttpEntity entity = httpResponse.getEntity(); if (entity != null) { if (entity.isRepeatable() == false) { entity = new BufferedHttpEntity(entity); } httpResponse.setEntity(entity); ContentType contentType = ContentType.get(entity); Charset charset = StandardCharsets.UTF_8; if (contentType != null && contentType.getCharset() != null) { charset = contentType.getCharset(); } try (BufferedReader reader = new BufferedReader(new InputStreamReader(entity.getContent(), charset))) { String line; while( (line = reader.readLine()) != null) { responseLine.append("\n# ").append(line); } } } return responseLine.toString(); } private static String getUri(RequestLine requestLine) { if (requestLine.getUri().charAt(0) != '/') { return "/" + requestLine.getUri(); } return requestLine.getUri(); } }
elasticsearch-7.0.1/client/rest/src/main/java/org/elasticsearch/client/RestClient.javagit
public class RestClient implements Closeable { //...... public Response performRequest(Request request) throws IOException { InternalRequest internalRequest = new InternalRequest(request); return performRequest(nextNodes(), internalRequest, null); } private Response performRequest(final NodeTuple<Iterator<Node>> nodeTuple, final InternalRequest request, Exception previousException) throws IOException { RequestContext context = request.createContextForNextAttempt(nodeTuple.nodes.next(), nodeTuple.authCache); HttpResponse httpResponse; try { httpResponse = client.execute(context.requestProducer, context.asyncResponseConsumer, context.context, null).get(); } catch(Exception e) { RequestLogger.logFailedRequest(logger, request.httpRequest, context.node, e); onFailure(context.node); Exception cause = extractAndWrapCause(e); addSuppressedException(previousException, cause); if (nodeTuple.nodes.hasNext()) { return performRequest(nodeTuple, request, cause); } if (cause instanceof IOException) { throw (IOException) cause; } if (cause instanceof RuntimeException) { throw (RuntimeException) cause; } throw new IllegalStateException("unexpected exception type: must be either RuntimeException or IOException", cause); } ResponseOrResponseException responseOrResponseException = convertResponse(request, context.node, httpResponse); if (responseOrResponseException.responseException == null) { return responseOrResponseException.response; } addSuppressedException(previousException, responseOrResponseException.responseException); if (nodeTuple.nodes.hasNext()) { return performRequest(nodeTuple, request, responseOrResponseException.responseException); } throw responseOrResponseException.responseException; } private ResponseOrResponseException convertResponse(InternalRequest request, Node node, HttpResponse httpResponse) throws IOException { RequestLogger.logResponse(logger, request.httpRequest, node.getHost(), httpResponse); int statusCode = httpResponse.getStatusLine().getStatusCode(); Response response = new Response(request.httpRequest.getRequestLine(), node.getHost(), httpResponse); if (isSuccessfulResponse(statusCode) || request.ignoreErrorCodes.contains(response.getStatusLine().getStatusCode())) { onResponse(node); if (request.warningsHandler.warningsShouldFailRequest(response.getWarnings())) { throw new WarningFailureException(response); } return new ResponseOrResponseException(response); } ResponseException responseException = new ResponseException(response); if (isRetryStatus(statusCode)) { //mark host dead and retry against next one onFailure(node); return new ResponseOrResponseException(responseException); } //mark host alive and don't retry, as the error should be a request problem onResponse(node); throw responseException; } private void performRequestAsync(final NodeTuple<Iterator<Node>> nodeTuple, final InternalRequest request, final FailureTrackingResponseListener listener) { final RequestContext context = request.createContextForNextAttempt(nodeTuple.nodes.next(), nodeTuple.authCache); client.execute(context.requestProducer, context.asyncResponseConsumer, context.context, new FutureCallback<HttpResponse>() { @Override public void completed(HttpResponse httpResponse) { try { ResponseOrResponseException responseOrResponseException = convertResponse(request, context.node, httpResponse); if (responseOrResponseException.responseException == null) { listener.onSuccess(responseOrResponseException.response); } else { if (nodeTuple.nodes.hasNext()) { listener.trackFailure(responseOrResponseException.responseException); performRequestAsync(nodeTuple, request, listener); } else { listener.onDefinitiveFailure(responseOrResponseException.responseException); } } } catch(Exception e) { listener.onDefinitiveFailure(e); } } @Override public void failed(Exception failure) { try { RequestLogger.logFailedRequest(logger, request.httpRequest, context.node, failure); onFailure(context.node); if (nodeTuple.nodes.hasNext()) { listener.trackFailure(failure); performRequestAsync(nodeTuple, request, listener); } else { listener.onDefinitiveFailure(failure); } } catch(Exception e) { listener.onDefinitiveFailure(e); } } @Override public void cancelled() { listener.onDefinitiveFailure(new ExecutionException("request was cancelled", null)); } }); } //...... }