一. elasticsearch分析 - HttpServerTransport和Transport服務端請求處理分析

        elasticsearch提供了兩種方式TcpTransport和HttpServerTransport方便用戶進行各類操做(增,刪,改,查等)。node

Netty3HttpServerTransport

public ChannelPipeline getPipeline() throws Exception {
    ChannelPipeline pipeline = Channels.pipeline();
    pipeline.addLast("openChannels", transport.serverOpenChannels);
    HttpRequestDecoder requestDecoder = new HttpRequestDecoder(
        (int) transport.maxInitialLineLength.getBytes(),
        (int) transport.maxHeaderSize.getBytes(),
        (int) transport.maxChunkSize.getBytes()
    );
    if (transport.maxCumulationBufferCapacity.getBytes() >= 0) {
        if (transport.maxCumulationBufferCapacity.getBytes() > Integer.MAX_VALUE) {
            requestDecoder.setMaxCumulationBufferCapacity(Integer.MAX_VALUE);
        } else {
            requestDecoder.setMaxCumulationBufferCapacity((int) transport.maxCumulationBufferCapacity.getBytes());
        }
    }
    if (transport.maxCompositeBufferComponents != -1) {
        requestDecoder.setMaxCumulationBufferComponents(transport.maxCompositeBufferComponents);
    }
    pipeline.addLast("decoder", requestDecoder);
    pipeline.addLast("decoder_compress", new HttpContentDecompressor());
    HttpChunkAggregator httpChunkAggregator = new HttpChunkAggregator((int) transport.maxContentLength.getBytes());
    if (transport.maxCompositeBufferComponents != -1) {
        httpChunkAggregator.setMaxCumulationBufferComponents(transport.maxCompositeBufferComponents);
    }
    pipeline.addLast("aggregator", httpChunkAggregator);
    pipeline.addLast("encoder", new ESNetty3HttpResponseEncoder());
    if (transport.compression) {
        pipeline.addLast("encoder_compress", new HttpContentCompressor(transport.compressionLevel));
    }
    if (SETTING_CORS_ENABLED.get(transport.settings())) {
        pipeline.addLast("cors", new Netty3CorsHandler(transport.getCorsConfig()));
    }
    if (transport.pipelining) {
        pipeline.addLast("pipelining", new HttpPipeliningHandler(transport.pipeliningMaxEvents));
    }
    pipeline.addLast("handler", requestHandler);
    return pipeline;
}

         上面是Netty3HttpServerTransport服務端pipline的配置代碼,咱們重點關注倒數第二行代碼pipline.addLast("handler",requestHandler)。這行代碼是HttpServerTransport中對request的處理方式,從requestHandler咱們也能夠大體推測其意義。app

public class Netty3HttpRequestHandler extends SimpleChannelUpstreamHandler {

    private final Netty3HttpServerTransport serverTransport;
    private final boolean httpPipeliningEnabled;
    private final boolean detailedErrorsEnabled;
    private final ThreadContext threadContext;

    public Netty3HttpRequestHandler(Netty3HttpServerTransport serverTransport, boolean detailedErrorsEnabled, ThreadContext threadContext) {
        this.serverTransport = serverTransport;
        this.httpPipeliningEnabled = serverTransport.pipelining;
        this.detailedErrorsEnabled = detailedErrorsEnabled;
        this.threadContext = threadContext;
    }

    @Override
    public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
        HttpRequest request;
        OrderedUpstreamMessageEvent oue = null;
        if (this.httpPipeliningEnabled && e instanceof OrderedUpstreamMessageEvent) {
            oue = (OrderedUpstreamMessageEvent) e;
            request = (HttpRequest) oue.getMessage();
        } else {
            request = (HttpRequest) e.getMessage();
        }

        // the netty HTTP handling always copy over the buffer to its own buffer, either in NioWorker internally
        // when reading, or using a cumulation buffer
        Netty3HttpRequest httpRequest = new Netty3HttpRequest(serverTransport.xContentRegistry, request, e.getChannel());
        Netty3HttpChannel channel = new Netty3HttpChannel(serverTransport, httpRequest, oue, detailedErrorsEnabled, threadContext);
        serverTransport.dispatchRequest(httpRequest, channel);
        super.messageReceived(ctx, e);
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) throws Exception {
        Netty3Utils.maybeDie(e.getCause());
        serverTransport.exceptionCaught(ctx, e);
    }
}

        上面是requestHandler的具體實現類,咱們知道messageReceived是對收到的請求的處理方法。邏輯比較簡單,將請求從新封裝爲Netty3HttpRequest,並經過相關的channel進行處理,而後將請求進行分發(dispatchRequest),最後將其繼續往下傳遞(super.messageReceived->ctx.sendUpStream())。在這裏咱們重點關注方法serverTransport.dispatchRequest(httpRequest,channel)。咱們繼續往下探究:    cors

protected void dispatchRequest(RestRequest request, RestChannel channel) {
    httpServerAdapter.dispatchRequest(request, channel, threadPool.getThreadContext());
}
public interface HttpServerAdapter {

    void dispatchRequest(RestRequest request, RestChannel channel, ThreadContext context);

}

        繼續探究HttpServerAdapter的實現類,在這裏咱們經過idea的Hierarchy能夠看到只有HttpServer實現了該接口,進入HttpServer類elasticsearch

public void dispatchRequest(RestRequest request, RestChannel channel, ThreadContext threadContext) {
    if (request.rawPath().equals("/favicon.ico")) {
        handleFavicon(request, channel);
        return;
    }
    RestChannel responseChannel = channel;
    try {
        int contentLength = request.content().length();
        if (restController.canTripCircuitBreaker(request)) {
            inFlightRequestsBreaker(circuitBreakerService).addEstimateBytesAndMaybeBreak(contentLength, "<http_request>");
        } else {
            inFlightRequestsBreaker(circuitBreakerService).addWithoutBreaking(contentLength);
        }
        // iff we could reserve bytes for the request we need to send the response also over this channel
        responseChannel = new ResourceHandlingHttpChannel(channel, circuitBreakerService, contentLength);
        restController.dispatchRequest(request, responseChannel, client, threadContext);
    } catch (Exception e) {
        try {
            responseChannel.sendResponse(new BytesRestResponse(channel, e));
        } catch (Exception inner) {
            inner.addSuppressed(e);
            logger.error((Supplier<?>) () ->
                new ParameterizedMessage("failed to send failure response for uri [{}]", request.uri()), inner);
        }
    }
}

        上述爲HttpServer裏對dispatchRequest的具體實現方法,一樣咱們重點關注try裏的最後一行代碼restController.dispatchRequest(request,responseChannel,client,threadContext)。繼續探究:ide

public void dispatchRequest(final RestRequest request, final RestChannel channel, final NodeClient client, ThreadContext threadContext) throws Exception {
    if (!checkRequestParameters(request, channel)) {
        return;
    }
    try (ThreadContext.StoredContext ignored = threadContext.stashContext()) {
        for (String key : headersToCopy) {
            String httpHeader = request.header(key);
            if (httpHeader != null) {
                threadContext.putHeader(key, httpHeader);
            }
        }

        final RestHandler handler = getHandler(request);

        if (handler == null) {
            if (request.method() == RestRequest.Method.OPTIONS) {
                // when we have OPTIONS request, simply send OK by default (with the Access Control Origin header which gets automatically added)
                channel.sendResponse(new BytesRestResponse(OK, BytesRestResponse.TEXT_CONTENT_TYPE, BytesArray.EMPTY));
            } else {
                final String msg = "No handler found for uri [" + request.uri() + "] and method [" + request.method() + "]";
                channel.sendResponse(new BytesRestResponse(BAD_REQUEST, msg));
            }
        } else {
            final RestHandler wrappedHandler = Objects.requireNonNull(handlerWrapper.apply(handler));
            wrappedHandler.handleRequest(request, channel, client);
        }
    }
}

       上述爲RestController裏的dispatchRequest方法,handler是對每一類request都有相關的處理類,咱們按正常流程走即有相關的處理方法,則咱們進入else的分支,在這個分支裏開始處理request請求,繼續往下探究:ui

/**
 * Handler for REST requests
 */
public interface RestHandler {

    /**
     * Handles a rest request.
     *
     * @param request The request to handle
     * @param channel The channel to write the request response to
     * @param client A client to use to make internal requests on behalf of the original request
     */
    void handleRequest(RestRequest request, RestChannel channel, NodeClient client) throws Exception;

    default boolean canTripCircuitBreaker() {
        return true;
    }
}

       上述爲RestHandler的接口定義,上述的註釋清楚的解釋了該接口是爲了處理rest請求。該接口有兩個實現類,分別爲BaseRestHandler和DeprecationRestHandler,其中this

BaseRestHandler是咱們暫時要特別關注的一個抽象類。上圖爲該類的相關實現類(數量過多,未能截全)咱們能夠發現該類爲模板類,它的實現類定義了不一樣操做請求對應的不一樣處理方式。idea

@Override
public final void handleRequest(RestRequest request, RestChannel channel, NodeClient client) throws Exception {
    // prepare the request for execution; has the side effect of touching the request parameters
    final RestChannelConsumer action = prepareRequest(request, client);

    // validate unconsumed params, but we must exclude params used to format the response
    // use a sorted set so the unconsumed parameters appear in a reliable sorted order
    final SortedSet<String> unconsumedParams =
        request.unconsumedParams().stream().filter(p -> !responseParams().contains(p)).collect(Collectors.toCollection(TreeSet::new));

    // validate the non-response params
    if (!unconsumedParams.isEmpty()) {
        final Set<String> candidateParams = new HashSet<>();
        candidateParams.addAll(request.consumedParams());
        candidateParams.addAll(responseParams());
        throw new IllegalArgumentException(unrecognized(request, unconsumedParams, candidateParams, "parameter"));
    }

    // execute the action
    action.accept(channel);
}

 

       上面是BaseRestHandler類裏的handlerRequest方法。一樣在最後一行以前的diamante是對request的一些相關信息進行驗證,核心依然是在最後一行代碼action.accept(channel)。繼續探究:spa

/**
 * REST requests are handled by preparing a channel consumer that represents the execution of
 * the request against a channel.
 */
@FunctionalInterface
protected interface RestChannelConsumer {
    /**
     * Executes a request against the given channel.
     *
     * @param channel the channel for sending the response
     * @throws Exception if an exception occurred executing the request
     */
    void accept(RestChannel channel) throws Exception;
}

        

        一樣,RestChannelConsumer 是一個模本接口。因爲我一直對elasticsearch的index過程好奇。所以咱們在這裏選擇RestIndexAction做爲下一層的探究類。3d

@Override
public RestChannelConsumer prepareRequest(final RestRequest request, final NodeClient client) throws IOException {
    IndexRequest indexRequest = new IndexRequest(request.param("index"), request.param("type"), request.param("id"));
    indexRequest.routing(request.param("routing"));
    indexRequest.parent(request.param("parent")); // order is important, set it after routing, so it will set the routing
    if (request.hasParam("timestamp")) {
        deprecationLogger.deprecated("The [timestamp] parameter of index requests is deprecated");
    }
    indexRequest.timestamp(request.param("timestamp"));
    if (request.hasParam("ttl")) {
        deprecationLogger.deprecated("The [ttl] parameter of index requests is deprecated");
        indexRequest.ttl(request.param("ttl"));
    }
    indexRequest.setPipeline(request.param("pipeline"));
    indexRequest.source(request.content());
    indexRequest.timeout(request.paramAsTime("timeout", IndexRequest.DEFAULT_TIMEOUT));
    indexRequest.setRefreshPolicy(request.param("refresh"));
    indexRequest.version(RestActions.parseVersion(request));
    indexRequest.versionType(VersionType.fromString(request.param("version_type"), indexRequest.versionType()));
    String sOpType = request.param("op_type");
    String waitForActiveShards = request.param("wait_for_active_shards");
    if (waitForActiveShards != null) {
        indexRequest.waitForActiveShards(ActiveShardCount.parseString(waitForActiveShards));
    }
    if (sOpType != null) {
        indexRequest.opType(IndexRequest.OpType.fromString(sOpType));
    }

    return channel ->
        client.index(indexRequest, new RestStatusToXContentListener<>(channel, r -> {
            try {
                return r.getLocation(indexRequest.routing());
            } catch (URISyntaxException ex) {
                logger.warn("Location string is not a valid URI.", ex);
                return null;
            }
        }));
}

        經過閱讀該方法,咱們能夠看到將RestRequest轉換爲IndexRequest,最終經過nodeClient來處理index操做。

TransportClient

        一樣,咱們對TransportClient.get方法進行追蹤分析,可是此次咱們是經過正向來進行追蹤,發現是經過其代理類TransportProxyClient來進行執行的

@Override
protected <Request extends ActionRequest, Response extends ActionResponse, RequestBuilder extends ActionRequestBuilder<Request, Response, RequestBuilder>> void doExecute(Action<Request, Response, RequestBuilder> action, Request request, ActionListener<Response> listener) {
    proxy.execute(action, request, listener);
}
final class TransportProxyClient {

    private final TransportClientNodesService nodesService;
    private final Map<Action, TransportActionNodeProxy> proxies;

    TransportProxyClient(Settings settings, TransportService transportService,
                                TransportClientNodesService nodesService, List<GenericAction> actions) {
        this.nodesService = nodesService;
        Map<Action, TransportActionNodeProxy> proxies = new HashMap<>();
        for (GenericAction action : actions) {
            if (action instanceof Action) {
                proxies.put((Action) action, new TransportActionNodeProxy(settings, action, transportService));
            }
        }
        this.proxies = unmodifiableMap(proxies);
    }

    public <Request extends ActionRequest, Response extends ActionResponse, RequestBuilder extends
        ActionRequestBuilder<Request, Response, RequestBuilder>> void execute(final Action<Request, Response, RequestBuilder> action,
                                                                              final Request request, ActionListener<Response> listener) {
        final TransportActionNodeProxy<Request, Response> proxy = proxies.get(action);
        nodesService.execute((n, l) -> proxy.execute(n, request, l), listener);
    }
}
相關文章
相關標籤/搜索