Vert.x 異步訪問數據庫 MySQL

Vert.x提供異步訪問數據庫的API,數據庫操做是一個耗時操做,使用傳統的同步模型,容易阻塞線程,致使總體性能降低,所以咱們對於數據庫操做,須要使用Vert.x提供的異步API。java

Vert.x提供的API層級很是低,能夠說是僅僅在原生JDBC基礎上封裝了一層異步接口。全部的對數據庫操做都須要經過編寫SQL來完成,參數的封裝和結果的獲取都須要手動的來實現,對於習慣使用ORM框架的開發者可能會很是的不習慣。mysql

先來經過一個查詢數據庫的案例來演示如何使用Vert.x提供的異步APIsql

基本操做
1.引入數據庫依賴,咱們須要引入兩個包,一個是vertx-jdbc,另外一個是要真正鏈接數據庫的驅動包,這裏以MySQL爲例數據庫

 1 <dependency>
 2 <groupId>io.vertx</groupId>
 3 <artifactId>vertx-jdbc-client</artifactId>
 4 <version>3.6.0</version>
 5 </depend
 6 
 7 <dependency>
 8 <groupId>mysql</groupId>
 9 <artifactId>mysql-connector-java</artifactId>
10 <version>8.0.13</version>
11 </dependency>

2.抽象出一個DbUtils來方便獲取數據庫客戶端,爲了簡單,直接就將配置寫到代碼裏了json

 1 public class JdbcUtils {
 2 
 3 // 用於操做數據庫的客戶端
 4 private JDBCClient dbClient;
 5 
 6 public JdbcUtils(Vertx vertx) {
 7 
 8 // 構造數據庫的鏈接信息
 9 JsonObject dbConfig = new JsonObject();
10 dbConfig.put("url", "jdbc:mysql://192.168.40.66:3306/test");
11 dbConfig.put("driver_class", "com.mysql.jdbc.Driver");
12 dbConfig.put("user", "xxxx");
13 dbConfig.put("password", "xxxx");
14 
15 // 建立客戶端
16 dbClient = JDBCClient.createShared(vertx, dbConfig);
17 }
18 
19 // 提供一個公共方法來獲取客戶端
20 public JDBCClient getDbClient() {
21 return dbClient;
22 }
23 
24 }

經過上面的工具類,能夠快速的獲取到客戶端,看上面的代碼也很簡單,經過JsonObect構建一些基本的數據庫鏈接信息,而後經過JDBCClient的createShard方法建立一個JDBCClient實例。數組

3.進行數據庫的操做,以查詢年齡大於18歲的用戶爲例 框架

 1 public class JdbcTestVerticle extends AbstractVerticle {
 2 
 3 @Override
 4 public void start() throws Exception {
 5 
 6 // 獲取到數據庫鏈接的客戶端
 7 JDBCClient jdbcClient = new JdbcUtils(vertx).getDbClient();
 8 String sql = "select * from t_user where age > ?";
 9 // 構造參數
10 JsonArray params = new JsonArray().add(18);
11 // 執行查詢
12 jdbcClient.queryWithParams(sql, params, qryRes->{
13 if(qryRes.succeeded()) {
14 // 獲取到查詢的結果,Vert.x對ResultSet進行了封裝
15 ResultSet resultSet = qryRes.result();
16 // 把ResultSet轉爲List<JsonObject>形式
17 List<JsonObject> rows = resultSet.getRows();
18 // 輸出結果
19 System.out.println(rows);
20 } else {
21 System.out.println("查詢數據庫出錯!");
22 }
23 });
24 
25 }
26 
27 public static void main(String[] args) {
28 Vertx vertx = Vertx.vertx();
29 vertx.deployVerticle(new JdbcTestVerticle());
30 }
31 }

JsonArray是一個數組,SQL中用到的參數能夠經過構建一個JsonArray來賦值。dom

JsonObejct是一個Json對象,相似於阿里的fastjson中提供的JSONObject異步

這兩個對象在Vert.x中很是經常使用,並且很是的好用,但必定要注意空指針的問題,這是很是讓人頭疼的。ide

優化

經過上面的三個步驟,就可成功的對數據庫進行操做了,但還有些問題須要優化,好比數據庫鏈接信息放到配置文件中,再好比使用數據庫鏈接池等等。

* 使用配置文件

 1 {
 2 "default":{
 3 "url":"jdbc:mysql://localhost:3306/my_project",
 4 "driver_class":"com.mysql.cj.jdbc.Driver",
 5 "user":"root",
 6 "password":"root"
 7 },
 8 "prod":{
 9 "url":"jdbc:mysql://localhost:3306/my_project",
10 "driver_class":"com.mysql.cj.jdbc.Driver",
11 "user":"root",
12 "password":"root"
13 }
14 }
15 修改DbUtils工具類
16 
17 public class JdbcUtils {
18 
19 private JDBCClient dbClient;
20 private static JsonObject config ;
21 
22 static {
23 byte[] buff = new byte[102400];
24 try {
25 // 讀取配置文件
26 InputStream ins = new FileInputStream("db.json");
27 int i = IOUtils.read(ins, buff);
28 config = new JsonObject(new String(buff, 0, i));
29 } catch (Exception e) {
30 System.out.println("讀取配置文件失敗");
31 }
32 }
33 
34 public JdbcUtils(Vertx vertx, String dsName) {
35 JsonObject dbConfig = config.getJsonObject(dsName);
36 if(dbConfig == null) {
37 throw new RuntimeException("沒有找到指定的數據源");
38 }
39 dbClient = JDBCClient.createShared(vertx, dbConfig);
40 }
41 
42 public JdbcUtils(Vertx vertx) {
43 this(vertx, "default");
44 }
45 
46 public JDBCClient getDbClient() {
47 return dbClient;
48 }
49 
50 }

這樣就支持了多個數據源,並且數據庫鏈接配置都放到了配置文件中。

鏈接池配置
數據鏈接池默認使用的C3P0,因此能夠在db.json中進行配置C3P0鏈接池的參數就能夠了,這裏官網的地址爲:https://vertx.io/docs/vertx-jdbc-client/java/

具體配置能夠參考官網給出的配置,下面是一個簡單的截圖

 

遺憾的是,Vert.x給出的數據庫鏈接池的支持並很少,若是咱們想要使用好比阿里的Druid鏈接池,須要本身來實現DataSourceProvider。固然DataSourceProvider的實現並不複雜,但麻煩啊!後面我會給出一個關於druid的DataSourceProvider的實現。

事務
Vert.x從比較低的層面來控制事務,不像Spring同樣可使用聲明式事務管理。要想在Vert.x中開啓事務,和傳統的JDBC管理事務的方式很是相似。首先要得到到鏈接,而後調用鏈接的setAutoCommit方法,關閉事務的自動提交,而後再手動的提交和回滾事務。

由於開啓事務、提交事務、執行SQL都須要和數據庫服務進行通訊,所以在Vert.x中都是異步操做,按傳統方式實現一個事務代碼很是痛苦,看下面的一段開啓事務的代碼。寫了一遍之後,絕對不肯意再寫第二遍。

1. 得到鏈接

// 得到鏈接
jdbcClient.getConnection(con -> {
if (con.succeeded()) {
System.out.println("獲取到數據庫鏈接");

// 獲取到的鏈接對象
SQLConnection connection = con.result();
}
});

2. 設置不自動提交事務

1 // 開啓事務
2 connection.setAutoCommit(false, (v) -> {
3 if (v.succeeded()) {
4 
5 }
6 });

3.dml操做

1 // 執行更新操做
2 connection.update("sql", upRes -> {
3 if(upRes.succeed()){
4 
5 }
6 });

4. 提交事務

1 // 提交事務
2 connection.commit(rx -> {
3 if (rx.succeeded()) {
4 // 事務提交成功
5 }
6 });

 回滾事務

1 // 回滾事務
2 connection.rollback(rb -> {
3 if (rb.succeeded()) {
4 // 事務回滾成功
5 }
6 });

若是你以爲上面的還很簡單,看看下面一個完整的例子吧,把這些嵌套在一塊兒,你還以爲簡單嗎?

 1 package stu.vertx.jdbc;
 2 
 3 import io.vertx.core.AbstractVerticle;
 4 import io.vertx.core.Vertx;
 5 import io.vertx.ext.jdbc.JDBCClient;
 6 import io.vertx.ext.sql.SQLConnection;
 7 
 8 /**
 9 * 得到數據庫鏈接,執行查詢,開啓事務,執行更新操做
10 *
11 * @author <a href="https://blog.csdn.net/king_kgh>Kingh</a>
12 * @version 1.0
13 * @date 2019/4/3 9:19
14 */
15 public class GetConnection extends AbstractVerticle {
16 
17 @Override
18 public void start() throws Exception {
19 
20 JDBCClient jdbcClient = new JdbcUtils(vertx).getDbClient();
21 System.out.println("獲取到數據庫客戶端");
22 // 獲取數據庫鏈接
23 jdbcClient.getConnection(con -> {
24 if (con.succeeded()) {
25 System.out.println("獲取到數據庫鏈接");
26 
27 // 獲取到的鏈接對象
28 SQLConnection connection = con.result();
29 
30 // 執行查詢操做
31 connection.query("select * from t1", rs -> {
32 // 處理查詢結果
33 if (rs.succeeded()) {
34 System.out.println(rs.result().getRows());
35 }
36 });
37 
38 // 開啓事務
39 connection.setAutoCommit(false, (v) -> {
40 if (v.succeeded()) {
41 // 事務開啓成功 執行crud操做
42 connection.update("update t1 set name = '被修改了' where name = '111'", up -> {
43 
44 if (up.succeeded()) {
45 // 再來一筆寫操做
46 connection.update("insert into t1 values ('222','222222') ", up2 -> {
47 if (up2.succeeded()) {
48 // 提交事務
49 connection.commit(rx -> {
50 if (rx.succeeded()) {
51 // 事務提交成功
52 }
53 });
54 } else {
55 connection.rollback(rb -> {
56 if (rb.succeeded()) {
57 // 事務回滾成功
58 }
59 });
60 }
61 });
62 } else {
63 connection.rollback(rb -> {
64 if (rb.succeeded()) {
65 // 事務回滾成功
66 }
67 });
68 }
69 });
70 
71 } else {
72 System.out.println("開啓事務失敗");
73 }
74 });
75 } else {
76 System.out.println("獲取數據庫鏈接失敗");
77 }
78 });
79 
80 
81 }
82 
83 public static void main(String[] args) {
84 Vertx.vertx().deployVerticle(new GetConnection());
85 }
86 }
View Code

RxJava解決多層回調嵌套問題

上面的代碼僅僅是作了兩個寫操做,能夠說是很是的痛苦了,一層一層的嵌套,根本無法維護。那麼在真實的開發環境中,該如何管理事務呢,這就須要使用rxjava了,可以有效的減小多層嵌套帶來的問題。使用rxjava首先是須要引入rxjava的依賴

1 <dependency>
2 <groupId>io.vertx</groupId>
3 <artifactId>vertx-rx-java</artifactId>
4 <version>3.7.0</version>
5 </dependency>

完成上面案例的一樣代碼以下

  1 package stu.vertx.jdbc;
  2 
  3 import io.vertx.core.*;
  4 import io.vertx.core.json.JsonArray;
  5 import io.vertx.ext.jdbc.JDBCClient;
  6 import io.vertx.ext.sql.SQLConnection;
  7 import rx.Single;
  8 
  9 import java.util.UUID;
 10 
 11 /**
 12 *  15 */
 16 public class GetConnectionWithRxJava extends AbstractVerticle {
 17 
 18 @Override
 19 public void start() throws Exception {
 20 
 21 // 獲取JDBC客戶端
 22 JDBCClient jdbcClient = new JdbcUtils(vertx).getDbClient();
 23 
 24 getConnection(jdbcClient, con -> {
 25 if (con.succeeded()) {
 26 // 獲取到與數據庫的鏈接
 27 SQLConnection connection = con.result();
 28 
 29 // 開啓事務
 30 rxOpenTx(connection)
 31 // 執行寫操做
 32 .flatMap(this::rxExecuteUpdate1)
 33 // 執行寫操做
 34 .flatMap(this::rxExecuteUpdate2)
 35 .subscribe(ok -> {
 36 // 提交事務
 37 ok.commit(v -> {
 38 });
 39 }, err -> {
 40 // 回滾事務
 41 connection.rollback(v -> {
 42 });
 43 });
 44 }
 45 });
 46 }
 47 
 48 public Single<SQLConnection> rxOpenTx(SQLConnection connection) {
 49 return Single.create(new io.vertx.rx.java.SingleOnSubscribeAdapter<>(fut -> openTx(connection, fut)));
 50 }
 51 
 52 public Single<SQLConnection> rxExecuteUpdate1(SQLConnection connection) {
 53 return Single.create(new io.vertx.rx.java.SingleOnSubscribeAdapter<>(fut -> update1(connection, fut)));
 54 }
 55 
 56 public Single<SQLConnection> rxExecuteUpdate2(SQLConnection connection) {
 57 return Single.create(new io.vertx.rx.java.SingleOnSubscribeAdapter<>(fut -> update2(connection, fut)));
 58 }
 59 
 60 public void getConnection(JDBCClient jdbcClient, Handler<AsyncResult<SQLConnection>> resultHandler) {
 61 jdbcClient.getConnection(con -> {
 62 if (con.succeeded()) {
 63 resultHandler.handle(Future.succeededFuture(con.result()));
 64 } else {
 65 resultHandler.handle(Future.failedFuture(con.cause()));
 66 }
 67 });
 68 }
 69 
 70 public void openTx(SQLConnection connection, Handler<AsyncResult<SQLConnection>> resultHandler) {
 71 connection.setAutoCommit(false, o -> {
 72 if (o.succeeded()) {
 73 resultHandler.handle(Future.succeededFuture(connection));
 74 } else {
 75 resultHandler.handle(Future.failedFuture(o.cause()));
 76 }
 77 });
 78 }
 79 
 80 public void update1(SQLConnection connection, Handler<AsyncResult<SQLConnection>> resultHandler) {
 81 connection.updateWithParams("insert into t1 values (?,?)", new JsonArray().add(UUID.randomUUID().toString()).add(UUID.randomUUID().toString()), in -> {
 82 if (in.succeeded()) {
 83 resultHandler.handle(Future.succeededFuture(connection));
 84 } else {
 85 resultHandler.handle(Future.failedFuture(in.cause()));
 86 }
 87 });
 88 }
 89 
 90 public void update2(SQLConnection connection, Handler<AsyncResult<SQLConnection>> resultHandler) {
 91 connection.update("update t1 set name = '111' where passwd = '111'", in -> {
 92 if (in.succeeded()) {
 93 resultHandler.handle(Future.succeededFuture(connection));
 94 } else {
 95 resultHandler.handle(Future.failedFuture(in.cause()));
 96 }
 97 });
 98 }
 99 
100 public static void main(String[] args) {
101 Vertx.vertx().deployVerticle(new GetConnectionWithRxJava());
102 }
103 }

 

經過使用RxJava,沒有那麼深的嵌套層次,邏輯比較清晰。固然了,爲了一個簡單的操做,仍是須要寫不少的代碼。 

相關文章
相關標籤/搜索