本文主要研究一下Elasticsearch RestClient的RequestLoggerjava
elasticsearch-7.0.1/client/rest/src/main/java/org/elasticsearch/client/RequestLogger.javanode
final class RequestLogger {
private static final Log tracer = LogFactory.getLog("tracer");
private RequestLogger() {
}
/**
* Logs a request that yielded a response
*/
static void logResponse(Log logger, HttpUriRequest request, HttpHost host, HttpResponse httpResponse) {
if (logger.isDebugEnabled()) {
logger.debug("request [" + request.getMethod() + " " + host + getUri(request.getRequestLine()) +
"] returned [" + httpResponse.getStatusLine() + "]");
}
if (logger.isWarnEnabled()) {
Header[] warnings = httpResponse.getHeaders("Warning");
if (warnings != null && warnings.length > 0) {
logger.warn(buildWarningMessage(request, host, warnings));
}
}
if (tracer.isTraceEnabled()) {
String requestLine;
try {
requestLine = buildTraceRequest(request, host);
} catch(IOException e) {
requestLine = "";
tracer.trace("error while reading request for trace purposes", e);
}
String responseLine;
try {
responseLine = buildTraceResponse(httpResponse);
} catch(IOException e) {
responseLine = "";
tracer.trace("error while reading response for trace purposes", e);
}
tracer.trace(requestLine + '\n' + responseLine);
}
}
/**
* Logs a request that failed
*/
static void logFailedRequest(Log logger, HttpUriRequest request, Node node, Exception e) {
if (logger.isDebugEnabled()) {
logger.debug("request [" + request.getMethod() + " " + node.getHost() + getUri(request.getRequestLine()) + "] failed", e);
}
if (tracer.isTraceEnabled()) {
String traceRequest;
try {
traceRequest = buildTraceRequest(request, node.getHost());
} catch (IOException e1) {
tracer.trace("error while reading request for trace purposes", e);
traceRequest = "";
}
tracer.trace(traceRequest);
}
}
static String buildWarningMessage(HttpUriRequest request, HttpHost host, Header[] warnings) {
StringBuilder message = new StringBuilder("request [").append(request.getMethod()).append(" ").append(host)
.append(getUri(request.getRequestLine())).append("] returned ").append(warnings.length).append(" warnings: ");
for (int i = 0; i < warnings.length; i++) {
if (i > 0) {
message.append(",");
}
message.append("[").append(warnings[i].getValue()).append("]");
}
return message.toString();
}
/**
* Creates curl output for given request
*/
static String buildTraceRequest(HttpUriRequest request, HttpHost host) throws IOException {
String requestLine = "curl -iX " + request.getMethod() + " '" + host + getUri(request.getRequestLine()) + "'";
if (request instanceof HttpEntityEnclosingRequest) {
HttpEntityEnclosingRequest enclosingRequest = (HttpEntityEnclosingRequest) request;
if (enclosingRequest.getEntity() != null) {
requestLine += " -d '";
HttpEntity entity = enclosingRequest.getEntity();
if (entity.isRepeatable() == false) {
entity = new BufferedHttpEntity(enclosingRequest.getEntity());
enclosingRequest.setEntity(entity);
}
requestLine += EntityUtils.toString(entity, StandardCharsets.UTF_8) + "'";
}
}
return requestLine;
}
/**
* Creates curl output for given response
*/
static String buildTraceResponse(HttpResponse httpResponse) throws IOException {
StringBuilder responseLine = new StringBuilder();
responseLine.append("# ").append(httpResponse.getStatusLine());
for (Header header : httpResponse.getAllHeaders()) {
responseLine.append("\n# ").append(header.getName()).append(": ").append(header.getValue());
}
responseLine.append("\n#");
HttpEntity entity = httpResponse.getEntity();
if (entity != null) {
if (entity.isRepeatable() == false) {
entity = new BufferedHttpEntity(entity);
}
httpResponse.setEntity(entity);
ContentType contentType = ContentType.get(entity);
Charset charset = StandardCharsets.UTF_8;
if (contentType != null && contentType.getCharset() != null) {
charset = contentType.getCharset();
}
try (BufferedReader reader = new BufferedReader(new InputStreamReader(entity.getContent(), charset))) {
String line;
while( (line = reader.readLine()) != null) {
responseLine.append("\n# ").append(line);
}
}
}
return responseLine.toString();
}
private static String getUri(RequestLine requestLine) {
if (requestLine.getUri().charAt(0) != '/') {
return "/" + requestLine.getUri();
}
return requestLine.getUri();
}
}
複製代碼
elasticsearch-7.0.1/client/rest/src/main/java/org/elasticsearch/client/RestClient.javagit
public class RestClient implements Closeable {
//......
public Response performRequest(Request request) throws IOException {
InternalRequest internalRequest = new InternalRequest(request);
return performRequest(nextNodes(), internalRequest, null);
}
private Response performRequest(final NodeTuple<Iterator<Node>> nodeTuple,
final InternalRequest request,
Exception previousException) throws IOException {
RequestContext context = request.createContextForNextAttempt(nodeTuple.nodes.next(), nodeTuple.authCache);
HttpResponse httpResponse;
try {
httpResponse = client.execute(context.requestProducer, context.asyncResponseConsumer, context.context, null).get();
} catch(Exception e) {
RequestLogger.logFailedRequest(logger, request.httpRequest, context.node, e);
onFailure(context.node);
Exception cause = extractAndWrapCause(e);
addSuppressedException(previousException, cause);
if (nodeTuple.nodes.hasNext()) {
return performRequest(nodeTuple, request, cause);
}
if (cause instanceof IOException) {
throw (IOException) cause;
}
if (cause instanceof RuntimeException) {
throw (RuntimeException) cause;
}
throw new IllegalStateException("unexpected exception type: must be either RuntimeException or IOException", cause);
}
ResponseOrResponseException responseOrResponseException = convertResponse(request, context.node, httpResponse);
if (responseOrResponseException.responseException == null) {
return responseOrResponseException.response;
}
addSuppressedException(previousException, responseOrResponseException.responseException);
if (nodeTuple.nodes.hasNext()) {
return performRequest(nodeTuple, request, responseOrResponseException.responseException);
}
throw responseOrResponseException.responseException;
}
private ResponseOrResponseException convertResponse(InternalRequest request, Node node, HttpResponse httpResponse) throws IOException {
RequestLogger.logResponse(logger, request.httpRequest, node.getHost(), httpResponse);
int statusCode = httpResponse.getStatusLine().getStatusCode();
Response response = new Response(request.httpRequest.getRequestLine(), node.getHost(), httpResponse);
if (isSuccessfulResponse(statusCode) || request.ignoreErrorCodes.contains(response.getStatusLine().getStatusCode())) {
onResponse(node);
if (request.warningsHandler.warningsShouldFailRequest(response.getWarnings())) {
throw new WarningFailureException(response);
}
return new ResponseOrResponseException(response);
}
ResponseException responseException = new ResponseException(response);
if (isRetryStatus(statusCode)) {
//mark host dead and retry against next one
onFailure(node);
return new ResponseOrResponseException(responseException);
}
//mark host alive and don't retry, as the error should be a request problem onResponse(node); throw responseException; } private void performRequestAsync(final NodeTuple<Iterator<Node>> nodeTuple, final InternalRequest request, final FailureTrackingResponseListener listener) { final RequestContext context = request.createContextForNextAttempt(nodeTuple.nodes.next(), nodeTuple.authCache); client.execute(context.requestProducer, context.asyncResponseConsumer, context.context, new FutureCallback<HttpResponse>() { @Override public void completed(HttpResponse httpResponse) { try { ResponseOrResponseException responseOrResponseException = convertResponse(request, context.node, httpResponse); if (responseOrResponseException.responseException == null) { listener.onSuccess(responseOrResponseException.response); } else { if (nodeTuple.nodes.hasNext()) { listener.trackFailure(responseOrResponseException.responseException); performRequestAsync(nodeTuple, request, listener); } else { listener.onDefinitiveFailure(responseOrResponseException.responseException); } } } catch(Exception e) { listener.onDefinitiveFailure(e); } } @Override public void failed(Exception failure) { try { RequestLogger.logFailedRequest(logger, request.httpRequest, context.node, failure); onFailure(context.node); if (nodeTuple.nodes.hasNext()) { listener.trackFailure(failure); performRequestAsync(nodeTuple, request, listener); } else { listener.onDefinitiveFailure(failure); } } catch(Exception e) { listener.onDefinitiveFailure(e); } } @Override public void cancelled() { listener.onDefinitiveFailure(new ExecutionException("request was cancelled", null)); } }); } //...... } 複製代碼