Spring WebFlux之HttpHandler的探索

這是本人正在寫的《Java 編程方法論:響應式Reactor三、Reactor-Netty和Spring WebFlux》一書的文章節選,它是《Java編程方法論:響應式RxJava與代碼設計實戰》的續篇,也可做爲獨立的一原本讀html

這是此節上半段的節選內容java

HttpHandler的探索

經過前面的章節,咱們已經接觸了Reactor-Netty整個流程的設計實現細節,同時也涉及到了reactor.netty.http.server.HttpServer#handle,準確得說,它是一個SPI(Service Provider Interface)接口,對外提供BiFunction<? super HttpServerRequest, ? super HttpServerResponse, ? extends Publisher<Void>> handler,這樣,咱們能夠針對該handler依據自身環境進行相應實現。 Spring WebFluxReactor-Netty都有一套屬於本身的實現,只不過前者爲了適應Spring Web的一些習慣作了大量的適配設計,整個過程比較複雜,後者提供了一套簡單而靈活的實現。那麼本章咱們就從Reactor-Netty內對它的實現開始,正式向Spring WebFlux進行過渡。react

HttpServerRoutes設定

咱們在給後臺服務器提交HTTP請求的時候,每每會涉及到getheadpostput這幾種類型,還會包括請求地址,服務端會根據請求類型和請求地址提供對應的服務,而後纔是具體的處理,那麼咱們是否是能夠將尋找服務的這個過程抽取出來,造成服務路由查找。編程

因而,在Reactor-Netty中,設計了一個HttpServerRoutes接口,該接口繼承了BiFunction<HttpServerRequest, HttpServerResponse, Publisher<Void>>,用來路由請求,當請求來臨時,對咱們所設計的路由規則按順序依次查找,直到第一個匹配,而後調用對應的處理handlerHttpServerRoutes接口內針對於咱們經常使用的getheadpostputdelete等請求設計了對應的路由規則(具體請看下面源碼)。服務器

咱們在使用的時候首先會調用HttpServerRoutes#newRoutes獲得一個DefaultHttpServerRoutes實例,而後加入咱們設計的路由規則,關於路由規則的設計,其實就是將一條條規則經過一個集合管理起來,而後在須要時進行遍歷匹配便可,這裏它的核心組織方法就是reactor.netty.http.server.HttpServerRoutes#route,在規則設計完後,咱們就能夠設計對應每一條規則的BiFunction<HttpServerRequest, HttpServerResponse, Publisher<Void>>函數式實現,最後,當請求路由匹配成功,就能夠調用咱們的BiFunction實現,對請求進行處理。app

//reactor.netty.http.server.HttpServerRoutes
public interface HttpServerRoutes extends BiFunction<HttpServerRequest, HttpServerResponse, Publisher<Void>> {

	static HttpServerRoutes newRoutes() {
		return new DefaultHttpServerRoutes();
	}


	default HttpServerRoutes delete(String path, BiFunction<? super HttpServerRequest, ? super HttpServerResponse, ? extends Publisher<Void>> handler) {
		return route(HttpPredicate.delete(path), handler);
	}

    ...

	default HttpServerRoutes get(String path, BiFunction<? super HttpServerRequest, ? super HttpServerResponse, ? extends Publisher<Void>> handler) {
		return route(HttpPredicate.get(path), handler);
	}

	default HttpServerRoutes head(String path, BiFunction<? super HttpServerRequest, ? super HttpServerResponse, ? extends Publisher<Void>> handler) {
		return route(HttpPredicate.head(path), handler);
	}

	default HttpServerRoutes index(final BiFunction<? super HttpServerRequest, ? super HttpServerResponse, ? extends Publisher<Void>> handler) {
		return route(INDEX_PREDICATE, handler);
	}

	default HttpServerRoutes options(String path, BiFunction<? super HttpServerRequest, ? super HttpServerResponse, ? extends Publisher<Void>> handler) {
		return route(HttpPredicate.options(path), handler);
	}

	default HttpServerRoutes post(String path, BiFunction<? super HttpServerRequest, ? super HttpServerResponse, ? extends Publisher<Void>> handler) {
		return route(HttpPredicate.post(path), handler);
	}

	default HttpServerRoutes put(String path, BiFunction<? super HttpServerRequest, ? super HttpServerResponse, ? extends Publisher<Void>> handler) {
		return route(HttpPredicate.put(path), handler);
	}

	HttpServerRoutes route(Predicate<? super HttpServerRequest> condition, BiFunction<? super HttpServerRequest, ? super HttpServerResponse, ? extends Publisher<Void>> handler);

	...

}
複製代碼

關於路由規則的設計,結合前面所講,咱們能夠在HttpServerRoutes的實現類中設計一個List用來存儲一條條的規則,接下來要作的就是將制定的規則一條條放入其中便可,由於這是一個添加過程,並不須要返回值,咱們可使用Consumer<? super HttpServerRoutes>來表明這個過程。對於請求的匹配,每每都是對請求的條件判斷,那咱們可使用Predicate<? super HttpServerRequest>來表明這個判斷邏輯,因爲單條路由規則匹配對應的BiFunction<HttpServerRequest, HttpServerResponse, Publisher<Void>>處理,那麼咱們是否是能夠將這二者耦合到一塊兒,因而reactor.netty.http.server.DefaultHttpServerRoutes.HttpRouteHandler就設計出來了:ide

//reactor.netty.http.server.DefaultHttpServerRoutes.HttpRouteHandler
static final class HttpRouteHandler implements BiFunction<HttpServerRequest, HttpServerResponse, Publisher<Void>>, Predicate<HttpServerRequest> {

    final Predicate<? super HttpServerRequest>          condition;
    final BiFunction<? super HttpServerRequest, ? super HttpServerResponse, ? extends Publisher<Void>>
                                                        handler;
    final Function<? super String, Map<String, String>> resolver;

    HttpRouteHandler(Predicate<? super HttpServerRequest> condition,
            BiFunction<? super HttpServerRequest, ? super HttpServerResponse, ? extends Publisher<Void>> handler,
            @Nullable Function<? super String, Map<String, String>> resolver) {
        this.condition = Objects.requireNonNull(condition, "condition");
        this.handler = Objects.requireNonNull(handler, "handler");
        this.resolver = resolver;
    }

    @Override
    public Publisher<Void> apply(HttpServerRequest request, HttpServerResponse response) {
        return handler.apply(request.paramsResolver(resolver), response);
    }

    @Override
    public boolean test(HttpServerRequest o) {
        return condition.test(o);
    }
}
複製代碼

這裏可能須要對request中的參數進行解析,因此對外提供了一個可供咱們自定義的參數解析器實現接口:Function<? super String, Map<String, String>>,剩下的conditionresolver就能夠按照咱們前面說的邏輯進行。函數

此時,HttpRouteHandler屬於一個真正的請求校驗者和請求業務處理者,咱們如今要將它們的功能經過一系列邏輯串聯造成一個處理流程,那麼這裏能夠經過一個代理模式進行,咱們在HttpServerRoutes的實現類中經過一個List集合管理了數量不等的HttpRouteHandler實例,對外,咱們在使用reactor.netty.http.server.HttpServer#handle時只會看到一個BiFunction<HttpServerRequest, HttpServerResponse, Publisher<Void>>實現,那麼,全部的邏輯流程處理都應該在這個BiFunctionapply(...)實現中進行,因而,咱們就有下面的reactor.netty.http.server.DefaultHttpServerRoutes實現:post

//reactor.netty.http.server.DefaultHttpServerRoutes
final class DefaultHttpServerRoutes implements HttpServerRoutes {


	private final CopyOnWriteArrayList<HttpRouteHandler> handlers =
			new CopyOnWriteArrayList<>();
	...
	@Override
	public HttpServerRoutes route(Predicate<? super HttpServerRequest> condition, BiFunction<? super HttpServerRequest, ? super HttpServerResponse, ? extends Publisher<Void>> handler) {
		Objects.requireNonNull(condition, "condition");
		Objects.requireNonNull(handler, "handler");

		if (condition instanceof HttpPredicate) {
			handlers.add(new HttpRouteHandler(condition,
					handler,
					(HttpPredicate) condition));
		}
		else {
			handlers.add(new HttpRouteHandler(condition, handler, null));
		}
		return this;
	}

	@Override
	public Publisher<Void> apply(HttpServerRequest request, HttpServerResponse response) {
		final Iterator<HttpRouteHandler> iterator = handlers.iterator();
		HttpRouteHandler cursor;

		try {
			while (iterator.hasNext()) {
				cursor = iterator.next();
				if (cursor.test(request)) {
					return cursor.apply(request, response);
				}
			}
		}
		catch (Throwable t) {
			Exceptions.throwIfJvmFatal(t);
			return Mono.error(t); //500
		}

		return response.sendNotFound();
	}
    ...
}
複製代碼

能夠看到route(...)方法只是作了HttpRouteHandler實例的構建並交由handlers這個list進行管理,經過上面的apply實現將前面的內容在流程邏輯中進行組合。因而,咱們就能夠在reactor.netty.http.server.HttpServer中設計一個route方法,對外提供一個SPI接口,將咱們所提到的整個過程定義在這個方法中(獲得一個HttpServerRoutes實例,而後經過它的route方法構建規則,構建過程在前面提到的Consumer<? super HttpServerRoutes>中進行,最後將組合成功的HttpServerRoutesBiFunction<HttpServerRequest, HttpServerResponse, Publisher<Void>>的角色做爲參數交由HttpServer#handle)。ui

另外,咱們在這裏要特別注意下,在上面DefaultHttpServerRoutes實現的apply方法中,能夠看出,一旦請求匹配,處理完後就直接返回結果,再也不繼續遍歷匹配,也就是說每次新來的請求,只調用所聲明匹配規則順序的第一個匹配。

//reactor.netty.http.server.HttpServer#route
public final HttpServer route(Consumer<? super HttpServerRoutes> routesBuilder) {
    Objects.requireNonNull(routesBuilder, "routeBuilder");
    HttpServerRoutes routes = HttpServerRoutes.newRoutes();
    routesBuilder.accept(routes);
    return handle(routes);
}
複製代碼

因而,咱們就能夠經過下面的Demo來應用上面的設計:

import reactor.core.publisher.Mono;
import reactor.netty.DisposableServer;
import reactor.netty.http.server.HttpServer;

public class Application {

    public static void main(String[] args) {
        DisposableServer server =
                HttpServer.create()
                          .route(routes ->
                              routes.get("/hello",        <1>
                                         (request, response) -> response.sendString(Mono.just("Hello World!")))
                                    .post("/echo",        <2>
                                         (request, response) -> response.send(request.receive().retain()))
                                    .get("/path/{param}", <3>
                                         (request, response) -> response.sendString(Mono.just(request.param("param")))))
                          .bindNow();

        server.onDispose()
              .block();
    }
}
複製代碼

<1>處,當咱們發出一個GET請求去訪問/hello時就會獲得一個字符串Hello World!

<2>處,當咱們發出一個 POST請求去訪問 /echo時就會將請求體做爲響應內容返回。

<3>處,當咱們發出一個 GET請求去訪問 /path/{param} 時就會獲得一個請求路徑參數param的值。

關於SSE在這裏的使用,咱們能夠看下面這個Demo,具體的代碼細節就不詳述了,看對應註釋便可:

import com.fasterxml.jackson.databind.ObjectMapper;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.netty.DisposableServer;
import reactor.netty.http.server.HttpServer;
import reactor.netty.http.server.HttpServerRequest;
import reactor.netty.http.server.HttpServerResponse;

import java.io.ByteArrayOutputStream;
import java.nio.charset.Charset;
import java.time.Duration;
import java.util.function.BiFunction;

public class Application {

    public static void main(String[] args) {
        DisposableServer server =
                HttpServer.create()
                          .route(routes -> routes.get("/sse", serveSse()))
                          .bindNow();

        server.onDispose()
              .block();
    }

    /** * 準備 SSE response * 參考 reactor.netty.http.server.HttpServerResponse#sse能夠知道它的"Content-Type" * 是"text/event-stream" * flush策略爲經過所提供的Publisher來每下發一個元素就flush一次 */
    private static BiFunction<HttpServerRequest, HttpServerResponse, Publisher<Void>> serveSse() {
        Flux<Long> flux = Flux.interval(Duration.ofSeconds(10));
        return (request, response) ->
            response.sse()
                    .send(flux.map(Application::toByteBuf), b -> true);
    }

    /** * 將發元素按照按照給定的格式由Object轉換爲ByteBuf。 */
    private static ByteBuf toByteBuf(Object any) {
        ByteArrayOutputStream out = new ByteArrayOutputStream();
        try {
            out.write("data: ".getBytes(Charset.defaultCharset()));
            MAPPER.writeValue(out, any);
            out.write("\n\n".getBytes(Charset.defaultCharset()));
        }
        catch (Exception e) {
            throw new RuntimeException(e);
        }
        return ByteBufAllocator.DEFAULT
                               .buffer()
                               .writeBytes(out.toByteArray());
    }

    private static final ObjectMapper MAPPER = new ObjectMapper();
}
複製代碼
相關文章
相關標籤/搜索