以前咱們提到過,對於底層的數據源來講,MongoDB, Redis, 和 Cassandra 能夠直接以reactive的方式支持Spring Data。而其餘不少關係型數據庫好比Postgres, Microsoft SQL Server, MySQL, H2 和 Google Spanner 則能夠經過使用R2DBC 來實現對reactive的支持。java
今天咱們就來具體講解一下R2DBC的使用。react
以前咱們介紹了Reactor還有基於其之上的Spring WebFlux框架。包括vert.x,rxjava等等reactive技術。咱們實際上在應用層已經有不少優秀的響應式處理框架。git
可是有一個問題就是全部的框架都須要獲取底層的數據,而基本上關係型數據庫的底層讀寫都仍是同步的。github
爲了解決這個問題,出現了兩個標準,一個是oracle提出的 ADBC (Asynchronous Database Access API),另外一個就是Pivotal提出的R2DBC (Reactive Relational Database Connectivity)。 web
R2DBC是基於Reactive Streams標準來設計的。經過使用R2DBC,你可使用reactive API來操做數據。spring
同時R2DBC只是一個開放的標準,而各個具體的數據庫鏈接實現,須要實現這個標準。shell
今天咱們以r2dbc-h2爲例,講解一下r2dbc在Spring webFlux中的使用。數據庫
咱們須要引入r2dbc-spi和r2dbc-h2兩個庫,其中r2dbc-spi是接口,而r2dbc-h2是具體的實現。oracle
同時咱們使用了Spring webflux,因此還須要引入spring-boot-starter-webflux。app
具體的依賴以下:
<!-- R2DBC H2 Driver --> <dependency> <groupId>io.r2dbc</groupId> <artifactId>r2dbc-h2</artifactId> <version>${r2dbc-h2.version}</version> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-webflux</artifactId> </dependency>
ConnectionFactory是數據庫鏈接的一個具體實現,經過ConnectionFactory咱們能夠建立到數據庫的鏈接。
先看一下數據庫的配置文件,爲了方便起見,這裏咱們使用的是內存數據庫H2 :
r2dbc.url=r2dbc:h2:mem://./r2dbc r2dbc.user=sa r2dbc.password=password
第一個url指定的是數據庫的鏈接方式,下面兩個是數據庫的用戶名和密碼。
接下來咱們看一下,怎麼經過這些屬性來建立ConnectionFactory:
@Bean public ConnectionFactory connectionFactory() { ConnectionFactoryOptions baseOptions = ConnectionFactoryOptions.parse(url); ConnectionFactoryOptions.Builder ob = ConnectionFactoryOptions.builder().from(baseOptions); if (!StringUtil.isNullOrEmpty(user)) { ob = ob.option(USER, user); } if (!StringUtil.isNullOrEmpty(password)) { ob = ob.option(PASSWORD, password); } return ConnectionFactories.get(ob.build()); }
經過url能夠parse獲得ConnectionFactoryOptions。而後經過ConnectionFactories的get方法建立ConnectionFactory。
若是咱們設置了USER或者PASSWORD,還能夠加上這兩個配置。
這裏,咱們建立一個簡單的User對象:
@Data @NoArgsConstructor @AllArgsConstructor public class Users { private Long id; private String firstname; private String lastname; }
雖然H5有不少更加簡單的方式來初始化數據庫,好比直接讀取SQL文件,這裏爲了說明R2DBC的使用,咱們使用手動的方式來建立:
@Bean public CommandLineRunner initDatabase(ConnectionFactory cf) { return (args) -> Flux.from(cf.create()) .flatMap(c -> Flux.from(c.createBatch() .add("drop table if exists Users") .add("create table Users(" + "id IDENTITY(1,1)," + "firstname varchar(80) not null," + "lastname varchar(80) not null)") .add("insert into Users(firstname,lastname)" + "values('flydean','ma')") .add("insert into Users(firstname,lastname)" + "values('jacken','yu')") .execute()) .doFinally((st) -> c.close()) ) .log() .blockLast(); }
上面的代碼中,咱們使用c.createBatch()來向數據庫插入一些數據。
除了createBatch,還可使用create來建立單個的執行語句。
在Dao中,咱們提供了一個findAll的方法:
public Flux<Users> findAll() { return Mono.from(connectionFactory.create()) .flatMap((c) -> Mono.from(c.createStatement("select id,firstname,lastname from users") .execute()) .doFinally((st) -> close(c))) .flatMapMany(result -> Flux.from(result.map((row, meta) -> { Users acc = new Users(); acc.setId(row.get("id", Long.class)); acc.setFirstname(row.get("firstname", String.class)); acc.setLastname(row.get("lastname", String.class)); return acc; }))); }
簡單解釋一下上面的使用。
由於是一個findAll方法,咱們須要找出全部的用戶信息。因此咱們返回的是一個Flux而不是一個Mono。
怎麼從Mono轉換成爲一個Flux呢?
這裏咱們使用的是flatMapMany,將select出來的結果,分紅一行一行的,最後轉換成爲Flux。
爲了防止SQL注入,咱們須要在SQL中使用Prepare statement:
public Mono<Users> findById(long id) { return Mono.from(connectionFactory.create()) .flatMap(c -> Mono.from(c.createStatement("select id,firstname,lastname from Users where id = $1") .bind("$1", id) .execute()) .doFinally((st) -> close(c))) .map(result -> result.map((row, meta) -> new Users(row.get("id", Long.class), row.get("firstname", String.class), row.get("lastname", String.class)))) .flatMap( p -> Mono.from(p)); }
看下咱們是怎麼在R2DBC中使用prepare statement的。
接下來咱們看一下怎麼在R2DBC中使用事務:
public Mono<Users> createAccount(Users account) { return Mono.from(connectionFactory.create()) .flatMap(c -> Mono.from(c.beginTransaction()) .then(Mono.from(c.createStatement("insert into Users(firstname,lastname) values($1,$2)") .bind("$1", account.getFirstname()) .bind("$2", account.getLastname()) .returnGeneratedValues("id") .execute())) .map(result -> result.map((row, meta) -> new Users(row.get("id", Long.class), account.getFirstname(), account.getLastname()))) .flatMap(pub -> Mono.from(pub)) .delayUntil(r -> c.commitTransaction()) .doFinally((st) -> c.close())); }
上面的代碼中,咱們使用了事務,具體的代碼有兩部分:
c -> Mono.from(c.beginTransaction()) .delayUntil(r -> c.commitTransaction())
開啓是的時候須要使用beginTransaction,後面提交就須要調用commitTransaction。
最後,咱們須要建立WebFlux應用來對外提供服務:
@GetMapping("/users/{id}") public Mono<ResponseEntity<Users>> getUsers(@PathVariable("id") Long id) { return usersDao.findById(id) .map(acc -> new ResponseEntity<>(acc, HttpStatus.OK)) .switchIfEmpty(Mono.just(new ResponseEntity<>(null, HttpStatus.NOT_FOUND))); } @GetMapping("/users") public Flux<Users> getAllAccounts() { return usersDao.findAll(); } @PostMapping("/createUser") public Mono<ResponseEntity<Users>> createUser(@RequestBody Users user) { return usersDao.createAccount(user) .map(acc -> new ResponseEntity<>(acc, HttpStatus.CREATED)) .log(); }
最後,咱們運行一下代碼,執行下users:
curl "localhost:8080/users" [{"id":1,"firstname":"flydean","lastname":"ma"},{"id":2,"firstname":"jacken","lastname":"yu"}]%
完美,實驗成功。
本文的代碼:webflux-with-r2dbc
本文做者:flydean程序那些事本文連接:http://www.flydean.com/r2dbc-introduce/
本文來源:flydean的博客
歡迎關注個人公衆號:「程序那些事」最通俗的解讀,最深入的乾貨,最簡潔的教程,衆多你不知道的小技巧等你來發現!