Reactive MySQL Client

Reactive MySQL Client是MySQL的客戶端,具備直觀的API,側重於可伸縮性和低開銷。javascript

特徵html

  • 事件驅動java

  • 輕量級mysql

  • 內置鏈接池sql

  • 準備好的查詢緩存api

  • 遊標支持緩存

  • 行流服務器

  • RxJava 1和RxJava 2架構

  • 將內存直接寫入對象而不須要沒必要要app

  • Java 8日期和時間

  • MySQL實用程序命令支持

  • 兼容MySQL 5.6和5.7

用法

要使用Reactive MySQL Client,請將如下依賴項添加到構建描述符dependencies部分:

  • Maven(在你的pom.xml):

<dependency> <groupId>io.vertx</groupId> <artifactId>vertx-mysql-client</artifactId> <version>3.8.0</version> </dependency>
  • Gradle(在您的build.gradle文件中):

dependencies {
 compile 'io.vertx:vertx-mysql-client:3.8.0' }

入門

這是鏈接,查詢和斷開鏈接的最簡單方法

MySQLConnectOptions connectOptions = new MySQLConnectOptions() .setPort(3306) .setHost("the-host") .setDatabase("the-db") .setUser("user") .setPassword("secret"); // Pool options PoolOptions poolOptions = new PoolOptions() .setMaxSize(5); // Create the client pool MySQLPool client = MySQLPool.pool(connectOptions, poolOptions); // A simple query client.query("SELECT * FROM users WHERE id='julien'", ar -> { if (ar.succeeded()) { RowSet result = ar.result(); System.out.println("Got " + result.size() + " rows "); } else { System.out.println("Failure: " + ar.cause().getMessage()); } // Now close the pool client.close(); });

鏈接到MySQL

大多數狀況下,您將使用池鏈接到MySQL:

MySQLConnectOptions connectOptions = new MySQLConnectOptions() .setPort(3306) .setHost("the-host") .setDatabase("the-db") .setUser("user") .setPassword("secret"); // Pool options PoolOptions poolOptions = new PoolOptions() .setMaxSize(5); // Create the pooled client MySQLPool client = MySQLPool.pool(connectOptions, poolOptions);

池化客戶端使用鏈接池,任何操做都將從池中借用鏈接以執行操做並將其釋放到池中。

若是您使用Vert.x運行,能夠將Vertx實例傳遞給它:

MySQLConnectOptions connectOptions = new MySQLConnectOptions() .setPort(3306) .setHost("the-host") .setDatabase("the-db") .setUser("user") .setPassword("secret"); // Pool options PoolOptions poolOptions = new PoolOptions() .setMaxSize(5); // Create the pooled client MySQLPool client = MySQLPool.pool(vertx, connectOptions, poolOptions);

您須要在再也不須要時釋放池:

pool.close();

當您須要在同一鏈接上執行多個操做時,您須要使用客戶端 connection

您能夠從游泳池輕鬆得到一個:

MySQLConnectOptions connectOptions = new MySQLConnectOptions() .setPort(3306) .setHost("the-host") .setDatabase("the-db") .setUser("user") .setPassword("secret"); // Pool options PoolOptions poolOptions = new PoolOptions() .setMaxSize(5); // Create the pooled client MySQLPool client = MySQLPool.pool(vertx, connectOptions, poolOptions); // Get a connection from the pool client.getConnection(ar1 -> { if (ar1.succeeded()) { System.out.println("Connected"); // Obtain our connection SqlConnection conn = ar1.result(); // All operations execute on the same connection conn.query("SELECT * FROM users WHERE id='julien'", ar2 -> { if (ar2.succeeded()) { conn.query("SELECT * FROM users WHERE id='emad'", ar3 -> { // Release the connection to the pool conn.close(); }); } else { // Release the connection to the pool conn.close(); } }); } else { System.out.println("Could not connect: " + ar1.cause().getMessage()); } });

完成鏈接後,必須將其關閉以將其釋放到池中,以即可以重複使用。

組態

您可使用多種方法來配置客戶端。

數據對象

配置客戶端的一種簡單方法是指定MySQLConnectOptions數據對象。

MySQLConnectOptions connectOptions = new MySQLConnectOptions() .setPort(3306) .setHost("the-host") .setDatabase("the-db") .setUser("user") .setPassword("secret"); // Pool Options PoolOptions poolOptions = new PoolOptions().setMaxSize(5); // Create the pool from the data object MySQLPool pool = MySQLPool.pool(vertx, connectOptions, poolOptions); pool.getConnection(ar -> { // Handling your connection });

您還可使用setPropertiesaddProperty方法配置鏈接屬性注意setProperties將覆蓋默認的客戶端屬性。

MySQLConnectOptions connectOptions = new MySQLConnectOptions(); // Add a connection attribute connectOptions.addProperty("_java_version", "1.8.0_212"); // Override the attributes Map<String, String> attributes = new HashMap<>(); attributes.put("_client_name", "myapp"); attributes.put("_client_version", "1.0.0"); connectOptions.setProperties(attributes);

有關客戶端鏈接屬性的更多信息,請參閱MySQL參考手冊

鏈接uri

除了使用MySQLConnectOptions數據對象進行配置以外,當您要使用鏈接URI進行配置時,咱們還爲您提供了另外一種鏈接方式:

String connectionUri = "mysql://dbuser:secretpassword@database.server.com:3211/mydb"; // Create the pool from the connection URI MySQLPool pool = MySQLPool.pool(connectionUri); // Create the connection from the connection URI MySQLConnection.connect(vertx, connectionUri, res -> { // Handling your connection });

有關鏈接字符串格式的更多信息,請參閱MySQL參考手冊

目前,客戶端支持鏈接uri中的如下參數關鍵字

  • 主辦

  • 港口

  • 用戶

  • 密碼

  • 模式

  • 插座

運行查詢

當您不須要事務或運行單個查詢時,您能夠直接在池上運行查詢; 池將使用其中一個鏈接來運行查詢並將結果返回給您。

如下是如何運行簡單查詢:

client.query("SELECT * FROM users WHERE id='julien'", ar -> { if (ar.succeeded()) { RowSet result = ar.result(); System.out.println("Got " + result.size() + " rows "); } else { System.out.println("Failure: " + ar.cause().getMessage()); } });

您可使用準備好的查詢執行相同操做。

SQL字符串能夠經過位置參考參數,使用$1$2等...

client.preparedQuery("SELECT * FROM users WHERE id=?", Tuple.of("julien"), ar -> { if (ar.succeeded()) { RowSet rows = ar.result(); System.out.println("Got " + rows.size() + " rows "); } else { System.out.println("Failure: " + ar.cause().getMessage()); } });

查詢方法提供了一個RowSet適用於SELECT查詢的異步實例

client.preparedQuery("SELECT first_name, last_name FROM users", ar -> { if (ar.succeeded()) { RowSet rows = ar.result(); for (Row row : rows) { System.out.println("User " + row.getString(0) + " " + row.getString(1)); } } else { System.out.println("Failure: " + ar.cause().getMessage()); } });

UPDATE / INSERT查詢:

client.preparedQuery("INSERT INTO users (first_name, last_name) VALUES (?, ?)", Tuple.of("Julien", "Viet"), ar -> { if (ar.succeeded()) { RowSet rows = ar.result(); System.out.println(rows.rowCount()); } else { System.out.println("Failure: " + ar.cause().getMessage()); } });

Row讓你經過索引訪問您的數據

System.out.println("User " + row.getString(0) + " " + row.getString(1));

或按名稱

System.out.println("User " + row.getString("first_name") + " " + row.getString("last_name"));

您能夠訪問各類類型

String firstName = row.getString("first_name"); Boolean male = row.getBoolean("male"); Integer age = row.getInteger("age");

您能夠執行準備批處理

 

您能夠緩存準備好的查詢:

 

您能夠在查詢中使用「RETURNING」子句獲取生成的鍵:

 

使用鏈接

當您須要執行順序查詢(沒有事務)時,您能夠建立新鏈接或從池中借用一個鏈接:

pool.getConnection(ar1 -> {
  if (ar1.succeeded()) { SqlConnection connection = ar1.result(); connection.query("SELECT * FROM users WHERE id='julien'", ar2 -> { if (ar1.succeeded()) { connection.query("SELECT * FROM users WHERE id='paulo'", ar3 -> { // Do something with rows and return the connection to the pool connection.close(); }); } else { // Return the connection to the pool connection.close(); } }); } });

能夠建立準備好的查詢:

connection.prepare("SELECT * FROM users WHERE first_name LIKE ?", ar1 -> { if (ar1.succeeded()) { PreparedQuery pq = ar1.result(); pq.execute(Tuple.of("julien"), ar2 -> { if (ar2.succeeded()) { // All rows RowSet rows = ar2.result(); } }); } });
注意
準備好的查詢緩存取決於setCachePreparedStatements而且不依賴於您是建立準備好的查詢仍是使用direct prepared queries

PreparedQuery 能夠執行有效的批處理:

 

遊標和流媒體

默認狀況下,準備好的查詢執行會獲取全部行,您可使用a Cursor來控制要讀取的行數:

connection.prepare("SELECT * FROM users WHERE age > ?", ar1 -> { if (ar1.succeeded()) { PreparedQuery pq = ar1.result(); // Create a cursor Cursor cursor = pq.cursor(Tuple.of(18)); // Read 50 rows cursor.read(50, ar2 -> { if (ar2.succeeded()) { RowSet rows = ar2.result(); // Check for more ? if (cursor.hasMore()) { // Repeat the process... } else { // No more rows - close the cursor cursor.close(); } } }); } });

PostreSQL在事務結束時銷燬遊標,所以遊標API將在事務中使用,不然您可能會收到34000PostgreSQL錯誤。

遊標過早釋放時應關閉:

cursor.read(50, ar2 -> { if (ar2.succeeded()) { // Close the cursor cursor.close(); } });

流API也可用於遊標,這能夠更方便,特別是使用Rxified版本。

connection.prepare("SELECT * FROM users WHERE age > ?", ar1 -> { if (ar1.succeeded()) { PreparedQuery pq = ar1.result(); // Fetch 50 rows at a time RowStream<Row> stream = pq.createStream(50, Tuple.of(18)); // Use the stream stream.exceptionHandler(err -> { System.out.println("Error: " + err.getMessage()); }); stream.endHandler(v -> { System.out.println("End of stream"); }); stream.handler(row -> { System.out.println("User: " + row.getString("last_name")); }); } });

流按批次讀取行50並流式傳輸,當行已傳遞給處理程序時,將50讀取新批次,依此類推。

流能夠恢復或暫停,加載的行將保留在內存中,直到它們被傳遞而且光標將中止迭代。

MySQL類型映射

目前,客戶端支持如下MySQL類型

  • BOOL,BOOLEAN(java.lang.Byte

  • TINYINT(java.lang.Byte

  • SMALLINT(java.lang.Short

  • MEDIUMINT(java.lang.Integer

  • INT,INTEGER(java.lang.Integer

  • BIGINT(java.lang.Long

  • FLOAT(java.lang.Float

  • DOUBLE(java.lang.Double

  • NUMERIC(io.vertx.sqlclient.data.Numeric

  • 日期(java.time.LocalDate

  • DATETIME(java.time.LocalDateTime

  • 時間(java.time.Duration

  • TIMESTAMP(java.time.LocalDateTime

  • 年(java.lang.Short

  • CHAR(java.lang.String

  • VARCHAR(java.lang.String

  • BINARY(io.vertx.core.buffer.Buffer

  • VARBINARY(io.vertx.core.buffer.Buffer

  • TINYBLOB(io.vertx.core.buffer.Buffer

  • TINYTEXT(java.lang.String

  • BLOB(io.vertx.core.buffer.Buffer

  • 文字(java.lang.String

  • MEDIUMBLOB(io.vertx.core.buffer.Buffer

  • MEDIUMTEXT(java.lang.String

  • LONGBLOB(io.vertx.core.buffer.Buffer

  • LONGTEXT(java.lang.String

元組解碼在存儲值時使用上述類型

處理BOOLEAN

在MySQL中BOOLEANBOOL數據類型是同義詞TINYINT(1)零值被視爲假,非零值被視爲真。一個BOOLEAN數據類型值存儲在RowTuple做爲java.lang.Byte類型,你能夠調用Row#getValue來檢索它的java.lang.Byte值,或者能夠稱之爲Row#getBoolean檢索它java.lang.Boolean的價值。

client.query("SELECT graduated FROM students WHERE id = 0", ar -> { if (ar.succeeded()) { RowSet rowSet = ar.result(); for (Row row : rowSet) { int pos = row.getColumnIndex("graduated"); Byte value = row.get(Byte.class, pos); Boolean graduated = row.getBoolean("graduated"); } } else { System.out.println("Failure: " + ar.cause().getMessage()); } });

若是要使用BOOLEAN的參數執行預準備語句,只需將該java.lang.Boolean添加到參數列表便可。

client.preparedQuery("UPDATE students SET graduated = ? WHERE id = 0", Tuple.of(true), ar -> { if (ar.succeeded()) { System.out.println("Updated with the boolean value"); } else { System.out.println("Failure: " + ar.cause().getMessage()); } });

處理NUMERIC

NumericJava類型用於表示MySQL的NUMERIC類型。

Numeric numeric = row.get(Numeric.class, 0); if (numeric.isNaN()) { // Handle NaN } else { BigDecimal value = numeric.bigDecimalValue(); }

收集器查詢

您能夠將Java收集器與查詢API一塊兒使用:

Collector<Row, ?, Map<Long, String>> collector = Collectors.toMap( row -> row.getLong("id"), row -> row.getString("last_name")); // Run the query with the collector client.query("SELECT * FROM users", collector, ar -> { if (ar.succeeded()) { SqlResult<Map<Long, String>> result = ar.result(); // Get the map created by the collector Map<Long, String> map = result.value(); System.out.println("Got " + map); } else { System.out.println("Failure: " + ar.cause().getMessage()); } });

收集器處理不能保留對它的引用,Row由於有一行用於處理整個集合。

Java Collectors提供了許多有趣的預約義收集器,例如,您能夠直接從行集建立一個字符串:

Collector<Row, ?, String> collector = Collectors.mapping( row -> row.getString("last_name"), Collectors.joining(",", "(", ")") ); // Run the query with the collector client.query("SELECT * FROM users", collector, ar -> { if (ar.succeeded()) { SqlResult<String> result = ar.result(); // Get the string created by the collector String list = result.value(); System.out.println("Got " + list); } else { System.out.println("Failure: " + ar.cause().getMessage()); } });

MySQL實用程序命令

有時您想使用MySQL實用程序命令,咱們爲此提供支持。能夠在MySQL實用程序命令中找到更多信息

COM_PING

您可使用COM_PING命令檢查服務器是否處於活動狀態。若是服務器響應PING,將通知處理程序,不然將永遠不會調用處理程序。

connection.ping(ar -> {
  System.out.println("The server has responded to the PING"); });

COM_RESET_CONNECTION

您可使用COM_RESET_CONNECTION命令重置會話狀態,這將重置鏈接狀態,如: - 用戶變量 - 臨時表 - 預準備語句

connection.resetConnection(ar -> {
  if (ar.succeeded()) { System.out.println("Connection has been reset now"); } else { System.out.println("Failure: " + ar.cause().getMessage()); } });

COM_CHANGE_USER

您能夠更改當前鏈接的用戶,這將執行從新身份驗證並重置鏈接狀態COM_RESET_CONNECTION

MySQLConnectOptions authenticationOptions = new MySQLConnectOptions() .setUser("newuser") .setPassword("newpassword") .setDatabase("newdatabase"); connection.changeUser(authenticationOptions, ar -> { if (ar.succeeded()) { System.out.println("User of current connection has been changed."); } else { System.out.println("Failure: " + ar.cause().getMessage()); } });

COM_INIT_DB

您可使用COM_INIT_DB命令更改鏈接的默認架構。

connection.specifySchema("newschema", ar -> { if (ar.succeeded()) { System.out.println("Default schema changed to newschema"); } else { System.out.println("Failure: " + ar.cause().getMessage()); } });

COM_STATISTICS

您可使用COM_STATISTICS命令在MySQL服務器中獲取一些人類可讀的內部狀態變量字符串。

connection.getInternalStatistics(ar -> {
  if (ar.succeeded()) { System.out.println("Statistics: " + ar.result()); } else { System.out.println("Failure: " + ar.cause().getMessage()); } });

COM_DEBUG

您可使用COM_DEBUG命令將調試信息轉儲到MySQL服務器的STDOUT。

connection.debug(ar -> {
  if (ar.succeeded()) { System.out.println("Debug info dumped to server's STDOUT"); } else { System.out.println("Failure: " + ar.cause().getMessage()); } });

COM_SET_OPTION

您可使用COM_SET_OPTION命令爲當前鏈接設置選項。目前只能CLIENT_MULTI_STATEMENTS設置。

例如,您能夠CLIENT_MULTI_STATEMENTS使用此命令禁用

connection.setOption(MySQLSetOption.MYSQL_OPTION_MULTI_STATEMENTS_OFF, ar -> { if (ar.succeeded()) { System.out.println("CLIENT_MULTI_STATEMENTS is off now"); } else { System.out.println("Failure: " + ar.cause().getMessage()); } });
相關文章
相關標籤/搜索