Spring-Cloud-Gateway 從升級到放棄

 

1 爲何要升級爲spring-cloud-gateway?

Spring Cloud Gateway features:java

  • Built on Spring Framework 5, Project Reactor and Spring Boot 2.0react

  • Able to match routes on any request attribute.web

  • Predicates and filters are specific to routes.spring

  • Hystrix Circuit Breaker integration.編程

  • Spring Cloud DiscoveryClient integrationjson

  • Easy to write Predicates and Filtersbootstrap

  • Request Rate Limiting緩存

  • Path Rewriting併發

這是官方說的,spring gateway相對spring zuul要新不少,應用也更加自由,開發體驗更好。可是我在測試中發現,spring-cloud-gateway相對於zuul來講,更加出衆的仍是其性能,固然最後讓我放棄的也是由於這一點。app

網上的朋友也有作一些gateway和zuul的性能比較,大多的結論也是gateway要優於zuul,同時也更加穩定。

可是咱們不能輕信,因此我也作了測試,這部分測試內容若不感興趣能夠跳過,zuul就不測試了。

2.spring-cloud-gateway的初步測試

  step.1:測試準備:

    1.gateway版本:2.0.1

    2.服務主機:10.1.4.32,16G內存,4核虛擬機

    3.測試客戶端:10.1.4.34,16G內存,4核虛擬機

    4.測試工具wrk

  step.2:創建gateway工程並寫兩個測試http接口,

    1.http://10.1.4.32:14077/hello [POST]

    2.http://10.1.4.32:14077/test [GET]

  step.3:開始測試

  step.4:測試結果   

[wrk@localhost wrk]$ ./wrk  -t 15 -c500 -d 10 --latency  http://10.1.4.32:14077/test
Running 10s test @ http://10.1.4.32:14077/test
  15 threads and 500 connections
  Thread Stats   Avg      Stdev     Max   +/- Stdev
    Latency     3.38ms    3.26ms 113.45ms   95.76%
    Req/Sec    10.84k     1.44k   26.48k    89.50%
  Latency Distribution
     50%    2.79ms
     75%    3.51ms
     90%    4.21ms
     99%   17.23ms
  1625714 requests in 10.10s, 131.79MB read
Requests/sec: 160961.07
Transfer/sec:     13.05MB

以及:

[wrk@localhost wrk]$ ./wrk  -t 15 -c500 -d 10 --latency -s scripts/gateway.lua  http://10.1.4.32:14077/hello
Running 10s test @ http://10.1.4.32:14077/hello
  15 threads and 500 connections
  Thread Stats   Avg      Stdev     Max   +/- Stdev
    Latency     5.21ms    3.96ms 255.59ms   96.75%
    Req/Sec     6.62k   604.79    13.72k    88.48%
  Latency Distribution
     50%    4.78ms
     75%    5.55ms
     90%    6.32ms
     99%   14.87ms
  994374 requests in 10.10s, 539.59MB read
Requests/sec:  98471.14
Transfer/sec:     53.43MB

說明,若是測試結果差異較大多是由於測試工具的問題。

結果顯示,POST方法的性能TPS達到了10W/s,而GET方法的性能TPS達到了16W/s。

這看起來很難以想象,由於正常的微服務,能達到2W/s的性能已是良好,達到10W實在是難以想象。可是前面說了spring-cloud-gateway引入了Spring Reactor反應式編程,應對的即是這種高併發需求。

固然,即使spring-cloud-gateway給了咱們很大驚喜,可是若是所以就引入了spring-cloud-gateway,那仍是會有些草率,畢竟gateway是用來幹什麼的?是路由和過濾。繼續測試。

  step.5:加上路由和過濾器,在配置文件中加入下面內容

spring:
  cloud:
    gateway:
      routes:
      - id: test
        uri: http://10.1.4.32:14077/test
        predicates:
        - Path=/tt
        filters:
        - AddRequestParameter=foo, bar

表示,給test方法加入了路由,而且加入了官方提供的過濾器:AddRequestParameter=foo, bar

  step.6:測試,並附測試結果:

[wrk@localhost wrk]$ ./wrk  -t 15 -c500 -d 10 --latency  http://10.1.4.32:14077/tt
Running 10s test @ http://10.1.4.32:14077/tt
  15 threads and 500 connections
  Thread Stats   Avg      Stdev     Max   +/- Stdev
    Latency    18.99ms   12.15ms 122.69ms   70.84%
    Req/Sec     1.82k   155.77     2.36k    73.94%
  Latency Distribution
     50%   17.03ms
     75%   25.49ms
     90%   35.02ms
     99%   57.13ms
  274529 requests in 10.10s, 22.25MB read
Requests/sec:  27182.88
Transfer/sec:      2.20MB

性能只剩27000/s,貌似下降了不少,可是比起zuul仍然快了很多。由於在這臺機器上,測試zuul或許都不能到達2W。

那麼,是否是就應該使用spring-cloud-gateway了?

3.開始使用spring-cloud-gateway

在使用上spring-cluod-gateway以後,我開始編輯本身的過濾器,需求要求寫兩個過濾器,修改請求體和響應體。

由於須要對特定的請求使用過濾器,因此這裏使用gateway-filter,有些代碼官方有,有些網友提供,兩個過濾器代碼大體以下:

解密過濾器,pre:

package com.newland.dc.ctid.fileter;

import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import com.newland.dc.common.vo.RequestHeaderVo;
import com.newland.dc.ctid.entity.dto.RequestDto;
import io.netty.buffer.ByteBufAllocator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.cloud.gateway.filter.GatewayFilter;
import org.springframework.cloud.gateway.filter.GatewayFilterChain;
import org.springframework.cloud.gateway.filter.factory.AbstractGatewayFilterFactory;
import org.springframework.core.Ordered;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.core.io.buffer.DataBufferUtils;
import org.springframework.core.io.buffer.NettyDataBufferFactory;
import org.springframework.core.style.ToStringCreator;
import org.springframework.http.MediaType;
import org.springframework.http.server.reactive.ServerHttpRequest;
import org.springframework.http.server.reactive.ServerHttpRequestDecorator;
import org.springframework.stereotype.Component;
import org.springframework.web.server.ServerWebExchange;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

import java.nio.CharBuffer;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.atomic.AtomicReference;

/**
 * Created by garfield on 2019/2/26.
 */
@Component
public class DecryptGatewayFilterFactory extends AbstractGatewayFilterFactory<DecryptGatewayFilterFactory.Config>{

    private static Logger log = LoggerFactory.getLogger(DecryptGatewayFilterFactory.class);

    public static final String DECRYPT_HEADER = "decrypt_header";


    public DecryptGatewayFilterFactory() {
        super(Config.class);
    }


    private Gson gson = new GsonBuilder().serializeNulls().create();


    @Override
    @SuppressWarnings("unchecked")
    public GatewayFilter apply(Config config) {
        return new DecryptGatewayFilter(config);
    }

    public class DecryptGatewayFilter implements GatewayFilter, Ordered {
        Config config;

        DecryptGatewayFilter(Config config) {
            this.config = config;
        }
        @Override
        public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
                log.debug(config.toString());
                ServerHttpRequest request = exchange.getRequest();
                MediaType contentType = request.getHeaders().getContentType();

                boolean postRequest = "POST".equalsIgnoreCase(request.getMethodValue()) && !contentType.toString().contains("multipart/form-data");
                //判斷是否爲POST請求
                if (postRequest) {

                    Flux<DataBuffer> body = request.getBody();
                    AtomicReference<String> bodyRef = new AtomicReference<>();//緩存讀取的request body信息
                    body.subscribe(dataBuffer -> {
                        CharBuffer charBuffer = StandardCharsets.UTF_8.decode(dataBuffer.asByteBuffer());
                        DataBufferUtils.release(dataBuffer);
                        bodyRef.set(charBuffer.toString());
                    });//讀取request body到緩存
                    String bodyStr = bodyRef.get();//獲取request body
                    log.debug(bodyStr);//這裏是咱們須要作的操做
                    RequestDto requestDto = gson.fromJson(bodyStr, RequestDto.class);
                    log.debug("decrypt filter");
                    //save header to response header
                    RequestHeaderVo headerVo = requestDto.getHeader();
                    headerVo.setAppVersion("1000");
                    //此處能夠傳遞一些變量
                    exchange.getResponse().getHeaders().add(DECRYPT_HEADER, gson.toJson(headerVo));

                    DataBuffer bodyDataBuffer = stringBuffer(bodyStr);
                    Flux<DataBuffer> bodyFlux = Flux.just(bodyDataBuffer);

                    request = new ServerHttpRequestDecorator(request){
                        @Override
                        public Flux<DataBuffer> getBody() {
                            return bodyFlux;
                        }
                    };//封裝咱們的request
                }
                return chain.filter(exchange.mutate().request(request.mutate().header("a","200").build()).build());
            };

        @Override
        public int getOrder() {
            return -10;
        }
    }

    public static DataBuffer stringBuffer(String value) {
        byte[] bytes = value.getBytes(StandardCharsets.UTF_8);

        NettyDataBufferFactory nettyDataBufferFactory = new NettyDataBufferFactory(ByteBufAllocator.DEFAULT);
        DataBuffer buffer = nettyDataBufferFactory.allocateBuffer(bytes.length);
        buffer.write(bytes);
        return buffer;
    }


    @Override
    public ServerHttpRequest.Builder mutate(ServerHttpRequest request) {
        return null;
    }

    public static class Config {
        private boolean decrypt;


        public boolean isDecrypt() {
            return decrypt;
        }

        public void setDecrypt(boolean decrypt) {
            this.decrypt = decrypt;
        }

        @Override
        public String toString() {
            return new ToStringCreator(this)
                    .append("decrypt", decrypt)
                    .toString();
        }
    }


    @Override
    public List<String> shortcutFieldOrder() {
        return Arrays.asList("decrypt");
    }
}
View Code

加密過濾器,使用源碼的提供的修改方法,post:

package com.newland.dc.ctid.fileter;

import com.google.gson.Gson;
import com.newland.dc.common.vo.RequestHeaderVo;
import com.newland.dc.ctid.entity.dto.RequestDto;
import org.springframework.cloud.gateway.filter.GatewayFilter;
import org.springframework.cloud.gateway.filter.factory.AbstractGatewayFilterFactory;
import org.springframework.cloud.gateway.filter.factory.rewrite.ModifyResponseBodyGatewayFilterFactory;
import org.springframework.core.style.ToStringCreator;
import org.springframework.stereotype.Component;
import org.springframework.web.server.ServerWebExchange;
import reactor.core.publisher.Mono;

import java.util.Arrays;
import java.util.List;

/**
 * @Auther: garfield
 * @Date: 2019/3/5 15:33
 * @Description:
 */
@Component
public class AnGatewayFilterFactory extends AbstractGatewayFilterFactory<AnGatewayFilterFactory.Config> {


    private Gson gson = new Gson();

    public AnGatewayFilterFactory() {
        super(Config.class);
    }

    @Override
    public GatewayFilter apply(Config config) {

        ModifyResponseBodyGatewayFilterFactory m1 = new ModifyResponseBodyGatewayFilterFactory(null);
        ModifyResponseBodyGatewayFilterFactory.Config c1 = new ModifyResponseBodyGatewayFilterFactory.Config();
        c1.setInClass(String.class);
        c1.setOutClass(String.class);
        c1.setNewContentType("application/json");

        c1.setRewriteFunction((exchange, body) -> {
            ServerWebExchange ex = (ServerWebExchange) exchange;
            //此處更改響應體
            RequestHeaderVo requestHeaderVo = new RequestHeaderVo();
            RequestDto requestDto = gson.fromJson(body.toString(), RequestDto.class);
            requestDto.setHeader(requestHeaderVo);
            body = gson.toJson(requestDto);
            return Mono.just(body);
        });
        return m1.apply(c1);
    }

    public static class Config {
        private boolean decrypt;


        public boolean isDecrypt() {
            return decrypt;
        }

        public void setDecrypt(boolean decrypt) {
            this.decrypt = decrypt;
        }

        @Override
        public String toString() {
            return new ToStringCreator(this)
                    .append("encrypt", decrypt)
                    .toString();
        }
    }

    @Override
    public List<String> shortcutFieldOrder() {
        return Arrays.asList("encrypt");
    }
}
View Code

這裏須要轉移一下話題,這個過濾器修改其實有幾種方法,能夠本身寫,也能夠應用源碼提供的例子。上面的兩種寫法已經測試都能使用,其實我還有兩種方式,大同小異就是了,但也準備貼出來,也記錄一下問題:

下面這個其實就是源碼中的例子,只不過不引用,本身寫:

    @Override
        @SuppressWarnings("unchecked")
        public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {

            ServerHttpResponseDecorator responseDecorator = new ServerHttpResponseDecorator(exchange.getResponse()) {
                @Override
                public Mono<Void> writeWith(Publisher<? extends DataBuffer> body) {
                    ServerHttpRequest request = exchange.getRequest();

                    MediaType originalResponseContentType = exchange.getAttribute(ORIGINAL_RESPONSE_CONTENT_TYPE_ATTR);
                    HttpHeaders httpHeaders = new HttpHeaders();
                    httpHeaders.setContentType(originalResponseContentType);
                    ResponseAdapter responseAdapter = new ResponseAdapter(body, httpHeaders);
                    DefaultClientResponse clientResponse = new DefaultClientResponse(responseAdapter, ExchangeStrategies.withDefaults());

                    Mono<DataBuffer> modifiedBody = clientResponse.bodyToMono(DataBuffer.class).map(encrypt(config, new RequestHeaderVo()));

                    BodyInserter bodyInserter = BodyInserters.fromPublisher(modifiedBody, DataBuffer.class);
                    CachedBodyOutputMessage outputMessage = new CachedBodyOutputMessage(exchange, exchange.getResponse().getHeaders());
                    return bodyInserter.insert(outputMessage, new BodyInserterContext())
                            .then(Mono.defer(() -> {
                                Flux<DataBuffer> messageBody = outputMessage.getBody();
                                HttpHeaders headers = getDelegate().getHeaders();
                                if (headers.getContentLength() < 0 && !headers.containsKey(HttpHeaders.TRANSFER_ENCODING)) {
                                    messageBody = messageBody.doOnNext(data -> headers.setContentLength(data.readableByteCount()));
                                }
                                return this.getDelegate().writeWith(messageBody);
                            }));
                }

                @Override
                public Mono<Void> writeAndFlushWith(Publisher<? extends Publisher<? extends DataBuffer>> body) {
                    return writeWith(Flux.from(body)
                            .flatMapSequential(p -> p));
                }
            };
            return chain.filter(exchange.mutate().response(responseDecorator).build());

        }

        @Override
        public int getOrder() {
            return NettyWriteResponseFilter.WRITE_RESPONSE_FILTER_ORDER - 1;
        }

    }

    private Function<DataBuffer, DataBuffer> encrypt(Config config, RequestHeaderVo headerVo) {
        if (config.encrypt) {
            return (i) -> {
                InputStream inputStream = i.asInputStream();

                byte[] bytes = new byte[0];
                try {
                    bytes = new byte[inputStream.available()];


                    inputStream.read(bytes);
                } catch (IOException e) {
                    e.printStackTrace();
                }
                //進行咱們的操做
                String body = new String(bytes);
                log.debug("this is response encrypt");
                log.debug(body);
                log.debug(headerVo.toString());
//                body = encryptService.responseEncrypt(body, headerVo);

                //進行咱們的操做
                return i.write(TokenGatewayFilterFactory.stringBuffer(body));
//                return i.write(new String(body).getBytes());

            };


        } else {
            return i -> i;
        }

    }
View Code

這種例子中,發現修改response body的時候,會引發代碼進入NioEventLoop類中的run方法,死循環沒法退出,我也不清楚爲何,修改需謹慎。

另外一種,跟這位網友寫得差很少,只不過我沒測試就是了:https://www.jianshu.com/p/9f00e0e1681c

        @Override
        @SuppressWarnings("unchecked")
        public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
            return chain.filter(exchange.mutate().response(new ServerHttpResponseDecorator(exchange.getResponse()) {
                @Override
                public Mono<Void> writeWith(Publisher<? extends DataBuffer> body) {
                    DataBufferFactory bufferFactory = exchange.getResponse().bufferFactory();
                    if (getStatusCode().equals(HttpStatus.OK) && body instanceof Flux) {
                        Flux<? extends DataBuffer> fluxBody = Flux.first(body);
                        return super.writeWith(fluxBody.map(dataBuffer -> {
                            System.out.println(dataBuffer.readableByteCount());

                            byte[] content = new byte[dataBuffer.readableByteCount()];
                            dataBuffer.read(content);
                            //釋放掉內存
                            DataBufferUtils.release(dataBuffer);
                            //responseData就是下游系統返回的內容,能夠查看修改
                            String responseData = new String(content, Charset.forName("UTF-8"));

                            log.debug("響應內容:{}", responseData);
                            log.debug("this is response encrypt");
                            System.out.println(responseData);

                            byte[] newContent = responseData.getBytes();
//                body = encryptService.responseEncrypt(body, headerVo);
                            byte[] uppedContent = new String(newContent, Charset.forName("UTF-8")).getBytes();
                            return bufferFactory.wrap(uppedContent);
                        }));
                    } else {
                        log.error("響應code異常:{}", getStatusCode());
                    }
                    return super.writeWith(body);
                }
            }).build());
        }


        @Override
        public int getOrder() {
            return NettyWriteResponseFilter.WRITE_RESPONSE_FILTER_ORDER - 1;
        }
    }
View Code

這個方法會出現問題,body的截取長度常常沒有徹底。

我原本是到這個網址下面尋找答案,做者是這樣回覆的:

  上面只是簡單的樣例,FIux是發送多個數據的,當報文長時會拆分,處理一次只能拿到一部分報文,可使用Flux.toArray方法將數據聚合後處理,也能夠參照https://www.jianshu.com/p/9b781fb1aaa0裏面的響應處理。

確實是這個問題,因此咱們也能夠仿照他的另一個例子寫,你們能夠到他的簡書博客中去看,值得提醒的是,他的例子中,版本也是2.0.1,如果版本改成2.1以上,就不能用哦!

這裏蠻貼一下:

package com.newland.dc.ctid.fileter;

import com.google.gson.Gson;
import com.newland.dc.common.vo.RequestHeaderVo;
import com.newland.dc.ctid.service.SecurityService;
import com.newland.dc.log.kafka.KafkaLog;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.cloud.gateway.filter.GatewayFilter;
import org.springframework.cloud.gateway.filter.GatewayFilterChain;
import org.springframework.cloud.gateway.filter.NettyWriteResponseFilter;
import org.springframework.cloud.gateway.filter.factory.AbstractGatewayFilterFactory;
import org.springframework.cloud.gateway.support.*;
import org.springframework.core.Ordered;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.core.style.ToStringCreator;
import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpStatus;
import org.springframework.http.MediaType;
import org.springframework.http.ResponseCookie;
import org.springframework.http.client.reactive.ClientHttpResponse;
import org.springframework.http.server.reactive.ServerHttpRequest;
import org.springframework.http.server.reactive.ServerHttpRequestDecorator;
import org.springframework.http.server.reactive.ServerHttpResponse;
import org.springframework.http.server.reactive.ServerHttpResponseDecorator;
import org.springframework.stereotype.Component;
import org.springframework.util.MultiValueMap;
import org.springframework.web.reactive.function.client.ExchangeStrategies;
import org.springframework.web.reactive.function.server.ServerRequest;
import org.springframework.web.server.ServerWebExchange;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

import java.io.IOException;
import java.io.InputStream;
import java.util.*;
import java.util.function.BiFunction;
import java.util.function.Function;

import static org.springframework.cloud.gateway.support.ServerWebExchangeUtils.ORIGINAL_RESPONSE_CONTENT_TYPE_ATTR;


/**
 * @Auther: garfield
 * @Date: 2019/2/28 上午10:45
 * @Description:
 */
@Component
public class EncryptGatewayFilterFactory extends AbstractGatewayFilterFactory<EncryptGatewayFilterFactory.Config> {

    private static Logger log = LoggerFactory.getLogger(EncryptGatewayFilterFactory.class);


    @Autowired
    private SecurityService encryptService;

    public EncryptGatewayFilterFactory() {
        super(Config.class);
    }


    //    private Gson gson = new GsonBuilder().serializeNulls().create();
    private Gson gson = new Gson();

    @Value("${server.host:10.10.10.10}")
    private String serverHost;

    @Value("${server.port}")
    private String serverPort;


    @Override
    @SuppressWarnings("unchecked")
    public GatewayFilter apply(Config config) {
        return new EncryptGatewayFilter(config);
    }


    @Override
    public ServerHttpRequest.Builder mutate(ServerHttpRequest request) {
        return null;
    }

    public static class Config {

        private boolean encrypt;

        public boolean isEncrypt() {
            return encrypt;
        }

        public Config setEncrypt(boolean encrypt) {
            this.encrypt = encrypt;
            return this;
        }

        @Override
        public String toString() {
            return new ToStringCreator(this)
                    .append("encrypt", encrypt)
                    .toString();
        }
    }

    @Override
    public List<String> shortcutFieldOrder() {
        return Arrays.asList("encrypt");
    }

    public class EncryptGatewayFilter implements GatewayFilter, Ordered {
        Config config;

        EncryptGatewayFilter(Config config) {
            this.config = config;
        }

        @Override
        @SuppressWarnings("unchecked")
        public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
            String trace = exchange.getRequest().getHeaders().getFirst("trace");
            ServerRequest serverRequest = new DefaultServerRequest(exchange);
            return serverRequest.bodyToMono(String.class).flatMap(reqBody -> {
                //重寫原始請求
                ServerHttpRequestDecorator decorator = new ServerHttpRequestDecorator(exchange.getRequest()) {
                    @Override
                    public HttpHeaders getHeaders() {
                        HttpHeaders httpHeaders = new HttpHeaders();
                        httpHeaders.putAll(super.getHeaders());
                        return httpHeaders;
                    }

                    @Override
                    public Flux<DataBuffer> getBody() {
                        //打印原始請求日誌
                        log.info("[Trace:{}]-gateway request:headers=[{}],body=[{}]", trace, getHeaders(), reqBody);
                        return Flux.just(reqBody).map(bx -> exchange.getResponse().bufferFactory().wrap(bx.getBytes()));
                    }
                };
                //重寫原始響應
                BodyHandlerServerHttpResponseDecorator responseDecorator = new BodyHandlerServerHttpResponseDecorator(
                        initBodyHandler(exchange), exchange.getResponse());

                return chain.filter(exchange.mutate().request(decorator).response(responseDecorator).build());
            });
        }

        @Override
        public int getOrder() {
            return NettyWriteResponseFilter.WRITE_RESPONSE_FILTER_ORDER - 1;
        }

    }

    public interface BodyHandlerFunction
            extends BiFunction<ServerHttpResponse, Publisher<? extends DataBuffer>, Mono<Void>> {
    }

    protected BodyHandlerFunction initBodyHandler(ServerWebExchange exchange) {
        return (resp, body) -> {
            //攔截
            MediaType originalResponseContentType = exchange.getAttribute(ORIGINAL_RESPONSE_CONTENT_TYPE_ATTR);
            HttpHeaders httpHeaders = new HttpHeaders();
            httpHeaders.setContentType(originalResponseContentType);
            DefaultClientResponseAdapter clientResponseAdapter = new DefaultClientResponseAdapter(body, httpHeaders);
            Mono<String> bodyMono = clientResponseAdapter.bodyToMono(String.class);
            //此處能夠得到前面放置的參數
            return bodyMono.flatMap((respBody) -> {
//                打印返回響應日誌
                System.out.println(respBody);
                return resp.writeWith(Flux.just(respBody).map(bx -> resp.bufferFactory().wrap(bx.getBytes())));
            }).then();
        };
    }

 
    public static class DefaultClientResponseAdapter extends DefaultClientResponse {

        /**
         * @param body
         * @param httpHeaders
         */
        public DefaultClientResponseAdapter(Publisher<? extends DataBuffer> body,
                                            HttpHeaders httpHeaders) {
            this(new ResponseAdapter(body, httpHeaders),
                    ExchangeStrategies.withDefaults());
        }

        /**
         * @param response
         * @param strategies
         */
        public DefaultClientResponseAdapter(ClientHttpResponse response,
                                            ExchangeStrategies strategies) {
            super(response, strategies);
        }

        /**
         * ClientHttpResponse 適配器
         */
        static class ResponseAdapter implements ClientHttpResponse {
            /**
             * 響應數據
             */
            private final Flux<DataBuffer> flux;
            /**
             *
             */
            private final HttpHeaders headers;

            public ResponseAdapter(Publisher<? extends DataBuffer> body,
                                   HttpHeaders headers) {
                this.headers = headers;
                if (body instanceof Flux) {
                    flux = (Flux) body;
                } else {
                    flux = ((Mono) body).flux();
                }
            }

            @Override
            public Flux<DataBuffer> getBody() {
                return flux;
            }

            @Override
            public HttpHeaders getHeaders() {
                return headers;
            }

            @Override
            public HttpStatus getStatusCode() {
                return null;
            }

            @Override
            public int getRawStatusCode() {
                return 0;
            }

            @Override
            public MultiValueMap<String, ResponseCookie> getCookies() {
                return null;
            }
        }
    }

    class BodyHandlerServerHttpResponseDecorator extends ServerHttpResponseDecorator {

        /**
         * body 處理攔截器
         */
        private BodyHandlerFunction bodyHandler = initDefaultBodyHandler();

        /**
         * 構造函數
         *
         * @param bodyHandler
         * @param delegate
         */
        public BodyHandlerServerHttpResponseDecorator(BodyHandlerFunction bodyHandler, ServerHttpResponse delegate) {
            super(delegate);
            if (bodyHandler != null) {
                this.bodyHandler = bodyHandler;
            }
        }

        @Override
        public Mono<Void> writeWith(Publisher<? extends DataBuffer> body) {
            //body 攔截處理器處理響應
            return bodyHandler.apply(getDelegate(), body);
        }

        @Override
        public Mono<Void> writeAndFlushWith(Publisher<? extends Publisher<? extends DataBuffer>> body) {
            return writeWith(Flux.from(body).flatMapSequential(p -> p));
        }

        /**
         * 默認body攔截處理器
         *
         * @return
         */
        private BodyHandlerFunction initDefaultBodyHandler() {
            return (resp, body) -> resp.writeWith(body);
        }
    }
}
View Code

那麼萬事具有,代碼都寫好了,咱們又須要進行性能測試。這邊要記住,我用的是官方的那個例子,其餘的寫法也用過了,可是結果差很少。

4.再一次測試spring-cloud-gateway 網關路由性能

  step.1:性能測試,改一下配置,表示加入了過濾器。這裏爲何只有一個過濾器,由於這個過濾器問題比較大,過程就略過了。

      - id: an
        uri: http://10.1.4.32:14077/hello
        predicates:
        - Path=/an
        filters:
        - An

  通過屢次測試,其餘的過濾器都還好,只有修改response body的過濾器,嚴重影響性能,且有讀寫錯誤。

  step.2:測試,以及測試結果

[wrk@localhost wrk]$ ./wrk  -t 15 -c500 -d 10 --latency -s scripts/gateway.lua  http://10.1.4.32:14077/an
Running 10s test @ http://10.1.4.32:14077/an
  15 threads and 500 connections
  Thread Stats   Avg      Stdev     Max   +/- Stdev
    Latency     1.03s   488.75ms   2.00s    60.62%
    Req/Sec    26.59     13.84    80.00     67.60%
  Latency Distribution
     50%  931.54ms
     75%    1.45s
     90%    1.76s
     99%    1.97s
  3848 requests in 10.10s, 1.64MB read
  Socket errors: connect 0, read 0, write 0, timeout 458
Requests/sec:    381.05
Transfer/sec:    166.71KB

結果多出一行,socket錯誤,並且仍是超時,並且,日誌中也存在錯誤:

2019-03-06T16:09:33,396|INFO ||AsyncResolver-bootstrap-executor-0||||Resolving eureka endpoints via configuration
2019-03-06T16:10:38,268|ERROR||reactor-http-server-epoll-18||||Unhandled failure: Connection has been closed, response already set (status=200)
2019-03-06T16:10:38,268|WARN ||reactor-http-server-epoll-18||||Handling completed with error: Connection has been closed
2019-03-06T16:10:38,269|ERROR||reactor-http-server-epoll-18||||Unhandled failure: null, response already set (status=200)
2019-03-06T16:10:38,269|WARN ||reactor-http-server-epoll-18||||Handling completed with error: null
2019-03-06T16:10:38,294|ERROR||reactor-http-server-epoll-18||||Unhandled failure: syscall:write(..) failed: 斷開的管道, response already set (status=null)
2019-03-06T16:10:38,294|WARN ||reactor-http-server-epoll-18||||Handling completed with error: syscall:write(..) failed: 斷開的管道
2019-03-06T16:10:38,306|ERROR||reactor-http-server-epoll-23||||Unhandled failure: syscall:write(..) failed: 斷開的管道, response already set (status=null)
2019-03-06T16:10:38,306|WARN ||reactor-http-server-epoll-23||||Handling completed with error: syscall:write(..) failed: 斷開的管道
2019-03-06T16:14:33,397|INFO ||AsyncResolver-bootstrap-executor-0||||Resolving eureka endpoints via configuration

這個問題很嚴重了,由於單個請求的時候,並不會報錯,這個錯誤只發生在高併發壓測下,沒法追蹤。最重要的是,咱們看到性能只剩下300/s,這是萬萬不能接受的,生產更不能接收。

這個問題很難解釋,由於咱們採用的是官方提供的寫法,咱們回頭看官方的修改response 類,好吧,不用看了,由於:

package org.springframework.cloud.gateway.filter.factory.rewrite;
/**
 * This filter is BETA and may be subject to change in a future release.
 */
public class ModifyResponseBodyGatewayFilterFactory
        extends AbstractGatewayFilterFactory<ModifyResponseBodyGatewayFilterFactory.Config> {

官方已經說了,這是測試版本,不頂用。

不死心,又想起了gateway提供的GlobalFilter,將剛纔的代碼寫到全局過濾器中再試試,可是結果相同!

涼涼...

跪求結論跟我不一樣的啓發文檔,或者只能等下一版本了。

相關文章
相關標籤/搜索