pyspark是Spark對Python的api接口,能夠在Python環境中經過調用pyspark模塊來操做spark,完成大數據框架下的數據分析與挖掘。其中,數據的讀寫是基礎操做,pyspark的子模塊pyspark.sql 能夠完成大部分類型的數據讀寫。文本介紹在pyspark中讀寫Mysql數據庫。
1 軟件版本
在Python中使用Spark,須要安裝配置Spark,這裏跳過配置的過程,給出運行環境和相關程序版本信息。java
- win10 64bit
- java 13.0.1
- spark 3.0
- python 3.8
- pyspark 3.0
- pycharm 2019.3.4
2 環境配置
pyspark鏈接Mysql是經過java實現的,因此須要下載鏈接Mysql的jar包。python
下載地址:https://dev.mysql.com/downloads/mysql


選擇下載Connector/J
,而後選擇操做系統爲Platform Independent
,下載壓縮包到本地。sql


而後解壓文件,將其中的jar包mysql-connector-java-8.0.19.jar
放入spark的安裝目錄下,例如D:\spark\spark-3.0.0-preview2-bin-hadoop2.7\jars
。數據庫


環境配置完成!api
3 讀取Mysql
腳本以下:app
from pyspark.sql import SQLContext, SparkSession
if __name__ == '__main__':
# spark 初始化
spark = SparkSession. \
Builder(). \
appName('sql'). \
master('local'). \
getOrCreate()
# mysql 配置(須要修改)
prop = {'user': 'xxx',
'password': 'xxx',
'driver': 'com.mysql.cj.jdbc.Driver'}
# database 地址(須要修改)
url = 'jdbc:mysql://host:port/database'
# 讀取表
data = spark.read.jdbc(url=url, table='tb_newCity', properties=prop)
# 打印data數據類型
print(type(data))
# 展現數據
data.show()
# 關閉spark會話
spark.stop()
注意點:框架
prop
參數須要根據實際狀況修改,文中用戶名和密碼用xxx代替了,driver
參數也能夠不須要;url
參數須要根據實際狀況修改,格式爲jdbc:mysql://主機:端口/數據庫
;- 經過調用方法
read.jdbc
進行讀取,返回的數據類型爲spark DataFrame;
運行腳本,輸出以下:工具


4 寫入Mysql
腳本以下:oop
import pandas as pd
from pyspark import SparkContext
from pyspark.sql import SQLContext, Row
if __name__ == '__main__':
# spark 初始化
sc = SparkContext(master='local', appName='sql')
spark = SQLContext(sc)
# mysql 配置(須要修改)
prop = {'user': 'xxx',
'password': 'xxx',
'driver': 'com.mysql.cj.jdbc.Driver'}
# database 地址(須要修改)
url = 'jdbc:mysql://host:port/database'
# 建立spark DataFrame
# 方式1:list轉spark DataFrame
l = [(1, 12), (2, 22)]
# 建立並指定列名
list_df = spark.createDataFrame(l, schema=['id', 'value'])
# 方式2:rdd轉spark DataFrame
rdd = sc.parallelize(l) # rdd
col_names = Row('id', 'value') # 列名
tmp = rdd.map(lambda x: col_names(*x)) # 設置列名
rdd_df = spark.createDataFrame(tmp)
# 方式3:pandas dataFrame 轉spark DataFrame
df = pd.DataFrame({'id': [1, 2], 'value': [12, 22]})
pd_df = spark.createDataFrame(df)
# 寫入數據庫
pd_df.write.jdbc(url=url, table='new', mode='append', properties=prop)
# 關閉spark會話
sc.stop()
注意點:
prop
和url
參數一樣須要根據實際狀況修改;- 寫入數據庫要求的對象類型是spark DataFrame,提供了三種常見數據類型轉spark DataFrame的方法;
- 經過調用
write.jdbc
方法進行寫入,其中的model
參數控制寫入數據的行爲。


當數據庫無寫入的表時,這四種模式都會根據設定的表名稱自動建立表,無需在Mysql裏先建表。
5 常見報錯
Access denied for user ...


緣由:mysql配置參數出錯
解決辦法:檢查user,password拼寫,檢查帳號密碼是否正確,用其餘工具測試mysql是否能正常鏈接,作對比檢查。
No suitable driver


緣由:沒有配置運行環境
解決辦法:下載jar包進行配置,具體過程參考本文的2 環境配置。