springboot 使用webflux響應式開發教程(二)

本篇是對springboot 使用webflux響應式開發教程(一)的進一步學習。
分三個部分:javascript

數據庫操做
webservice
websocket
建立項目,artifactId = trading-service,groupId=io.spring.workshop。選擇Reactive Web , Devtools, Thymeleaf , Reactive Mongo。
WEB容器
spring-boot-starter-webflux 附帶了 spring-boot-starter-reactor-netty,因此默認使用Reactor Netty做爲web server。
若是要用Tomcat,添加pom便可css

<dependency>
     <groupId>org.springframework.boot</groupId>
     <artifactId>spring-boot-starter-tomcat</artifactId>
</dependency>
一樣支持Undertow和Jetty

響應式數據庫操做

這個示例使用MongoDB。做爲reactive模式,數據庫的驅動與傳統模式區分開。截至目前尚未mysql的reactive驅動,據悉正在研發。本例中使用內存版的mongodb,須要添加依賴html

<dependency>
    <groupId>de.flapdoodle.embed</groupId>
    <artifactId>de.flapdoodle.embed.mongo</artifactId>
</dependency>
在初次運行時會自動下載mongodb模塊,可是牆國是直連不到mongodb的官網,因此在須要添加代理。在這推薦使用JVM參數的方式,-DproxySet=true -Dhttps.proxyHost=127.0.0.1 -Dhttps.proxyPort=1080。須要注意的是http和https協議是區分開來配置的,若是須要http的代理就須要把Dhttps改成Dhttp。 
數據庫的存儲實體 TradingUser
@Document
@Data
public class TradingUser {

    @Id
    private String id;

    private String userName;

    private String fullName;

    public TradingUser() {
    }

    public TradingUser(String id, String userName, String fullName) {
        this.id = id;
        this.userName = userName;
        this.fullName = fullName;
    }

    public TradingUser(String userName, String fullName) {
        this.userName = userName;
        this.fullName = fullName;
    }



    @Override
    public boolean equals(Object o) {
        if (this == o) return true;
        if (o == null || getClass() != o.getClass()) return false;

        TradingUser that = (TradingUser) o;

        if (!id.equals(that.id)) return false;
        return userName.equals(that.userName);
    }

    @Override
    public int hashCode() {
        int result = id.hashCode();
        result = 31 * result + userName.hashCode();
        return result;
    }
}

建立TradingUserRepository繼承ReactiveMongoRepository。添加findByUserName方法返回一個實體。
在項目啓動的時候咱們要初始化一些數據,爲此建立UsersCommandLineRunner並繼承CommandLineRunner並重寫run方法,在該方法裏初始化數據,並插入到數據庫中。前端

@Component
public class UsersCommandLineRunner implements CommandLineRunner {

    private final TradingUserRepository repository;

    public UsersCommandLineRunner(TradingUserRepository repository) {
        this.repository = repository;
    }

    @Override
    public void run(String... strings) throws Exception {
        List<TradingUser> users = Arrays.asList(
                new TradingUser("sdeleuze", "Sebastien Deleuze"),
                new TradingUser("snicoll", "Stephane Nicoll"),
                new TradingUser("rstoyanchev", "Rossen Stoyanchev"),
                new TradingUser("poutsma", "Arjen Poutsma"),
                new TradingUser("smaldini", "Stephane Maldini"),
                new TradingUser("simonbasle", "Simon Basle"),
                new TradingUser("violetagg", "Violeta Georgieva"),
                new TradingUser("bclozel", "Brian Clozel")
        );
        this.repository.insert(users).blockLast(Duration.ofSeconds(3));
    }
}
因爲該方法是void類型,實現是阻塞的,所以在 repository 插入數據返回Flux的時候須要調用 blockLast(Duration) 
。也可使用 then().block(Duration) 將 Flux 轉化爲 Mono<Void> 等待執行結束。

建立 webservice, @RestController標註 的 UserController,添加兩個控制器方法
一、get請求,」/users」,返回全部TradingUser,content-type = 「application/json」
二、get請求,」/users/{username}」,返回單個TradingUser,content-type = 「application/json」java

@RestController
public class UserController {

    private final TradingUserRepository tradingUserRepository;

    public UserController(TradingUserRepository tradingUserRepository) {
        this.tradingUserRepository = tradingUserRepository;
    }
    @GetMapping(path = "/users", produces = MediaType.APPLICATION_JSON_VALUE)
    public Flux<TradingUser> listUsers() {
        return this.tradingUserRepository.findAll();
    }

    @GetMapping(path = "/users/{username}", produces = MediaType.APPLICATION_JSON_VALUE)
    public Mono<TradingUser> showUsers(@PathVariable String username) {
        return this.tradingUserRepository.findByUserName(username);
    }
}

編寫測試mysql

@RunWith(SpringRunner.class)
@WebFluxTest(UserController.class)
public class UserControllerTests {

  @Autowired
  private WebTestClient webTestClient;

  @MockBean
  private TradingUserRepository repository;

  @Test
  public void listUsers() {
    TradingUser juergen = new TradingUser("1", "jhoeller", "Juergen Hoeller");
    TradingUser andy = new TradingUser("2", "wilkinsona", "Andy Wilkinson");

    BDDMockito.given(this.repository.findAll())
        .willReturn(Flux.just(juergen, andy));

    this.webTestClient.get().uri("/users").accept(MediaType.APPLICATION_JSON)
        .exchange()
        .expectBodyList(TradingUser.class)
        .hasSize(2)
        .contains(juergen, andy);

  }

  @Test
  public void showUser() {
    TradingUser juergen = new TradingUser("1", "jhoeller", "Juergen Hoeller");

    BDDMockito.given(this.repository.findByUserName("jhoeller"))
        .willReturn(Mono.just(juergen));

    this.webTestClient.get().uri("/users/jhoeller").accept(MediaType.APPLICATION_JSON)
        .exchange()
        .expectBody(TradingUser.class)
        .isEqualTo(juergen);
  }

}

用Thymeleaf渲染頁面 
pom添加前端依賴react

<dependency>
    <groupId>org.webjars</groupId>
    <artifactId>bootstrap</artifactId>
    <version>3.3.7</version>
</dependency>
<dependency>
    <groupId>org.webjars</groupId>
    <artifactId>highcharts</artifactId>
    <version>5.0.8</version>
</dependency>

建立HomeControllerjquery

@Controller
public class HomeController {

    private final TradingUserRepository tradingUserRepository;

    public HomeController(TradingUserRepository tradingUserRepository) {
        this.tradingUserRepository = tradingUserRepository;
    }

    @GetMapping("/")
    public String home(Model model) {
        model.addAttribute("users", this.tradingUserRepository.findAll());
        return "index";
    }
}

建立首頁git

<!DOCTYPE html>
<html lang="en" xmlns:th="http://www.thymeleaf.org">
<head>
    <meta charset="utf-8"/>
    <meta http-equiv="X-UA-Compatible" content="IE=edge"/>
    <meta name="viewport" content="width=device-width, initial-scale=1"/>
    <meta name="description" content="Spring WebFlux Workshop"/>
    <meta name="author" content="Violeta Georgieva and Brian Clozel"/>
    <title>Spring Trading application</title>
    <link rel="stylesheet" href="/webjars/bootstrap/3.3.7/css/bootstrap-theme.min.css"/>
    <link rel="stylesheet" href="/webjars/bootstrap/3.3.7/css/bootstrap.min.css"/>
</head>
<body>
<nav class="navbar navbar-default">
    <div class="container-fluid">
        <div class="navbar-header">
            <a class="navbar-brand" href="/">Spring Trading application</a>
        </div>
        <div id="navbar" class="navbar-collapse collapse">
            <ul class="nav navbar-nav">
                <li class="active"><a href="/">Home</a></li>
                <li><a href="/quotes">Quotes</a></li>
                <li><a href="/websocket">Websocket</a></li>
            </ul>
        </div>
    </div>
</nav>
<div class="container wrapper">
    <h2>Trading users</h2>
    <table class="table table-striped">
        <thead>
        <tr>
            <th>#</th>
            <th>User name</th>
            <th>Full name</th>
        </tr>
        </thead>
        <tbody>
        <tr th:each="user: ${users}">
            <th scope="row" th:text="${user.id}">42</th>
            <td th:text="${user.userName}">janedoe</td>
            <td th:text="${user.fullName}">Jane Doe</td>
        </tr>
        </tbody>
    </table>
</div>
<script type="text/javascript" src="/webjars/jquery/1.11.1/jquery.min.js"></script>
<script type="text/javascript" src="/webjars/bootstrap/3.3.7/js/bootstrap.min.js"></script>
</body>
</html>
Spring WebFlux在渲染視圖以前自動解析Publisher實例,所以不需包含阻塞代碼

使用WebClient 將 stream JSON 輸送到瀏覽器

如今要用到springboot 使用webflux響應式開發教程(一)的示例,遠程調用該服務。而後建立視圖github

<!DOCTYPE html>
<html lang="en" xmlns:th="http://www.thymeleaf.org">
<head>
    <meta charset="utf-8"/>
    <meta http-equiv="X-UA-Compatible" content="IE=edge"/>
    <meta name="viewport" content="width=device-width, initial-scale=1"/>
    <meta name="description" content="Spring WebFlux Workshop"/>
    <meta name="author" content="Violeta Georgieva and Brian Clozel"/>
    <title>Spring Trading application</title>
    <link rel="stylesheet" href="/webjars/bootstrap/3.3.7/css/bootstrap-theme.min.css"/>
    <link rel="stylesheet" href="/webjars/bootstrap/3.3.7/css/bootstrap.min.css"/>
    <link rel="stylesheet" href="/webjars/highcharts/5.0.8/css/highcharts.css"/>
</head>
<body>
<nav class="navbar navbar-default">
    <div class="container-fluid">
        <div class="navbar-header">
            <a class="navbar-brand" href="/">Spring Trading application</a>
        </div>
        <div id="navbar" class="navbar-collapse collapse">
            <ul class="nav navbar-nav">
                <li><a href="/">Home</a></li>
                <li class="active"><a href="/quotes">Quotes</a></li>
                <li><a href="/websocket">Websocket</a></li>
            </ul>
        </div>
    </div>
</nav>
<div class="container wrapper">
    <div id="chart" style="height: 400px; min-width: 310px"></div>
</div>
<script type="text/javascript" src="/webjars/jquery/1.11.1/jquery.min.js"></script>
<script type="text/javascript" src="/webjars/highcharts/5.0.8/highcharts.js"></script>
<script type="text/javascript" src="/webjars/bootstrap/3.3.7/js/bootstrap.min.js"></script>
<script type="text/javascript">

    // Setting up the chart
    var chart = new Highcharts.chart('chart', {
        title: {
            text: 'My Stock Portfolio'
        },
        yAxis: {
            title: {
                text: 'Stock Price'
            }
        },
        legend: {
            layout: 'vertical',
            align: 'right',
            verticalAlign: 'middle'
        },
        xAxis: {
            type: 'datetime',
        },
        series: [{
            name: 'CTXS',
            data: []
        }, {
            name: 'MSFT',
            data: []
        }, {
            name: 'ORCL',
            data: []
        }, {
            name: 'RHT',
            data: []
        }, {
            name: 'VMW',
            data: []
        }, {
            name: 'DELL',
            data: []
        }]
    });

    // This function adds the given data point to the chart
    var appendStockData = function (quote) {
        chart.series
            .filter(function (serie) {
                return serie.name == quote.ticker
            })
            .forEach(function (serie) {
                var shift = serie.data.length > 40;
                serie.addPoint([new Date(quote.instant), quote.price], true, shift);
            });
    };

    // The browser connects to the server and receives quotes using ServerSentEvents
    // those quotes are appended to the chart as they're received
    var stockEventSource = new EventSource("/quotes/feed");
    stockEventSource.onmessage = function (e) {
        appendStockData(JSON.parse(e.data));
    };
</script>
</body>
</html>
頁面會經過Server Sent Event(SSE) 向服務器請求Quotes。

建立控制器QuotesController並添加兩個方法以下

@Controller
public class QuotesController {

    @GetMapping("/quotes")
    public String quotes() {
        return "quotes";
    }

    @GetMapping(path = "/quotes/feed", produces = TEXT_EVENT_STREAM_VALUE)
    @ResponseBody
    public Flux<Quote> quotesStream() {
        return WebClient.create("http://localhost:8081")
                .get()
                .uri("/quotes")
                .accept(APPLICATION_STREAM_JSON)
                .retrieve()
                .bodyToFlux(Quote.class)
                .share()
                .log("io.spring.workshop.tradingservice");
    }
}
quotesStream方法返回的content-type爲」text/event-stream」,並將Flux<Quote>做爲響應主體,數據已由stock-quotes提供,在這使用WebClient來請求並檢索數據。 
同時應該避免爲每一個瀏覽器的請求都去向數據服務提供方發送請求,可使用Flux.share()

接下來進入頁面查看效果

建立WebSocket Handler
WebFlux 支持函數響應式WebSocket 客戶端和服務端。
服務端主要分兩部分:WebSocketHandlerAdapter 負責處理請求,而後委託給WebSocketService和WebSocketHandler返回響應完成會話。
spring mvc 的 reactive websocket 官方文檔參考 這裏.

先建立EchoWebSocketHandler 實現 WebSocketHandler接口

public class EchoWebSocketHandler implements WebSocketHandler {

    @Override
    public Mono<Void> handle(WebSocketSession session) {
        return session.send(session.receive()
                .doOnNext(WebSocketMessage::retain)
                .delayElements(Duration.ofSeconds(1)).log());
    }
}

實現handle方法,接收傳入的消息而後在延遲一秒後輸出。 
爲了將請求映射到Handler,須要建立WebSocketRouter

@Configuration
public class WebSocketRouter {

    @Bean
    public HandlerMapping handlerMapping() {

        Map<String, WebSocketHandler> map = new HashMap<>();
        map.put("/websocket/echo", new EchoWebSocketHandler());

        SimpleUrlHandlerMapping mapping = new SimpleUrlHandlerMapping();
        mapping.setOrder(10);
        mapping.setUrlMap(map);
        return mapping;
    }

    @Bean
    public WebSocketHandlerAdapter handlerAdapter() {
        return new WebSocketHandlerAdapter();
    }
}

而後建立WebSocketController

@Controller
public class WebSocketController {

    @GetMapping("/websocket")
    public String websocket() {
        return "websocket";
    }
}

返回視圖,在頁面上查看效果

<!DOCTYPE html>
<html lang="en" xmlns:th="http://www.thymeleaf.org">
<head>
    <meta charset="utf-8"/>
    <meta http-equiv="X-UA-Compatible" content="IE=edge"/>
    <meta name="viewport" content="width=device-width, initial-scale=1"/>
    <meta name="description" content="Spring WebFlux Workshop"/>
    <meta name="author" content="Violeta Georgieva and Brian Clozel"/>
    <title>Spring Trading application</title>
    <link rel="stylesheet" href="/webjars/bootstrap/3.3.7/css/bootstrap-theme.min.css"/>
    <link rel="stylesheet" href="/webjars/bootstrap/3.3.7/css/bootstrap.min.css"/>
</head>
<body>
<nav class="navbar navbar-default">
    <div class="container-fluid">
        <div class="navbar-header">
            <a class="navbar-brand" href="/">Spring Trading application</a>
        </div>
        <div id="navbar" class="navbar-collapse collapse">
            <ul class="nav navbar-nav">
                <li><a href="/">Home</a></li>
                <li><a href="/quotes">Quotes</a></li>
                <li class="active"><a href="/websocket">Websocket</a></li>
            </ul>
        </div>
    </div>
</nav>
<div class="container wrapper">
    <h2>Websocket Echo</h2>
    <form class="form-inline">
        <div class="form-group">
            <input class="form-control" type="text" id="input" value="type something">
            <input class="btn btn-default" type="submit" id="button" value="Send"/>
        </div>
    </form>
    <div id="output"></div>
</div>
<script type="text/javascript" src="/webjars/jquery/1.11.1/jquery.min.js"></script>
<script type="text/javascript" src="/webjars/bootstrap/3.3.7/js/bootstrap.min.js"></script>
<script type="text/javascript">
    $(document).ready(function () {
        if (!("WebSocket" in window)) WebSocket = MozWebSocket;
        var socket = new WebSocket("ws://localhost:8080/websocket/echo");

        socket.onopen = function (event) {
            var newMessage = document.createElement('p');
            newMessage.textContent = "-- CONNECTED";
            document.getElementById('output').appendChild(newMessage);

            socket.onmessage = function (e) {
                var newMessage = document.createElement('p');
                newMessage.textContent = "<< SERVER: " + e.data;
                document.getElementById('output').appendChild(newMessage);
            }

            $("#button").click(function (e) {
                e.preventDefault();
                var message = $("#input").val();
                socket.send(message);
                var newMessage = document.createElement('p');
                newMessage.textContent = ">> CLIENT: " + message;
                document.getElementById('output').appendChild(newMessage);
            });
        }
    });
</script>
</body>
</html>

也可使用WebSocketClient寫測試

@RunWith(SpringRunner.class)
@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT)
public class EchoWebSocketHandlerTests {

    @LocalServerPort
    private String port;

    @Test
    public void echo() throws Exception {
        int count = 4;
        Flux<String> input = Flux.range(1, count).map(index -> "msg-" + index);
        ReplayProcessor<Object> output = ReplayProcessor.create(count);

        WebSocketClient client = new StandardWebSocketClient();
        client.execute(getUrl("/websocket/echo"),
                session -> session
                        .send(input.map(session::textMessage))
                        .thenMany(session.receive().take(count).map(WebSocketMessage::getPayloadAsText))
                        .subscribeWith(output)
                        .then())
                .block(Duration.ofMillis(5000));

        assertEquals(input.collectList().block(Duration.ofMillis(5000)), output.collectList().block(Duration.ofMillis(5000)));
    }

    protected URI getUrl(String path) throws URISyntaxException {
        return new URI("ws://localhost:" + this.port + path);
    }
}

github源碼地址

相關文章
相關標籤/搜索