pyspark對Mysql數據庫進行讀寫 - 知乎

pyspark是Spark對Python的api接口,能夠在Python環境中經過調用pyspark模塊來操做spark,完成大數據框架下的數據分析與挖掘。其中,數據的讀寫是基礎操做,pyspark的子模塊pyspark.sql 能夠完成大部分類型的數據讀寫。文本介紹在pyspark中讀寫Mysql數據庫。

1 軟件版本

在Python中使用Spark,須要安裝配置Spark,這裏跳過配置的過程,給出運行環境和相關程序版本信息。java

  • win10 64bit
  • java 13.0.1
  • spark 3.0
  • python 3.8
  • pyspark 3.0
  • pycharm 2019.3.4

2 環境配置

pyspark鏈接Mysql是經過java實現的,因此須要下載鏈接Mysql的jar包。python

下載地址:https://dev.mysql.com/downloads/mysql

選擇下載Connector/J,而後選擇操做系統爲Platform Independent,下載壓縮包到本地。sql

而後解壓文件,將其中的jar包mysql-connector-java-8.0.19.jar放入spark的安裝目錄下,例如D:\spark\spark-3.0.0-preview2-bin-hadoop2.7\jars數據庫

環境配置完成!api

3 讀取Mysql

腳本以下:app

from pyspark.sql import SQLContext, SparkSession

if __name__ == '__main__':
    # spark 初始化
    spark = SparkSession. \
        Builder(). \
        appName('sql'). \
        master('local'). \
        getOrCreate()
    # mysql 配置(須要修改)
    prop = {'user': 'xxx', 
            'password': 'xxx', 
            'driver': 'com.mysql.cj.jdbc.Driver'}
    # database 地址(須要修改)
    url = 'jdbc:mysql://host:port/database'
    # 讀取表
    data = spark.read.jdbc(url=url, table='tb_newCity', properties=prop)
    # 打印data數據類型
    print(type(data))
    # 展現數據
    data.show()
    # 關閉spark會話
    spark.stop()

注意點:框架

  1. prop參數須要根據實際狀況修改,文中用戶名和密碼用xxx代替了,driver參數也能夠不須要;
  2. url參數須要根據實際狀況修改,格式爲jdbc:mysql://主機:端口/數據庫
  3. 經過調用方法read.jdbc進行讀取,返回的數據類型爲spark DataFrame;

運行腳本,輸出以下:工具

4 寫入Mysql

腳本以下:oop

import pandas as pd
from pyspark import SparkContext
from pyspark.sql import SQLContext, Row

if __name__ == '__main__':
    # spark 初始化
    sc = SparkContext(master='local', appName='sql')
    spark = SQLContext(sc)
    # mysql 配置(須要修改)
    prop = {'user': 'xxx',
            'password': 'xxx',
            'driver': 'com.mysql.cj.jdbc.Driver'}
    # database 地址(須要修改)
    url = 'jdbc:mysql://host:port/database'

    # 建立spark DataFrame
    # 方式1:list轉spark DataFrame
    l = [(1, 12), (2, 22)]
    # 建立並指定列名
    list_df = spark.createDataFrame(l, schema=['id', 'value']) 
    
    # 方式2:rdd轉spark DataFrame
    rdd = sc.parallelize(l)  # rdd
    col_names = Row('id', 'value')  # 列名
    tmp = rdd.map(lambda x: col_names(*x))  # 設置列名
    rdd_df = spark.createDataFrame(tmp)  
    
    # 方式3:pandas dataFrame 轉spark DataFrame
    df = pd.DataFrame({'id': [1, 2], 'value': [12, 22]})
    pd_df = spark.createDataFrame(df)

    # 寫入數據庫
    pd_df.write.jdbc(url=url, table='new', mode='append', properties=prop)
    # 關閉spark會話
    sc.stop()

注意點:

  1. propurl參數一樣須要根據實際狀況修改;
  2. 寫入數據庫要求的對象類型是spark DataFrame,提供了三種常見數據類型轉spark DataFrame的方法;
  3. 經過調用write.jdbc方法進行寫入,其中的model參數控制寫入數據的行爲。

當數據庫無寫入的表時,這四種模式都會根據設定的表名稱自動建立表,無需在Mysql裏先建表。

5 常見報錯

Access denied for user ...

緣由:mysql配置參數出錯

解決辦法:檢查user,password拼寫,檢查帳號密碼是否正確,用其餘工具測試mysql是否能正常鏈接,作對比檢查。


No suitable driver

緣由:沒有配置運行環境

解決辦法:下載jar包進行配置,具體過程參考本文的2 環境配置


http://weixin.qq.com/r/mShLU4rECNd3rc4w932L (二維碼自動識別)

相關文章
相關標籤/搜索