本文基於 spring cloud gateway 2.0.1java
GlobalGilter 全局過濾器接口與 GatewayFilter 網關過濾器接口具備相同的方法定義。全局過濾器是一系列特殊的過濾器,會根據條件應用到全部路由中。網關過濾器是更細粒度的過濾器,做用於指定的路由中。web
從類圖中能夠看到 GlobalFilter 有十一個實現類,包括路由轉發、負載均衡、ws 路由、netty 路由等全局過濾器。下面咱們就分別介紹一下這些全局路由過濾器的實現。spring
ForwardRoutingFilter 在交換屬性 ServerWebExchangeUtils.GATEWAY_ REQUEST_ URL_ ATTR 中 查找 URL, 若是 URL 爲轉發模式即 forward:/// localendpoint, 它將使用Spring DispatcherHandler 來處 理請求。 未修改的原始 URL 將保存到 GATEWAY_ ORIGINAL_ REQUEST_ URL_ ATTR 屬性的列表中。後端
public class ForwardRoutingFilter implements GlobalFilter, Ordered { private static final Log log = LogFactory.getLog(ForwardRoutingFilter.class); private final ObjectProvider<DispatcherHandler> dispatcherHandler; public ForwardRoutingFilter(ObjectProvider<DispatcherHandler> dispatcherHandler) { this.dispatcherHandler = dispatcherHandler; } @Override public int getOrder() { return Ordered.LOWEST_PRECEDENCE; } @Override public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) { URI requestUrl = exchange.getRequiredAttribute(GATEWAY_REQUEST_URL_ATTR); //獲取請求URI的請求結構 String scheme = requestUrl.getScheme(); //該路由已經被處理或者URI格式不是forward則繼續其它過濾器 if (isAlreadyRouted(exchange) || !"forward".equals(scheme)) { return chain.filter(exchange); } setAlreadyRouted(exchange); //TODO: translate url? if (log.isTraceEnabled()) { log.trace("Forwarding to URI: "+requestUrl); } // 使用dispatcherHandler進行處理 return this.dispatcherHandler.getIfAvailable().handle(exchange); } }
轉發路由過濾器實現比較簡單,構造函數傳入請求的分發處理器DispatcherHandler。過濾器執行時,首先獲取請求地址的url前綴,而後判斷該請求是否已被路由處理或者URL的前綴不是forward,則繼續執行過濾器鏈;不然設置路由處理狀態並交由DispatcherHandler進行處理。緩存
請求路由是否被處理的判斷以下:websocket
// ServerWebExchangeUtils.javasession
public static void setAlreadyRouted(ServerWebExchange exchange) { exchange.getAttributes().put(GATEWAY_ALREADY_ROUTED_ATTR, true); } public static boolean isAlreadyRouted(ServerWebExchange exchange) { return exchange.getAttributeOrDefault(GATEWAY_ALREADY_ROUTED_ATTR, false); }
兩個 方法 定義 在 ServerWebExchangeUtils 中, 這 兩個 方法 用於 修改 與 查詢 ServerWebExchange 中的 Map< String, Object> getAttributes(),# getAttributes 方法 返回 當前 exchange 所請 求 屬性 的 可變 映射。負載均衡
這兩個方法定義在 ServerWebExchangeUtils 中,分別用於修改和查詢 GATEWAY_ALREADY_ROUTED_ATTR 狀態。socket
spring: cloud: gateway: routes: - id: myRoute uri: lb://service predicates: - Path=/service/**
LoadBalancerClientFilter 在交換屬性 GATEWAY_ REQUEST_ URL_ ATTR 中查找URL, 若是URL有一個 lb 前綴 ,即 lb:// myservice,將使用 LoadBalancerClient 將名稱 解析爲實際的主機和端口,如示例中的 myservice。 未修改的原始 URL將保存到 GATEWAY_ ORIGINAL_ REQUEST_ URL_ ATTR 屬性的列表中。過濾器還將查看ServerWebExchangeUtils.GATEWAY_SCHEME_PREFIX_ATTR屬性以查看它是否等於lb,而後應用相同的規則。ide
@Override public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) { URI url = exchange.getAttribute(GATEWAY_REQUEST_URL_ATTR); String schemePrefix = exchange.getAttribute(GATEWAY_SCHEME_PREFIX_ATTR); if (url == null || (!"lb".equals(url.getScheme()) && !"lb".equals(schemePrefix))) { return chain.filter(exchange); } //保留原始url addOriginalRequestUrl(exchange, url); log.trace("LoadBalancerClientFilter url before: " + url); //負載均衡到具體服務實例 final ServiceInstance instance = choose(exchange); if (instance == null) { throw new NotFoundException("Unable to find instance for " + url.getHost()); } URI uri = exchange.getRequest().getURI(); //若是沒有提供前綴的話,則會使用默認的'< scheme>',不然使用' lb:< scheme>' 機制。 String overrideScheme = null; if (schemePrefix != null) { overrideScheme = url.getScheme(); } //根據獲取的服務實例信息,從新組裝請求的 url URI requestUrl = loadBalancer.reconstructURI(new DelegatingServiceInstance(instance, overrideScheme), uri); // Routing 相關 的 GatewayFilter 會 經過 GATEWAY_ REQUEST_ URL_ ATTR 屬性, 發起 請求。 log.trace("LoadBalancerClientFilter url chosen: " + requestUrl); exchange.getAttributes().put(GATEWAY_REQUEST_URL_ATTR, requestUrl); return chain.filter(exchange); }
從過濾器執行方法中能夠看出,負載均衡客戶端過濾器的實現步驟以下:
一、構造函數傳入負載均衡客戶端,依賴中添加 Spring Cloud Netflix Ribbon 便可 注入 該 Bean。
二、獲取請求的 URL 及其前綴,若是 URL 不爲空且前綴爲lb或者網關請求的前綴是 lb,則保存原始的URL,負載到具體的服務實例並根據獲取的服務實例信息,從新組裝請求的URL。
三、最後,添加請求的URL到GATEWAY_ REQUEST_ URL_ ATTR,並提交到過濾器鏈中繼續執行
在組裝請求的地址時,若是loadbalancer沒有提供前綴的話,則使用默認的,即overrideScheme 爲null,不然的話使用 lb:
若是 ServerWebExchangeUtils.GATEWAY_ REQUEST_ URL_ ATTR 請求屬性中的URL 具備http或https前綴,NettyRoutingFilter 路由過濾器將運行,它使用 Netty HttpClient 代理對下游的請求。響應信息放在ServerWebExchangeUtils.CLIENT_ RESPONSE_ ATTR 屬性中,在過濾器鏈中進行傳遞。
該過濾器實際處理 和客戶端負載均衡的實現方式相似:
首先獲取請求的URL及前綴,判斷前綴是否是http或者https,若是該請求已經被路由或者前綴不合法,則調用過濾器鏈直接向後傳遞;不然正常對頭部進行過濾操做。
public class NettyRoutingFilter implements GlobalFilter, Ordered { private final HttpClient httpClient; private final ObjectProvider<List<HttpHeadersFilter>> headersFilters; private final HttpClientProperties properties; public NettyRoutingFilter(HttpClient httpClient, ObjectProvider<List<HttpHeadersFilter>> headersFilters, HttpClientProperties properties) { this.httpClient = httpClient; this.headersFilters = headersFilters; this.properties = properties; } @Override public int getOrder() { return Ordered.LOWEST_PRECEDENCE; } @Override public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) { URI requestUrl = exchange.getRequiredAttribute(GATEWAY_REQUEST_URL_ATTR); String scheme = requestUrl.getScheme(); if (isAlreadyRouted(exchange) || (!"http".equals(scheme) && !"https".equals(scheme))) { return chain.filter(exchange); } setAlreadyRouted(exchange); ServerHttpRequest request = exchange.getRequest(); final HttpMethod method = HttpMethod.valueOf(request.getMethod().toString()); final String url = requestUrl.toString(); HttpHeaders filtered = filterRequest(this.headersFilters.getIfAvailable(), exchange); final DefaultHttpHeaders httpHeaders = new DefaultHttpHeaders(); filtered.forEach(httpHeaders::set); String transferEncoding = request.getHeaders().getFirst(HttpHeaders.TRANSFER_ENCODING); boolean chunkedTransfer = "chunked".equalsIgnoreCase(transferEncoding); boolean preserveHost = exchange.getAttributeOrDefault(PRESERVE_HOST_HEADER_ATTRIBUTE, false); Mono<HttpClientResponse> responseMono = this.httpClient.request(method, url, req -> { final HttpClientRequest proxyRequest = req.options(NettyPipeline.SendOptions::flushOnEach) .headers(httpHeaders) .chunkedTransfer(chunkedTransfer) .failOnServerError(false) .failOnClientError(false); if (preserveHost) { String host = request.getHeaders().getFirst(HttpHeaders.HOST); proxyRequest.header(HttpHeaders.HOST, host); } if (properties.getResponseTimeout() != null) { proxyRequest.context(ctx -> ctx.addHandlerFirst( new ReadTimeoutHandler(properties.getResponseTimeout().toMillis(), TimeUnit.MILLISECONDS))); } return proxyRequest.sendHeaders() //I shouldn't need this .send(request.getBody().map(dataBuffer -> ((NettyDataBuffer) dataBuffer).getNativeBuffer())); }); return responseMono.doOnNext(res -> { ServerHttpResponse response = exchange.getResponse(); // put headers and status so filters can modify the response HttpHeaders headers = new HttpHeaders(); res.responseHeaders().forEach(entry -> headers.add(entry.getKey(), entry.getValue())); String contentTypeValue = headers.getFirst(HttpHeaders.CONTENT_TYPE); if (StringUtils.hasLength(contentTypeValue)) { exchange.getAttributes().put(ORIGINAL_RESPONSE_CONTENT_TYPE_ATTR, contentTypeValue); } HttpHeaders filteredResponseHeaders = HttpHeadersFilter.filter( this.headersFilters.getIfAvailable(), headers, exchange, Type.RESPONSE); response.getHeaders().putAll(filteredResponseHeaders); HttpStatus status = HttpStatus.resolve(res.status().code()); if (status != null) { response.setStatusCode(status); } else if (response instanceof AbstractServerHttpResponse) { // https://jira.spring.io/browse/SPR-16748 ((AbstractServerHttpResponse) response).setStatusCodeValue(res.status().code()); } else { throw new IllegalStateException("Unable to set status code on response: " +res.status().code()+", "+response.getClass()); } // Defer committing the response until all route filters have run // Put client response as ServerWebExchange attribute and write response later NettyWriteResponseFilter exchange.getAttributes().put(CLIENT_RESPONSE_ATTR, res); }) .onErrorMap(t -> properties.getResponseTimeout() != null && t instanceof ReadTimeoutException, t -> new TimeoutException("Response took longer than timeout: " + properties.getResponseTimeout())) .then(chain.filter(exchange)); } }
NettyRoutingFilter 過濾器的構造函數有三個參數:
HttpClient httpClient : 基於 Netty 實現的 HttpClient,經過該屬性請求後端 的 Http 服務
ObjectProvider<List> headersFilters: ObjectProvider 類型 的 headersFilters,用於頭部過濾
HttpClientProperties properties: Netty HttpClient 的配置屬性
filterRequest 用於對請求頭部的信息進行處理,是定義在接口 HttpHeadersFilter 中的默認方法,該接口有三個實現類,請求頭部將會通過這三個頭部過濾器,並最終返回修改以後的頭部。
public interface HttpHeadersFilter { enum Type { REQUEST, RESPONSE } /** * Filters a set of Http Headers * * @param input Http Headers * @param exchange * @return filtered Http Headers */ HttpHeaders filter(HttpHeaders input, ServerWebExchange exchange); static HttpHeaders filterRequest(List<HttpHeadersFilter> filters, ServerWebExchange exchange) { HttpHeaders headers = exchange.getRequest().getHeaders(); return filter(filters, headers, exchange, Type.REQUEST); } static HttpHeaders filter(List<HttpHeadersFilter> filters, HttpHeaders input, ServerWebExchange exchange, Type type) { HttpHeaders response = input; if (filters != null) { HttpHeaders reduce = filters.stream() .filter(headersFilter -> headersFilter.supports(type)) .reduce(input, (headers, filter) -> filter.filter(headers, exchange), (httpHeaders, httpHeaders2) -> { httpHeaders.addAll(httpHeaders2); return httpHeaders; }); return reduce; } return response; } default boolean supports(Type type) { return type.equals(Type.REQUEST); } }
HttpHeadersFilter 接口的三個實現類:
ForwardedHeadersFilter:
增長 Forwarded頭部,頭部值爲協議類型、host和目標地址
XForwardedHeadersFilter:
增長 X- Forwarded- For、 X- Forwarded- Host、 X- Forwarded- Port 和 X- Forwarded- Proto 頭部。 代理轉發時,用以自定義的頭部信息向下遊傳遞。
RemoveHopByHopHeadersFilter:
爲了定義緩存和非緩存代理的行爲,咱們將HTTP頭字段分爲兩類:端到端的頭部字段,發送給請求或響應的最終接收人;逐跳頭部字段,對單個傳輸級別鏈接有意義,而且不被緩存存儲或由代理轉發。
因此該頭部過濾器會移除逐跳頭部字段,包括如下8個字段:
Proxy- Authenticate
Proxy- Authorization
TE
Trailer
Transfer- Encoding
Upgrade
proxy- connection
content- length
NettyWriteResponseFilter 與 NettyRoutingFilter 成對使用。「 預」 過濾階段沒有任何內容,由於 CLIENT_ RESPONSE_ ATTR 在 WebHandler 運行以前不會被添加。
@Override public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) { // NOTICE: nothing in "pre" filter stage as CLIENT_RESPONSE_ATTR is not added // until the WebHandler is run return chain.filter(exchange).then(Mono.defer(() -> { HttpClientResponse clientResponse = exchange.getAttribute(CLIENT_RESPONSE_ATTR); if (clientResponse == null) { return Mono.empty(); } log.trace("NettyWriteResponseFilter start"); ServerHttpResponse response = exchange.getResponse(); NettyDataBufferFactory factory = (NettyDataBufferFactory) response.bufferFactory(); //TODO: what if it's not netty final Flux<NettyDataBuffer> body = clientResponse.receive() .retain() //TODO: needed? .map(factory::wrap); MediaType contentType = null; try { contentType = response.getHeaders().getContentType(); } catch (Exception e) { log.trace("invalid media type", e); } return (isStreamingMediaType(contentType) ? response.writeAndFlushWith(body.map(Flux::just)) : response.writeWith(body)); })); }
若是 CLIENT_ RESPONSE_ ATTR 請求 屬性 中 存在 Netty HttpClientResponse, 則 會應用 NettyWriteResponseFilter。 它在其餘過濾器完成後運行,並將代理響應寫回 網關客戶端響應。成對出現的 WebClientHttpRoutingFilter 和 WebClientWriteResponseFilter 過濾器,與基於Nettty 的路由和響應過濾器執行相同 的功能,但不須要使用Netty。
若是 ServerWebExchangeUtils.GATEWAY_ ROUTE_ ATTR 請求屬性中有Route對象, 則 會運行 RouteToRequestUrlFilter 過濾器。他會根據請求URI建立一個新的URI。 新的 URI 位於 ServerWebExchangeUtils.GATEWAY_REQUEST_URL_ATTR 請求屬性中。該過濾器會組裝成發送到代理服務的URL地址,向後傳遞到路由轉發的過濾器。
@Override public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) { Route route = exchange.getAttribute(GATEWAY_ROUTE_ATTR); if (route == null) { return chain.filter(exchange); } log.trace("RouteToRequestUrlFilter start"); URI uri = exchange.getRequest().getURI(); boolean encoded = containsEncodedParts(uri); URI routeUri = route.getUri(); if (hasAnotherScheme(routeUri)) { // this is a special url, save scheme to special attribute // replace routeUri with schemeSpecificPart exchange.getAttributes().put(GATEWAY_SCHEME_PREFIX_ATTR, routeUri.getScheme()); routeUri = URI.create(routeUri.getSchemeSpecificPart()); } URI mergedUrl = UriComponentsBuilder.fromUri(uri) // .uri(routeUri) .scheme(routeUri.getScheme()) .host(routeUri.getHost()) .port(routeUri.getPort()) .build(encoded) .toUri(); exchange.getAttributes().put(GATEWAY_REQUEST_URL_ATTR, mergedUrl); return chain.filter(exchange); }
首先獲取請求中的 Route, 如 果爲 空 則 直接 提交 過濾器 鏈; 不然 獲取 routeUri, 並 判斷 routeUri 是否 特殊, 若是 是 則需 要 處理 URL, 保存 前綴 到 GATEWAY_SCHEME_PREFIX_ATTR, 並將 routeUri 替換
首先獲取請求中的Route,若是爲空則直接提交給過濾器鏈
獲取routeUri並判斷是否特殊,若是是則須要處理URL,保存前綴到GATEWAY_SCHEME_PREFIX_ATTR,並將routeUri 替換爲schemeSpecificPart
而後拼接requestUrl,將請求的URI轉換爲路由定義的routeUri
最後,提交到過濾器鏈繼續執行
若是請求中的ServerWebExchangeUtils.GATEWAY_REQUEST_URL_ATTR 屬性對應的URL前綴爲 ws 或 wss,則啓用Websocket 路由過濾器。它使用Spring Web Socket 做爲底層通訊組件向下遊轉發 WebSocket 請求。Websocket 能夠經過添加前綴 lb來實現負載均衡,如 lb:ws://serviceid
若是您使用SockJS做爲普通http的回調,則應配置正常的HTTP路由以及Websocket路由
spring: cloud: gateway: routes: # SockJS route - id: websocket_sockjs_route uri: http://localhost:3001 predicates: - Path=/websocket/info/** # Normwal Websocket route - id: websocket_route uri: ws://localhost:3001 predicates: - Path=/websocket/**
Websocket 路由過濾器進行處理時,首先獲取請求的URL及其前綴,判斷是否知足 Websocket 過濾器啓用的條件;對於未被路由處理且請求前綴爲ws或wss的請求,設置路由處理狀態位,構造過濾後的頭部。最後將請求經過代理轉發。
// WebsocketRoutingFilter.java
@Override public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) { //檢查websocket 是不是 upgrade changeSchemeIfIsWebSocketUpgrade(exchange); URI requestUrl = exchange.getRequiredAttribute(GATEWAY_REQUEST_URL_ATTR); String scheme = requestUrl.getScheme(); //判斷是否知足websocket啓用條件 if (isAlreadyRouted(exchange) || (!"ws".equals(scheme) && !"wss".equals(scheme))) { return chain.filter(exchange); } setAlreadyRouted(exchange); HttpHeaders headers = exchange.getRequest().getHeaders(); HttpHeaders filtered = filterRequest(getHeadersFilters(), exchange); List<String> protocols = headers.get(SEC_WEBSOCKET_PROTOCOL); if (protocols != null) { protocols = headers.get(SEC_WEBSOCKET_PROTOCOL).stream() .flatMap(header -> Arrays.stream(commaDelimitedListToStringArray(header))) .map(String::trim) .collect(Collectors.toList()); } //將請求代理轉發 return this.webSocketService.handleRequest(exchange, new ProxyWebSocketHandler(requestUrl, this.webSocketClient, filtered, protocols)); }
ProxyWebSocketHandler 是 WebSocketHandler 的實現類,處理客戶端 WebSocket Session。 下面看一下代理 WebSocket 處理器的具體實現:
// WebsocketRoutingFilter.java
private static class ProxyWebSocketHandler implements WebSocketHandler { private final WebSocketClient client; private final URI url; private final HttpHeaders headers; private final List<String> subProtocols; public ProxyWebSocketHandler(URI url, WebSocketClient client, HttpHeaders headers, List<String> protocols) { this.client = client; this.url = url; this.headers = headers; if (protocols != null) { this.subProtocols = protocols; } else { this.subProtocols = Collections.emptyList(); } } @Override public List<String> getSubProtocols() { return this.subProtocols; } @Override public Mono<Void> handle(WebSocketSession session) { // pass headers along so custom headers can be sent through return client.execute(url, this.headers, new WebSocketHandler() { @Override public Mono<Void> handle(WebSocketSession proxySession) { // Use retain() for Reactor Netty Mono<Void> proxySessionSend = proxySession .send(session.receive().doOnNext(WebSocketMessage::retain)); // .log("proxySessionSend", Level.FINE); Mono<Void> serverSessionSend = session .send(proxySession.receive().doOnNext(WebSocketMessage::retain)); // .log("sessionSend", Level.FINE); return Mono.zip(proxySessionSend, serverSessionSend).then(); } /** * Copy subProtocols so they are available downstream. * @return */ @Override public List<String> getSubProtocols() { return ProxyWebSocketHandler.this.subProtocols; } }); } }
WebSocketClient# execute 方法鏈接後端被代理的 WebSocket 服務。
鏈接成功後,回調WebSocketHandler實現的內部類的handle( WebSocketSession session)方法
WebSocketHandler 實現的內部類實現對消息的轉發: 客戶端=> 具體業務服務=> 客戶 端; 而後合併代理服務的會話信息 proxySessionSend 和業務服務的會話信息serverSessionSend。
AdaptCachedBodyGlobalFilter— 用於緩存請求體的過濾器,在全局過濾器中的優先級較高。
ForwardPathFilter— 請求中的 gatewayRoute 屬性對應 Route 對象,當 Route 中的 URI scheme 爲 forward 模式 時, 該過濾器用於設置請求的 URI 路徑爲 Route 對象 中的 URI 路徑。