RSocket
應用層協議支持 Reactive Streams
語義, 例如:用RSocket做爲HTTP的一種替代方案。在本教程中, 咱們將看到RSocket
用在spring boot中,特別是spring boot 如何幫助抽象出更低級別的RSocket API。java
2. 依賴
讓咱們從添加spring-boot-starter-rsocket
依賴開始:git
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-rsocket</artifactId> </dependency>
這個依賴會傳遞性的拉取RSocket
相關的依賴,好比:rsocket-core
和 rsocket-transport-netty
github
3.示例的應用程序
如今繼續咱們的簡單應用程序。爲了突出RSocket
提供的交互模式,我打算建立一個交易應用程序, 交易應用程序包括客戶端和服務器。web
3.1. 服務器設置
首先,咱們設置由springboot應用程序引導的RSocket server
服務器。 由於有spring-boot-starter-rsocket dependency
依賴,因此springboot會自動配置RSocket server
。 跟日常同樣, 能夠用屬性驅動的方式修改RSocket server
默認配置值。例如:經過增長以下配置在application.properties
中,來修改RSocket
端口spring
spring.rsocket.server.port=7000
也能夠根據須要進一步修改服務器的其餘屬性springboot
3.2.設置客戶端
接下來,咱們來設置客戶端,也是一個springboot應用程序。雖然springboot自動配置大部分RSocket相關的組件,但還要自定義一些對象來完成設置。bash
@Configuration
public class ClientConfiguration {
@Bean
public RSocket rSocket() { return RSocketFactory .connect() .mimeType(MimeTypeUtils.APPLICATION_JSON_VALUE, MimeTypeUtils.APPLICATION_JSON_VALUE) .frameDecoder(PayloadDecoder.ZERO_COPY) .transport(TcpClientTransport.create(7000)) .start() .block(); } @Bean RSocketRequester rSocketRequester(RSocketStrategies rSocketStrategies) { return RSocketRequester.wrap(rSocket(), MimeTypeUtils.APPLICATION_JSON, rSocketStrategies); } }
這兒咱們正在建立RSocket
客戶端而且配置TCP端口爲:7000。注意: 該服務端口咱們在前面已經配置過。 接下來咱們定義了一個RSocket的裝飾器對象RSocketRequester
。 這個對象在咱們跟RSocket server
交互時會爲咱們提供幫助。 定義這些對象配置後,咱們還只是有了一個骨架。在接下來,咱們將暴露不一樣的交互模式, 並看看springboot在這個地方提供幫助的。服務器
4. springboot RSocket
中的 Request/Response
咱們從Request/Response
開始,HTTP
也使用這種通訊方式,這也是最多見的、最類似的交互模式。 在這種交互模式裏, 由客戶端初始化通訊併發送一個請求。以後,服務器端執行操做並返回一個響應給客戶端--這時通訊完成。 在咱們的交易應用程序裏, 一個客戶端詢問一個給定的股票的當前的市場數據。 做爲回覆,服務器會傳遞請求的數據。併發
4.1.服務器
在服務器這邊,咱們首先應該建立一個controller
來持有咱們的處理器方法。 咱們會使用 @MessageMapping
註解來代替像SpringMVC中的@RequestMapping
或者@GetMapping
註解app
@Controller
public class MarketDataRSocketController {
private final MarketDataRepository marketDataRepository;
public MarketDataRSocketController(MarketDataRepository marketDataRepository) {
this.marketDataRepository = marketDataRepository;
}
@MessageMapping("currentMarketData") public Mono<MarketData> currentMarketData(MarketDataRequest marketDataRequest) { return marketDataRepository.getOne(marketDataRequest.getStock()); } }
來研究下咱們的控制器。 咱們將使用@Controller
註解來定義一個控制器來處理進入RSocket的請求。 另外,註解@MessageMapping
讓咱們定義咱們感興趣的路由和如何響應一個請求。 在這個示例中, 服務器監聽路由currentMarketData
, 並響應一個單一的結果Mono<MarketData>
給客戶端。
4.2. 客戶端
接下來, 咱們的RSocket客戶端應該詢問一隻股票的價格並獲得一個單一的響應。 爲了初始化請求, 咱們該使用RSocketRequester
類,以下:
@RestController
public class MarketDataRestController {
private final RSocketRequester rSocketRequester;
public MarketDataRestController(RSocketRequester rSocketRequester) {
this.rSocketRequester = rSocketRequester;
}
@GetMapping(value = "/current/{stock}") public Publisher<MarketData> current(@PathVariable("stock") String stock) { return rSocketRequester .route("currentMarketData") .data(new MarketDataRequest(stock)) .retrieveMono(MarketData.class); } }
注意:在示例中,RSocket
客戶端也是一個REST
風格的controller
,以此來訪問咱們的RSocket
服務器。所以,咱們使用@RestController
和@GetMapping
註解來定義咱們的請求/響應端點。 在端點方法中, 咱們使用的是類RSocketRequester
並指定了路由。 事實上,這個是服務器端RSocket
所指望的路由,而後咱們傳遞請求數據。最後,當調用retrieveMono()
方法時,springboot會幫咱們初始化一個請求/響應交互。
5. Spring Boot RSocket
中的Fire And Forget
模式
接下來咱們將查看 Fire And Forget
交互模式。正如名字提示的同樣,客戶端發送一個請求給服務器,可是不指望服務器的返回響應回來。 在咱們的交易程序中, 一些客戶端會做爲數據資源服務,而且推送市場數據給服務器端。
5.1.服務器端
咱們來建立另一個端點在咱們的服務器應用程序中,以下:
@MessageMapping("collectMarketData") public Mono<Void> collectMarketData(MarketData marketData) { marketDataRepository.add(marketData); return Mono.empty(); }
咱們又一次定義了一個新的@MessageMapping
路由爲collectMarketData
。此外, Spring Boot自動轉換傳入的負載爲一個MarketData
實例。 可是,這兒最大的不一樣是咱們返回一個Mono<Void>
,由於客戶端不須要服務器的返回。
5.2. 客戶端
來看看咱們如何初始化咱們的fire-and-forget
模式的請求。 咱們將建立另一個REST風格的端點,以下:
@GetMapping(value = "/collect") public Publisher<Void> collect() { return rSocketRequester .route("collectMarketData") .data(getMarketData()) .send(); }
這兒咱們指定路由和負載將是一個MarketData
實例。 因爲咱們使用send()
方法來代替retrieveMono()
,全部交互模式變成了fire-and-forget
模式。
6.Spring Boot RSocket
中的Request Stream
請求流是一種更復雜的交互模式, 這個模式中客戶端發送一個請求,可是在一段時間內從服務器端獲取到多個響應。 爲了模擬這種交互模式, 客戶端會詢問給定股票的全部市場數據。
6.1.服務器端
咱們從服務器端開始。 咱們將添加另一個消息映射方法,以下:
@MessageMapping("feedMarketData") public Flux<MarketData> feedMarketData(MarketDataRequest marketDataRequest) { return marketDataRepository.getAll(marketDataRequest.getStock()); }
正如所見, 這個處理器方法跟其餘的處理器方法很是相似。 不一樣的部分是咱們返回一個Flux<MarketData>
來代替Mono<MarketData>
。 最後咱們的RSocket服務器會返回多個響應給客戶端。
6.2.客戶端
在客戶端這邊, 咱們該建立一個端點來初始化請求/響應通訊,以下:
@GetMapping(value = "/feed/{stock}", produces = MediaType.TEXT_EVENT_STREAM_VALUE) public Publisher<MarketData> feed(@PathVariable("stock") String stock) { return rSocketRequester .route("feedMarketData") .data(new MarketDataRequest(stock)) .retrieveFlux(MarketData.class); }
咱們來研究下RSocket請求。 首先咱們定義了路由和請求負載。 而後,咱們定義了使用retrieveFlux()
調用的響應指望。這部分決定了交互模式。 另外注意:因爲咱們的客戶端也是REST
風格的服務器,客戶端也定義了響應媒介類型MediaType.TEXT_EVENT_STREAM_VALUE
。
7.異常的處理
如今讓咱們看看在服務器程序中,如何以聲明式的方式處理異常。 當處理請求/響應式, 我能夠簡單的使用@MessageExceptionHandler
註解,以下:
@MessageExceptionHandler
public Mono<MarketData> handleException(Exception e) {
return Mono.just(MarketData.fromException(e)); }
這裏咱們給異常處理方法標記註解爲@MessageExceptionHandler
。做爲結果, 這個方法將處理全部類型的異常, 由於Exception
是全部其餘類型的異常的超類。 咱們也能夠明確地建立更多的不一樣類型的,不一樣的異常處理方法。 這固然是請求/響應模式,而且咱們返回的是Mono<MarketData>
。咱們指望這裏的響應類型跟咱們的交互模式的返回類型相匹配。
8.總結
在本教程中, 咱們介紹了springboot的RSocket支持,並詳細列出了RSocket提供的不一樣交互模式。查看全部示例代碼在GitHub上。
做者:baeldung
譯者:sleeve
![](http://static.javashuo.com/static/loading.gif)