本文主要研究下spring cloud gateway的streaming-media-types屬性html
{ "sourceType": "org.springframework.cloud.gateway.config.GatewayProperties", "name": "spring.cloud.gateway.streaming-media-types", "type": "java.util.List<org.springframework.http.MediaType>" }
spring-cloud-gateway-core-2.0.0.RC2-sources.jar!/org/springframework/cloud/gateway/config/GatewayProperties.javajava
@ConfigurationProperties("spring.cloud.gateway") @Validated public class GatewayProperties { /** * List of Routes */ @NotNull @Valid private List<RouteDefinition> routes = new ArrayList<>(); /** * List of filter definitions that are applied to every route. */ private List<FilterDefinition> defaultFilters = new ArrayList<>(); private List<MediaType> streamingMediaTypes = Arrays.asList(MediaType.TEXT_EVENT_STREAM, MediaType.APPLICATION_STREAM_JSON); public List<RouteDefinition> getRoutes() { return routes; } public void setRoutes(List<RouteDefinition> routes) { this.routes = routes; } public List<FilterDefinition> getDefaultFilters() { return defaultFilters; } public void setDefaultFilters(List<FilterDefinition> defaultFilters) { this.defaultFilters = defaultFilters; } public List<MediaType> getStreamingMediaTypes() { return streamingMediaTypes; } public void setStreamingMediaTypes(List<MediaType> streamingMediaTypes) { this.streamingMediaTypes = streamingMediaTypes; } @Override public String toString() { return "GatewayProperties{" + "routes=" + routes + ", defaultFilters=" + defaultFilters + ", streamingMediaTypes=" + streamingMediaTypes + '}'; } }
能夠看到默認是MediaType.TEXT_EVENT_STREAM(text/event-stream
)、MediaType.APPLICATION_STREAM_JSON(application/stream+json
)
spring-cloud-gateway-core-2.0.0.RC2-sources.jar!/org/springframework/cloud/gateway/config/GatewayAutoConfiguration.javareact
@Configuration @ConditionalOnClass(HttpClient.class) protected static class NettyConfiguration { @Bean @ConditionalOnMissingBean public HttpClient httpClient(@Qualifier("nettyClientOptions") Consumer<? super HttpClientOptions.Builder> options) { return HttpClient.create(options); } //...... @Bean public HttpClientProperties httpClientProperties() { return new HttpClientProperties(); } @Bean public NettyRoutingFilter routingFilter(HttpClient httpClient, ObjectProvider<List<HttpHeadersFilter>> headersFilters) { return new NettyRoutingFilter(httpClient, headersFilters); } @Bean public NettyWriteResponseFilter nettyWriteResponseFilter(GatewayProperties properties) { return new NettyWriteResponseFilter(properties.getStreamingMediaTypes()); } @Bean public ReactorNettyWebSocketClient reactorNettyWebSocketClient(@Qualifier("nettyClientOptions") Consumer<? super HttpClientOptions.Builder> options) { return new ReactorNettyWebSocketClient(options); } }
這裏的NettyWriteResponseFilter使用到了properties.getStreamingMediaTypes()
spring-cloud-gateway-core-2.0.0.RC2-sources.jar!/org/springframework/cloud/gateway/filter/NettyWriteResponseFilter.javacweb
public class NettyWriteResponseFilter implements GlobalFilter, Ordered { private static final Log log = LogFactory.getLog(NettyWriteResponseFilter.class); public static final int WRITE_RESPONSE_FILTER_ORDER = -1; private final List<MediaType> streamingMediaTypes; public NettyWriteResponseFilter(List<MediaType> streamingMediaTypes) { this.streamingMediaTypes = streamingMediaTypes; } @Override public int getOrder() { return WRITE_RESPONSE_FILTER_ORDER; } @Override public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) { // NOTICE: nothing in "pre" filter stage as CLIENT_RESPONSE_ATTR is not added // until the WebHandler is run return chain.filter(exchange).then(Mono.defer(() -> { HttpClientResponse clientResponse = exchange.getAttribute(CLIENT_RESPONSE_ATTR); if (clientResponse == null) { return Mono.empty(); } log.trace("NettyWriteResponseFilter start"); ServerHttpResponse response = exchange.getResponse(); NettyDataBufferFactory factory = (NettyDataBufferFactory) response.bufferFactory(); //TODO: what if it's not netty final Flux<NettyDataBuffer> body = clientResponse.receive() .retain() //TODO: needed? .map(factory::wrap); MediaType contentType = response.getHeaders().getContentType(); return (isStreamingMediaType(contentType) ? response.writeAndFlushWith(body.map(Flux::just)) : response.writeWith(body)); })); } //TODO: use framework if possible //TODO: port to WebClientWriteResponseFilter private boolean isStreamingMediaType(@Nullable MediaType contentType) { return (contentType != null && this.streamingMediaTypes.stream() .anyMatch(contentType::isCompatibleWith)); } }
能夠看到這裏根據isStreamingMediaType方法判斷是不是stream類型,若是是則採用writeAndFlushWith方法,不是則採用writeWith方法
spring-web-5.0.6.RELEASE-sources.jar!/org/springframework/http/ReactiveHttpOutputMessage.javaspring
/** * A "reactive" HTTP output message that accepts output as a {@link Publisher}. * * <p>Typically implemented by an HTTP request on the client-side or an * HTTP response on the server-side. * * @author Arjen Poutsma * @author Sebastien Deleuze * @since 5.0 */ public interface ReactiveHttpOutputMessage extends HttpMessage { /** * Return a {@link DataBufferFactory} that can be used to create the body. * @return a buffer factory * @see #writeWith(Publisher) */ DataBufferFactory bufferFactory(); /** * Register an action to apply just before the HttpOutputMessage is committed. * <p><strong>Note:</strong> the supplied action must be properly deferred, * e.g. via {@link Mono#defer} or {@link Mono#fromRunnable}, to ensure it's * executed in the right order, relative to other actions. * @param action the action to apply */ void beforeCommit(Supplier<? extends Mono<Void>> action); /** * Whether the HttpOutputMessage is committed. */ boolean isCommitted(); /** * Use the given {@link Publisher} to write the body of the message to the * underlying HTTP layer. * @param body the body content publisher * @return a {@link Mono} that indicates completion or error */ Mono<Void> writeWith(Publisher<? extends DataBuffer> body); /** * Use the given {@link Publisher} of {@code Publishers} to write the body * of the HttpOutputMessage to the underlying HTTP layer, flushing after * each {@code Publisher<DataBuffer>}. * @param body the body content publisher * @return a {@link Mono} that indicates completion or error */ Mono<Void> writeAndFlushWith(Publisher<? extends Publisher<? extends DataBuffer>> body); /** * Indicate that message handling is complete, allowing for any cleanup or * end-of-processing tasks to be performed such as applying header changes * made via {@link #getHeaders()} to the underlying HTTP message (if not * applied already). * <p>This method should be automatically invoked at the end of message * processing so typically applications should not have to invoke it. * If invoked multiple times it should have no side effects. * @return a {@link Mono} that indicates completion or error */ Mono<Void> setComplete(); }
從接口的註釋能夠看到,writeWith與writeAndFlushWith的參數泛型不一樣,一個是Publisher<? extends DataBuffer>,一個是Publisher<? extends Publisher<? extends DataBuffer>>。而writeAndFlushWith則是在每一個Publisher<DataBuffer>寫入以後就flush。
NettyWriteResponseFilter根據spring.cloud.gateway.streaming-media-types配置的類型來判斷是writeAndFlushWith仍是writeWith,若是是指定類型則選擇用writeAndFlushWith寫入response。默認該配置指定了MediaType.TEXT_EVENT_STREAM(text/event-stream
)、MediaType.APPLICATION_STREAM_JSON(application/stream+json
)這兩種類型。json