webmvc和webflux做爲spring framework的兩個重要模塊,表明了兩個IO模型,阻塞式和非阻塞式的。html
webmvc是基於servlet的阻塞式模型(通常稱爲oio),一個請求到達服務器後會單獨分配一個線程去處理請求,若是請求包含IO操做,線程在IO操做結束以前一直處於阻塞等待狀態,這樣線程在等待IO操做結束的時間就浪費了。java
webflux是基於reactor的非阻塞模型(通常稱爲nio),一樣,請求到達服務器後也會分配一個線程去處理請求,若是請求包含IO操做,線程在IO操做結束以前再也不是處於阻塞等待狀態,而是去處理其餘事情,等到IO操做結束以後,再通知(得益於系統的機制)線程繼續處理請求。react
這樣線程就有效地利用了IO操做所消耗的時間。web
新建User 對象 ,代碼以下:redis
package com.crazymaker.springcloud.reactive.user.info.entity; import com.crazymaker.springcloud.reactive.user.info.dto.User; import javax.persistence.Column; import javax.persistence.Entity; import javax.persistence.GeneratedValue; import javax.persistence.GenerationType; import javax.persistence.Id; import javax.persistence.Table; @Entity @Table(name = "t_user") public final class UserEntity extends User { @Id @Column(name = "id") @GeneratedValue(strategy = GenerationType.IDENTITY) @Override public long getUserId() { return super.getUserId(); } @Column(name = "name") public String getName() { return super.getName(); } }
@Repository 用於標註數據訪問組件,即 DAO 組件。實現代碼中使用名爲 repository 的 Map 對象做爲內存數據存儲,並對對象具體實現了具體業務邏輯。JpaUserRepositoryImpl 負責將 PO 持久層(數據操做)相關的封裝組織,完成新增、查詢、刪除等操做。spring
package com.crazymaker.springcloud.reactive.user.info.dao.impl; import com.crazymaker.springcloud.reactive.user.info.dto.User; import org.springframework.stereotype.Repository; import javax.persistence.EntityManager; import javax.persistence.PersistenceContext; import javax.persistence.Query; import javax.transaction.Transactional; import java.util.List; @Repository @Transactional public class JpaUserRepositoryImpl { @PersistenceContext private EntityManager entityManager; public Long insert(final User user) { entityManager.persist(user); return user.getUserId(); } public void delete(final Long userId) { Query query = entityManager.createQuery("DELETE FROM UserEntity o WHERE o.userId = ?1"); query.setParameter(1, userId); query.executeUpdate(); } @SuppressWarnings("unchecked") public List<User> selectAll() { return (List<User>) entityManager.createQuery("SELECT o FROM UserEntity o").getResultList(); } @SuppressWarnings("unchecked") public User selectOne(final Long userId) { Query query = entityManager.createQuery("SELECT o FROM UserEntity o WHERE o.userId = ?1"); query.setParameter(1, userId); return (User) query.getSingleResult(); } }
package com.crazymaker.springcloud.reactive.user.info.service.impl; import com.crazymaker.springcloud.common.util.BeanUtil; import com.crazymaker.springcloud.reactive.user.info.dao.impl.JpaUserRepositoryImpl; import com.crazymaker.springcloud.reactive.user.info.dto.User; import com.crazymaker.springcloud.reactive.user.info.entity.UserEntity; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Service; import org.springframework.transaction.annotation.Transactional; import javax.annotation.Resource; import java.util.List; @Slf4j @Service @Transactional public class JpaEntityServiceImpl { @Resource private JpaUserRepositoryImpl userRepository; @Transactional //增長用戶 public User addUser(User dto) { User userEntity = new UserEntity(); userEntity.setUserId(dto.getUserId()); userEntity.setName(dto.getName()); userRepository.insert(userEntity); BeanUtil.copyProperties(userEntity,dto); return dto; } @Transactional //刪除用戶 public User delUser(User dto) { userRepository.delete(dto.getUserId()); return dto; } //查詢所有用戶 public List<User> selectAllUser() { log.info("方法 selectAllUser 被調用了"); return userRepository.selectAll(); } //查詢一個用戶 public User selectOne(final Long userId) { log.info("方法 selectOne 被調用了"); return userRepository.selectOne(userId); } }
Spring Boot WebFlux也可使用註解模式來進行API接口開發。apache
package com.crazymaker.springcloud.reactive.user.info.controller; import com.crazymaker.springcloud.common.result.RestOut; import com.crazymaker.springcloud.reactive.user.info.dto.User; import com.crazymaker.springcloud.reactive.user.info.service.impl.JpaEntityServiceImpl; import io.swagger.annotations.Api; import io.swagger.annotations.ApiImplicitParam; import io.swagger.annotations.ApiImplicitParams; import io.swagger.annotations.ApiOperation; import lombok.extern.slf4j.Slf4j; import org.springframework.web.bind.annotation.DeleteMapping; import org.springframework.web.bind.annotation.PostMapping; import org.springframework.web.bind.annotation.RequestBody; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RequestParam; import org.springframework.web.bind.annotation.RestController; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import javax.annotation.Resource; /** * Mono 和 Flux 適用於兩個場景,即: * Mono:實現發佈者,並返回 0 或 1 個元素,即單對象。 * Flux:實現發佈者,並返回 N 個元素,即 List 列表對象。 * 有人會問,這爲啥不直接返回對象,好比返回 City/Long/List。 * 緣由是,直接使用 Flux 和 Mono 是非阻塞寫法,至關於回調方式。 * 利用函數式能夠減小了回調,所以會看不到相關接口。這偏偏是 WebFlux 的好處:集合了非阻塞 + 異步 */ @Slf4j @Api(value = "用戶信息、基礎學習DEMO", tags = {"用戶信息DEMO"}) @RestController @RequestMapping("/api/user") public class UserReactiveController { @ApiOperation(value = "回顯測試", notes = "提示接口使用者注意事項", httpMethod = "GET") @RequestMapping(value = "/hello") @ApiImplicitParams({ @ApiImplicitParam(paramType = "query", dataType="string",dataTypeClass = String.class, name = "name",value = "名稱", required = true)}) public Mono<RestOut<String>> hello(@RequestParam(name = "name") String name) { log.info("方法 hello 被調用了"); return Mono.just(RestOut.succeed("hello " + name)); } @Resource JpaEntityServiceImpl jpaEntityService; @PostMapping("/add/v1") @ApiOperation(value = "插入用戶" ) @ApiImplicitParams({ // @ApiImplicitParam(paramType = "body", dataType="java.lang.Long", name = "userId", required = false), // @ApiImplicitParam(paramType = "body", dataType="用戶", name = "dto", required = true) @ApiImplicitParam(paramType = "body",dataTypeClass = User.class, dataType="User", name = "dto", required = true), }) // @ApiImplicitParam(paramType = "body", dataType="com.crazymaker.springcloud.reactive.user.info.dto.User", required = true) public Mono<User> userAdd(@RequestBody User dto) { //命令式寫法 // jpaEntityService.delUser(dto); //響應式寫法 return Mono.create(cityMonoSink -> cityMonoSink.success(jpaEntityService.addUser(dto))); } @PostMapping("/del/v1") @ApiOperation(value = "響應式的刪除") @ApiImplicitParams({ @ApiImplicitParam(paramType = "body", dataType="User",dataTypeClass = User.class,name = "dto", required = true), }) public Mono<User> userDel(@RequestBody User dto) { //命令式寫法 // jpaEntityService.delUser(dto); //響應式寫法 return Mono.create(cityMonoSink -> cityMonoSink.success(jpaEntityService.delUser(dto))); } @PostMapping("/list/v1") @ApiOperation(value = "查詢用戶") public Flux<User> listAllUser() { log.info("方法 listAllUser 被調用了"); //命令式寫法 改成響應式 如下語句,須要在流中執行 // List<User> list = jpaEntityService.selectAllUser(); //響應式寫法 Flux<User> userFlux = Flux.fromIterable(jpaEntityService.selectAllUser()); return userFlux; } @PostMapping("/detail/v1") @ApiOperation(value = "響應式的查看") @ApiImplicitParams({ @ApiImplicitParam(paramType = "body", dataTypeClass = User.class,dataType="User", name = "dto", required = true), }) public Mono<User> getUser(@RequestBody User dto) { log.info("方法 getUser 被調用了"); //構造流 Mono<User> userMono = Mono.justOrEmpty(jpaEntityService.selectOne(dto.getUserId())); return userMono; } @PostMapping("/detail/v2") @ApiOperation(value = "命令式的查看") @ApiImplicitParams({ @ApiImplicitParam(paramType = "body", dataType="User",dataTypeClass = User.class, name = "dto", required = true), }) public RestOut<User> getUserV2(@RequestBody User dto) { log.info("方法 getUserV2 被調用了"); User user = jpaEntityService.selectOne(dto.getUserId()); return RestOut.success(user); } }
從返回值能夠看出,Mono 和 Flux 適用於兩個場景,即:編程
有人會問,這爲啥不直接返回對象,好比返回 City/Long/List。緣由是,直接使用 Flux 和 Mono 是非阻塞寫法,至關於回調方式。利用函數式能夠減小了回調,所以會看不到相關接口。這偏偏是 WebFlux 的好處:集合了非阻塞 + 異步。api
Mono 是什麼? 官方描述以下:A Reactive Streams Publisher with basic rx operators that completes successfully by emitting an element, or with an error.服務器
Mono 是響應流 Publisher 具備基礎 rx 操做符。能夠成功發佈元素或者錯誤。如圖所示:
file
Mono 經常使用的方法有:
Flux 是什麼? 官方描述以下:A Reactive Streams Publisher with rx operators that emits 0 to N elements, and then completes (successfully or with an error).
Flux 是響應流 Publisher 具備基礎 rx 操做符。能夠成功發佈 0 到 N 個元素或者錯誤。Flux 實際上是 Mono 的一個補充。如圖所示:
file
因此要注意:若是知道 Publisher 是 0 或 1 個,則用 Mono。
Flux 最值得一提的是 fromIterable 方法。 fromIterable(Iterable<? extends T> it) 能夠發佈 Iterable 類型的元素。固然,Flux 也包含了基礎的操做:map、merge、concat、flatMap、take,這裏就不展開介紹了。
1 能夠編寫一個處理器類 Handler代替 Controller , Service 、dao層保持不變。
2 配置請求的路由
處理器類 Handler須要從請求解析參數,而且封裝響應,代碼以下:
package com.crazymaker.springcloud.reactive.user.info.config.handler; import com.crazymaker.springcloud.common.exception.BusinessException; import com.crazymaker.springcloud.reactive.user.info.dto.User; import com.crazymaker.springcloud.reactive.user.info.service.impl.JpaEntityServiceImpl; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; import org.springframework.stereotype.Component; import org.springframework.web.reactive.function.server.ServerRequest; import org.springframework.web.reactive.function.server.ServerResponse; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import javax.annotation.Resource; import static org.springframework.http.MediaType.APPLICATION_JSON_UTF8; import static org.springframework.web.reactive.function.server.ServerResponse.ok; @Slf4j @Component public class UserReactiveHandler { @Resource private JpaEntityServiceImpl jpaEntityService; /** * 獲得全部用戶 * * @param request * @return */ public Mono<ServerResponse> getAllUser(ServerRequest request) { log.info("方法 getAllUser 被調用了"); return ok().contentType(APPLICATION_JSON_UTF8) .body(Flux.fromIterable(jpaEntityService.selectAllUser()), User.class); } /** * 建立用戶 * * @param request * @return */ public Mono<ServerResponse> createUser(ServerRequest request) { // 2.0.0 是能夠工做, 可是2.0.1 下面這個模式是會報異常 Mono<User> user = request.bodyToMono(User.class); /**Mono 使用響應式的,時候都是一個流,是一個發佈者,任什麼時候候都不能調用發佈者的訂閱方法 也就是不能消費它, 最終的消費仍是交給咱們的Springboot來對它進行消費,任什麼時候候不能調用它的 user.subscribe(); 不能調用block 把異常放在統一的地方來處理 */ return user.flatMap(dto -> { // 校驗代碼須要放在這裏 if (StringUtils.isBlank(dto.getName())) { throw new BusinessException("用戶名不能爲空"); } return ok().contentType(APPLICATION_JSON_UTF8) .body(Mono.create(cityMonoSink -> cityMonoSink.success(jpaEntityService.addUser(dto))), User.class); }); } /** * 根據id刪除用戶 * * @param request * @return */ public Mono<ServerResponse> deleteUserById(ServerRequest request) { String id = request.pathVariable("id"); // 校驗代碼須要放在這裏 if (StringUtils.isBlank(id)) { throw new BusinessException("id不能爲空"); } User dto = new User(); dto.setUserId(Long.parseLong(id)); return ok().contentType(APPLICATION_JSON_UTF8) .body(Mono.create(cityMonoSink -> cityMonoSink.success(jpaEntityService.delUser(dto))), User.class); } }
package com.crazymaker.springcloud.reactive.user.info.config; import com.crazymaker.springcloud.reactive.user.info.config.handler.UserReactiveHandler; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.http.MediaType; import org.springframework.http.server.reactive.ServerHttpRequest; import org.springframework.web.reactive.function.server.RouterFunction; import org.springframework.web.reactive.function.server.RouterFunctions; import org.springframework.web.reactive.function.server.ServerResponse; import org.springframework.web.server.WebFilter; import static org.springframework.web.reactive.function.server.RequestPredicates.DELETE; import static org.springframework.web.reactive.function.server.RequestPredicates.GET; import static org.springframework.web.reactive.function.server.RequestPredicates.POST; import static org.springframework.web.reactive.function.server.RequestPredicates.accept; @Configuration public class RoutersConfig { @Bean RouterFunction<ServerResponse> routes(UserReactiveHandler handler) { // 下面的至關於類裏面的 @RequestMapping // 獲得全部用戶 return RouterFunctions.route(GET("/user"), handler::getAllUser) // 建立用戶 .andRoute(POST("/user").and(accept(MediaType.APPLICATION_JSON_UTF8)), handler::createUser) // 刪除用戶 .andRoute(DELETE("/user/{id}"), handler::deleteUserById); } @Value("${server.servlet.context-path}") private String contextPath; //處理上下文路徑,沒有上下文路徑,此函數能夠忽略 @Bean public WebFilter contextPathWebFilter() { return (exchange, chain) -> { ServerHttpRequest request = exchange.getRequest(); String requestPath = request.getURI().getPath(); if (requestPath.startsWith(contextPath)) { return chain.filter( exchange.mutate() .request(request.mutate().contextPath(contextPath).build()) .build()); } return chain.filter(exchange); }; } }
本文主要展現一下如何使用支持WebFlux的Swagger
<dependency> <groupId>io.springfox</groupId> <artifactId>springfox-swagger2</artifactId> <version>${swagger.version}</version> </dependency> <dependency> <groupId>io.springfox</groupId> <artifactId>springfox-spring-webflux</artifactId> <version>${swagger.version}</version> </dependency> <dependency> <groupId>io.springfox</groupId> <artifactId>springfox-swagger-ui</artifactId> <version>${swagger.version}</version> </dependency>
2.9.2
)還不支持WebFlux,得使用3.0.0才支持package com.crazymaker.springcloud.reactive.user.info.config; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.core.Ordered; import org.springframework.core.annotation.Order; import org.springframework.web.util.UriComponentsBuilder; import springfox.documentation.PathProvider; import springfox.documentation.builders.ApiInfoBuilder; import springfox.documentation.builders.PathSelectors; import springfox.documentation.builders.RequestHandlerSelectors; import springfox.documentation.service.ApiInfo; import springfox.documentation.service.Contact; import springfox.documentation.spi.DocumentationType; import springfox.documentation.spring.web.paths.DefaultPathProvider; import springfox.documentation.spring.web.paths.Paths; import springfox.documentation.spring.web.plugins.Docket; import springfox.documentation.swagger2.annotations.EnableSwagger2WebFlux; @Configuration @EnableSwagger2WebFlux public class SwaggerConfig { @Bean public Docket createRestApi() { // return new Docket(DocumentationType.OAS_30) return new Docket(DocumentationType.SWAGGER_2) .apiInfo(apiInfo()) .pathMapping(servletContextPath) //注意webflux沒有context-path配置,若是不加這句話的話,接口測試時路徑沒有前綴 .select() .apis(RequestHandlerSelectors.basePackage("com.crazymaker.springcloud.reactive.user.info.controller")) .paths(PathSelectors.any()) .build(); } @Value("${server.servlet.context-path}") private String servletContextPath; //構建 api文檔的詳細信息函數 private ApiInfo apiInfo() { return new ApiInfoBuilder() //頁面標題 .title("瘋狂創客圈 springcloud + Nginx 高併發核心編程") //描述 .description("Zuul+Swagger2 構建 RESTful APIs") //條款地址 .termsOfServiceUrl("https://www.cnblogs.com/crazymakercircle/") .contact(new Contact("瘋狂創客圈", "https://www.cnblogs.com/crazymakercircle/", "")) .version("1.0") .build(); } /** * 重寫 PathProvider ,解決 context-path 重複問題 * @return */ @Order(Ordered.HIGHEST_PRECEDENCE) @Bean public PathProvider pathProvider() { return new DefaultPathProvider() { @Override public String getOperationPath(String operationPath) { operationPath = operationPath.replaceFirst(servletContextPath, "/"); UriComponentsBuilder uriComponentsBuilder = UriComponentsBuilder.fromPath("/"); return Paths.removeAdjacentForwardSlashes(uriComponentsBuilder.path(operationPath).build().toString()); } @Override public String getResourceListingPath(String groupName, String apiDeclaration) { apiDeclaration = super.getResourceListingPath(groupName, apiDeclaration); return apiDeclaration; } }; } }
配置模式的 WebFlux Rest接口只能使用PostMan測試,例子以下:
注意,不能帶上下文路徑:
http://192.168.68.1:7705/uaa-react-provider/user
CRUD其餘的界面,略過
@Configuration @EnableWebFlux //使用註解@EnableWebFlux public class WebFluxConfig implements WebFluxConfigurer { //繼承WebFluxConfigurer //配置靜態資源 @Override public void addResourceHandlers(ResourceHandlerRegistry registry) { registry.addResourceHandler("/static/**") .addResourceLocations("classpath:/static/"); registry.addResourceHandler("/file/**") .addResourceLocations("file:" + System.getProperty("user.dir") + File.separator + "file" + File.separator); registry.addResourceHandler("/swagger-ui.html**") .addResourceLocations("classpath:/META-INF/resources/"); registry.addResourceHandler("/webjars/**") .addResourceLocations("classpath:/META-INF/resources/webjars/"); } //配置攔截器 //配置編解碼 ... }
@Configuration @EnableWebFluxSecurity //使用註解@EnableWebFluxSecurity public class WebFluxSecurityConfig implements WebFilter, //攔截器 ServerLogoutSuccessHandler, //登出成功回調 ServerAuthenticationEntryPoint, //驗證入口 ServerAuthenticationFailureHandler, //驗證成功回調 ServerAuthenticationSuccessHandler { //驗證失敗回調 //實現接口的方法 @Override public Mono<Void> filter(ServerWebExchange exchange, WebFilterChain chain) { //配置webflux的context-path ServerHttpRequest request = exchange.getRequest(); if (request.getURI().getPath().startsWith(contextPath)) { exchange = exchange.mutate().request(request.mutate().contextPath(contextPath).build()).build(); } //把查詢參數轉移到FormData中,否則驗證過濾器(ServerFormLoginAuthenticationConverter)接受不到參數 if (exchange.getRequest().getMethod() == HttpMethod.POST && exchange.getRequest().getQueryParams().size() > 0) { ServerWebExchange finalExchange = exchange; ServerWebExchange realExchange = new Decorator(exchange) { @Override public Mono<MultiValueMap<String, String>> getFormData() { return super.getFormData().map(new Function<MultiValueMap<String, String>, MultiValueMap<String, String>>() { @Override public MultiValueMap<String, String> apply(MultiValueMap<String, String> stringStringMultiValueMap) { if (stringStringMultiValueMap.size() == 0) { return finalExchange.getRequest().getQueryParams(); } else { return stringStringMultiValueMap; } } }); } }; return chain.filter(realExchange); } return chain.filter(exchange); } @Override public Mono<Void> onLogoutSuccess(WebFilterExchange webFilterExchange, Authentication authentication) { return sendJson(webFilterExchange.getExchange(), new Response<>("登出成功")); } @Override public Mono<Void> commence(ServerWebExchange exchange, AuthenticationException e) { return sendJson(exchange, new Response<>(HttpStatus.UNAUTHORIZED.value(), "未驗證")); } @Override public Mono<Void> onAuthenticationFailure(WebFilterExchange webFilterExchange, AuthenticationException exception) { return sendJson(webFilterExchange.getExchange(), new Response<>(1, "驗證失敗")); } @Override public Mono<Void> onAuthenticationSuccess(WebFilterExchange webFilterExchange, Authentication authentication) { return webFilterExchange.getChain().filter( webFilterExchange.getExchange().mutate() .request(t -> t.method(HttpMethod.POST).path("/user/login")) //轉發到自定義控制器 .build() ); } @Bean public SecurityWebFilterChain springSecurityFilterChain(ServerHttpSecurity http) { http.addFilterAfter(this, SecurityWebFiltersOrder.FIRST) .csrf().disable() .authorizeExchange() .pathMatchers("/swagger*/**", "/webjars/**", "/v2/api-docs") //swagger .permitAll() .and() .authorizeExchange() .pathMatchers("/static/**", "/file/**") //靜態資源 .permitAll() .and() .authorizeExchange() .anyExchange() .authenticated() .and() .logout() //登出 .logoutUrl("/user/logout") .logoutSuccessHandler(this) .and() .exceptionHandling() //未驗證回調 .authenticationEntryPoint(this) .and() .formLogin() .loginPage("/user/login") .authenticationFailureHandler(this) //驗證失敗回調 .authenticationSuccessHandler(this) //驗證成功回調 .and() .httpBasic() .authenticationEntryPoint(this); //basic驗證,通常用於移動端 return http.build(); } }
@Configuration @EnableRedisWebSession(maxInactiveIntervalInSeconds = 60) //使用註解@EnableRedisWebSession ,maxInactiveIntervalInSeconds設置數據過時時間,spring.session.timeout無論用 public class RedisWebSessionConfig { //考慮到分佈式系統,通常使用redis存儲session @Bean public LettuceConnectionFactory lettuceConnectionFactory() { return new LettuceConnectionFactory(); } }
//參數上傳 //定義參數bean @Setter @Getter @ToString @ApiModel public class QueryBean{ @ApiModelProperty(value = "普通參數", required = false, example = "") private String query; @ApiModelProperty(value = "文件參數", required = false, example = "") private FilePart image; //強調,webflux中使用FilePart做爲接收文件的類型 } //定義接口 @ApiOperation("一個接口") @PostMapping("/path") //這裏須要使用@ApiImplicitParam顯示配置【文件參數】才能使swagger界面顯示上傳文件按鈕 @ApiImplicitParams({ @ApiImplicitParam( paramType = "form", //表單參數 dataType = "__file", //最新版本使用__file表示文件,之前用的是file name = "image", //和QueryBean裏面的【文件參數image】同名 value = "文件") //註釋 }) public Mono<Response> bannerAddOrUpdate(QueryBean q) { }
userAdd方法代碼以下:
public Mono<User> userAdd(@RequestBody User dto) { //命令式寫法 // jpaEntityService.delUser(dto); //響應式寫法 return Mono.create(cityMonoSink -> cityMonoSink.success(jpaEntityService.addUser(dto))); }
因爲返回的數據只有一個因此使用的是Mono做爲返回數據,使用Mono類靜態create方法建立Mono對象,代碼以下:
public abstract class Mono<T> implements Publisher<T> { static final BiPredicate EQUALS_BIPREDICATE = Object::equals; public Mono() { } public static <T> Mono<T> create(Consumer<MonoSink<T>> callback) { return onAssembly(new MonoCreate(callback)); } ... }
能夠到create方法接收一個參數,參數是Consumer對象,經過callback能夠看出,這裏使用的是callback回調,下面看看Consumer接口的定義:
@FunctionalInterface public interface Consumer<T> { /** * Performs this operation on the given argument. * * @param t the input argument */ void accept(T t); /** * Returns a composed {@code Consumer} that performs, in sequence, this * operation followed by the {@code after} operation. If performing either * operation throws an exception, it is relayed to the caller of the * composed operation. If performing this operation throws an exception, * the {@code after} operation will not be performed. * * @param after the operation to perform after this operation * @return a composed {@code Consumer} that performs in sequence this * operation followed by the {@code after} operation * @throws NullPointerException if {@code after} is null */ default Consumer<T> andThen(Consumer<? super T> after) { Objects.requireNonNull(after); return (T t) -> { accept(t); after.accept(t); }; } }
經過上面的代碼能夠看出,有兩個方法,一個是默認的方法andThen,還有一個accept方法,
Mono.create()方法的參數須要一個實現類,實現Consumer接口;Mono.create方法的參數指向的實例對象, 就是要實現這個accept方法。
例子中,下面的lambda表達式,就是accept方法的實現,實參的類型爲 Consumer<MonoSink
cityMonoSink -> cityMonoSink.success(jpaEntityService.addUser(dto))
來來來,重複看一下,create方法的實現:
public static <T> Mono<T> create(Consumer<MonoSink<T>> callback) { return onAssembly(new MonoCreate(callback)); }
在方法內部調用了onAssembly方法,參數是MonoCreate對象,而後咱們看看MonoCreate類,代碼以下:
// // Source code recreated from a .class file by IntelliJ IDEA // (powered by Fernflower decompiler) // package reactor.core.publisher; import java.util.Objects; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; import java.util.function.Consumer; import java.util.function.LongConsumer; import reactor.core.CoreSubscriber; import reactor.core.Disposable; import reactor.core.Scannable.Attr; import reactor.core.publisher.FluxCreate.SinkDisposable; import reactor.util.annotation.Nullable; import reactor.util.context.Context; final class MonoCreate<T> extends Mono<T> { final Consumer<MonoSink<T>> callback; MonoCreate(Consumer<MonoSink<T>> callback) { this.callback = callback; } public void subscribe(CoreSubscriber<? super T> actual) { MonoCreate.DefaultMonoSink<T> emitter = new MonoCreate.DefaultMonoSink(actual); actual.onSubscribe(emitter); try { this.callback.accept(emitter); } catch (Throwable var4) { emitter.error(Operators.onOperatorError(var4, actual.currentContext())); } } static final class DefaultMonoSink<T> extends AtomicBoolean implements MonoSink<T>, InnerProducer<T> { final CoreSubscriber<? super T> actual; volatile Disposable disposable; static final AtomicReferenceFieldUpdater<MonoCreate.DefaultMonoSink, Disposable> DISPOSABLE = AtomicReferenceFieldUpdater.newUpdater(MonoCreate.DefaultMonoSink.class, Disposable.class, "disposable"); volatile int state; static final AtomicIntegerFieldUpdater<MonoCreate.DefaultMonoSink> STATE = AtomicIntegerFieldUpdater.newUpdater(MonoCreate.DefaultMonoSink.class, "state"); volatile LongConsumer requestConsumer; static final AtomicReferenceFieldUpdater<MonoCreate.DefaultMonoSink, LongConsumer> REQUEST_CONSUMER = AtomicReferenceFieldUpdater.newUpdater(MonoCreate.DefaultMonoSink.class, LongConsumer.class, "requestConsumer"); T value; static final int NO_REQUEST_HAS_VALUE = 1; static final int HAS_REQUEST_NO_VALUE = 2; static final int HAS_REQUEST_HAS_VALUE = 3; DefaultMonoSink(CoreSubscriber<? super T> actual) { this.actual = actual; } public Context currentContext() { return this.actual.currentContext(); } @Nullable public Object scanUnsafe(Attr key) { if (key != Attr.TERMINATED) { return key == Attr.CANCELLED ? OperatorDisposables.isDisposed(this.disposable) : super.scanUnsafe(key); } else { return this.state == 3 || this.state == 1; } } public void success() { if (STATE.getAndSet(this, 3) != 3) { try { this.actual.onComplete(); } finally { this.disposeResource(false); } } } public void success(@Nullable T value) { if (value == null) { this.success(); } else { int s; do { s = this.state; if (s == 3 || s == 1) { Operators.onNextDropped(value, this.actual.currentContext()); return; } if (s == 2) { if (STATE.compareAndSet(this, s, 3)) { try { this.actual.onNext(value); this.actual.onComplete(); } finally { this.disposeResource(false); } } return; } this.value = value; } while(!STATE.compareAndSet(this, s, 1)); } } public void error(Throwable e) { if (STATE.getAndSet(this, 3) != 3) { try { this.actual.onError(e); } finally { this.disposeResource(false); } } else { Operators.onOperatorError(e, this.actual.currentContext()); } } public MonoSink<T> onRequest(LongConsumer consumer) { Objects.requireNonNull(consumer, "onRequest"); if (!REQUEST_CONSUMER.compareAndSet(this, (Object)null, consumer)) { throw new IllegalStateException("A consumer has already been assigned to consume requests"); } else { return this; } } public CoreSubscriber<? super T> actual() { return this.actual; } public MonoSink<T> onCancel(Disposable d) { Objects.requireNonNull(d, "onCancel"); SinkDisposable sd = new SinkDisposable((Disposable)null, d); if (!DISPOSABLE.compareAndSet(this, (Object)null, sd)) { Disposable c = this.disposable; if (c instanceof SinkDisposable) { SinkDisposable current = (SinkDisposable)c; if (current.onCancel == null) { current.onCancel = d; } else { d.dispose(); } } } return this; } public MonoSink<T> onDispose(Disposable d) { Objects.requireNonNull(d, "onDispose"); SinkDisposable sd = new SinkDisposable(d, (Disposable)null); if (!DISPOSABLE.compareAndSet(this, (Object)null, sd)) { Disposable c = this.disposable; if (c instanceof SinkDisposable) { SinkDisposable current = (SinkDisposable)c; if (current.disposable == null) { current.disposable = d; } else { d.dispose(); } } } return this; } public void request(long n) { if (Operators.validate(n)) { LongConsumer consumer = this.requestConsumer; if (consumer != null) { consumer.accept(n); } int s; do { s = this.state; if (s == 2 || s == 3) { return; } if (s == 1) { if (STATE.compareAndSet(this, s, 3)) { try { this.actual.onNext(this.value); this.actual.onComplete(); } finally { this.disposeResource(false); } } return; } } while(!STATE.compareAndSet(this, s, 2)); } } public void cancel() { if (STATE.getAndSet(this, 3) != 3) { this.value = null; this.disposeResource(true); } } void disposeResource(boolean isCancel) { Disposable d = this.disposable; if (d != OperatorDisposables.DISPOSED) { d = (Disposable)DISPOSABLE.getAndSet(this, OperatorDisposables.DISPOSED); if (d != null && d != OperatorDisposables.DISPOSED) { if (isCancel && d instanceof SinkDisposable) { ((SinkDisposable)d).cancel(); } d.dispose(); } } } } }
上面的代碼比較多,咱們主要關注下面兩個函數:
MonoCreate(Consumer<MonoSink<T>> callback) { this.callback = callback; } public void subscribe(CoreSubscriber<? super T> actual) { MonoCreate.DefaultMonoSink<T> emitter = new MonoCreate.DefaultMonoSink(actual); actual.onSubscribe(emitter); try { this.callback.accept(emitter); } catch (Throwable var4) { emitter.error(Operators.onOperatorError(var4, actual.currentContext())); } }
經過上面的代碼能夠看出,一個是構造器,參數是Consumer,裏面進行操做保存了Consumer對象,而後在subscribe方法裏面有一句代碼是this.callback.accept(emitter),就是在這裏進行了接口的回調,回調Consumer的accept方法,這個方法是在調用Mono.create()方法的時候實現了。而後在細看subscribe方法,這裏面有一個actual.onSubscribe方法,經過方法名能夠知道,這裏是訂閱了消息。webflux是基於reactor模型,基於事件消息和異步,這裏也體現了一個異步。
Mono和Flux的其餘用法能夠參照上面的源碼流程本身看看,就不細說了。