WebFlux那點事兒:函數式編程

Mono<GatewayDslContext> dslContext = new Builder(context).build();
        return dslContext
                .filter(
                        dslCtx -> apiParamVerify(dslCtx, context.getApiConfig().getApiParamsConfigList())
                )
                .switchIfEmpty(Mono.defer(() -> Mono.error(PARAM_CONFIG_ERROR.ofEx())))
                .flatMap(
                        dslCtx -> Mono.fromCallable(
                                () -> doInvoke(context, dslCtx)
                        )
                )
                .switchIfEmpty(Mono.defer(() -> Mono.error(PIRATE_ERROR.ofEx())))
                .timeout(Duration.ofMillis(context.getApiConfig().getTimeoutInMilliSeconds()))
                .flatMap(GatewayInvokeHandler::createResponse)
                .publishOn(ExecutorsConfig.SYNC_SCHEDULER);
    }
@Override
    public Mono<ServerResponse> handle(ServerRequest request) {
        ApiConfig apiConfig = (ApiConfig) request.attributes().get(ApiRouter.BEST_MATCH_API_ROUTER_ATTRIBUTE_KEY);

        ApiGatewayReqContext apiGatewayReqContext = new ApiGatewayReqContext();
        apiGatewayReqContext.setServerRequest(request);
        apiGatewayReqContext.setApiConfig(apiConfig);

        GatewayFilterChain filterChain = getFilterChain();
        return filterChain.handleRequest(apiGatewayReqContext)
                .flatMap(r -> {
                    if (r.getResponse() != null && r.getResponse().isSuccess()) {
                        return ServerResponse.ok()
                                .contentType(
                                        apiConfig.getResponseConfig().getMediaType() == null
                                                ? MediaType.APPLICATION_JSON_UTF8
                                                : apiConfig.getResponseConfig().getMediaType())
                                .syncBody(r.getResponse().getData());
                    } else {
                        return ServerResponse.ok()
                                .contentType(MediaType.APPLICATION_JSON_UTF8)
                                .syncBody(
                                        new ApiGatewayResponse(ApiResponseCodeEnum.ROUTE_NOT_FOUND.getCode(), r.getResponse().getErrors().get(0).getMessage()));
                    }
                }).onErrorResume(this::processApiException);
    }
相關文章
相關標籤/搜索