springcloud webflux


目錄:小圈的 springCloud 高併發系列

推薦閱讀
nacos 實戰(史上最全)
sentinel (史上最全+入門教程)
springcloud + webflux 高併發實戰
Webflux(史上最全)
SpringCloud gateway (史上最全)
和 1000+ Java 高併發 發燒友、 一塊兒 交流 、學習、入大廠、作架構,GO

前言

webmvc和webflux做爲spring framework的兩個重要模塊,表明了兩個IO模型,阻塞式和非阻塞式的。web

webmvc是基於servlet的阻塞式模型(通常稱爲oio),一個請求到達服務器後會單獨分配一個線程去處理請求,若是請求包含IO操做,線程在IO操做結束以前一直處於阻塞等待狀態,這樣線程在等待IO操做結束的時間就浪費了。面試

webflux是基於reactor的非阻塞模型(通常稱爲nio),一樣,請求到達服務器後也會分配一個線程去處理請求,若是請求包含IO操做,線程在IO操做結束以前再也不是處於阻塞等待狀態,而是去處理其餘事情,等到IO操做結束以後,再通知(得益於系統的機制)線程繼續處理請求。redis

這樣線程就有效地利用了IO操做所消耗的時間。spring

WebFlux 增刪改查完整實戰 demo

Dao層 (又稱 repository 層)

entity(又稱 PO對象)

新建User 對象 ,代碼以下:apache

package com.crazymaker.springcloud.reactive.user.info.entity;

import com.crazymaker.springcloud.reactive.user.info.dto.User;

import javax.persistence.Column;
import javax.persistence.Entity;
import javax.persistence.GeneratedValue;
import javax.persistence.GenerationType;
import javax.persistence.Id;
import javax.persistence.Table;

@Entity
@Table(name = "t_user")
public final class UserEntity extends User
{

    @Id
    @Column(name = "id")
    @GeneratedValue(strategy = GenerationType.IDENTITY)
    @Override
    public long getUserId()
    {
        return super.getUserId();
    }

    @Column(name = "name")
    public String getName()
    {
        return super.getName();
    }
}

Dao 實現類

@Repository 用於標註數據訪問組件,即 DAO 組件。實現代碼中使用名爲 repository 的 Map 對象做爲內存數據存儲,並對對象具體實現了具體業務邏輯。JpaUserRepositoryImpl 負責將 PO 持久層(數據操做)相關的封裝組織,完成新增、查詢、刪除等操做。編程

package com.crazymaker.springcloud.reactive.user.info.dao.impl;

import com.crazymaker.springcloud.reactive.user.info.dto.User;
import org.springframework.stereotype.Repository;

import javax.persistence.EntityManager;
import javax.persistence.PersistenceContext;
import javax.persistence.Query;
import javax.transaction.Transactional;
import java.util.List;

@Repository
@Transactional
public class JpaUserRepositoryImpl
{

    @PersistenceContext
    private EntityManager entityManager;


    public Long insert(final User user)
    {
        entityManager.persist(user);
        return user.getUserId();
    }

    public void delete(final Long userId)
    {
        Query query = entityManager.createQuery("DELETE FROM UserEntity o WHERE o.userId = ?1");
        query.setParameter(1, userId);
        query.executeUpdate();
    }

    @SuppressWarnings("unchecked")
    public List<User> selectAll()
    {
        return (List<User>) entityManager.createQuery("SELECT o FROM UserEntity o").getResultList();
    }

    @SuppressWarnings("unchecked")
    public User selectOne(final Long userId)
    {
        Query query = entityManager.createQuery("SELECT o FROM UserEntity o WHERE o.userId = ?1");
        query.setParameter(1, userId);
        return (User) query.getSingleResult();
    }
}

Service服務層

package com.crazymaker.springcloud.reactive.user.info.service.impl;

import com.crazymaker.springcloud.common.util.BeanUtil;
import com.crazymaker.springcloud.reactive.user.info.dao.impl.JpaUserRepositoryImpl;
import com.crazymaker.springcloud.reactive.user.info.dto.User;
import com.crazymaker.springcloud.reactive.user.info.entity.UserEntity;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;

import javax.annotation.Resource;
import java.util.List;

@Slf4j
@Service
@Transactional
public class JpaEntityServiceImpl
{

    @Resource
    private JpaUserRepositoryImpl userRepository;



    @Transactional
    //增長用戶
    public User addUser(User dto)
    {
        User userEntity = new UserEntity();
        userEntity.setUserId(dto.getUserId());
        userEntity.setName(dto.getName());
        userRepository.insert(userEntity);
        BeanUtil.copyProperties(userEntity,dto);
        return dto;
    }

    @Transactional
    //刪除用戶
    public User delUser(User dto)
    {
          userRepository.delete(dto.getUserId());
          return dto;
    }

    //查詢所有用戶
    public List<User> selectAllUser()
    {
        log.info("方法 selectAllUser 被調用了");

        return userRepository.selectAll();
    }

    //查詢一個用戶
    public User selectOne(final Long userId)
    {

        log.info("方法 selectOne 被調用了");

        return userRepository.selectOne(userId);
    }

}

Controller控制層

Spring Boot WebFlux也可使用註解模式來進行API接口開發。api

package com.crazymaker.springcloud.reactive.user.info.controller;

import com.crazymaker.springcloud.common.result.RestOut;
import com.crazymaker.springcloud.reactive.user.info.dto.User;
import com.crazymaker.springcloud.reactive.user.info.service.impl.JpaEntityServiceImpl;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiImplicitParam;
import io.swagger.annotations.ApiImplicitParams;
import io.swagger.annotations.ApiOperation;
import lombok.extern.slf4j.Slf4j;
import org.springframework.web.bind.annotation.DeleteMapping;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

import javax.annotation.Resource;

/**
 * Mono 和 Flux 適用於兩個場景,即:
 * Mono:實現發佈者,並返回 0 或 1 個元素,即單對象。
 * Flux:實現發佈者,並返回 N 個元素,即 List 列表對象。
 * 有人會問,這爲啥不直接返回對象,好比返回 City/Long/List。
 * 緣由是,直接使用 Flux 和 Mono 是非阻塞寫法,至關於回調方式。
 * 利用函數式能夠減小了回調,所以會看不到相關接口。這偏偏是 WebFlux 的好處:集合了非阻塞 + 異步
 */
@Slf4j
@Api(value = "用戶信息、基礎學習DEMO", tags = {"用戶信息DEMO"})
@RestController
@RequestMapping("/api/user")
public class UserReactiveController
{

        @ApiOperation(value = "回顯測試", notes = "提示接口使用者注意事項", httpMethod = "GET")
        @RequestMapping(value = "/hello")
        @ApiImplicitParams({
                @ApiImplicitParam(paramType = "query", dataType="string",dataTypeClass = String.class, name = "name",value = "名稱", required = true)})
        public Mono<RestOut<String>> hello(@RequestParam(name = "name") String name)
        {
            log.info("方法 hello 被調用了");

            return  Mono.just(RestOut.succeed("hello " + name));
        }


        @Resource
        JpaEntityServiceImpl jpaEntityService;


        @PostMapping("/add/v1")
        @ApiOperation(value = "插入用戶" )
        @ApiImplicitParams({
//                @ApiImplicitParam(paramType = "body", dataType="java.lang.Long", name = "userId", required = false),
//                @ApiImplicitParam(paramType = "body", dataType="用戶", name = "dto", required = true)
                @ApiImplicitParam(paramType = "body",dataTypeClass = User.class, dataType="User", name = "dto",  required = true),
        })
//    @ApiImplicitParam(paramType = "body", dataType="com.crazymaker.springcloud.reactive.user.info.dto.User",  required = true)
        public Mono<User> userAdd(@RequestBody User dto)
        {
            //命令式寫法
//        jpaEntityService.delUser(dto);

            //響應式寫法
            return Mono.create(cityMonoSink -> cityMonoSink.success(jpaEntityService.addUser(dto)));
        }


        @PostMapping("/del/v1")
        @ApiOperation(value = "響應式的刪除")
        @ApiImplicitParams({
                @ApiImplicitParam(paramType = "body", dataType="User",dataTypeClass = User.class,name = "dto",  required = true),
        })
        public Mono<User> userDel(@RequestBody User dto)
        {
            //命令式寫法

//        jpaEntityService.delUser(dto);

            //響應式寫法

            return Mono.create(cityMonoSink -> cityMonoSink.success(jpaEntityService.delUser(dto)));
        }

        @PostMapping("/list/v1")
        @ApiOperation(value = "查詢用戶")
        public Flux<User> listAllUser()
        {
            log.info("方法 listAllUser 被調用了");

            //命令式寫法 改成響應式 如下語句,須要在流中執行
//        List<User> list = jpaEntityService.selectAllUser();
            //響應式寫法
            Flux<User> userFlux = Flux.fromIterable(jpaEntityService.selectAllUser());
            return userFlux;
        }

        @PostMapping("/detail/v1")
        @ApiOperation(value = "響應式的查看")
        @ApiImplicitParams({
                @ApiImplicitParam(paramType = "body", dataTypeClass = User.class,dataType="User", name = "dto",  required = true),
        })
        public Mono<User> getUser(@RequestBody User dto)
        {
            log.info("方法 getUser 被調用了");

            //構造流
            Mono<User> userMono = Mono.justOrEmpty(jpaEntityService.selectOne(dto.getUserId()));
            return userMono;
        }

        @PostMapping("/detail/v2")
        @ApiOperation(value = "命令式的查看")
        @ApiImplicitParams({
                @ApiImplicitParam(paramType = "body", dataType="User",dataTypeClass = User.class, name = "dto",  required = true),
        })        public RestOut<User> getUserV2(@RequestBody User dto)
        {
            log.info("方法 getUserV2 被調用了");

            User user = jpaEntityService.selectOne(dto.getUserId());
            return RestOut.success(user);
        }

    }

從返回值能夠看出,Mono 和 Flux 適用於兩個場景,即:

  • Mono:實現發佈者,並返回 0 或 1 個元素,即單對象
  • Flux:實現發佈者,並返回 N 個元素,即 List 列表對象

有人會問,這爲啥不直接返回對象,好比返回 City/Long/List。緣由是,直接使用 Flux 和 Mono 是非阻塞寫法,至關於回調方式。利用函數式能夠減小了回調,所以會看不到相關接口。這偏偏是 WebFlux 的好處:集合了非阻塞 + 異步。

Mono

Mono 是什麼? 官方描述以下:A Reactive Streams Publisher with basic rx operators that completes successfully by emitting an element, or with an error.

Mono 是響應流 Publisher 具備基礎 rx 操做符。能夠成功發佈元素或者錯誤。如圖所示:

img

file

Mono 經常使用的方法有:

  • Mono.create():使用 MonoSink 來建立 Mono
  • Mono.justOrEmpty():從一個 Optional 對象或 null 對象中建立 Mono。
  • Mono.error():建立一個只包含錯誤消息的 Mono
  • Mono.never():建立一個不包含任何消息通知的 Mono
  • Mono.delay():在指定的延遲時間以後,建立一個 Mono,產生數字 0 做爲惟一值

Flux

Flux 是什麼? 官方描述以下:A Reactive Streams Publisher with rx operators that emits 0 to N elements, and then completes (successfully or with an error).

Flux 是響應流 Publisher 具備基礎 rx 操做符。能夠成功發佈 0 到 N 個元素或者錯誤。Flux 實際上是 Mono 的一個補充。如圖所示:

img

file

因此要注意:若是知道 Publisher 是 0 或 1 個,則用 Mono。

Flux 最值得一提的是 fromIterable 方法。 fromIterable(Iterable<? extends T> it) 能夠發佈 Iterable 類型的元素。固然,Flux 也包含了基礎的操做:map、merge、concat、flatMap、take,這裏就不展開介紹了。

使用配置模式進行WebFlux 接口開發

1 能夠編寫一個處理器類 Handler代替 Controller , Service 、dao層保持不變。

2 配置請求的路由

處理器類 Handler

處理器類 Handler須要從請求解析參數,而且封裝響應,代碼以下:

package com.crazymaker.springcloud.reactive.user.info.config.handler;

import com.crazymaker.springcloud.common.exception.BusinessException;
import com.crazymaker.springcloud.reactive.user.info.dto.User;
import com.crazymaker.springcloud.reactive.user.info.service.impl.JpaEntityServiceImpl;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.stereotype.Component;
import org.springframework.web.reactive.function.server.ServerRequest;
import org.springframework.web.reactive.function.server.ServerResponse;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

import javax.annotation.Resource;

import static org.springframework.http.MediaType.APPLICATION_JSON_UTF8;
import static org.springframework.web.reactive.function.server.ServerResponse.ok;

@Slf4j
@Component
public class UserReactiveHandler
{


    @Resource
    private JpaEntityServiceImpl jpaEntityService;


    /**
     * 獲得全部用戶
     *
     * @param request
     * @return
     */
    public Mono<ServerResponse> getAllUser(ServerRequest request)
    {
        log.info("方法 getAllUser 被調用了");
        return ok().contentType(APPLICATION_JSON_UTF8)
                .body(Flux.fromIterable(jpaEntityService.selectAllUser()), User.class);
    }

    /**
     * 建立用戶
     *
     * @param request
     * @return
     */
    public Mono<ServerResponse> createUser(ServerRequest request)
    {
        // 2.0.0 是能夠工做, 可是2.0.1 下面這個模式是會報異常
        Mono<User> user = request.bodyToMono(User.class);
        /**Mono 使用響應式的,時候都是一個流,是一個發佈者,任什麼時候候都不能調用發佈者的訂閱方法
         也就是不能消費它, 最終的消費仍是交給咱們的Springboot來對它進行消費,任什麼時候候不能調用它的
         user.subscribe();
         不能調用block
         把異常放在統一的地方來處理
         */

        return user.flatMap(dto ->
        {
            // 校驗代碼須要放在這裏
            if (StringUtils.isBlank(dto.getName()))
            {
                throw new BusinessException("用戶名不能爲空");
            }

            return ok().contentType(APPLICATION_JSON_UTF8)
                    .body(Mono.create(cityMonoSink -> cityMonoSink.success(jpaEntityService.addUser(dto))), User.class);
        });
    }

    /**
     * 根據id刪除用戶
     *
     * @param request
     * @return
     */
    public Mono<ServerResponse> deleteUserById(ServerRequest request)
    {
        String id = request.pathVariable("id");
        // 校驗代碼須要放在這裏
        if (StringUtils.isBlank(id))
        {
            throw new BusinessException("id不能爲空");
        }
        User dto = new User();
        dto.setUserId(Long.parseLong(id));
        return ok().contentType(APPLICATION_JSON_UTF8)
                .body(Mono.create(cityMonoSink -> cityMonoSink.success(jpaEntityService.delUser(dto))), User.class);
    }

}

路由配置

package com.crazymaker.springcloud.reactive.user.info.config;

import com.crazymaker.springcloud.reactive.user.info.config.handler.UserReactiveHandler;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.http.MediaType;
import org.springframework.http.server.reactive.ServerHttpRequest;
import org.springframework.web.reactive.function.server.RouterFunction;
import org.springframework.web.reactive.function.server.RouterFunctions;
import org.springframework.web.reactive.function.server.ServerResponse;
import org.springframework.web.server.WebFilter;

import static org.springframework.web.reactive.function.server.RequestPredicates.DELETE;
import static org.springframework.web.reactive.function.server.RequestPredicates.GET;
import static org.springframework.web.reactive.function.server.RequestPredicates.POST;
import static org.springframework.web.reactive.function.server.RequestPredicates.accept;

@Configuration
public class RoutersConfig
{

    @Bean
    RouterFunction<ServerResponse> routes(UserReactiveHandler handler)
    {

        // 下面的至關於類裏面的 @RequestMapping
        // 獲得全部用戶
        return RouterFunctions.route(GET("/user"), handler::getAllUser)
                // 建立用戶
                .andRoute(POST("/user").and(accept(MediaType.APPLICATION_JSON_UTF8)), handler::createUser)
                // 刪除用戶
                .andRoute(DELETE("/user/{id}"), handler::deleteUserById);
    }

    @Value("${server.servlet.context-path}")
    private String contextPath;

    //處理上下文路徑,沒有上下文路徑,此函數能夠忽略
    @Bean
    public WebFilter contextPathWebFilter()
    {
        return (exchange, chain) ->
        {
            ServerHttpRequest request = exchange.getRequest();

            String requestPath = request.getURI().getPath();
            if (requestPath.startsWith(contextPath))
            {
                return chain.filter(
                        exchange.mutate()
                                .request(request.mutate().contextPath(contextPath).build())
                                .build());
            }
            return chain.filter(exchange);
        };
    }
}

集成Swagger

本文主要展現一下如何使用支持WebFlux的Swagger

maven依賴

<dependency>
            <groupId>io.springfox</groupId>
            <artifactId>springfox-swagger2</artifactId>
            <version>${swagger.version}</version>
        </dependency>
        <dependency>
            <groupId>io.springfox</groupId>
            <artifactId>springfox-spring-webflux</artifactId>
            <version>${swagger.version}</version>
        </dependency>
        <dependency>
            <groupId>io.springfox</groupId>
            <artifactId>springfox-swagger-ui</artifactId>
            <version>${swagger.version}</version>
        </dependency>
  • swagger.version目前是3.0.0,Spring 5引入了WebFlux,而當前版本的SpringFox Swagger2(2.9.2)還不支持WebFlux,得使用3.0.0才支持

swagger 配置

package com.crazymaker.springcloud.reactive.user.info.config;

import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.Ordered;
import org.springframework.core.annotation.Order;
import org.springframework.web.util.UriComponentsBuilder;
import springfox.documentation.PathProvider;
import springfox.documentation.builders.ApiInfoBuilder;
import springfox.documentation.builders.PathSelectors;
import springfox.documentation.builders.RequestHandlerSelectors;
import springfox.documentation.service.ApiInfo;
import springfox.documentation.service.Contact;
import springfox.documentation.spi.DocumentationType;
import springfox.documentation.spring.web.paths.DefaultPathProvider;
import springfox.documentation.spring.web.paths.Paths;
import springfox.documentation.spring.web.plugins.Docket;
import springfox.documentation.swagger2.annotations.EnableSwagger2WebFlux;

@Configuration
@EnableSwagger2WebFlux
public class SwaggerConfig
{


    @Bean
    public Docket createRestApi()
    {
//        return new Docket(DocumentationType.OAS_30)
        return new Docket(DocumentationType.SWAGGER_2)
                .apiInfo(apiInfo())
                .pathMapping(servletContextPath)  //注意webflux沒有context-path配置,若是不加這句話的話,接口測試時路徑沒有前綴

                .select()
                .apis(RequestHandlerSelectors.basePackage("com.crazymaker.springcloud.reactive.user.info.controller"))
                .paths(PathSelectors.any())
                .build();

    }
    @Value("${server.servlet.context-path}")
    private String servletContextPath;

    //構建 api文檔的詳細信息函數
    private ApiInfo apiInfo()
    {
        return new ApiInfoBuilder()
                //頁面標題
                .title("瘋狂創客圈 springcloud + Nginx 高併發核心編程")
                //描述
                .description("Zuul+Swagger2  構建  RESTful APIs")
                //條款地址
                .termsOfServiceUrl("https://www.cnblogs.com/crazymakercircle/")
                .contact(new Contact("瘋狂創客圈", "https://www.cnblogs.com/crazymakercircle/", ""))
                .version("1.0")
                .build();
    }

    /**
     * 重寫 PathProvider ,解決 context-path 重複問題
     * @return
     */
    @Order(Ordered.HIGHEST_PRECEDENCE)
    @Bean
    public PathProvider pathProvider() {
        return new DefaultPathProvider() {
            @Override
            public String getOperationPath(String operationPath) {
                operationPath = operationPath.replaceFirst(servletContextPath, "/");
                UriComponentsBuilder uriComponentsBuilder = UriComponentsBuilder.fromPath("/");
                return Paths.removeAdjacentForwardSlashes(uriComponentsBuilder.path(operationPath).build().toString());
            }

            @Override
            public String getResourceListingPath(String groupName, String apiDeclaration) {
                apiDeclaration = super.getResourceListingPath(groupName, apiDeclaration);
                return apiDeclaration;
            }
        };
    }
}

測試

配置模式的 WebFlux Rest接口測試

配置模式的 WebFlux Rest接口只能使用PostMan測試,例子以下:

在這裏插入圖片描述

注意,不能帶上下文路徑:

http://192.168.68.1:7705/uaa-react-provider/user

註解模式的WebFlux Rest接口測試

swagger 增長界面

在這裏插入圖片描述

CRUD其餘的界面,略過

配置大全

靜態資源配置

@Configuration
@EnableWebFlux		//使用註解@EnableWebFlux
public class WebFluxConfig implements WebFluxConfigurer {		//繼承WebFluxConfigurer 
	//配置靜態資源
	@Override
    public void addResourceHandlers(ResourceHandlerRegistry registry) {
        registry.addResourceHandler("/static/**")
                .addResourceLocations("classpath:/static/");
        registry.addResourceHandler("/file/**")
                .addResourceLocations("file:" + System.getProperty("user.dir") + File.separator + "file" + File.separator);
        registry.addResourceHandler("/swagger-ui.html**")
                .addResourceLocations("classpath:/META-INF/resources/");
        registry.addResourceHandler("/webjars/**")
                .addResourceLocations("classpath:/META-INF/resources/webjars/");
    }
	//配置攔截器
	//配置編解碼
	...
}

WebFluxSecurity配置

@Configuration
@EnableWebSecurity
public class WebMvcSecurityConfig extends WebSecurityConfigurerAdapter implements 
AuthenticationEntryPoint,		//未驗證回調
AuthenticationSuccessHandler,		//驗證成功回調
AuthenticationFailureHandler,		//驗證失敗回調
LogoutSuccessHandler {		//登出成功回調

    @Override
    public void commence(HttpServletRequest request, HttpServletResponse response, AuthenticationException authException) throws IOException, ServletException {
        sendJson(response, new Response<>(HttpStatus.UNAUTHORIZED.value(), "Unauthorized"));
    }

    @Override
    public void onAuthenticationFailure(HttpServletRequest request, HttpServletResponse response, AuthenticationException exception) throws IOException, ServletException {
        sendJson(response, new Response<>(1, "Incorrect"));
    }

    @Override
    public void onAuthenticationSuccess(HttpServletRequest request, HttpServletResponse response, Authentication authentication) throws IOException, ServletException {
        sendJson(response, new Response<>(0, authentication.getClass().getSimpleName()));
    }

    @Override
    public void onLogoutSuccess(HttpServletRequest request, HttpServletResponse response, Authentication authentication) throws IOException, ServletException {
        sendJson(response, new Response<>(0, "Success"));
    }

    @Override
    protected void configure(HttpSecurity http) throws Exception {
        http
                .csrf()
                .disable()
                .authorizeRequests()
                .antMatchers("/swagger*/**", "/webjars/**", "/v2/api-docs")
                .permitAll()
                .and()
                .authorizeRequests()
                .antMatchers("/static/**", "/file/**")
                .permitAll()
                .and()
                .authorizeRequests()
                .anyRequest()
                .authenticated()
                .and()
                .logout()
                .logoutUrl("/user/logout")		//虛擬路徑,不是控制器定義的路徑
                .logoutSuccessHandler(this)
                .permitAll()
                .and()
                .exceptionHandling()
                .authenticationEntryPoint(this)
                .and()
                .formLogin()
                .usernameParameter("username")
                .passwordParameter("password")
                .loginProcessingUrl("/user/login")		//虛擬路徑,不是控制器定義的路徑
                .successForwardUrl("/user/login")		//是控制器定義的路徑
                .failureHandler(this)
                .and()
                .httpBasic()
                .authenticationEntryPoint(this);
    }

    @Override
    protected void configure(AuthenticationManagerBuilder auth) throws Exception {
        auth.userDetailsService(userDetailService);
    }

webflux-驗證依賴於用戶數據服務,需定義實現ReactiveUserDetailsService的Bean

@Configuration
@EnableWebFluxSecurity		//使用註解@EnableWebFluxSecurity
public class WebFluxSecurityConfig implements 
WebFilter,		//攔截器
ServerLogoutSuccessHandler,		//登出成功回調
ServerAuthenticationEntryPoint,		//驗證入口
ServerAuthenticationFailureHandler,		//驗證成功回調 
ServerAuthenticationSuccessHandler {		//驗證失敗回調
	//實現接口的方法
	@Override
    public Mono<Void> filter(ServerWebExchange exchange, WebFilterChain chain) {
    	//配置webflux的context-path
        ServerHttpRequest request = exchange.getRequest();
        if (request.getURI().getPath().startsWith(contextPath)) {
            exchange = exchange.mutate().request(request.mutate().contextPath(contextPath).build()).build();
        }
        //把查詢參數轉移到FormData中,否則驗證過濾器(ServerFormLoginAuthenticationConverter)接受不到參數
        if (exchange.getRequest().getMethod() == HttpMethod.POST && exchange.getRequest().getQueryParams().size() > 0) {
            ServerWebExchange finalExchange = exchange;
            ServerWebExchange realExchange = new Decorator(exchange) {
                @Override
                public Mono<MultiValueMap<String, String>> getFormData() {
                    return super.getFormData().map(new Function<MultiValueMap<String, String>, MultiValueMap<String, String>>() {
                        @Override
                        public MultiValueMap<String, String> apply(MultiValueMap<String, String> stringStringMultiValueMap) {
                            if (stringStringMultiValueMap.size() == 0) {
                                return finalExchange.getRequest().getQueryParams();
                            } else {
                                return stringStringMultiValueMap;
                            }
                        }
                    });
                }
            };
            return chain.filter(realExchange);
        }
        return chain.filter(exchange);
    }
    
	@Override
    public Mono<Void> onLogoutSuccess(WebFilterExchange webFilterExchange, Authentication authentication) {
        return sendJson(webFilterExchange.getExchange(), new Response<>("登出成功"));
    }

    @Override
    public Mono<Void> commence(ServerWebExchange exchange, AuthenticationException e) {
        return sendJson(exchange, new Response<>(HttpStatus.UNAUTHORIZED.value(), "未驗證"));
    }

    @Override
    public Mono<Void> onAuthenticationFailure(WebFilterExchange webFilterExchange, AuthenticationException exception) {
        return sendJson(webFilterExchange.getExchange(), new Response<>(1, "驗證失敗"));
    }

    @Override
    public Mono<Void> onAuthenticationSuccess(WebFilterExchange webFilterExchange, Authentication authentication) {
        return webFilterExchange.getChain().filter(
                webFilterExchange.getExchange().mutate()
                        .request(t -> t.method(HttpMethod.POST).path("/user/login"))		//轉發到自定義控制器
                        .build()
        );
    }
    
	@Bean
    public SecurityWebFilterChain springSecurityFilterChain(ServerHttpSecurity http) {
        http.addFilterAfter(this, SecurityWebFiltersOrder.FIRST)
                .csrf().disable()
                .authorizeExchange()
                .pathMatchers("/swagger*/**", "/webjars/**", "/v2/api-docs")		//swagger
                .permitAll()
                .and()
                .authorizeExchange()
                .pathMatchers("/static/**", "/file/**")		//靜態資源
                .permitAll()
                .and()
                .authorizeExchange()
                .anyExchange()
                .authenticated()
                .and()
                .logout()		//登出
                .logoutUrl("/user/logout")
                .logoutSuccessHandler(this)
                .and()
                .exceptionHandling()		//未驗證回調
                .authenticationEntryPoint(this)
                .and()
                .formLogin()
                .loginPage("/user/login")
                .authenticationFailureHandler(this)		//驗證失敗回調
                .authenticationSuccessHandler(this)		//驗證成功回調
                .and()
                .httpBasic()
                .authenticationEntryPoint(this);		//basic驗證,通常用於移動端
        return http.build();
    }
}

WebSession配置

@Configuration
@EnableRedisWebSession(maxInactiveIntervalInSeconds = 60) //使用註解@EnableRedisWebSession ,maxInactiveIntervalInSeconds設置數據過時時間,spring.session.timeout無論用
public class RedisWebSessionConfig { //考慮到分佈式系統,通常使用redis存儲session

    @Bean
    public LettuceConnectionFactory lettuceConnectionFactory() {
        return new LettuceConnectionFactory();
    }

}
//單點登陸使用ReactiveRedisSessionRepository.getSessionRedisOperations().scan方法查詢相同用戶名的session,刪除其餘session便可
public Mono<Map<String, String>> findByPrincipalName(String name) {
        return reactiveSessionRepository.getSessionRedisOperations().scan(ScanOptions.scanOptions().match(ReactiveRedisSessionRepository.DEFAULT_NAMESPACE + ":sessions:*").build())
                .flatMap(new Function<String, Publisher<Tuple2<String, Map.Entry<Object, Object>>>>() {
                    @Override
                    public Publisher<Tuple2<String, Map.Entry<Object, Object>>> apply(String s) {
                        return reactiveSessionRepository.getSessionRedisOperations().opsForHash().entries(s)
                                .map(new Function<Map.Entry<Object, Object>, Tuple2<String, Map.Entry<Object, Object>>>() {
                                    @Override
                                    public Tuple2<String, Map.Entry<Object, Object>> apply(Map.Entry<Object, Object> objectObjectEntry) {
                                        return Tuples.of(s, objectObjectEntry);
                                    }
                                });
                    }
                })
                .filter(new Predicate<Tuple2<String, Map.Entry<Object, Object>>>() {
                    @Override
                    public boolean test(Tuple2<String, Map.Entry<Object, Object>> rule) {
                        Map.Entry<Object, Object> t = rule.getT2();
                        String key = "sessionAttr:" + HttpSessionSecurityContextRepository.SPRING_SECURITY_CONTEXT_KEY;
                        if (key.equals(t.getKey())) {
                            User sci = (User) ((SecurityContextImpl) t.getValue()).getAuthentication().getPrincipal();
                            return sci.getUsername().equals(name);
                        }
                        return false;
                    }
                })
                .collectMap(new Function<Tuple2<String, Map.Entry<Object, Object>>, String>() {
                    @Override
                    public String apply(Tuple2<String, Map.Entry<Object, Object>> rule) {
                        return name;
                    }
                }, new Function<Tuple2<String, Map.Entry<Object, Object>>, String>() {
                    @Override
                    public String apply(Tuple2<String, Map.Entry<Object, Object>> rule) {
                        return rule.getT1().replace(ReactiveRedisSessionRepository.DEFAULT_NAMESPACE + ":sessions:", "");
                    }
                });
    }

對標的 SpringWebMVC配置

@Configuration
@EnableRedisHttpSession	//使用註解@EnableRedisHttpSession	
public class RedisHttpSessionConfig { //考慮到分佈式系統,通常使用redis存儲session

  @Bean
  public LettuceConnectionFactory redisConnectionFactory() {
    return new LettuceConnectionFactory();
  }

}
//單點登陸使用FindByIndexNameSessionRepository根據用戶名查詢session,刪除其餘session便可
Map<String, Session> map = findByIndexNameSessionRepository.findByPrincipalName(name);

文件上傳配置

//參數上傳
//定義參數bean
@Setter
@Getter
@ToString
@ApiModel
public class QueryBean{
    @ApiModelProperty(value = "普通參數", required = false, example = "")
    private String query;
    @ApiModelProperty(value = "文件參數", required = false, example = "")
    private FilePart image;		//強調,webflux中使用FilePart做爲接收文件的類型
}
//定義接口
@ApiOperation("一個接口")
@PostMapping("/path")
//這裏須要使用@ApiImplicitParam顯示配置【文件參數】才能使swagger界面顯示上傳文件按鈕
@ApiImplicitParams({
	@ApiImplicitParam(
		paramType = "form", //表單參數
		dataType = "__file", //最新版本使用__file表示文件,之前用的是file
		name = "image", //和QueryBean裏面的【文件參數image】同名
		value = "文件")	//註釋
})
public Mono<Response> bannerAddOrUpdate(QueryBean q) {

}

WebFlux 執行流程

userAdd方法代碼以下:

public Mono<User> userAdd(@RequestBody User dto)
        {
            //命令式寫法
//        jpaEntityService.delUser(dto);

            //響應式寫法
            return Mono.create(cityMonoSink -> cityMonoSink.success(jpaEntityService.addUser(dto)));
        }

因爲返回的數據只有一個因此使用的是Mono做爲返回數據,使用Mono類靜態create方法建立Mono對象,代碼以下:

public abstract class Mono<T> implements Publisher<T> {
    static final BiPredicate EQUALS_BIPREDICATE = Object::equals;

    public Mono() {
    }

    public static <T> Mono<T> create(Consumer<MonoSink<T>> callback) {
        return onAssembly(new MonoCreate(callback));
    }
...
}

​ 能夠到create方法接收一個參數,參數是Consumer對象,經過callback能夠看出,這裏使用的是callback回調,下面看看Consumer接口的定義:

@FunctionalInterface
public interface Consumer<T> {
 
    /**
     * Performs this operation on the given argument.
     *
     * @param t the input argument
     */
    void accept(T t);
 
    /**
     * Returns a composed {@code Consumer} that performs, in sequence, this
     * operation followed by the {@code after} operation. If performing either
     * operation throws an exception, it is relayed to the caller of the
     * composed operation.  If performing this operation throws an exception,
     * the {@code after} operation will not be performed.
     *
     * @param after the operation to perform after this operation
     * @return a composed {@code Consumer} that performs in sequence this
     * operation followed by the {@code after} operation
     * @throws NullPointerException if {@code after} is null
     */
    default Consumer<T> andThen(Consumer<? super T> after) {
        Objects.requireNonNull(after);
        return (T t) -> { accept(t); after.accept(t); };
    }
}

經過上面的代碼能夠看出,有兩個方法,一個是默認的方法andThen,還有一個accept方法,

Mono.create()方法的參數須要一個實現類,實現Consumer接口;Mono.create方法的參數指向的實例對象, 就是要實現這個accept方法。

例子中,下面的lambda表達式,就是accept方法的實現,實參的類型爲 Consumer<MonoSink > , accept的實現爲 以下:

cityMonoSink -> cityMonoSink.success(jpaEntityService.addUser(dto))

來來來,重複看一下,create方法的實現:

public static <T> Mono<T> create(Consumer<MonoSink<T>> callback) {
        return onAssembly(new MonoCreate(callback));
    }

​ 在方法內部調用了onAssembly方法,參數是MonoCreate對象,而後咱們看看MonoCreate類,代碼以下:

//
// Source code recreated from a .class file by IntelliJ IDEA
// (powered by Fernflower decompiler)
//
 
package reactor.core.publisher;
 
import java.util.Objects;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import java.util.function.Consumer;
import java.util.function.LongConsumer;
import reactor.core.CoreSubscriber;
import reactor.core.Disposable;
import reactor.core.Scannable.Attr;
import reactor.core.publisher.FluxCreate.SinkDisposable;
import reactor.util.annotation.Nullable;
import reactor.util.context.Context;
 
final class MonoCreate<T> extends Mono<T> {
    final Consumer<MonoSink<T>> callback;
 
    MonoCreate(Consumer<MonoSink<T>> callback) {
        this.callback = callback;
    }
 
    public void subscribe(CoreSubscriber<? super T> actual) {
        MonoCreate.DefaultMonoSink<T> emitter = new MonoCreate.DefaultMonoSink(actual);
        actual.onSubscribe(emitter);
 
        try {
            this.callback.accept(emitter);
        } catch (Throwable var4) {
            emitter.error(Operators.onOperatorError(var4, actual.currentContext()));
        }
 
    }
 
    static final class DefaultMonoSink<T> extends AtomicBoolean implements MonoSink<T>, InnerProducer<T> {
        final CoreSubscriber<? super T> actual;
        volatile Disposable disposable;
        static final AtomicReferenceFieldUpdater<MonoCreate.DefaultMonoSink, Disposable> DISPOSABLE = AtomicReferenceFieldUpdater.newUpdater(MonoCreate.DefaultMonoSink.class, Disposable.class, "disposable");
        volatile int state;
        static final AtomicIntegerFieldUpdater<MonoCreate.DefaultMonoSink> STATE = AtomicIntegerFieldUpdater.newUpdater(MonoCreate.DefaultMonoSink.class, "state");
        volatile LongConsumer requestConsumer;
        static final AtomicReferenceFieldUpdater<MonoCreate.DefaultMonoSink, LongConsumer> REQUEST_CONSUMER = AtomicReferenceFieldUpdater.newUpdater(MonoCreate.DefaultMonoSink.class, LongConsumer.class, "requestConsumer");
        T value;
        static final int NO_REQUEST_HAS_VALUE = 1;
        static final int HAS_REQUEST_NO_VALUE = 2;
        static final int HAS_REQUEST_HAS_VALUE = 3;
 
        DefaultMonoSink(CoreSubscriber<? super T> actual) {
            this.actual = actual;
        }
 
        public Context currentContext() {
            return this.actual.currentContext();
        }
 
        @Nullable
        public Object scanUnsafe(Attr key) {
            if (key != Attr.TERMINATED) {
                return key == Attr.CANCELLED ? OperatorDisposables.isDisposed(this.disposable) : super.scanUnsafe(key);
            } else {
                return this.state == 3 || this.state == 1;
            }
        }
 
        public void success() {
            if (STATE.getAndSet(this, 3) != 3) {
                try {
                    this.actual.onComplete();
                } finally {
                    this.disposeResource(false);
                }
            }
 
        }
 
        public void success(@Nullable T value) {
            if (value == null) {
                this.success();
            } else {
                int s;
                do {
                    s = this.state;
                    if (s == 3 || s == 1) {
                        Operators.onNextDropped(value, this.actual.currentContext());
                        return;
                    }
 
                    if (s == 2) {
                        if (STATE.compareAndSet(this, s, 3)) {
                            try {
                                this.actual.onNext(value);
                                this.actual.onComplete();
                            } finally {
                                this.disposeResource(false);
                            }
                        }
 
                        return;
                    }
 
                    this.value = value;
                } while(!STATE.compareAndSet(this, s, 1));
 
            }
        }
 
        public void error(Throwable e) {
            if (STATE.getAndSet(this, 3) != 3) {
                try {
                    this.actual.onError(e);
                } finally {
                    this.disposeResource(false);
                }
            } else {
                Operators.onOperatorError(e, this.actual.currentContext());
            }
 
        }
 
        public MonoSink<T> onRequest(LongConsumer consumer) {
            Objects.requireNonNull(consumer, "onRequest");
            if (!REQUEST_CONSUMER.compareAndSet(this, (Object)null, consumer)) {
                throw new IllegalStateException("A consumer has already been assigned to consume requests");
            } else {
                return this;
            }
        }
 
        public CoreSubscriber<? super T> actual() {
            return this.actual;
        }
 
        public MonoSink<T> onCancel(Disposable d) {
            Objects.requireNonNull(d, "onCancel");
            SinkDisposable sd = new SinkDisposable((Disposable)null, d);
            if (!DISPOSABLE.compareAndSet(this, (Object)null, sd)) {
                Disposable c = this.disposable;
                if (c instanceof SinkDisposable) {
                    SinkDisposable current = (SinkDisposable)c;
                    if (current.onCancel == null) {
                        current.onCancel = d;
                    } else {
                        d.dispose();
                    }
                }
            }
 
            return this;
        }
 
        public MonoSink<T> onDispose(Disposable d) {
            Objects.requireNonNull(d, "onDispose");
            SinkDisposable sd = new SinkDisposable(d, (Disposable)null);
            if (!DISPOSABLE.compareAndSet(this, (Object)null, sd)) {
                Disposable c = this.disposable;
                if (c instanceof SinkDisposable) {
                    SinkDisposable current = (SinkDisposable)c;
                    if (current.disposable == null) {
                        current.disposable = d;
                    } else {
                        d.dispose();
                    }
                }
            }
 
            return this;
        }
 
        public void request(long n) {
            if (Operators.validate(n)) {
                LongConsumer consumer = this.requestConsumer;
                if (consumer != null) {
                    consumer.accept(n);
                }
 
                int s;
                do {
                    s = this.state;
                    if (s == 2 || s == 3) {
                        return;
                    }
 
                    if (s == 1) {
                        if (STATE.compareAndSet(this, s, 3)) {
                            try {
                                this.actual.onNext(this.value);
                                this.actual.onComplete();
                            } finally {
                                this.disposeResource(false);
                            }
                        }
 
                        return;
                    }
                } while(!STATE.compareAndSet(this, s, 2));
 
            }
        }
 
        public void cancel() {
            if (STATE.getAndSet(this, 3) != 3) {
                this.value = null;
                this.disposeResource(true);
            }
 
        }
 
        void disposeResource(boolean isCancel) {
            Disposable d = this.disposable;
            if (d != OperatorDisposables.DISPOSED) {
                d = (Disposable)DISPOSABLE.getAndSet(this, OperatorDisposables.DISPOSED);
                if (d != null && d != OperatorDisposables.DISPOSED) {
                    if (isCancel && d instanceof SinkDisposable) {
                        ((SinkDisposable)d).cancel();
                    }
 
                    d.dispose();
                }
            }
 
        }
    }
}

上面的代碼比較多,咱們主要關注下面兩個函數:

MonoCreate(Consumer<MonoSink<T>> callback) {
        this.callback = callback;
    }
 
    public void subscribe(CoreSubscriber<? super T> actual) {
        MonoCreate.DefaultMonoSink<T> emitter = new MonoCreate.DefaultMonoSink(actual);
        actual.onSubscribe(emitter);
 
        try {
            this.callback.accept(emitter);
        } catch (Throwable var4) {
            emitter.error(Operators.onOperatorError(var4, actual.currentContext()));
        }
 
    }

經過上面的代碼能夠看出,一個是構造器,參數是Consumer,裏面進行操做保存了Consumer對象,而後在subscribe方法裏面有一句代碼是this.callback.accept(emitter),就是在這裏進行了接口的回調,回調Consumer的accept方法,這個方法是在調用Mono.create()方法的時候實現了。而後在細看subscribe方法,這裏面有一個actual.onSubscribe方法,經過方法名能夠知道,這裏是訂閱了消息。webflux是基於reactor模型,基於事件消息和異步,這裏也體現了一個異步。

Mono和Flux的其餘用法能夠參照上面的源碼流程本身看看,就不細說了。

相關文章
相關標籤/搜索