這裏是 SpringCloud Gateway
實踐的第一篇,主要講過濾器的相關實現。Spring-Cloud-Gateway 是以 WebFlux
爲基礎的響應式架構設計, 是異步非阻塞式的,它可以充分利用多核 CPU 的硬件資源去處理大量的併發請求。html
本篇將基於 spring-cloud-gateway 簡介 基礎環境進行改造。前端
Spring-Cloud-Gateway 基於過濾器實現,同 zuul 相似,有pre和post兩種方式的 filter,分別處理前置邏輯和後置邏輯。客戶端的請求先通過pre類型的 filter,而後將請求轉發到具體的業務服務,收到業務服務的響應以後,再通過post類型的 filter 處理,最後返回響應到客戶端。java
過濾器執行流程以下,order 越大,優先級越低redis
接下來咱們來驗證下 filter
執行順序。spring
這裏建立 3 個過濾器,分別配置不一樣的優先級json
@Slf4j public class AFilter implements GlobalFilter { @Override public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) { log.info("AFilter前置邏輯"); return chain.filter(exchange).then(Mono.fromRunnable(() -> { log.info("AFilter後置邏輯"); })); } } @Slf4j public class BFilter implements GlobalFilter { @Override public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) { log.info("BFilter前置邏輯"); return chain.filter(exchange).then(Mono.fromRunnable(() -> { log.info("BFilter後置邏輯"); })); } } @Slf4j public class CFilter implements GlobalFilter { @Override public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) { log.info("CFilter前置邏輯"); return chain.filter(exchange).then(Mono.fromRunnable(() -> { log.info("CFilter後置邏輯"); })); } } @Configuration public class FilterConfig { @Bean @Order(-1) public GlobalFilter a() { return new AFilter(); } @Bean @Order(0) public GlobalFilter b() { return new BFilter(); } @Bean @Order(1) public GlobalFilter c() { return new CFilter(); } }
curl -X POST -H "Content-Type:application/json" -d '{"name": "admin"}' http://192.168.124.5:2000/p/provider1 curl -X GET -G -d "username=admin" http://192.168.124.5:2000/p/provider1/1
查看網關輸出日誌後端
2020-03-29 16:23:22.832 INFO 59326 --- [ctor-http-nio-6] cn.idea360.gateway.filter1.AFilter : AFilter前置邏輯 2020-03-29 16:23:22.832 INFO 59326 --- [ctor-http-nio-6] cn.idea360.gateway.filter1.BFilter : BFilter前置邏輯 2020-03-29 16:23:22.832 INFO 59326 --- [ctor-http-nio-6] cn.idea360.gateway.filter1.CFilter : CFilter前置邏輯 2020-03-29 16:23:22.836 INFO 59326 --- [ctor-http-nio-6] cn.idea360.gateway.filter1.CFilter : CFilter後置邏輯 2020-03-29 16:23:22.836 INFO 59326 --- [ctor-http-nio-6] cn.idea360.gateway.filter1.BFilter : BFilter後置邏輯 2020-03-29 16:23:22.836 INFO 59326 --- [ctor-http-nio-6] cn.idea360.gateway.filter1.AFilter : AFilter後置邏輯
如今假設咱們要統計某個服務的響應時間,咱們能夠在代碼中bash
long beginTime = System.currentTimeMillis(); // do something... long elapsed = System.currentTimeMillis() - beginTime; log.info("elapsed: {}ms", elapsed);
每次都要這麼寫是否是很煩?Spring 告訴咱們有個東西叫 AOP。可是咱們是微服務啊,在每一個服務裏都寫也很煩。這時候就該網關的過濾器登臺表演了。架構
自定義過濾器須要實現 GatewayFilter
和 Ordered
。其中 GatewayFilter
中的這個方法就是用來實現你的自定義的邏輯的併發
Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain);
而 Ordered
中的 int getOrder()
方法是來給過濾器設定優先級別的,值越大則優先級越低。
好了,讓咱們來擼代碼吧.
/** * 此過濾器功能爲計算請求完成時間 */ public class ElapsedFilter implements GatewayFilter, Ordered { private static final String ELAPSED_TIME_BEGIN = "elapsedTimeBegin"; @Override public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) { exchange.getAttributes().put(ELAPSED_TIME_BEGIN, System.currentTimeMillis()); return chain.filter(exchange).then( Mono.fromRunnable(() -> { Long startTime = exchange.getAttribute(ELAPSED_TIME_BEGIN); if (startTime != null) { System.out.println(exchange.getRequest().getURI().getRawPath() + ": " + (System.currentTimeMillis() - startTime) + "ms"); } }) ); } /* *過濾器存在優先級,order越大,優先級越低 */ @Override public int getOrder() { return Ordered.LOWEST_PRECEDENCE; } }
咱們在請求剛剛到達時,往 ServerWebExchange
中放入了一個屬性 elapsedTimeBegin
,屬性值爲當時的毫秒級時間戳。而後在請求執行結束後,又從中取出咱們以前放進去的那個時間戳,與當前時間的差值即爲該請求的耗時。由於這是與業務無關的日誌因此將 Ordered
設爲 Integer.MAX_VALUE
以下降優先級。
如今再來看咱們以前的問題:怎麼來區分是 「pre」 仍是 「post」 呢?其實就是 chain.filter(exchange)
以前的就是 「pre」 部分,以後的也就是 then
裏邊的是 「post」 部分。
建立好 Filter 以後咱們將它添加到咱們的 Filter Chain 裏邊
@Configuration public class FilterConfig { /** * http://localhost:8100/filter/provider * @param builder * @return */ @Bean public RouteLocator customerRouteLocator(RouteLocatorBuilder builder) { // @formatter:off // 能夠對比application.yml中關於路由轉發的配置 return builder.routes() .route(r -> r.path("/filter/**") .filters(f -> f.stripPrefix(1) .filter(new ElapsedFilter())) .uri("lb://idc-cloud-provider") .order(0) .id("filter") ) .build(); // @formatter:on } }
// AdaptCachedBodyGlobalFilter @Component public class LogFilter implements GlobalFilter, Ordered { private Logger log = LoggerFactory.getLogger(LogFilter.class); private final ObjectMapper objectMapper = new ObjectMapper(); private static final String START_TIME = "startTime"; private static final List<HttpMessageReader<?>> messageReaders = HandlerStrategies.withDefaults().messageReaders(); @Override public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) { ServerHttpRequest request = exchange.getRequest(); // 請求路徑 String path = request.getPath().pathWithinApplication().value(); // 請求schema: http/https String scheme = request.getURI().getScheme(); // 請求方法 HttpMethod method = request.getMethod(); // 路由服務地址 URI targetUri = exchange.getAttribute(ServerWebExchangeUtils.GATEWAY_REQUEST_URL_ATTR); // 請求頭 HttpHeaders headers = request.getHeaders(); // 設置startTime exchange.getAttributes().put(START_TIME, System.currentTimeMillis()); // 獲取請求地址 InetSocketAddress remoteAddress = request.getRemoteAddress(); MultiValueMap<String, String> formData = null; AccessRecord accessRecord = new AccessRecord(); accessRecord.setPath(path); accessRecord.setSchema(scheme); accessRecord.setMethod(method.name()); accessRecord.setTargetUri(targetUri.toString()); accessRecord.setRemoteAddress(remoteAddress.toString()); accessRecord.setHeaders(headers); if (method == HttpMethod.GET) { formData = request.getQueryParams(); accessRecord.setFormData(formData); writeAccessRecord(accessRecord); } if (method == HttpMethod.POST) { Mono<Void> voidMono = null; if (headers.getContentType().equals(MediaType.APPLICATION_JSON)) { // JSON voidMono = readBody(exchange, chain, accessRecord); } if (headers.getContentType().equals(MediaType.APPLICATION_FORM_URLENCODED)) { // x-www-form-urlencoded voidMono = readFormData(exchange, chain, accessRecord); } if (voidMono != null) { return voidMono; } } return chain.filter(exchange); } private Mono<Void> readFormData(ServerWebExchange exchange, GatewayFilterChain chain, AccessRecord accessRecord) { return null; } private Mono<Void> readBody(ServerWebExchange exchange, GatewayFilterChain chain, AccessRecord accessRecord) { return DataBufferUtils.join(exchange.getRequest().getBody()).flatMap(dataBuffer -> { byte[] bytes = new byte[dataBuffer.readableByteCount()]; dataBuffer.read(bytes); DataBufferUtils.release(dataBuffer); Flux<DataBuffer> cachedFlux = Flux.defer(() -> { DataBuffer buffer = exchange.getResponse().bufferFactory().wrap(bytes); DataBufferUtils.retain(buffer); return Mono.just(buffer); }); // 重寫請求體,由於請求體數據只能被消費一次 ServerHttpRequest mutatedRequest = new ServerHttpRequestDecorator(exchange.getRequest()) { @Override public Flux<DataBuffer> getBody() { return cachedFlux; } }; ServerWebExchange mutatedExchange = exchange.mutate().request(mutatedRequest).build(); return ServerRequest.create(mutatedExchange, messageReaders) .bodyToMono(String.class) .doOnNext(objectValue -> { accessRecord.setBody(objectValue); writeAccessRecord(accessRecord); }).then(chain.filter(mutatedExchange)); }); } @Override public int getOrder() { return Ordered.LOWEST_PRECEDENCE; } /** * TODO 異步日誌 * @param accessRecord */ private void writeAccessRecord(AccessRecord accessRecord) { log.info("\n\n start------------------------------------------------- \n " + "請求路徑:{}\n " + "scheme:{}\n " + "請求方法:{}\n " + "目標服務:{}\n " + "請求頭:{}\n " + "遠程IP地址:{}\n " + "表單參數:{}\n " + "請求體:{}\n " + "end------------------------------------------------- \n ", accessRecord.getPath(), accessRecord.getSchema(), accessRecord.getMethod(), accessRecord.getTargetUri(), accessRecord.getHeaders(), accessRecord.getRemoteAddress(), accessRecord.getFormData(), accessRecord.getBody()); } }
curl -X POST -H "Content-Type:application/json" -d '{"name": "admin"}' http://192.168.124.5:2000/p/provider1 curl -X GET -G -d "username=admin" http://192.168.124.5:2000/p/provider1/1
輸出結果
start------------------------------------------------- 請求路徑:/provider1 scheme:http 請求方法:POST 目標服務:http://192.168.124.5:2001/provider1 請求頭:[Content-Type:"application/json", User-Agent:"PostmanRuntime/7.22.0", Accept:"*/*", Cache-Control:"no-cache", Postman-Token:"2a4ce04d-8449-411d-abd8-247d20421dc2", Host:"192.168.124.5:2000", Accept-Encoding:"gzip, deflate, br", Content-Length:"16", Connection:"keep-alive"] 遠程IP地址:/192.168.124.5:49969 表單參數:null 請求體:{"name":"admin"} end-------------------------------------------------
接下來,咱們來配置日誌,方便日誌系統提取日誌。SpringBoot 默認的日誌爲 logback。
<?xml version="1.0" encoding="UTF-8"?> <configuration> <property name="LOGS" value="/Users/cuishiying/Documents/spring-cloud-learning/logs" /> <appender name="Console" class="ch.qos.logback.core.ConsoleAppender"> <layout class="ch.qos.logback.classic.PatternLayout"> <Pattern> %black(%d{ISO8601}) %highlight(%-5level) [%blue(%t)] %yellow(%C{1.}): %msg%n%throwable </Pattern> </layout> </appender> <appender name="RollingFile" class="ch.qos.logback.core.rolling.RollingFileAppender"> <file>${LOGS}/spring-boot-logger.log</file> <encoder class="ch.qos.logback.classic.encoder.PatternLayoutEncoder"> <Pattern>%d %p %C{1.} [%t] %m%n</Pattern> </encoder> <rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy"> <!-- rollover daily and when the file reaches 10 MegaBytes --> <fileNamePattern>${LOGS}/archived/spring-boot-logger-%d{yyyy-MM-dd}.%i.log </fileNamePattern> <timeBasedFileNamingAndTriggeringPolicy class="ch.qos.logback.core.rolling.SizeAndTimeBasedFNATP"> <maxFileSize>10MB</maxFileSize> </timeBasedFileNamingAndTriggeringPolicy> </rollingPolicy> </appender> <!-- LOG everything at INFO level --> <root level="info"> <!--<appender-ref ref="RollingFile" />--> <appender-ref ref="Console" /> </root> <!-- LOG "cn.idea360*" at TRACE level additivity:是否向上級loger傳遞打印信息。默認是true--> <logger name="cn.idea360.gateway" level="info" additivity="false"> <appender-ref ref="RollingFile" /> <appender-ref ref="Console" /> </logger> </configuration>
這樣 console 和日誌目錄下就都有日誌了。
若是你看過靜態路由的配置,你應該對以下配置有印象。
filters: - StripPrefix=1 - AddResponseHeader=X-Response-Default-Foo, Default-Bar
StripPrefix
、AddResponseHeader
這兩個其實是兩個過濾器工廠(GatewayFilterFactory),用這種配置的方式更靈活方便。
咱們就將以前的那個 ElapsedFilter
改造一下,讓它能接收一個 boolean
類型的參數,來決定是否將請求參數也打印出來。
public class ElapsedGatewayFilterFactory extends AbstractGatewayFilterFactory<ElapsedGatewayFilterFactory.Config> { private static final Log log = LogFactory.getLog(GatewayFilter.class); private static final String ELAPSED_TIME_BEGIN = "elapsedTimeBegin"; private static final String KEY = "withParams"; public List<String> shortcutFieldOrder() { return Arrays.asList(KEY); } public ElapsedGatewayFilterFactory() { super(Config.class); } public GatewayFilter apply(Config config) { return (exchange, chain) -> { exchange.getAttributes().put(ELAPSED_TIME_BEGIN, System.currentTimeMillis()); return chain.filter(exchange).then( Mono.fromRunnable(() -> { Long startTime = exchange.getAttribute(ELAPSED_TIME_BEGIN); if (startTime != null) { StringBuilder sb = new StringBuilder(exchange.getRequest().getURI().getRawPath()) .append(": ") .append(System.currentTimeMillis() - startTime) .append("ms"); if (config.isWithParams()) { sb.append(" params:").append(exchange.getRequest().getQueryParams()); } log.info(sb.toString()); } }) ); }; } public static class Config { private boolean withParams; public boolean isWithParams() { return withParams; } public void setWithParams(boolean withParams) { this.withParams = withParams; } } }
過濾器工廠的頂級接口是 GatewayFilterFactory
,咱們能夠直接繼承它的兩個抽象類來簡化開發 AbstractGatewayFilterFactory
和 AbstractNameValueGatewayFilterFactory
,這兩個抽象類的區別就是前者接收一個參數(像 StripPrefix
和咱們建立的這種),後者接收兩個參數(像 AddResponseHeader
)。
GatewayFilter apply(Config config)
方法內部其實是建立了一個 GatewayFilter
的匿名類,具體實現和以前的幾乎同樣,就不解釋了。
靜態內部類 Config
就是爲了接收那個 boolean
類型的參數服務的,裏邊的變量名能夠隨意寫,可是要重寫 List shortcutFieldOrder()
這個方法。
這裏注意一下,必定要調用一下父類的構造器把 Config
類型傳過去,不然會報 ClassCastException
public ElapsedGatewayFilterFactory() { super(Config.class); }
工廠類咱們有了,再把它註冊到 Spring 當中
@Bean public ElapsedGatewayFilterFactory elapsedGatewayFilterFactory() { return new ElapsedGatewayFilterFactory(); }
而後添加配置(主要改動在 default-filters
配置)
server: port: 2000 spring: application: name: idc-gateway redis: host: localhost port: 6379 timeout: 6000ms # 鏈接超時時長(毫秒) jedis: pool: max-active: 1000 # 鏈接池最大鏈接數(使用負值表示沒有限制) max-wait: -1ms # 鏈接池最大阻塞等待時間(使用負值表示沒有限制) max-idle: 10 # 鏈接池中的最大空閒鏈接 min-idle: 5 # 鏈接池中的最小空閒鏈接 cloud: consul: host: localhost port: 8500 gateway: discovery: locator: enabled: true # 修改在這裏。gateway能夠經過開啓如下配置來打開根據服務的serviceId來匹配路由,默認是大寫 default-filters: - Elapsed=true routes: - id: provider # 路由 ID,保持惟一 uri: lb://idc-provider1 # uri指目標服務地址,lb表明從註冊中心獲取服務 predicates: # 路由條件。Predicate 接受一個輸入參數,返回一個布爾值結果。該接口包含多種默認方法來將 Predicate 組合成其餘複雜的邏輯(好比:與,或,非) - Path=/p/** filters: - StripPrefix=1 # 過濾器StripPrefix,做用是去掉請求路徑的最前面n個部分截取掉。StripPrefix=1就表明截取路徑的個數爲1,好比前端過來請求/test/good/1/view,匹配成功後,路由到後端的請求路徑就會變成http://localhost:8888/good/1/view
本文到此結束。關於 Webflux
的學習剛入門,以爲能夠像 Rxjava
那樣在 onNext
中拿到異步數據,然而在 post
獲取 body 中沒生效。經測試可知 getBody
得到的數據輸出爲 null,而本身經過 Flux.create
建立的數據能夠在訂閱者中獲取到。此處還有待研究,但願拋磚引玉,你們有研究出來的不吝賜教。同時,但願你們關注公衆號【當我趕上你】。