Spring Cloud Gateway(十一):全局過濾器GlobalFilter

本文基於 spring cloud gateway 2.0.1java

一、簡介

GlobalGilter 全局過濾器接口與 GatewayFilter 網關過濾器接口具備相同的方法定義。全局過濾器是一系列特殊的過濾器,會根據條件應用到全部路由中。網關過濾器是更細粒度的過濾器,做用於指定的路由中。web

在這裏插入圖片描述

從類圖中能夠看到 GlobalFilter 有十一個實現類,包括路由轉發、負載均衡、ws 路由、netty 路由等全局過濾器。下面咱們就分別介紹一下這些全局路由過濾器的實現。spring

二、ForwardRoutingFilter 轉發路由過濾器

ForwardRoutingFilter 在交換屬性 ServerWebExchangeUtils.GATEWAY_ REQUEST_ URL_ ATTR 中 查找 URL, 若是 URL 爲轉發模式即 forward:/// localendpoint, 它將使用Spring DispatcherHandler 來處 理請求。 未修改的原始 URL 將保存到 GATEWAY_ ORIGINAL_ REQUEST_ URL_ ATTR 屬性的列表中。後端

public class ForwardRoutingFilter implements GlobalFilter, Ordered {

	private static final Log log = LogFactory.getLog(ForwardRoutingFilter.class);

	private final ObjectProvider<DispatcherHandler> dispatcherHandler;

	public ForwardRoutingFilter(ObjectProvider<DispatcherHandler> dispatcherHandler) {
		this.dispatcherHandler = dispatcherHandler;
	}

	@Override
	public int getOrder() {
		return Ordered.LOWEST_PRECEDENCE;
	}

	@Override
	public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
		URI requestUrl = exchange.getRequiredAttribute(GATEWAY_REQUEST_URL_ATTR);
        //獲取請求URI的請求結構
		String scheme = requestUrl.getScheme();
		//該路由已經被處理或者URI格式不是forward則繼續其它過濾器
		if (isAlreadyRouted(exchange) || !"forward".equals(scheme)) {
			return chain.filter(exchange);
		}
		setAlreadyRouted(exchange);

		//TODO: translate url?

		if (log.isTraceEnabled()) {
			log.trace("Forwarding to URI: "+requestUrl);
		}
        // 使用dispatcherHandler進行處理
		return this.dispatcherHandler.getIfAvailable().handle(exchange);
	}
}

轉發路由過濾器實現比較簡單,構造函數傳入請求的分發處理器DispatcherHandler。過濾器執行時,首先獲取請求地址的url前綴,而後判斷該請求是否已被路由處理或者URL的前綴不是forward,則繼續執行過濾器鏈;不然設置路由處理狀態並交由DispatcherHandler進行處理。緩存

請求路由是否被處理的判斷以下:websocket

// ServerWebExchangeUtils.javasession

public static void setAlreadyRouted(ServerWebExchange exchange) {
		exchange.getAttributes().put(GATEWAY_ALREADY_ROUTED_ATTR, true);
	}

	public static boolean isAlreadyRouted(ServerWebExchange exchange) {
		return exchange.getAttributeOrDefault(GATEWAY_ALREADY_ROUTED_ATTR, false);
	}

兩個 方法 定義 在 ServerWebExchangeUtils 中, 這 兩個 方法 用於 修改 與 查詢 ServerWebExchange 中的 Map< String, Object> getAttributes(),# getAttributes 方法 返回 當前 exchange 所請 求 屬性 的 可變 映射。負載均衡

這兩個方法定義在 ServerWebExchangeUtils 中,分別用於修改和查詢 GATEWAY_ALREADY_ROUTED_ATTR 狀態。socket

三、LoadBalancerClientFilter 負載均衡客戶端過濾器

spring:
  cloud:
    gateway:
      routes:
      - id: myRoute
        uri: lb://service
        predicates:
        - Path=/service/**

LoadBalancerClientFilter 在交換屬性 GATEWAY_ REQUEST_ URL_ ATTR 中查找URL, 若是URL有一個 lb 前綴 ,即 lb:// myservice,將使用 LoadBalancerClient 將名稱 解析爲實際的主機和端口,如示例中的 myservice。 未修改的原始 URL將保存到 GATEWAY_ ORIGINAL_ REQUEST_ URL_ ATTR 屬性的列表中。過濾器還將查看ServerWebExchangeUtils.GATEWAY_SCHEME_PREFIX_ATTR屬性以查看它是否等於lb,而後應用相同的規則。ide

@Override
	public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
		URI url = exchange.getAttribute(GATEWAY_REQUEST_URL_ATTR);
		String schemePrefix = exchange.getAttribute(GATEWAY_SCHEME_PREFIX_ATTR);
		
		if (url == null || (!"lb".equals(url.getScheme()) && !"lb".equals(schemePrefix))) {
			return chain.filter(exchange);
		}
		//保留原始url
		addOriginalRequestUrl(exchange, url);

		log.trace("LoadBalancerClientFilter url before: " + url);
        //負載均衡到具體服務實例
		final ServiceInstance instance = choose(exchange);

		if (instance == null) {
			throw new NotFoundException("Unable to find instance for " + url.getHost());
		}

		URI uri = exchange.getRequest().getURI();

	    //若是沒有提供前綴的話,則會使用默認的'< scheme>',不然使用' lb:< scheme>' 機制。
		String overrideScheme = null;
		if (schemePrefix != null) {
			overrideScheme = url.getScheme();
		}
        //根據獲取的服務實例信息,從新組裝請求的 url
		URI requestUrl = loadBalancer.reconstructURI(new DelegatingServiceInstance(instance, overrideScheme), uri);
        // Routing 相關 的 GatewayFilter 會 經過 GATEWAY_ REQUEST_ URL_ ATTR 屬性, 發起 請求。
		log.trace("LoadBalancerClientFilter url chosen: " + requestUrl);
		exchange.getAttributes().put(GATEWAY_REQUEST_URL_ATTR, requestUrl);
		return chain.filter(exchange);
	}

從過濾器執行方法中能夠看出,負載均衡客戶端過濾器的實現步驟以下:

一、構造函數傳入負載均衡客戶端,依賴中添加 Spring Cloud Netflix Ribbon 便可 注入 該 Bean。

二、獲取請求的 URL 及其前綴,若是 URL 不爲空且前綴爲lb或者網關請求的前綴是 lb,則保存原始的URL,負載到具體的服務實例並根據獲取的服務實例信息,從新組裝請求的URL。

三、最後,添加請求的URL到GATEWAY_ REQUEST_ URL_ ATTR,並提交到過濾器鏈中繼續執行

在組裝請求的地址時,若是loadbalancer沒有提供前綴的話,則使用默認的,即overrideScheme 爲null,不然的話使用 lb:

四、NettyRoutingFilter 和 NettyWriteResponseFilter

若是 ServerWebExchangeUtils.GATEWAY_ REQUEST_ URL_ ATTR 請求屬性中的URL 具備http或https前綴,NettyRoutingFilter 路由過濾器將運行,它使用 Netty HttpClient 代理對下游的請求。響應信息放在ServerWebExchangeUtils.CLIENT_ RESPONSE_ ATTR 屬性中,在過濾器鏈中進行傳遞。

該過濾器實際處理 和客戶端負載均衡的實現方式相似:

首先獲取請求的URL及前綴,判斷前綴是否是http或者https,若是該請求已經被路由或者前綴不合法,則調用過濾器鏈直接向後傳遞;不然正常對頭部進行過濾操做。

public class NettyRoutingFilter implements GlobalFilter, Ordered {

	private final HttpClient httpClient;
	private final ObjectProvider<List<HttpHeadersFilter>> headersFilters;
	private final HttpClientProperties properties;

	public NettyRoutingFilter(HttpClient httpClient,
							  ObjectProvider<List<HttpHeadersFilter>> headersFilters,
							  HttpClientProperties properties) {
		this.httpClient = httpClient;
		this.headersFilters = headersFilters;
		this.properties = properties;
	}

	@Override
	public int getOrder() {
		return Ordered.LOWEST_PRECEDENCE;
	}

	@Override
	public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
		URI requestUrl = exchange.getRequiredAttribute(GATEWAY_REQUEST_URL_ATTR);

		String scheme = requestUrl.getScheme();
		if (isAlreadyRouted(exchange) || (!"http".equals(scheme) && !"https".equals(scheme))) {
			return chain.filter(exchange);
		}
		setAlreadyRouted(exchange);

		ServerHttpRequest request = exchange.getRequest();

		final HttpMethod method = HttpMethod.valueOf(request.getMethod().toString());
		final String url = requestUrl.toString();

		HttpHeaders filtered = filterRequest(this.headersFilters.getIfAvailable(),
				exchange);

		final DefaultHttpHeaders httpHeaders = new DefaultHttpHeaders();
		filtered.forEach(httpHeaders::set);

		String transferEncoding = request.getHeaders().getFirst(HttpHeaders.TRANSFER_ENCODING);
		boolean chunkedTransfer = "chunked".equalsIgnoreCase(transferEncoding);

		boolean preserveHost = exchange.getAttributeOrDefault(PRESERVE_HOST_HEADER_ATTRIBUTE, false);

		Mono<HttpClientResponse> responseMono = this.httpClient.request(method, url, req -> {
			final HttpClientRequest proxyRequest = req.options(NettyPipeline.SendOptions::flushOnEach)
					.headers(httpHeaders)
					.chunkedTransfer(chunkedTransfer)
					.failOnServerError(false)
					.failOnClientError(false);

			if (preserveHost) {
				String host = request.getHeaders().getFirst(HttpHeaders.HOST);
				proxyRequest.header(HttpHeaders.HOST, host);
			}

			if (properties.getResponseTimeout() != null) {
				proxyRequest.context(ctx -> ctx.addHandlerFirst(
						new ReadTimeoutHandler(properties.getResponseTimeout().toMillis(), TimeUnit.MILLISECONDS)));
			}

			return proxyRequest.sendHeaders() //I shouldn't need this
					.send(request.getBody().map(dataBuffer ->
							((NettyDataBuffer) dataBuffer).getNativeBuffer()));
		});

		return responseMono.doOnNext(res -> {
			ServerHttpResponse response = exchange.getResponse();
			// put headers and status so filters can modify the response
			HttpHeaders headers = new HttpHeaders();

			res.responseHeaders().forEach(entry -> headers.add(entry.getKey(), entry.getValue()));

			String contentTypeValue = headers.getFirst(HttpHeaders.CONTENT_TYPE);
			if (StringUtils.hasLength(contentTypeValue)) {
				exchange.getAttributes().put(ORIGINAL_RESPONSE_CONTENT_TYPE_ATTR, contentTypeValue);
			}

			HttpHeaders filteredResponseHeaders = HttpHeadersFilter.filter(
					this.headersFilters.getIfAvailable(), headers, exchange, Type.RESPONSE);
			
			response.getHeaders().putAll(filteredResponseHeaders);
			HttpStatus status = HttpStatus.resolve(res.status().code());
			if (status != null) {
				response.setStatusCode(status);
			} else if (response instanceof AbstractServerHttpResponse) {
				// https://jira.spring.io/browse/SPR-16748
				((AbstractServerHttpResponse) response).setStatusCodeValue(res.status().code());
			} else {
				throw new IllegalStateException("Unable to set status code on response: " +res.status().code()+", "+response.getClass());
			}

			// Defer committing the response until all route filters have run
			// Put client response as ServerWebExchange attribute and write response later NettyWriteResponseFilter
			exchange.getAttributes().put(CLIENT_RESPONSE_ATTR, res);
		})
				.onErrorMap(t -> properties.getResponseTimeout() != null && t instanceof ReadTimeoutException,
						t -> new TimeoutException("Response took longer than timeout: " +
								properties.getResponseTimeout()))
				.then(chain.filter(exchange));
	}
}

NettyRoutingFilter 過濾器的構造函數有三個參數:

HttpClient httpClient : 基於 Netty 實現的 HttpClient,經過該屬性請求後端 的 Http 服務

ObjectProvider<List> headersFilters: ObjectProvider 類型 的 headersFilters,用於頭部過濾

HttpClientProperties properties: Netty HttpClient 的配置屬性

4.一、NettyRoutingFilter ## HttpHeadersFilter 頭部過濾器接口

filterRequest 用於對請求頭部的信息進行處理,是定義在接口 HttpHeadersFilter 中的默認方法,該接口有三個實現類,請求頭部將會通過這三個頭部過濾器,並最終返回修改以後的頭部。

public interface HttpHeadersFilter {

	enum Type {
		REQUEST, RESPONSE
	}

	/**
	 * Filters a set of Http Headers
	 * 
	 * @param input Http Headers
	 * @param exchange
	 * @return filtered Http Headers
	 */
	HttpHeaders filter(HttpHeaders input, ServerWebExchange exchange);

	static HttpHeaders filterRequest(List<HttpHeadersFilter> filters,
							  ServerWebExchange exchange) {
		HttpHeaders headers = exchange.getRequest().getHeaders();
		return filter(filters, headers, exchange, Type.REQUEST);
	}

	static HttpHeaders filter(List<HttpHeadersFilter> filters, HttpHeaders input,
			ServerWebExchange exchange, Type type) {
		HttpHeaders response = input;
		if (filters != null) {
			HttpHeaders reduce = filters.stream()
					.filter(headersFilter -> headersFilter.supports(type))
					.reduce(input,
							(headers, filter) -> filter.filter(headers, exchange),
							(httpHeaders, httpHeaders2) -> {
								httpHeaders.addAll(httpHeaders2);
								return httpHeaders;
							});
			return reduce;
		}

		return response;
	}

	default boolean supports(Type type) {
		return type.equals(Type.REQUEST);
	}
}

HttpHeadersFilter 接口的三個實現類:

  • ForwardedHeadersFilter:

    增長 Forwarded頭部,頭部值爲協議類型、host和目標地址

  • XForwardedHeadersFilter:

    增長 X- Forwarded- For、 X- Forwarded- Host、 X- Forwarded- Port 和 X- Forwarded- Proto 頭部。 代理轉發時,用以自定義的頭部信息向下遊傳遞。

  • RemoveHopByHopHeadersFilter:

    爲了定義緩存和非緩存代理的行爲,咱們將HTTP頭字段分爲兩類:端到端的頭部字段,發送給請求或響應的最終接收人;逐跳頭部字段,對單個傳輸級別鏈接有意義,而且不被緩存存儲或由代理轉發。

    因此該頭部過濾器會移除逐跳頭部字段,包括如下8個字段:

    Proxy- Authenticate

    Proxy- Authorization

    TE

    Trailer

    Transfer- Encoding

    Upgrade

    proxy- connection

    content- length

4.二、NettyWriteResponseFilter

NettyWriteResponseFilter 與 NettyRoutingFilter 成對使用。「 預」 過濾階段沒有任何內容,由於 CLIENT_ RESPONSE_ ATTR 在 WebHandler 運行以前不會被添加。

@Override
	public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
		// NOTICE: nothing in "pre" filter stage as CLIENT_RESPONSE_ATTR is not added
		// until the WebHandler is run
		return chain.filter(exchange).then(Mono.defer(() -> {
			HttpClientResponse clientResponse = exchange.getAttribute(CLIENT_RESPONSE_ATTR);

			if (clientResponse == null) {
				return Mono.empty();
			}
			log.trace("NettyWriteResponseFilter start");
			ServerHttpResponse response = exchange.getResponse();

			NettyDataBufferFactory factory = (NettyDataBufferFactory) response.bufferFactory();
			//TODO: what if it's not netty

			final Flux<NettyDataBuffer> body = clientResponse.receive()
					.retain() //TODO: needed?
					.map(factory::wrap);

			MediaType contentType = null;
			try {
				contentType = response.getHeaders().getContentType();
			} catch (Exception e) {
				log.trace("invalid media type", e);
			}
			return (isStreamingMediaType(contentType) ?
					response.writeAndFlushWith(body.map(Flux::just)) : response.writeWith(body));
		}));
	}

若是 CLIENT_ RESPONSE_ ATTR 請求 屬性 中 存在 Netty HttpClientResponse, 則 會應用 NettyWriteResponseFilter。 它在其餘過濾器完成後運行,並將代理響應寫回 網關客戶端響應。成對出現的 WebClientHttpRoutingFilter 和 WebClientWriteResponseFilter 過濾器,與基於Nettty 的路由和響應過濾器執行相同 的功能,但不須要使用Netty。

五、RouteToRequestUrlFilter 路由到指定url的過濾器

若是 ServerWebExchangeUtils.GATEWAY_ ROUTE_ ATTR 請求屬性中有Route對象, 則 會運行 RouteToRequestUrlFilter 過濾器。他會根據請求URI建立一個新的URI。 新的 URI 位於 ServerWebExchangeUtils.GATEWAY_REQUEST_URL_ATTR 請求屬性中。該過濾器會組裝成發送到代理服務的URL地址,向後傳遞到路由轉發的過濾器。

@Override
	public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
		Route route = exchange.getAttribute(GATEWAY_ROUTE_ATTR);
		if (route == null) {
			return chain.filter(exchange);
		}
		log.trace("RouteToRequestUrlFilter start");
		URI uri = exchange.getRequest().getURI();
		boolean encoded = containsEncodedParts(uri);
		URI routeUri = route.getUri();

		if (hasAnotherScheme(routeUri)) {
			// this is a special url, save scheme to special attribute
			// replace routeUri with schemeSpecificPart
			exchange.getAttributes().put(GATEWAY_SCHEME_PREFIX_ATTR, routeUri.getScheme());
			routeUri = URI.create(routeUri.getSchemeSpecificPart());
		}

		URI mergedUrl = UriComponentsBuilder.fromUri(uri)
				// .uri(routeUri)
				.scheme(routeUri.getScheme())
				.host(routeUri.getHost())
				.port(routeUri.getPort())
				.build(encoded)
				.toUri();
		exchange.getAttributes().put(GATEWAY_REQUEST_URL_ATTR, mergedUrl);
		return chain.filter(exchange);
	}
  • 首先獲取請求中的 Route, 如 果爲 空 則 直接 提交 過濾器 鏈; 不然 獲取 routeUri, 並 判斷 routeUri 是否 特殊, 若是 是 則需 要 處理 URL, 保存 前綴 到 GATEWAY_SCHEME_PREFIX_ATTR, 並將 routeUri 替換

  • 首先獲取請求中的Route,若是爲空則直接提交給過濾器鏈

  • 獲取routeUri並判斷是否特殊,若是是則須要處理URL,保存前綴到GATEWAY_SCHEME_PREFIX_ATTR,並將routeUri 替換爲schemeSpecificPart

  • 而後拼接requestUrl,將請求的URI轉換爲路由定義的routeUri

  • 最後,提交到過濾器鏈繼續執行

六、WebsocketRoutingFilter

若是請求中的ServerWebExchangeUtils.GATEWAY_REQUEST_URL_ATTR 屬性對應的URL前綴爲 ws 或 wss,則啓用Websocket 路由過濾器。它使用Spring Web Socket 做爲底層通訊組件向下遊轉發 WebSocket 請求。Websocket 能夠經過添加前綴 lb來實現負載均衡,如 lb:ws://serviceid

若是您使用SockJS做爲普通http的回調,則應配置正常的HTTP路由以及Websocket路由

spring:
  cloud:
    gateway:
      routes:
      # SockJS route
      - id: websocket_sockjs_route
        uri: http://localhost:3001
        predicates:
        - Path=/websocket/info/**
      # Normwal Websocket route
      - id: websocket_route
        uri: ws://localhost:3001
        predicates:
        - Path=/websocket/**

Websocket 路由過濾器進行處理時,首先獲取請求的URL及其前綴,判斷是否知足 Websocket 過濾器啓用的條件;對於未被路由處理且請求前綴爲ws或wss的請求,設置路由處理狀態位,構造過濾後的頭部。最後將請求經過代理轉發。

// WebsocketRoutingFilter.java

@Override
	public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
	    //檢查websocket 是不是 upgrade
		changeSchemeIfIsWebSocketUpgrade(exchange);

		URI requestUrl = exchange.getRequiredAttribute(GATEWAY_REQUEST_URL_ATTR);
		String scheme = requestUrl.getScheme();
        //判斷是否知足websocket啓用條件
		if (isAlreadyRouted(exchange) || (!"ws".equals(scheme) && !"wss".equals(scheme))) {
			return chain.filter(exchange);
		}
		setAlreadyRouted(exchange);


		HttpHeaders headers = exchange.getRequest().getHeaders();
		HttpHeaders filtered = filterRequest(getHeadersFilters(),
				exchange);

		List<String> protocols = headers.get(SEC_WEBSOCKET_PROTOCOL);
		if (protocols != null) {
			protocols = headers.get(SEC_WEBSOCKET_PROTOCOL).stream()
					.flatMap(header -> Arrays.stream(commaDelimitedListToStringArray(header)))
					.map(String::trim)
					.collect(Collectors.toList());
		}
        //將請求代理轉發
		return this.webSocketService.handleRequest(exchange,
				new ProxyWebSocketHandler(requestUrl, this.webSocketClient,
						filtered, protocols));
	}

ProxyWebSocketHandler 是 WebSocketHandler 的實現類,處理客戶端 WebSocket Session。 下面看一下代理 WebSocket 處理器的具體實現:

// WebsocketRoutingFilter.java

private static class ProxyWebSocketHandler implements WebSocketHandler {

		private final WebSocketClient client;
		private final URI url;
		private final HttpHeaders headers;
		private final List<String> subProtocols;

		public ProxyWebSocketHandler(URI url, WebSocketClient client, HttpHeaders headers, List<String> protocols) {
			this.client = client;
			this.url = url;
			this.headers = headers;
			if (protocols != null) {
				this.subProtocols = protocols;
			} else {
				this.subProtocols = Collections.emptyList();
			}
		}

		@Override
		public List<String> getSubProtocols() {
			return this.subProtocols;
		}

		@Override
		public Mono<Void> handle(WebSocketSession session) {
			// pass headers along so custom headers can be sent through
			return client.execute(url, this.headers, new WebSocketHandler() {
				@Override
				public Mono<Void> handle(WebSocketSession proxySession) {
					// Use retain() for Reactor Netty
					Mono<Void> proxySessionSend = proxySession
							.send(session.receive().doOnNext(WebSocketMessage::retain));
                            // .log("proxySessionSend", Level.FINE);
					Mono<Void> serverSessionSend = session
							.send(proxySession.receive().doOnNext(WebSocketMessage::retain));
                            // .log("sessionSend", Level.FINE);
					return Mono.zip(proxySessionSend, serverSessionSend).then();
				}

				/**
				 * Copy subProtocols so they are available downstream.
				 * @return
				 */
				@Override
				public List<String> getSubProtocols() {
					return ProxyWebSocketHandler.this.subProtocols;
				}
			});
		}
	}
  • WebSocketClient# execute 方法鏈接後端被代理的 WebSocket 服務。

  • 鏈接成功後,回調WebSocketHandler實現的內部類的handle( WebSocketSession session)方法

  • WebSocketHandler 實現的內部類實現對消息的轉發: 客戶端=> 具體業務服務=> 客戶 端; 而後合併代理服務的會話信息 proxySessionSend 和業務服務的會話信息serverSessionSend。

七、其它過濾器

AdaptCachedBodyGlobalFilter— 用於緩存請求體的過濾器,在全局過濾器中的優先級較高。

ForwardPathFilter— 請求中的 gatewayRoute 屬性對應 Route 對象,當 Route 中的 URI scheme 爲 forward 模式 時, 該過濾器用於設置請求的 URI 路徑爲 Route 對象 中的 URI 路徑。

相關文章
相關標籤/搜索