Spark JDBC To MySQL

mysql jdbc driver下載地址
https://dev.mysql.com/downloads/connector/j/

在spark中使用jdbc
1.在 spark-env.sh 文件中加入:
export SPARK_CLASSPATH=/path/mysql-connector-java-5.1.42.jar
2.任務提交時加入:
--jars /path/mysql-connector-java-5.1.42.jar


從Spark Shell鏈接到MySQL:
spark-shell --jars "/path/mysql-connector-java-5.1.42.jar

可使用Data Sources API未來自遠程數據庫的表做爲DataFrame或Spark SQL臨時視圖加載。用戶能夠在數據源選項中指定JDBC鏈接屬性。

可使用Data Sources API未來自遠程數據庫的表做爲DataFrame或Spark SQL臨時視圖加載。用戶能夠在數據源選項中指定JDBC鏈接屬性。 user而且password一般做爲用於登陸數據源的鏈接屬性提供。除了鏈接屬性外,Spark還支持如下不區分大小寫的選項:

JDBC connection properties
屬性名稱和含義
url:要鏈接的JDBC URL。列如:jdbc:mysql://ip:3306
dbtable:應該讀取的JDBC表。可使用括號中的子查詢代替完整表。
driver:用於鏈接到此URL的JDBC驅動程序的類名,列如:com.mysql.jdbc.Driver

partitionColumn, lowerBound, upperBound, numPartitions
這些options僅適用於read數據。這些options必須同時被指定。他們描述,如何從多個workers並行讀取數據時,分割表。
partitionColumn:必須是表中的數字列。
lowerBound和upperBound僅用於決定分區的大小,而不是用於過濾表中的行。
表中的全部行將被分割並返回。

fetchsize:僅適用於read數據。JDBC提取大小,用於肯定每次獲取的行數。這能夠幫助JDBC驅動程序調優性能,這些驅動程序默認具備較低的提取大小(例如,Oracle每次提取10行)。

batchsize:僅適用於write數據。JDBC批量大小,用於肯定每次insert的行數。
這能夠幫助JDBC驅動程序調優性能。默認爲1000。

isolationLevel:僅適用於write數據。事務隔離級別,適用於當前鏈接。它能夠是一個NONE,READ_COMMITTED,READ_UNCOMMITTED,REPEATABLE_READ,或SERIALIZABLE,對應於由JDBC的鏈接對象定義,缺省值爲標準事務隔離級別READ_UNCOMMITTED。請參閱文檔java.sql.Connection。

truncate:僅適用於write數據。當SaveMode.Overwrite啓用時,此選項會truncate在MySQL中的表,而不是刪除,再重建其現有的表。這能夠更有效,而且防止表元數據(例如,索引)被去除。可是,在某些狀況下,例如當新數據具備不一樣的模式時,它將沒法工做。它默認爲false。

createTableOptions:僅適用於write數據。此選項容許在建立表(例如CREATE TABLE t (name string) ENGINE=InnoDB.)時設置特定的數據庫表和分區選項。java



spark jdbc read MySQL mysql

val jdbcDF11 = spark.read.format("jdbc")
      .option("driver", "com.mysql.jdbc.Driver")
      .option("url", "jdbc:mysql://ip:3306")
      .option("dbtable", "db.user_test")
      .option("user", "test")
      .option("password", "123456")
      .option("fetchsize", "3")
      .load()
jdbcDF11.show

val jdbcDF12 = spark.read.format("jdbc").options(
      Map(
        "driver" -> "com.mysql.jdbc.Driver",
        "url" -> "jdbc:mysql://ip:3306",
        "dbtable" -> "db.user_test",
        "user" -> "test",
        "password" -> "123456",
        "fetchsize" -> "3")).load()
jdbcDF12.show

 

 

jdbc(url: String, table: String, properties: Properties): DataFramesql

//-----------------------------------

import java.util.Properties

// jdbc(url: String, table: String, properties: Properties): DataFrame

val readConnProperties1 = new Properties()
readConnProperties1.put("driver", "com.mysql.jdbc.Driver")
readConnProperties1.put("user", "test")
readConnProperties1.put("password", "123456")
readConnProperties1.put("fetchsize", "3")

val jdbcDF1 = spark.read.jdbc(
  "jdbc:mysql://ip:3306",
  "db.user_test",
  readConnProperties1)

jdbcDF1.show
+---+------+---+
|uid|gender|age|
+---+------+---+
|  2|     2| 20|
|  3|     1| 30|
|  4|     2| 40|
|  5|     1| 50|
|  6|     2| 60|
|  7|     1| 25|
|  8|     2| 35|
|  9|     1| 70|
| 10|     2| 80|
|  1|     1| 18|
+---+------+---+


//默認並行度爲1
jdbcDF1.rdd.partitions.size
Int = 1

//-------------------------
    
// jdbc(url: String, table: String, properties: Properties): DataFrame

val readConnProperties4 = new Properties()
readConnProperties4.put("driver", "com.mysql.jdbc.Driver")
readConnProperties4.put("user", "test")
readConnProperties4.put("password", "123456")
readConnProperties4.put("fetchsize", "3")


val jdbcDF4 = spark.read.jdbc(
  "jdbc:mysql://ip:3306",
  "(select * from db.user_test where gender=1) t",  // 注意括號和表別名,必須得有,這裏能夠過濾數據
  readConnProperties4)
  
jdbcDF4.show
+---+------+---+
|uid|gender|age|
+---+------+---+
|  3|     1| 30|
|  5|     1| 50|
|  7|     1| 25|
|  9|     1| 70|
|  1|     1| 18|
+---+------+---+

 

 

 

jdbc(url: String, table: String,
     columnName: String, lowerBound: Long, upperBound: Long, numPartitions: Int,
     connectionProperties: Properties): DataFrame
shell

	 
import java.util.Properties

val readConnProperties2 = new Properties()
readConnProperties2.put("driver", "com.mysql.jdbc.Driver")
readConnProperties2.put("user", "test")
readConnProperties2.put("password", "123456")
readConnProperties2.put("fetchsize", "2")

val columnName = "uid"
val lowerBound = 1
val upperBound = 6
val numPartitions = 3

val jdbcDF2 = spark.read.jdbc(
  "jdbc:mysql://ip:3306",
  "db.user_test",
  columnName,
  lowerBound,
  upperBound,
  numPartitions,
  readConnProperties2)

jdbcDF2.show
+---+------+---+
|uid|gender|age|
+---+------+---+
|  2|     2| 20|
|  1|     1| 18|
|  3|     1| 30|
|  4|     2| 40|
|  5|     1| 50|
|  6|     2| 60|
|  7|     1| 25|
|  8|     2| 35|
|  9|     1| 70|
| 10|     2| 80|
+---+------+---+

// 並行度爲3,對應於numPartitions
jdbcDF2.rdd.partitions.size
Int = 3

 

 

 

jdbc(url: String, table: String, predicates: Array[String], connectionProperties: Properties): DataFrame
predicates: Condition in the WHERE clause for each partition.數據庫

import java.util.Properties

val readConnProperties3 = new Properties()
readConnProperties3.put("driver", "com.mysql.jdbc.Driver")
readConnProperties3.put("user", "test")
readConnProperties3.put("password", "123456")
readConnProperties3.put("fetchsize", "2")

val arr = Array(
  (1, 50),
  (2, 60))

// 此處的條件,既能夠分割數據用做並行度,也能夠過濾數據
val predicates = arr.map {
  case (gender, age) =>
    s" gender = $gender " + s" AND age < $age "
}

val predicates1 =
  Array(
    "2017-05-01" -> "2017-05-20",
    "2017-06-01" -> "2017-06-05").map {
      case (start, end) =>
        s"cast(create_time as date) >= date '$start' " + s"AND cast(create_time as date) <= date '$end'"
    }

val jdbcDF3 = spark.read.jdbc(
  "jdbc:mysql://ip:3306",
  "db.user_test",
  predicates,
  readConnProperties3)



jdbcDF3.show
+---+------+---+
|uid|gender|age|
+---+------+---+
|  3|     1| 30|
|  7|     1| 25|
|  1|     1| 18|
|  2|     2| 20|
|  4|     2| 40|
|  8|     2| 35|
+---+------+---+

// 並行度爲2,對應arr數組中元素的個數
jdbcDF3.rdd.partitions.size
Int = 2

 

 

spark jdbc write MySQL數組

// For implicit conversions like converting RDDs to DataFrames
import spark.implicits._

val dataList: List[(Double, String, Double, Double, String, Double, Double, Double, Double)] = List(
  (0, "male", 37, 10, "no", 3, 18, 7, 4),
  (0, "female", 27, 4, "no", 4, 14, 6, 4),
  (0, "female", 32, 15, "yes", 1, 12, 1, 4),
  (0, "male", 57, 15, "yes", 5, 18, 6, 5),
  (0, "male", 22, 0.75, "no", 2, 17, 6, 3),
  (0, "female", 32, 1.5, "no", 2, 17, 5, 5),
  (0, "female", 22, 0.75, "no", 2, 12, 1, 3),
  (0, "male", 57, 15, "yes", 2, 14, 4, 4),
  (0, "female", 32, 15, "yes", 4, 16, 1, 2))

val colArray: Array[String] = Array("affairs", "gender", "age", "yearsmarried", "children", "religiousness", "education", "occupation", "rating")

val df = dataList.toDF(colArray: _*)

df.write.mode("overwrite").format("jdbc").options(
  Map(
    "driver" -> "com.mysql.jdbc.Driver",
    "url" -> "jdbc:mysql://ip:3306",
    "dbtable" -> "db.affairs",
    "user" -> "test",
    "password" -> "123456",
    "batchsize" -> "1000",
    "truncate" -> "true")).save()
相關文章
相關標籤/搜索