elasticsearch提供了兩種方式TcpTransport和HttpServerTransport方便用戶進行各類操做(增,刪,改,查等)。node
Netty3HttpServerTransport
public ChannelPipeline getPipeline() throws Exception { ChannelPipeline pipeline = Channels.pipeline(); pipeline.addLast("openChannels", transport.serverOpenChannels); HttpRequestDecoder requestDecoder = new HttpRequestDecoder( (int) transport.maxInitialLineLength.getBytes(), (int) transport.maxHeaderSize.getBytes(), (int) transport.maxChunkSize.getBytes() ); if (transport.maxCumulationBufferCapacity.getBytes() >= 0) { if (transport.maxCumulationBufferCapacity.getBytes() > Integer.MAX_VALUE) { requestDecoder.setMaxCumulationBufferCapacity(Integer.MAX_VALUE); } else { requestDecoder.setMaxCumulationBufferCapacity((int) transport.maxCumulationBufferCapacity.getBytes()); } } if (transport.maxCompositeBufferComponents != -1) { requestDecoder.setMaxCumulationBufferComponents(transport.maxCompositeBufferComponents); } pipeline.addLast("decoder", requestDecoder); pipeline.addLast("decoder_compress", new HttpContentDecompressor()); HttpChunkAggregator httpChunkAggregator = new HttpChunkAggregator((int) transport.maxContentLength.getBytes()); if (transport.maxCompositeBufferComponents != -1) { httpChunkAggregator.setMaxCumulationBufferComponents(transport.maxCompositeBufferComponents); } pipeline.addLast("aggregator", httpChunkAggregator); pipeline.addLast("encoder", new ESNetty3HttpResponseEncoder()); if (transport.compression) { pipeline.addLast("encoder_compress", new HttpContentCompressor(transport.compressionLevel)); } if (SETTING_CORS_ENABLED.get(transport.settings())) { pipeline.addLast("cors", new Netty3CorsHandler(transport.getCorsConfig())); } if (transport.pipelining) { pipeline.addLast("pipelining", new HttpPipeliningHandler(transport.pipeliningMaxEvents)); } pipeline.addLast("handler", requestHandler); return pipeline; }
上面是Netty3HttpServerTransport服務端pipline的配置代碼,咱們重點關注倒數第二行代碼pipline.addLast("handler",requestHandler)。這行代碼是HttpServerTransport中對request的處理方式,從requestHandler咱們也能夠大體推測其意義。app
public class Netty3HttpRequestHandler extends SimpleChannelUpstreamHandler { private final Netty3HttpServerTransport serverTransport; private final boolean httpPipeliningEnabled; private final boolean detailedErrorsEnabled; private final ThreadContext threadContext; public Netty3HttpRequestHandler(Netty3HttpServerTransport serverTransport, boolean detailedErrorsEnabled, ThreadContext threadContext) { this.serverTransport = serverTransport; this.httpPipeliningEnabled = serverTransport.pipelining; this.detailedErrorsEnabled = detailedErrorsEnabled; this.threadContext = threadContext; } @Override public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception { HttpRequest request; OrderedUpstreamMessageEvent oue = null; if (this.httpPipeliningEnabled && e instanceof OrderedUpstreamMessageEvent) { oue = (OrderedUpstreamMessageEvent) e; request = (HttpRequest) oue.getMessage(); } else { request = (HttpRequest) e.getMessage(); } // the netty HTTP handling always copy over the buffer to its own buffer, either in NioWorker internally // when reading, or using a cumulation buffer Netty3HttpRequest httpRequest = new Netty3HttpRequest(serverTransport.xContentRegistry, request, e.getChannel()); Netty3HttpChannel channel = new Netty3HttpChannel(serverTransport, httpRequest, oue, detailedErrorsEnabled, threadContext); serverTransport.dispatchRequest(httpRequest, channel); super.messageReceived(ctx, e); } @Override public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) throws Exception { Netty3Utils.maybeDie(e.getCause()); serverTransport.exceptionCaught(ctx, e); } }
上面是requestHandler的具體實現類,咱們知道messageReceived是對收到的請求的處理方法。邏輯比較簡單,將請求從新封裝爲Netty3HttpRequest,並經過相關的channel進行處理,而後將請求進行分發(dispatchRequest),最後將其繼續往下傳遞(super.messageReceived->ctx.sendUpStream())。在這裏咱們重點關注方法serverTransport.dispatchRequest(httpRequest,channel)。咱們繼續往下探究: cors
protected void dispatchRequest(RestRequest request, RestChannel channel) { httpServerAdapter.dispatchRequest(request, channel, threadPool.getThreadContext()); }
public interface HttpServerAdapter { void dispatchRequest(RestRequest request, RestChannel channel, ThreadContext context); }
繼續探究HttpServerAdapter的實現類,在這裏咱們經過idea的Hierarchy能夠看到只有HttpServer實現了該接口,進入HttpServer類elasticsearch
public void dispatchRequest(RestRequest request, RestChannel channel, ThreadContext threadContext) {
if (request.rawPath().equals("/favicon.ico")) {
handleFavicon(request, channel);
return;
}
RestChannel responseChannel = channel;
try {
int contentLength = request.content().length();
if (restController.canTripCircuitBreaker(request)) {
inFlightRequestsBreaker(circuitBreakerService).addEstimateBytesAndMaybeBreak(contentLength, "<http_request>");
} else {
inFlightRequestsBreaker(circuitBreakerService).addWithoutBreaking(contentLength);
}
// iff we could reserve bytes for the request we need to send the response also over this channel
responseChannel = new ResourceHandlingHttpChannel(channel, circuitBreakerService, contentLength);
restController.dispatchRequest(request, responseChannel, client, threadContext);
} catch (Exception e) {
try {
responseChannel.sendResponse(new BytesRestResponse(channel, e));
} catch (Exception inner) {
inner.addSuppressed(e);
logger.error((Supplier<?>) () ->
new ParameterizedMessage("failed to send failure response for uri [{}]", request.uri()), inner);
}
}
}
上述爲HttpServer裏對dispatchRequest的具體實現方法,一樣咱們重點關注try裏的最後一行代碼restController.dispatchRequest(request,responseChannel,client,threadContext)。繼續探究:ide
public void dispatchRequest(final RestRequest request, final RestChannel channel, final NodeClient client, ThreadContext threadContext) throws Exception { if (!checkRequestParameters(request, channel)) { return; } try (ThreadContext.StoredContext ignored = threadContext.stashContext()) { for (String key : headersToCopy) { String httpHeader = request.header(key); if (httpHeader != null) { threadContext.putHeader(key, httpHeader); } } final RestHandler handler = getHandler(request); if (handler == null) { if (request.method() == RestRequest.Method.OPTIONS) { // when we have OPTIONS request, simply send OK by default (with the Access Control Origin header which gets automatically added) channel.sendResponse(new BytesRestResponse(OK, BytesRestResponse.TEXT_CONTENT_TYPE, BytesArray.EMPTY)); } else { final String msg = "No handler found for uri [" + request.uri() + "] and method [" + request.method() + "]"; channel.sendResponse(new BytesRestResponse(BAD_REQUEST, msg)); } } else { final RestHandler wrappedHandler = Objects.requireNonNull(handlerWrapper.apply(handler)); wrappedHandler.handleRequest(request, channel, client); } } }
上述爲RestController裏的dispatchRequest方法,handler是對每一類request都有相關的處理類,咱們按正常流程走即有相關的處理方法,則咱們進入else的分支,在這個分支裏開始處理request請求,繼續往下探究:ui
/** * Handler for REST requests */ public interface RestHandler { /** * Handles a rest request. * * @param request The request to handle * @param channel The channel to write the request response to * @param client A client to use to make internal requests on behalf of the original request */ void handleRequest(RestRequest request, RestChannel channel, NodeClient client) throws Exception; default boolean canTripCircuitBreaker() { return true; } }
上述爲RestHandler的接口定義,上述的註釋清楚的解釋了該接口是爲了處理rest請求。該接口有兩個實現類,分別爲BaseRestHandler和DeprecationRestHandler,其中this
BaseRestHandler是咱們暫時要特別關注的一個抽象類。上圖爲該類的相關實現類(數量過多,未能截全)咱們能夠發現該類爲模板類,它的實現類定義了不一樣操做請求對應的不一樣處理方式。idea
@Override
public final void handleRequest(RestRequest request, RestChannel channel, NodeClient client) throws Exception {
// prepare the request for execution; has the side effect of touching the request parameters
final RestChannelConsumer action = prepareRequest(request, client);
// validate unconsumed params, but we must exclude params used to format the response
// use a sorted set so the unconsumed parameters appear in a reliable sorted order
final SortedSet<String> unconsumedParams =
request.unconsumedParams().stream().filter(p -> !responseParams().contains(p)).collect(Collectors.toCollection(TreeSet::new));
// validate the non-response params
if (!unconsumedParams.isEmpty()) {
final Set<String> candidateParams = new HashSet<>();
candidateParams.addAll(request.consumedParams());
candidateParams.addAll(responseParams());
throw new IllegalArgumentException(unrecognized(request, unconsumedParams, candidateParams, "parameter"));
}
// execute the action
action.accept(channel);
}
上面是BaseRestHandler類裏的handlerRequest方法。一樣在最後一行以前的diamante是對request的一些相關信息進行驗證,核心依然是在最後一行代碼action.accept(channel)。繼續探究:spa
/**
* REST requests are handled by preparing a channel consumer that represents the execution of
* the request against a channel.
*/
@FunctionalInterface
protected interface RestChannelConsumer {
/**
* Executes a request against the given channel.
*
* @param channel the channel for sending the response
* @throws Exception if an exception occurred executing the request
*/
void accept(RestChannel channel) throws Exception;
}
一樣,RestChannelConsumer 是一個模本接口。因爲我一直對elasticsearch的index過程好奇。所以咱們在這裏選擇RestIndexAction做爲下一層的探究類。3d
@Override public RestChannelConsumer prepareRequest(final RestRequest request, final NodeClient client) throws IOException { IndexRequest indexRequest = new IndexRequest(request.param("index"), request.param("type"), request.param("id")); indexRequest.routing(request.param("routing")); indexRequest.parent(request.param("parent")); // order is important, set it after routing, so it will set the routing if (request.hasParam("timestamp")) { deprecationLogger.deprecated("The [timestamp] parameter of index requests is deprecated"); } indexRequest.timestamp(request.param("timestamp")); if (request.hasParam("ttl")) { deprecationLogger.deprecated("The [ttl] parameter of index requests is deprecated"); indexRequest.ttl(request.param("ttl")); } indexRequest.setPipeline(request.param("pipeline")); indexRequest.source(request.content()); indexRequest.timeout(request.paramAsTime("timeout", IndexRequest.DEFAULT_TIMEOUT)); indexRequest.setRefreshPolicy(request.param("refresh")); indexRequest.version(RestActions.parseVersion(request)); indexRequest.versionType(VersionType.fromString(request.param("version_type"), indexRequest.versionType())); String sOpType = request.param("op_type"); String waitForActiveShards = request.param("wait_for_active_shards"); if (waitForActiveShards != null) { indexRequest.waitForActiveShards(ActiveShardCount.parseString(waitForActiveShards)); } if (sOpType != null) { indexRequest.opType(IndexRequest.OpType.fromString(sOpType)); } return channel -> client.index(indexRequest, new RestStatusToXContentListener<>(channel, r -> { try { return r.getLocation(indexRequest.routing()); } catch (URISyntaxException ex) { logger.warn("Location string is not a valid URI.", ex); return null; } })); }
經過閱讀該方法,咱們能夠看到將RestRequest轉換爲IndexRequest,最終經過nodeClient來處理index操做。
TransportClient
一樣,咱們對TransportClient.get方法進行追蹤分析,可是此次咱們是經過正向來進行追蹤,發現是經過其代理類TransportProxyClient來進行執行的
@Override protected <Request extends ActionRequest, Response extends ActionResponse, RequestBuilder extends ActionRequestBuilder<Request, Response, RequestBuilder>> void doExecute(Action<Request, Response, RequestBuilder> action, Request request, ActionListener<Response> listener) { proxy.execute(action, request, listener); }
final class TransportProxyClient { private final TransportClientNodesService nodesService; private final Map<Action, TransportActionNodeProxy> proxies; TransportProxyClient(Settings settings, TransportService transportService, TransportClientNodesService nodesService, List<GenericAction> actions) { this.nodesService = nodesService; Map<Action, TransportActionNodeProxy> proxies = new HashMap<>(); for (GenericAction action : actions) { if (action instanceof Action) { proxies.put((Action) action, new TransportActionNodeProxy(settings, action, transportService)); } } this.proxies = unmodifiableMap(proxies); } public <Request extends ActionRequest, Response extends ActionResponse, RequestBuilder extends ActionRequestBuilder<Request, Response, RequestBuilder>> void execute(final Action<Request, Response, RequestBuilder> action, final Request request, ActionListener<Response> listener) { final TransportActionNodeProxy<Request, Response> proxy = proxies.get(action); nodesService.execute((n, l) -> proxy.execute(n, request, l), listener); } }