SparkSQL數據源實戰篇html
做者:尹正傑java
版權聲明:原創做品,謝絕轉載!不然將追究法律責任。mysql
一.通用加載/保存方法linux
1>.spark官方默認提供的測試數據sql
[root@hadoop101.yinzhengjie.org.cn ~]# ll /yinzhengjie/softwares/spark/examples/src/main/resources/ #該目錄下是spark官方提供不一樣文件格式的測試文件 total 44 -rw-r--r-- 1 yinzhengjie yinzhengjie 130 May 30 08:02 employees.json -rw-r--r-- 1 yinzhengjie yinzhengjie 240 May 30 08:02 full_user.avsc -rw-r--r-- 1 yinzhengjie yinzhengjie 5812 May 30 08:02 kv1.txt -rw-r--r-- 1 yinzhengjie yinzhengjie 49 May 30 08:02 people.csv -rw-r--r-- 1 yinzhengjie yinzhengjie 73 May 30 08:02 people.json -rw-r--r-- 1 yinzhengjie yinzhengjie 32 May 30 08:02 people.txt -rw-r--r-- 1 yinzhengjie yinzhengjie 185 May 30 08:02 user.avsc -rw-r--r-- 1 yinzhengjie yinzhengjie 334 May 30 08:02 users.avro -rw-r--r-- 1 yinzhengjie yinzhengjie 547 May 30 08:02 users.orc -rw-r--r-- 1 yinzhengjie yinzhengjie 615 May 30 08:02 users.parquet [root@hadoop101.yinzhengjie.org.cn ~]#
2>.手動指定選項shell
Spark SQL的DataFrame接口支持多種數據源的操做。一個DataFrame能夠進行RDDs方式的操做,也能夠被註冊爲臨時表。把DataFrame註冊爲臨時表以後,就能夠對該DataFrame執行SQL查詢。
Spark SQL的默認數據源爲Parquet格式。數據源爲Parquet文件時,Spark SQL能夠方便的執行全部的操做。修改配置項spark.sql.sources.default,可修改默認數據源格式。當數據源格式不是parquet格式文件時,須要手動指定數據源的格式。
數據源格式須要指定全名(例如:org.apache.spark.sql.parquet),若是數據源格式爲內置格式,則只須要指定簡稱定json, parquet, jdbc, orc, libsvm, csv, text來指定數據的格式。
能夠經過SparkSession提供的read.load方法用於通用加載數據,使用write和save保存數據。除此以外,能夠直接運行SQL在文件上。
[root@hadoop101.yinzhengjie.org.cn ~]# spark-shell 20/07/15 01:09:53 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties Setting default log level to "WARN". To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel). Spark context Web UI available at http://hadoop101.yinzhengjie.org.cn:4040 Spark context available as 'sc' (master = local[*], app id = local-1594746601368). Spark session available as 'spark'. Welcome to ____ __ / __/__ ___ _____/ /__ _\ \/ _ \/ _ `/ __/ '_/ /___/ .__/\_,_/_/ /_/\_\ version 2.4.6 /_/ Using Scala version 2.11.12 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_201) Type in expressions to have them evaluated. Type :help for more information. scala> val df = spark.read.load("file:///yinzhengjie/softwares/spark/examples/src/main/resources/users.parquet") #加載parquet文件無需指定格式,由於默認就是parquet喲~ df: org.apache.spark.sql.DataFrame = [name: string, favorite_color: string ... 1 more field] scala> df.show +------+--------------+----------------+ | name|favorite_color|favorite_numbers| +------+--------------+----------------+ |Alyssa| null| [3, 9, 15, 20]| | Ben| red| []| +------+--------------+----------------+ scala>
scala> val df = spark.read.format("json").load("file:///yinzhengjie/softwares/spark/examples/src/main/resources/people.json") #加載json格式文件 df: org.apache.spark.sql.DataFrame = [age: bigint, name: string] scala> df.show +----+-------+ | age| name| +----+-------+ |null|Michael| | 30| Andy| | 19| Justin| +----+-------+ scala>
scala> val df = spark.read.json("file:///yinzhengjie/softwares/spark/examples/src/main/resources/people.json") #上面的一種簡寫形式,直接使用json方法來讀取json文件,無需手動指定格式 df: org.apache.spark.sql.DataFrame = [age: bigint, name: string] scala> df.show +----+-------+ | age| name| +----+-------+ |null|Michael| | 30| Andy| | 19| Justin| +----+-------+ scala>
3>.文件保存選項數據庫
能夠採用SaveMode執行存儲操做,SaveMode定義了對數據的處理模式。須要注意的是,這些保存模式不使用任何鎖定,不是原子操做。
此外,當使用Overwrite方式執行時,在輸出新數據以前原數據就已經被刪除。SaveMode詳細介紹以下表: Scala/Java Any Language Meaning SaveMode.ErrorIfExists(default) "error"(default) 若是文件存在,則報錯 SaveMode.Append "append" 追加 SaveMode.Overwrite "overwrite" 覆寫 SaveMode.Ignore "ignore" 數據存在,則忽略
scala> val df = spark.read.json("file:///yinzhengjie/softwares/spark/examples/src/main/resources/people.json") #使用spark變量來讀取json文件 df: org.apache.spark.sql.DataFrame = [age: bigint, name: string] scala> df.write.save("file:///tmp/output") #咱們經過df將數據保存到本地磁盤,默認保存格式依舊是parquet喲~ scala> [root@hadoop101.yinzhengjie.org.cn ~]# ll /tmp/output/ #查看本地磁盤文件的後綴名稱依舊是parquet,說明默認的保存格式就是parquet total 4 -rw-r--r-- 1 root root 687 Jul 15 01:34 part-00000-00ce4157-82e7-438b-a0b6-bdbaa29d0f4f-c000.snappy.parquet -rw-r--r-- 1 root root 0 Jul 15 01:34 _SUCCESS [root@hadoop101.yinzhengjie.org.cn ~]# [root@hadoop101.yinzhengjie.org.cn ~]#
scala> val df = spark.read.json("file:///yinzhengjie/softwares/spark/examples/src/main/resources/people.json") df: org.apache.spark.sql.DataFrame = [age: bigint, name: string] scala> df.write.format("json").save("file:///tmp/output2") #保存文件時咱們能夠指定保存的格式爲"json" scala> [root@hadoop101.yinzhengjie.org.cn ~]# ll /tmp/output2/ #不難發現,保存的文件格式的確是json喲~ total 4 -rw-r--r-- 1 root root 71 Jul 15 01:38 part-00000-a52115a6-aede-4a2e-aa20-d31495f4b1cb-c000.json -rw-r--r-- 1 root root 0 Jul 15 01:38 _SUCCESS [root@hadoop101.yinzhengjie.org.cn ~]#
scala> val df = spark.read.json("file:///yinzhengjie/softwares/spark/examples/src/main/resources/people.json") df: org.apache.spark.sql.DataFrame = [age: bigint, name: string] scala> df.write.format("json").save("file:///tmp/output2") #第一次保存是成功的 scala> df.write.format("json").save("file:///tmp/output2") #第二次保存到相同目錄發現報錯了,說是該目錄已經存在啦~ org.apache.spark.sql.AnalysisException: path file:/tmp/output2 already exists.; at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:114) at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:104) at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:102) at org.apache.spark.sql.execution.command.DataWritingCommandExec.doExecute(commands.scala:122) at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131) at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127) at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152) at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127) at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:83) at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:81) at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:677) at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:677) at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:80) at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:127) at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:75) at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:677) at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:286) at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:272) at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:230) ... 49 elided scala> df.write.format("json").mode("append").save("file:///tmp/output2") #保存時咱們能夠指定模式爲追加("append"),這樣即便和上一次保存的路徑相同也不會報錯目錄已存在的狀況。 scala> [root@hadoop101.yinzhengjie.org.cn ~]# ll /tmp/output2/ #第一次保存目錄時的文件 total 4 -rw-r--r-- 1 root root 71 Jul 15 01:38 part-00000-a52115a6-aede-4a2e-aa20-d31495f4b1cb-c000.json -rw-r--r-- 1 root root 0 Jul 15 01:38 _SUCCESS [root@hadoop101.yinzhengjie.org.cn ~]# [root@hadoop101.yinzhengjie.org.cn ~]# [root@hadoop101.yinzhengjie.org.cn ~]# ll /tmp/output2/ #第二次保存目錄成功的文件 total 8 -rw-r--r-- 1 root root 71 Jul 15 01:38 part-00000-a52115a6-aede-4a2e-aa20-d31495f4b1cb-c000.json -rw-r--r-- 1 root root 71 Jul 15 01:42 part-00000-a668e03a-c098-4eb9-b44d-c195c6557ec0-c000.json -rw-r--r-- 1 root root 0 Jul 15 01:42 _SUCCESS [root@hadoop101.yinzhengjie.org.cn ~]# [root@hadoop101.yinzhengjie.org.cn ~]#
scala> val df = spark.read.json("file:///yinzhengjie/softwares/spark/examples/src/main/resources/people.json") df: org.apache.spark.sql.DataFrame = [age: bigint, name: string] scala> df.write.format("json").save("file:///tmp/output2") scala> df.write.format("json").mode("append").save("file:///tmp/output2") scala> df.write.format("json").mode("overwrite").save("file:///tmp/output2") #以覆蓋的模式寫入指定路徑,該路徑以前若存儲的有數據會被清空喲~ scala> [root@hadoop101.yinzhengjie.org.cn ~]# ll /tmp/output2/ #第一次寫入 total 4 -rw-r--r-- 1 root root 71 Jul 15 01:38 part-00000-a52115a6-aede-4a2e-aa20-d31495f4b1cb-c000.json -rw-r--r-- 1 root root 0 Jul 15 01:38 _SUCCESS [root@hadoop101.yinzhengjie.org.cn ~]# [root@hadoop101.yinzhengjie.org.cn ~]# [root@hadoop101.yinzhengjie.org.cn ~]# ll /tmp/output2/ #第二次追加寫入 total 8 -rw-r--r-- 1 root root 71 Jul 15 01:38 part-00000-a52115a6-aede-4a2e-aa20-d31495f4b1cb-c000.json -rw-r--r-- 1 root root 71 Jul 15 01:42 part-00000-a668e03a-c098-4eb9-b44d-c195c6557ec0-c000.json -rw-r--r-- 1 root root 0 Jul 15 01:42 _SUCCESS [root@hadoop101.yinzhengjie.org.cn ~]# [root@hadoop101.yinzhengjie.org.cn ~]# ll /tmp/output2/ #第三次覆蓋寫入 total 4 -rw-r--r-- 1 root root 71 Jul 15 02:01 part-00000-7ea74899-2f1d-43bd-8c63-4ba17032974b-c000.json -rw-r--r-- 1 root root 0 Jul 15 02:01 _SUCCESS [root@hadoop101.yinzhengjie.org.cn ~]# [root@hadoop101.yinzhengjie.org.cn ~]#
二.JSON文件express
Spark SQL可以自動推測 JSON數據集的結構,並將它加載爲一個Dataset[Row]. 我們能夠經過SparkSession.read.json()去加載一個JSON文件。
舒適提示:
這個JSON文件不是一個傳統的JSON文件,每一行都得是一個JSON串。
[root@hadoop101.yinzhengjie.org.cn ~]# vim /tmp/user.json [root@hadoop101.yinzhengjie.org.cn ~]# [root@hadoop101.yinzhengjie.org.cn ~]# cat /tmp/user.json {"name":"yinzhengjie","passwd":"2020","age":18} {"name":"Jason","passwd":"666666","age":27} {"name":"Liming","passwd":"123","age":49} {"name":"Jenny","passwd":"456","age":23} {"name":"Danny","passwd":"789","age":56} [root@hadoop101.yinzhengjie.org.cn ~]# [root@hadoop101.yinzhengjie.org.cn ~]#
scala> val path = "/tmp/user.json" path: String = /tmp/user.json scala> val userDF = spark.read.json(path) #讀取json文件 userDF: org.apache.spark.sql.DataFrame = [age: bigint, name: string ... 1 more field] scala> userDF.createOrReplaceTempView("users") #建立臨時視圖 scala> val teenagerNamesDF = spark.sql("SELECT name FROM users WHERE age BETWEEN 13 AND 19") #根據視圖執行SQL teenagerNamesDF: org.apache.spark.sql.DataFrame = [name: string] scala> teenagerNamesDF.show() #展現查詢的結果 +-----------+ | name| +-----------+ |yinzhengjie| +-----------+ scala>
三.Parquet文件apache
Parquet是一種流行的列式存儲格式,能夠高效地存儲具備嵌套字段的記錄。
Parquet格式常常在Hadoop生態圈中被使用,它也支持Spark SQL的所有數據類型。
Spark SQL提供了直接讀取和存儲Parquet格式文件的方法。
scala> val peopleDF = spark.read.json("file:///yinzhengjie/softwares/spark/examples/src/main/resources/people.json") #讀取本地json文件 peopleDF: org.apache.spark.sql.DataFrame = [age: bigint, name: string] scala> peopleDF.write.parquet("hdfs://hadoop101.yinzhengjie.org.cn:9000/yinzhengjie/spark/resources/people.parquet") #將讀取的內容保存到hdfs上並指定格式爲parquet scala> val parquetFileDF = spark.read.parquet("hdfs://hadoop101.yinzhengjie.org.cn:9000/yinzhengjie/spark/resources/people.parquet") #從hdfs中讀取文件 parquetFileDF: org.apache.spark.sql.DataFrame = [age: bigint, name: string] scala> parquetFileDF.createOrReplaceTempView("parquetFile") #建立臨時視圖 scala> val namesDF = spark.sql("SELECT name FROM parquetFile WHERE age BETWEEN 13 AND 19") #根據建立的臨時視圖執行SQL查詢 namesDF: org.apache.spark.sql.DataFrame = [name: string] scala> namesDF.map(attributes => "Name: " + attributes(0)).show() #展現查詢結果 +------------+ | value| +------------+ |Name: Justin| +------------+ scala> [root@hadoop101.yinzhengjie.org.cn ~]# ll /yinzhengjie/softwares/spark/examples/src/main/resources/people.json #該文件爲spark官方提供 -rw-r--r-- 1 yinzhengjie yinzhengjie 73 May 30 08:02 /yinzhengjie/softwares/spark/examples/src/main/resources/people.json [root@hadoop101.yinzhengjie.org.cn ~]# [root@hadoop101.yinzhengjie.org.cn ~]# [root@hadoop101.yinzhengjie.org.cn ~]# cat /yinzhengjie/softwares/spark/examples/src/main/resources/people.json {"name":"Michael"} {"name":"Andy", "age":30} {"name":"Justin", "age":19} [root@hadoop101.yinzhengjie.org.cn ~]# [root@hadoop101.yinzhengjie.org.cn ~]# [root@hadoop101.yinzhengjie.org.cn ~]# hdfs dfs -ls /yinzhengjie/spark/resources/people.parquet #數據被寫入到hdfs集羣,所以須要保證你的Hadoop集羣時啓動着的。 Found 2 items -rw-r--r-- 3 root supergroup 0 2020-07-15 02:21 /yinzhengjie/spark/resources/people.parquet/_SUCCESS -rw-r--r-- 3 root supergroup 687 2020-07-15 02:21 /yinzhengjie/spark/resources/people.parquet/part-00000-3ead21bf-d453-4161-8d6f-08c069d4cb50-c000.snappy.parquet [root@hadoop101.yinzhengjie.org.cn ~]#
四.JDBCjson
Spark SQL能夠經過JDBC從關係型數據庫中讀取數據的方式建立DataFrame,經過對DataFrame一系列的計算後,還能夠將數據再寫回關係型數據庫中。
舒適提示:
須要將相關的數據庫驅動放到spark的類路徑下。
1>.建立MySQL數據庫
安裝MariaDB數據庫: [root@hadoop101.yinzhengjie.org.cn ~]# yum -y install mariadb-server 將數據庫設置爲開機自啓動: [root@hadoop101.yinzhengjie.org.cn ~]# systemctl enable mariadb Created symlink from /etc/systemd/system/multi-user.target.wants/mariadb.service to /usr/lib/systemd/system/mariadb.service. [root@hadoop101.yinzhengjie.org.cn ~]# [root@hadoop101.yinzhengjie.org.cn ~]# systemctl start mariadb [root@hadoop101.yinzhengjie.org.cn ~]# 登陸數據庫,建立spark數據庫並受權用戶登陸: MariaDB [(none)]> CREATE SCHEMA IF NOT EXISTS spark DEFAULT CHARACTER SET = utf8mb4; Query OK, 1 row affected (0.00 sec) MariaDB [(none)]> MariaDB [(none)]> CREATE USER jason@'172.200.%' IDENTIFIED BY 'yinzhengjie'; Query OK, 0 rows affected (0.00 sec) MariaDB [(none)]> MariaDB [(none)]> GRANT ALL ON spark.* TO jason@'172.200.%'; Query OK, 0 rows affected (0.00 sec) MariaDB [(none)]>
[root@hadoop105.yinzhengjie.org.cn ~]# mysql -u jason -pyinzhengjie -h 172.200.4.101 Welcome to the MariaDB monitor. Commands end with ; or \g. Your MariaDB connection id is 7 Server version: 5.5.65-MariaDB MariaDB Server Copyright (c) 2000, 2018, Oracle, MariaDB Corporation Ab and others. Type 'help;' or '\h' for help. Type '\c' to clear the current input statement. MariaDB [(none)]> show databases; +--------------------+ | Database | +--------------------+ | information_schema | | spark | | test | +--------------------+ 3 rows in set (0.01 sec) MariaDB [(none)]> MariaDB [(none)]> quit Bye [root@hadoop105.yinzhengjie.org.cn ~]#
2>.將相關的數據庫驅動放到spark的類路徑下
[root@hadoop105.yinzhengjie.org.cn ~]# ll total 188288 -rw-r--r-- 1 root root 8409 Dec 12 2018 jce_policy-8.zip -rw-r--r-- 1 root root 191817140 Mar 25 2019 jdk-8u201-linux-x64.tar.gz -rw-r--r-- 1 root root 972009 Mar 1 22:52 mysql-connector-java-5.1.36-bin.jar drwxrwxr-x 2 root root 24 Jan 21 01:36 UnlimitedJCEPolicyJDK8 [root@hadoop105.yinzhengjie.org.cn ~]# [root@hadoop105.yinzhengjie.org.cn ~]# cp mysql-connector-java-5.1.36-bin.jar /yinzhengjie/softwares/spark/jars/ [root@hadoop105.yinzhengjie.org.cn ~]# [root@hadoop105.yinzhengjie.org.cn ~]# ll /yinzhengjie/softwares/spark/jars/ | grep mysql -rw-r--r-- 1 root root 972009 Jul 15 03:33 mysql-connector-java-5.1.36-bin.jar [root@hadoop105.yinzhengjie.org.cn ~]#
3>.從Mysql數據庫加載數據方式一
[root@hadoop105.yinzhengjie.org.cn ~]# spark-shell 20/07/15 03:33:49 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties Setting default log level to "WARN". To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel). Spark context Web UI available at http://hadoop105.yinzhengjie.org.cn:4040 Spark context available as 'sc' (master = local[*], app id = local-1594755234548). Spark session available as 'spark'. Welcome to ____ __ / __/__ ___ _____/ /__ _\ \/ _ \/ _ `/ __/ '_/ /___/ .__/\_,_/_/ /_/\_\ version 2.4.6 /_/ Using Scala version 2.11.12 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_201) Type in expressions to have them evaluated. Type :help for more information. scala> val jdbcDF = spark.read.format("jdbc").option("url", "jdbc:mysql://hadoop101.yinzhengjie.org.cn:3306/spark").option("dbtable", "users").option("user", "jason").option("password", "yinzhengjie").load() jdbcDF: org.apache.spark.sql.DataFrame = [id: bigint, name: string ... 2 more fields] scala> jdbcDF.show +---+-----------+---+------+ | id| name|age|passwd| +---+-----------+---+------+ | 1|yinzhengjie| 18| 2020| | 2| Jason| 27|666666| | 3| Liming| 49| 123| | 4| Jenny| 23| 456| | 5| Danny| 56| 789| +---+-----------+---+------+ scala>
4>.從Mysql數據庫加載數據方式二
[root@hadoop105.yinzhengjie.org.cn ~]# spark-shell 20/07/15 03:39:03 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties Setting default log level to "WARN". To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel). Spark context Web UI available at http://hadoop105.yinzhengjie.org.cn:4040 Spark context available as 'sc' (master = local[*], app id = local-1594755548606). Spark session available as 'spark'. Welcome to ____ __ / __/__ ___ _____/ /__ _\ \/ _ \/ _ `/ __/ '_/ /___/ .__/\_,_/_/ /_/\_\ version 2.4.6 /_/ Using Scala version 2.11.12 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_201) Type in expressions to have them evaluated. Type :help for more information. scala> val connectionProperties = new java.util.Properties() connectionProperties: java.util.Properties = {} scala> connectionProperties.put("user", "jason") res0: Object = null scala> connectionProperties.put("password", "yinzhengjie") res1: Object = null scala> val jdbcDF2 = spark.read.jdbc("jdbc:mysql://hadoop101.yinzhengjie.org.cn:3306/spark", "users", connectionProperties) jdbcDF2: org.apache.spark.sql.DataFrame = [id: bigint, name: string ... 2 more fields] scala> jdbcDF2.show +---+-----------+---+------+ | id| name|age|passwd| +---+-----------+---+------+ | 1|yinzhengjie| 18| 2020| | 2| Jason| 27|666666| | 3| Liming| 49| 123| | 4| Jenny| 23| 456| | 5| Danny| 56| 789| +---+-----------+---+------+ scala>
5>.將數據寫入Mysql方式一
直接使用讀取的對象進行操做: scala> jdbcDF.write.format("jdbc").option("url", "jdbc:mysql://hadoop101.yinzhengjie.org.cn:3306/spark").option("dbtable", "users2").option("user", "jason").option("password", "yinzhengjie").save() scala>
觀察MySQL數據庫的變化以下: [root@hadoop105.yinzhengjie.org.cn ~]# mysql -ujason -pyinzhengjie -h 172.200.4.101 Welcome to the MariaDB monitor. Commands end with ; or \g. Your MariaDB connection id is 20 Server version: 5.5.65-MariaDB MariaDB Server Copyright (c) 2000, 2018, Oracle, MariaDB Corporation Ab and others. Type 'help;' or '\h' for help. Type '\c' to clear the current input statement. MariaDB [(none)]> use spark Reading table information for completion of table and column names You can turn off this feature to get a quicker startup with -A Database changed MariaDB [spark]> MariaDB [spark]> show tables; +-----------------+ | Tables_in_spark | +-----------------+ | users | +-----------------+ 1 row in set (0.00 sec) MariaDB [spark]> MariaDB [spark]> MariaDB [spark]> show tables; +-----------------+ | Tables_in_spark | +-----------------+ | users | | users2 | +-----------------+ 2 rows in set (0.00 sec) MariaDB [spark]> select * from users2; +------+-------------+------+--------+ | id | name | age | passwd | +------+-------------+------+--------+ | 1 | yinzhengjie | 18 | 2020 | | 2 | Jason | 27 | 666666 | | 3 | Liming | 49 | 123 | | 4 | Jenny | 23 | 456 | | 5 | Danny | 56 | 789 | +------+-------------+------+--------+ 5 rows in set (0.00 sec) MariaDB [spark]>
6>.將數據寫入Mysql方式二
直接使用讀取的對象進行操做: scala> jdbcDF2.write.jdbc("jdbc:mysql://hadoop101.yinzhengjie.org.cn:3306/spark", "users3", connectionProperties) scala>
觀察MySQL數據庫的變化以下: MariaDB [spark]> show tables; +-----------------+ | Tables_in_spark | +-----------------+ | users | | users2 | +-----------------+ 2 rows in set (0.00 sec) MariaDB [spark]> select * from users2; +------+-------------+------+--------+ | id | name | age | passwd | +------+-------------+------+--------+ | 1 | yinzhengjie | 18 | 2020 | | 2 | Jason | 27 | 666666 | | 3 | Liming | 49 | 123 | | 4 | Jenny | 23 | 456 | | 5 | Danny | 56 | 789 | +------+-------------+------+--------+ 5 rows in set (0.00 sec) MariaDB [spark]> MariaDB [spark]> show tables; +-----------------+ | Tables_in_spark | +-----------------+ | users | | users2 | | users3 | +-----------------+ 3 rows in set (0.00 sec) MariaDB [spark]> MariaDB [spark]> select * from users3; +------+-------------+------+--------+ | id | name | age | passwd | +------+-------------+------+--------+ | 1 | yinzhengjie | 18 | 2020 | | 2 | Jason | 27 | 666666 | | 3 | Liming | 49 | 123 | | 4 | Jenny | 23 | 456 | | 5 | Danny | 56 | 789 | +------+-------------+------+--------+ 5 rows in set (0.00 sec)
五.SparkSQL數據源-Hive數據庫
博主推薦閱讀: https://www.cnblogs.com/yinzhengjie2020/p/13216504.html