這裏所指的分流插件很是相似於nginx的流量轉發功能,或者叫反向代理。css
儘管nginx的流量轉發功能也很強大,但業務上的一些變化有可能出現會讓nginx的配置繁多,疲於應付,好比:某款APP隨業務發展演化出衆多業務線:酒店業務線,機票業務線,餐飲業務線,本地出行業務線。這些業務線的背後每每是不一樣的部門,不一樣的技術團隊組成,所以會提供不一樣的服務供APP對接,若是每增長一個新服務都須要去nginx作配置,nginx的配置會隨着業務的發展時間的推移變得沉重難以維護。所以咱們能夠將nginx定義爲流量性的網關,與具體的業務無關,只負責基於域名的請求轉發,然後端業務線的不一樣服務的不一樣由業務網關負責。java
另外就是對服務進行邏輯分組的需求,好比將一些重要的請求劃分到一組服務器,這組服務器是高性能的;其它邊緣業務的請求劃分的另一組服務器,這組服務器配置相對低。再好比咱們的灰度環境,均須要基本必定的規則來分配流量。nginx
捕獲WebFlux的流量,而後後臺將請求轉發到相應服務,最後將響應的結果返回給客戶端。git
實現自定義的WebHandler,github
@Override public Mono<Void> handle(final ServerWebExchange exchange) { return new DefaultDiabloPluginChain(plugins).execute(exchange); }
實現一個插件職責鏈,調用不一樣的插件最終返回響應結果。web
private static class DefaultDiabloPluginChain implements DiabloPluginChain { private int index; private final List<DiabloPlugin> plugins; DefaultDiabloPluginChain(final List<DiabloPlugin> plugins) { this.plugins = plugins; } @Override public Mono<Void> execute(final ServerWebExchange exchange) { if (this.index < plugins.size()) { DiabloPlugin plugin = plugins.get(this.index++); try { return plugin.execute(exchange, this); } catch (Exception ex) { log.error("DefaultDiabloPluginChain.execute, traceId: {}, uri: {}, error:{}", exchange.getAttribute(Constants.CLIENT_RESPONSE_TRACE_ID), exchange.getRequest().getURI().getPath(), Throwables.getStackTraceAsString(ex)); throw ex; } } else { return Mono.empty(); // complete } } }
public class DividePlugin extends AbstractDiabloPlugin { private final UpstreamCacheManager upstreamCacheManager; private final WebClient webClient; public DividePlugin(final LocalCacheManager localCacheManager, final UpstreamCacheManager upstreamCacheManager, final WebClient webClient) { super(localCacheManager); this.upstreamCacheManager = upstreamCacheManager; this.webClient = webClient; } @Override protected Mono<Void> doExecute(final ServerWebExchange exchange, final DiabloPluginChain chain, final SelectorData selector, final RuleData rule) { final RequestDTO requestDTO = exchange.getAttribute(Constants.REQUESTDTO); final String traceId = exchange.getAttribute(Constants.CLIENT_RESPONSE_TRACE_ID); final DivideRuleHandle ruleHandle = GsonUtils.getInstance().fromJson(rule.getHandle(), DivideRuleHandle.class); String ruleId = rule.getId(); final List<DivideUpstream> upstreamList = upstreamCacheManager.findUpstreamListByRuleId(ruleId); if (CollectionUtils.isEmpty(upstreamList)) { log.warn("DividePlugin.doExecute upstreamList is empty, traceId: {}, uri: {}, ruleName:{}", traceId, exchange.getRequest().getURI().getPath(), rule.getName()); exchange.getResponse().setStatusCode(HttpStatus.SERVICE_UNAVAILABLE); return chain.execute(exchange); } final String ip = Objects.requireNonNull(exchange.getRequest().getRemoteAddress()).getAddress().getHostAddress(); DivideUpstream divideUpstream = LoadBalanceUtils.selector(upstreamList, ruleHandle.getLoadBalance(), ip); if (Objects.isNull(divideUpstream)) { log.warn("DividePlugin.doExecute divideUpstream is empty, traceId: {}, uri: {}, loadBalance:{}, ruleName:{}, upstreamSize: {}", traceId, exchange.getRequest().getURI().getPath(), ruleHandle.getLoadBalance(), rule.getName(), upstreamList.size()); exchange.getResponse().setStatusCode(HttpStatus.SERVICE_UNAVAILABLE); return chain.execute(exchange); } if (exchange.getAttributeOrDefault(Constants.GATEWAY_ALREADY_ROUTED_ATTR, false)) { log.warn("DividePlugin.doExecute alread routed, traceId: {}, uri: {}, ruleName:{}", traceId, exchange.getRequest().getURI().getPath(), rule.getName()); exchange.getResponse().setStatusCode(HttpStatus.FORBIDDEN); return chain.execute(exchange); } exchange.getAttributes().put(Constants.GATEWAY_ALREADY_ROUTED_ATTR, true); exchange.getAttributes().put(Constants.GATEWAY_CONTEXT_UPSTREAM_HOST, divideUpstream.getUpstreamHost()); exchange.getAttributes().put(Constants.GATEWAY_CONTEXT_RULE_ID, ruleId); HttpCommand command = new HttpCommand(exchange, chain, requestDTO, divideUpstream, webClient, ruleHandle.getTimeout()); return command.doHttpInvoke(); } public SelectorData filterSelector(final List<SelectorData> selectors, final ServerWebExchange exchange) { return selectors.stream() .filter(selector -> selector.getEnabled() && filterCustomSelector(selector, exchange)) .findFirst().orElse(null); } private Boolean filterCustomSelector(final SelectorData selector, final ServerWebExchange exchange) { if (selector.getType() == SelectorTypeEnum.CUSTOM_FLOW.getCode()) { List<ConditionData> conditionList = selector.getConditionList(); if (CollectionUtils.isEmpty(conditionList)) { return false; } // 後臺初始定義爲host且表達式爲 = if (MatchModeEnum.AND.getCode() == selector.getMatchMode()) { ConditionData conditionData = conditionList.get(0); return Objects.equals(exchange.getRequest().getHeaders().getFirst("Host"), conditionData.getParamValue().trim()); } else { return conditionList.stream().anyMatch(c -> Objects.equals(exchange.getRequest().getHeaders().getFirst("Host"), c.getParamValue().trim())); } } return true; } @Override public String named() { return PluginEnum.DIVIDE.getName(); } @Override public Boolean skip(final ServerWebExchange exchange) { final RequestDTO body = exchange.getAttribute(Constants.REQUESTDTO); return !Objects.equals(Objects.requireNonNull(body).getRpcType(), RpcTypeEnum.HTTP.getName()); } @Override public PluginTypeEnum pluginType() { return PluginTypeEnum.FUNCTION; } @Override public int getOrder() { return PluginEnum.DIVIDE.getCode(); } }
將http請求轉發出去,主要依賴WebClient,它提供了響應式接口。具體的操做封裝在HttpCommand工具類中,核心代碼以下:後端
public Mono<Void> doHttpInvoke() { URI uri = buildRealURL(divideUpstream, exchange); traceId = exchange.getAttribute(Constants.CLIENT_RESPONSE_TRACE_ID); if (uri == null) { log.warn("HttpCommand.doNext real url is null, traceId: {}, uri: {}", traceId, exchange.getRequest().getURI().getPath()); exchange.getResponse().setStatusCode(HttpStatus.SERVICE_UNAVAILABLE); return chain.execute(exchange).then(Mono.defer(() -> Mono.empty())); } // 後續有時間再加 todo 沒有清除掉 // IssRpcContext.commitParams(IssRpcContextParamKey.TRACE_ID, traceId); if (requestDTO.getHttpMethod().equals(HttpMethodEnum.GET.getName())) { return webClient.get().uri(f -> uri) .headers(httpHeaders -> { httpHeaders.add(Constants.TRACE_ID, traceId); httpHeaders.addAll(exchange.getRequest().getHeaders()); }) .exchange() // 默認doOnError異常會傳遞 .doOnError(e -> log.error("HttpCommand.doHttpInvoke Failed to webClient get execute, traceId: {}, uri: {}, cause:{}", traceId, uri, Throwables.getStackTraceAsString(e))) .timeout(Duration.ofMillis(timeout)) .flatMap(this::doNext); } else if (requestDTO.getHttpMethod().equals(HttpMethodEnum.POST.getName())) { return webClient.post().uri(f -> uri) .headers(httpHeaders -> { httpHeaders.add(Constants.TRACE_ID, traceId); httpHeaders.addAll(exchange.getRequest().getHeaders()); }) .contentType(buildMediaType()) .body(BodyInserters.fromDataBuffers(exchange.getRequest().getBody())) .exchange() .doOnError(e -> log.error("HttpCommand.doHttpInvoke Failed to webClient post execute, traceId: {}, uri: {}, cause:{}", traceId, uri, Throwables.getStackTraceAsString(e))) .timeout(Duration.ofMillis(timeout)) .flatMap(this::doNext); } else if (requestDTO.getHttpMethod().equals(HttpMethodEnum.OPTIONS.getName())) { return webClient.options().uri(f -> uri) .headers(httpHeaders -> { httpHeaders.add(Constants.TRACE_ID, traceId); httpHeaders.addAll(exchange.getRequest().getHeaders()); }) .exchange() .doOnError(e -> log.error("HttpCommand.doHttpInvoke Failed to webClient options execute, traceId: {}, uri: {}, cause:{}", traceId, uri, Throwables.getStackTraceAsString(e))) .timeout(Duration.ofMillis(timeout)) .flatMap(this::doNext); } else if (requestDTO.getHttpMethod().equals(HttpMethodEnum.HEAD.getName())) { return webClient.head().uri(f -> uri) .headers(httpHeaders -> { httpHeaders.add(Constants.TRACE_ID, traceId); httpHeaders.addAll(exchange.getRequest().getHeaders()); }) .exchange() .doOnError(e -> log.error("HttpCommand.doHttpInvoke Failed to webClient head execute, traceId: {}, uri: {}, cause:{}", traceId, uri, Throwables.getStackTraceAsString(e))) .timeout(Duration.ofMillis(timeout)) .flatMap(this::doNext); } else if (requestDTO.getHttpMethod().equals(HttpMethodEnum.PUT.getName())) { return webClient.put().uri(f -> uri) .headers(httpHeaders -> { httpHeaders.add(Constants.TRACE_ID, traceId); httpHeaders.addAll(exchange.getRequest().getHeaders()); }) .contentType(buildMediaType()) .body(BodyInserters.fromDataBuffers(exchange.getRequest().getBody())) .exchange() .doOnError(e -> log.error("HttpCommand.doHttpInvoke Failed to webClient put execute, traceId: {}, uri: {}, cause:{}", traceId, uri, Throwables.getStackTraceAsString(e))) .timeout(Duration.ofMillis(timeout)) .flatMap(this::doNext); } else if (requestDTO.getHttpMethod().equals(HttpMethodEnum.DELETE.getName())) { return webClient.delete().uri(f -> uri) .headers(httpHeaders -> { httpHeaders.add(Constants.TRACE_ID, traceId); httpHeaders.addAll(exchange.getRequest().getHeaders()); }) .exchange() .doOnError(e -> log.error("HttpCommand.doHttpInvoke Failed to webClient delete execute, traceId: {}, uri: {}, cause:{}", traceId, uri, Throwables.getStackTraceAsString(e))) .timeout(Duration.ofMillis(timeout)) .flatMap(this::doNext); } else if (requestDTO.getHttpMethod().equals(HttpMethodEnum.PATCH.getName())) { return webClient.patch().uri(f -> uri) .headers(httpHeaders -> { httpHeaders.add(Constants.TRACE_ID, traceId); httpHeaders.addAll(exchange.getRequest().getHeaders()); }) .contentType(buildMediaType()) .body(BodyInserters.fromDataBuffers(exchange.getRequest().getBody())) .exchange() .doOnError(e -> log.error("HttpCommand.doHttpInvoke Failed to webClient patch execute, traceId: {}, uri: {}, cause:{}", traceId, uri, Throwables.getStackTraceAsString(e))) .timeout(Duration.ofMillis(timeout)) .flatMap(this::doNext); } log.warn("HttpCommand doHttpInvoke Waring no match doHttpInvoke end, traceId: {}, httpMethod: {}, uri: {}", traceId, requestDTO.getHttpMethod(), uri.getPath()); return Mono.empty(); }
以上內容基於業務網關的一個小模塊,詳細請看這裏:diablo在這裏服務器