spring-cloud-gateway過濾器實踐

概述

這裏是 SpringCloud Gateway 實踐的第一篇,主要講過濾器的相關實現。Spring-Cloud-Gateway 是以 WebFlux 爲基礎的響應式架構設計, 是異步非阻塞式的,它可以充分利用多核 CPU 的硬件資源去處理大量的併發請求。html

本篇將基於 spring-cloud-gateway 簡介 基礎環境進行改造。前端

工做原理

Spring-Cloud-Gateway 基於過濾器實現,同 zuul 相似,有prepost兩種方式的 filter,分別處理前置邏輯後置邏輯。客戶端的請求先通過pre類型的 filter,而後將請求轉發到具體的業務服務,收到業務服務的響應以後,再通過post類型的 filter 處理,最後返回響應到客戶端。java

過濾器執行流程以下,order 越大,優先級越低redis

接下來咱們來驗證下 filter 執行順序。spring

這裏建立 3 個過濾器,分別配置不一樣的優先級json

@Slf4j
public class AFilter implements GlobalFilter {
    @Override
    public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
        log.info("AFilter前置邏輯");
        return chain.filter(exchange).then(Mono.fromRunnable(() -> {
            log.info("AFilter後置邏輯");
        }));
    }
}

@Slf4j
public class BFilter implements GlobalFilter {
    @Override
    public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
        log.info("BFilter前置邏輯");
        return chain.filter(exchange).then(Mono.fromRunnable(() -> {
            log.info("BFilter後置邏輯");
        }));
    }
}

@Slf4j
public class CFilter implements GlobalFilter {

    @Override
    public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
        log.info("CFilter前置邏輯");
        return chain.filter(exchange).then(Mono.fromRunnable(() -> {
            log.info("CFilter後置邏輯");
        }));
    }
}

@Configuration
public class FilterConfig {

    @Bean
    @Order(-1)
    public GlobalFilter a() {
        return new AFilter();
    }

    @Bean
    @Order(0)
    public GlobalFilter b() {
        return new BFilter();
    }

    @Bean
    @Order(1)
    public GlobalFilter c() {
        return new CFilter();
    }
}
curl -X POST -H "Content-Type:application/json" -d '{"name": "admin"}' http://192.168.124.5:2000/p/provider1

curl -X GET -G -d "username=admin" http://192.168.124.5:2000/p/provider1/1

查看網關輸出日誌後端

2020-03-29 16:23:22.832  INFO 59326 --- [ctor-http-nio-6] cn.idea360.gateway.filter1.AFilter       : AFilter前置邏輯
2020-03-29 16:23:22.832  INFO 59326 --- [ctor-http-nio-6] cn.idea360.gateway.filter1.BFilter       : BFilter前置邏輯
2020-03-29 16:23:22.832  INFO 59326 --- [ctor-http-nio-6] cn.idea360.gateway.filter1.CFilter       : CFilter前置邏輯

2020-03-29 16:23:22.836  INFO 59326 --- [ctor-http-nio-6] cn.idea360.gateway.filter1.CFilter       : CFilter後置邏輯
2020-03-29 16:23:22.836  INFO 59326 --- [ctor-http-nio-6] cn.idea360.gateway.filter1.BFilter       : BFilter後置邏輯
2020-03-29 16:23:22.836  INFO 59326 --- [ctor-http-nio-6] cn.idea360.gateway.filter1.AFilter       : AFilter後置邏輯

自定義過濾器

如今假設咱們要統計某個服務的響應時間,咱們能夠在代碼中bash

long beginTime = System.currentTimeMillis();
// do something...
long elapsed = System.currentTimeMillis() - beginTime;
log.info("elapsed: {}ms", elapsed);

每次都要這麼寫是否是很煩?Spring 告訴咱們有個東西叫 AOP。可是咱們是微服務啊,在每一個服務裏都寫也很煩。這時候就該網關的過濾器登臺表演了。架構

自定義過濾器須要實現 GatewayFilterOrdered 。其中 GatewayFilter 中的這個方法就是用來實現你的自定義的邏輯的併發

Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain);

Ordered 中的 int getOrder() 方法是來給過濾器設定優先級別的,值越大則優先級越低。

好了,讓咱們來擼代碼吧.

/**
 * 此過濾器功能爲計算請求完成時間
 */
public class ElapsedFilter implements GatewayFilter, Ordered {

    private static final String ELAPSED_TIME_BEGIN = "elapsedTimeBegin";

    @Override
    public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
        exchange.getAttributes().put(ELAPSED_TIME_BEGIN, System.currentTimeMillis());
        return chain.filter(exchange).then(
                Mono.fromRunnable(() -> {
                    Long startTime = exchange.getAttribute(ELAPSED_TIME_BEGIN);
                    if (startTime != null) {
                        System.out.println(exchange.getRequest().getURI().getRawPath() + ": " + (System.currentTimeMillis() - startTime) + "ms");
                    }
                })
        );
    }

    /*
     *過濾器存在優先級,order越大,優先級越低
     */
    @Override
    public int getOrder() {
        return Ordered.LOWEST_PRECEDENCE;
    }
}

咱們在請求剛剛到達時,往 ServerWebExchange 中放入了一個屬性 elapsedTimeBegin,屬性值爲當時的毫秒級時間戳。而後在請求執行結束後,又從中取出咱們以前放進去的那個時間戳,與當前時間的差值即爲該請求的耗時。由於這是與業務無關的日誌因此將 Ordered 設爲 Integer.MAX_VALUE 以下降優先級。

如今再來看咱們以前的問題:怎麼來區分是 「pre」 仍是 「post」 呢?其實就是 chain.filter(exchange) 以前的就是 「pre」 部分,以後的也就是 then 裏邊的是 「post」 部分。

建立好 Filter 以後咱們將它添加到咱們的 Filter Chain 裏邊

@Configuration
public class FilterConfig {


    /**
     * http://localhost:8100/filter/provider
     * @param builder
     * @return
     */
    @Bean
    public RouteLocator customerRouteLocator(RouteLocatorBuilder builder) {
        // @formatter:off
        // 能夠對比application.yml中關於路由轉發的配置
        return builder.routes()
                .route(r -> r.path("/filter/**")
                        .filters(f -> f.stripPrefix(1)
                                .filter(new ElapsedFilter()))
                        .uri("lb://idc-cloud-provider")
                        .order(0)
                        .id("filter")
                )
                .build();
        // @formatter:on
    }

}

基於全局過濾器實現審計功能

// AdaptCachedBodyGlobalFilter

@Component
public class LogFilter implements GlobalFilter, Ordered {

    private Logger log = LoggerFactory.getLogger(LogFilter.class);

    private final ObjectMapper objectMapper = new ObjectMapper();
    private static final String START_TIME = "startTime";
    private static final List<HttpMessageReader<?>> messageReaders = HandlerStrategies.withDefaults().messageReaders();

    @Override
    public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {

        ServerHttpRequest request = exchange.getRequest();
        // 請求路徑
        String path = request.getPath().pathWithinApplication().value();
        // 請求schema: http/https
        String scheme = request.getURI().getScheme();
        // 請求方法
        HttpMethod method = request.getMethod();
        // 路由服務地址
        URI targetUri = exchange.getAttribute(ServerWebExchangeUtils.GATEWAY_REQUEST_URL_ATTR);
        // 請求頭
        HttpHeaders headers = request.getHeaders();
        // 設置startTime
        exchange.getAttributes().put(START_TIME, System.currentTimeMillis());
        // 獲取請求地址
        InetSocketAddress remoteAddress = request.getRemoteAddress();


        MultiValueMap<String, String> formData = null;



        AccessRecord accessRecord = new AccessRecord();
        accessRecord.setPath(path);
        accessRecord.setSchema(scheme);
        accessRecord.setMethod(method.name());
        accessRecord.setTargetUri(targetUri.toString());
        accessRecord.setRemoteAddress(remoteAddress.toString());
        accessRecord.setHeaders(headers);

        if (method == HttpMethod.GET) {
            formData = request.getQueryParams();
            accessRecord.setFormData(formData);
            writeAccessRecord(accessRecord);
        }

        if (method == HttpMethod.POST) {
            Mono<Void> voidMono = null;
            if (headers.getContentType().equals(MediaType.APPLICATION_JSON)) {
                // JSON
                voidMono = readBody(exchange, chain, accessRecord);
            }

            if (headers.getContentType().equals(MediaType.APPLICATION_FORM_URLENCODED)) {
                // x-www-form-urlencoded
                voidMono = readFormData(exchange, chain, accessRecord);
            }

            if (voidMono != null) {
                return voidMono;
            }

        }

        return chain.filter(exchange);
    }

    private Mono<Void> readFormData(ServerWebExchange exchange, GatewayFilterChain chain, AccessRecord accessRecord) {
        return null;
    }

    private Mono<Void> readBody(ServerWebExchange exchange, GatewayFilterChain chain, AccessRecord accessRecord) {

        return DataBufferUtils.join(exchange.getRequest().getBody()).flatMap(dataBuffer -> {

            byte[] bytes = new byte[dataBuffer.readableByteCount()];
            dataBuffer.read(bytes);
            DataBufferUtils.release(dataBuffer);
            Flux<DataBuffer> cachedFlux = Flux.defer(() -> {
                DataBuffer buffer = exchange.getResponse().bufferFactory().wrap(bytes);
                DataBufferUtils.retain(buffer);
                return Mono.just(buffer);
            });


            // 重寫請求體,由於請求體數據只能被消費一次
            ServerHttpRequest mutatedRequest = new ServerHttpRequestDecorator(exchange.getRequest()) {
                @Override
                public Flux<DataBuffer> getBody() {
                    return cachedFlux;
                }
            };

            ServerWebExchange mutatedExchange = exchange.mutate().request(mutatedRequest).build();

            return ServerRequest.create(mutatedExchange, messageReaders)
                    .bodyToMono(String.class)
                    .doOnNext(objectValue -> {
                        accessRecord.setBody(objectValue);
                        writeAccessRecord(accessRecord);
                    }).then(chain.filter(mutatedExchange));
        });
    }

    @Override
    public int getOrder() {
        return Ordered.LOWEST_PRECEDENCE;
    }

    /**
     * TODO 異步日誌
     * @param accessRecord
     */
    private void writeAccessRecord(AccessRecord accessRecord) {

        log.info("\n\n start------------------------------------------------- \n " +
                        "請求路徑:{}\n " +
                        "scheme:{}\n " +
                        "請求方法:{}\n " +
                        "目標服務:{}\n " +
                        "請求頭:{}\n " +
                        "遠程IP地址:{}\n " +
                        "表單參數:{}\n " +
                        "請求體:{}\n " +
                        "end------------------------------------------------- \n ",
                accessRecord.getPath(), accessRecord.getSchema(), accessRecord.getMethod(), accessRecord.getTargetUri(), accessRecord.getHeaders(), accessRecord.getRemoteAddress(), accessRecord.getFormData(), accessRecord.getBody());
    }
}
curl -X POST -H "Content-Type:application/json" -d '{"name": "admin"}' http://192.168.124.5:2000/p/provider1

curl -X GET -G -d "username=admin" http://192.168.124.5:2000/p/provider1/1

輸出結果

start-------------------------------------------------
 請求路徑:/provider1
 scheme:http
 請求方法:POST
 目標服務:http://192.168.124.5:2001/provider1
 請求頭:[Content-Type:"application/json", User-Agent:"PostmanRuntime/7.22.0", Accept:"*/*", Cache-Control:"no-cache", Postman-Token:"2a4ce04d-8449-411d-abd8-247d20421dc2", Host:"192.168.124.5:2000", Accept-Encoding:"gzip, deflate, br", Content-Length:"16", Connection:"keep-alive"]
 遠程IP地址:/192.168.124.5:49969
 表單參數:null
 請求體:{"name":"admin"}
 end-------------------------------------------------

接下來,咱們來配置日誌,方便日誌系統提取日誌。SpringBoot 默認的日誌爲 logback。

<?xml version="1.0" encoding="UTF-8"?>
<configuration>

    <property name="LOGS" value="/Users/cuishiying/Documents/spring-cloud-learning/logs" />

    <appender name="Console" class="ch.qos.logback.core.ConsoleAppender">
        <layout class="ch.qos.logback.classic.PatternLayout">
            <Pattern>
                %black(%d{ISO8601}) %highlight(%-5level) [%blue(%t)] %yellow(%C{1.}): %msg%n%throwable
            </Pattern>
        </layout>
    </appender>

    <appender name="RollingFile" class="ch.qos.logback.core.rolling.RollingFileAppender">
        <file>${LOGS}/spring-boot-logger.log</file>
        <encoder
                class="ch.qos.logback.classic.encoder.PatternLayoutEncoder">
            <Pattern>%d %p %C{1.} [%t] %m%n</Pattern>
        </encoder>

        <rollingPolicy
                class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
            <!-- rollover daily and when the file reaches 10 MegaBytes -->
            <fileNamePattern>${LOGS}/archived/spring-boot-logger-%d{yyyy-MM-dd}.%i.log
            </fileNamePattern>
            <timeBasedFileNamingAndTriggeringPolicy
                    class="ch.qos.logback.core.rolling.SizeAndTimeBasedFNATP">
                <maxFileSize>10MB</maxFileSize>
            </timeBasedFileNamingAndTriggeringPolicy>
        </rollingPolicy>
    </appender>

    <!-- LOG everything at INFO level -->
    <root level="info">
        <!--<appender-ref ref="RollingFile" />-->
        <appender-ref ref="Console" />
    </root>

    <!-- LOG "cn.idea360*" at TRACE level additivity:是否向上級loger傳遞打印信息。默認是true-->
    <logger name="cn.idea360.gateway" level="info" additivity="false">
        <appender-ref ref="RollingFile" />
        <appender-ref ref="Console" />
    </logger>

</configuration>

這樣 console 和日誌目錄下就都有日誌了。

自定義過濾器工廠

若是你看過靜態路由的配置,你應該對以下配置有印象。

filters:
  - StripPrefix=1
  - AddResponseHeader=X-Response-Default-Foo, Default-Bar

StripPrefixAddResponseHeader 這兩個其實是兩個過濾器工廠(GatewayFilterFactory),用這種配置的方式更靈活方便。

咱們就將以前的那個 ElapsedFilter 改造一下,讓它能接收一個 boolean 類型的參數,來決定是否將請求參數也打印出來。

public class ElapsedGatewayFilterFactory extends AbstractGatewayFilterFactory<ElapsedGatewayFilterFactory.Config> {

    private static final Log log = LogFactory.getLog(GatewayFilter.class);
    private static final String ELAPSED_TIME_BEGIN = "elapsedTimeBegin";
    private static final String KEY = "withParams";


    public List<String> shortcutFieldOrder() {
        return Arrays.asList(KEY);
    }

    public ElapsedGatewayFilterFactory() {
        super(Config.class);
    }


    public GatewayFilter apply(Config config) {
        return (exchange, chain) -> {
            exchange.getAttributes().put(ELAPSED_TIME_BEGIN, System.currentTimeMillis());
            return chain.filter(exchange).then(
                    Mono.fromRunnable(() -> {
                        Long startTime = exchange.getAttribute(ELAPSED_TIME_BEGIN);
                        if (startTime != null) {
                            StringBuilder sb = new StringBuilder(exchange.getRequest().getURI().getRawPath())
                                    .append(": ")
                                    .append(System.currentTimeMillis() - startTime)
                                    .append("ms");
                            if (config.isWithParams()) {
                                sb.append(" params:").append(exchange.getRequest().getQueryParams());
                            }
                            log.info(sb.toString());
                        }
                    })
            );
        };
    }


    public static class Config {

        private boolean withParams;

        public boolean isWithParams() {
            return withParams;
        }

        public void setWithParams(boolean withParams) {
            this.withParams = withParams;
        }

    }
}

過濾器工廠的頂級接口是 GatewayFilterFactory,咱們能夠直接繼承它的兩個抽象類來簡化開發 AbstractGatewayFilterFactoryAbstractNameValueGatewayFilterFactory,這兩個抽象類的區別就是前者接收一個參數(像 StripPrefix 和咱們建立的這種),後者接收兩個參數(像 AddResponseHeader)。

GatewayFilter apply(Config config) 方法內部其實是建立了一個 GatewayFilter 的匿名類,具體實現和以前的幾乎同樣,就不解釋了。

靜態內部類 Config 就是爲了接收那個 boolean 類型的參數服務的,裏邊的變量名能夠隨意寫,可是要重寫 List shortcutFieldOrder() 這個方法。

這裏注意一下,必定要調用一下父類的構造器把 Config 類型傳過去,不然會報 ClassCastException

public ElapsedGatewayFilterFactory() {
    super(Config.class);
}

工廠類咱們有了,再把它註冊到 Spring 當中

@Bean
public ElapsedGatewayFilterFactory elapsedGatewayFilterFactory() {
    return new ElapsedGatewayFilterFactory();
}

而後添加配置(主要改動在 default-filters 配置)

server:
  port: 2000
spring:
  application:
    name: idc-gateway
  redis:
    host: localhost
    port: 6379
    timeout: 6000ms  # 鏈接超時時長(毫秒)
    jedis:
      pool:
        max-active: 1000  # 鏈接池最大鏈接數(使用負值表示沒有限制)
        max-wait: -1ms      # 鏈接池最大阻塞等待時間(使用負值表示沒有限制)
        max-idle: 10      # 鏈接池中的最大空閒鏈接
        min-idle: 5       # 鏈接池中的最小空閒鏈接
  cloud:
    consul:
      host: localhost
      port: 8500
    gateway:
      discovery:
        locator:
          enabled: true
          # 修改在這裏。gateway能夠經過開啓如下配置來打開根據服務的serviceId來匹配路由,默認是大寫
      default-filters:
        - Elapsed=true
      routes:
        - id: provider  # 路由 ID,保持惟一
          uri: lb://idc-provider1 # uri指目標服務地址,lb表明從註冊中心獲取服務
          predicates: # 路由條件。Predicate 接受一個輸入參數,返回一個布爾值結果。該接口包含多種默認方法來將 Predicate 組合成其餘複雜的邏輯(好比:與,或,非)
            - Path=/p/**
          filters:
            - StripPrefix=1 # 過濾器StripPrefix,做用是去掉請求路徑的最前面n個部分截取掉。StripPrefix=1就表明截取路徑的個數爲1,好比前端過來請求/test/good/1/view,匹配成功後,路由到後端的請求路徑就會變成http://localhost:8888/good/1/view

結語

本文到此結束。關於 Webflux 的學習剛入門,以爲能夠像 Rxjava 那樣在 onNext 中拿到異步數據,然而在 post 獲取 body 中沒生效。經測試可知 getBody 得到的數據輸出爲 null,而本身經過 Flux.create 建立的數據能夠在訂閱者中獲取到。此處還有待研究,但願拋磚引玉,你們有研究出來的不吝賜教。同時,但願你們關注公衆號【當我趕上你】。

參考

相關文章
相關標籤/搜索