PY => PySpark-Spark SQL

Spark SQL

Spark SQL 分爲三類:python

  1. SQL
  2. DataFrame (參考pandas,但略有不一樣)
  3. Datasets (因爲python是動態的,因此不支持python)

初始環境:

import findspark
findspark.init()

from pyspark.sql import SparkSession        
spark = SparkSession.builder.appName('myspark').getOrCreate()    # 初始化session
# spark.sparkContext.parallelize([1,2,3,4]).collect()  # 裏面包含以前說過的sparkContext
...
中間這部分留給下面寫
...
spark.stop()         # 關閉 session

從json導入爲df:mysql

df = spark.read.json("file:///home/lin/data/user.json",multiLine=True)

打印DF字段信息:git

df.printSchema() 

root
 |-- age: long (nullable = true)
 |-- gender: string (nullable = true)
 |-- name: string (nullable = true)

CRUD

sql

from pyspark.sql import functions as f

# schema就至關於 pandas 指定的 columns, 雙層序列,  [2x4] 的樣本
df1 = spark.createDataFrame([[1,2,3,4],[5,6,7,8]],schema=['1_c','2_c', '3_c', '4_c'])
+-----+-----+-----+-----+
|1\_col|2\_col|3\_col|4\_col|
+-----+-----+-----+-----+
|    1|    2|    3|    4|
|    5|    6|    7|    8|
+-----+-----+-----+-----+

# lit能夠在指定空列的時候,指定 null值, 或者 int型(裏面有不少類型,能夠發現)
# df2 = df1.withColumn('null_col', f.lit(None)).withColumn('digit_col', f.lit(2))
df2 = df1.withColumn('5_col', df1['4_col']+1)   # 在原來列字段基礎上。
df2.show()

數據庫

df2 = df1.drop('age')        # 刪除 age列
df2 = df1.dropna()           # 刪除空行
df2 = df1.drop_duplicates()  # 刪除重複-行

json

和"增",差很少,只不過字段,指定爲原有字段字符串便可。

segmentfault

下面的講的(投影、過濾、排序、分組),幾乎都是查。

投影

投影全部:session

df.show(n=20)        # 默認就是 n=20 只返回 前20條記錄

+---+------+--------+
|age|gender|    name|
+---+------+--------+
| 18|   man|zhangsan|
+---+------+--------+

選中某列投影:app

df.select('name','age').show()        # 若直接寫 '*', 和直接 df.show()是一個效果

+--------+---+
|    name|age|
+--------+---+
|zhangsan| 18|
+--------+---+

或者用另兩種方式投影(投影過程可計算):函數

df.select(df['name'],df['age']+20).show() # 同上,這是另外一種寫法,注意一下列名
df.select(df.name, df.age+20).show()      # 同上,這是另二種寫法,注意一下列名

+--------+----------+
|    name|(age + 20)|
+--------+----------+
|zhangsan|        38|
+--------+----------+

取出前N條DF,並轉化爲 [ {} , {} ] 格式

df_user.take(1)  # [Row(age=18, name='張三')]
df_user.head(1)  # [Row(age=18, name='張三')]
df_user.first()  # Row(age=18, name='張三')    # 注意,無列表

排序

df_user.head(1)
df_user.sort(df_user.name.desc()).show()

# 另外說明一點, df的每一個熟悉都有,一些操做符函數, desc()就是一種操做符函數

過濾:

df.filter( df['age'] > 15).show()

+---+------+----+
|age|gender|name|
+---+------+----+
+---+------+----+

分組:

df.groupBy('name').count().show()

+--------+-----+
|    name|count|
+--------+-----+
|zhangsan|    1|
+--------+-----+

Join

df_user.join(df_user, on=df_user.name==df_user.name, how='inner').show()

+----+---+----+---+
|name|age|name|age|
+----+---+----+---+
|李四| 20|李四| 20 |
|張三| 18|張三| 18 |
+----+---+----+---+
# 特別提醒, 此 Join, 只要都進來是 DF格式的任何數據庫,均可 Join
# 好比: MySQL 和 Hive  ,  Json 也可。

儲存爲臨時視圖(表), 並調用sql語句:

df.createOrReplaceTempView('user')                  # 建立爲 user臨時視圖
df_sql = spark.sql('select * from user').show()     # spark.sql返回的仍是df, 因此要show()

+---+------+--------+
|age|gender|    name|
+---+------+--------+
| 18|   man|zhangsan|
+---+------+--------+

RDD 與 DF互轉

RDD -> DF

### RDD -> DF 須要把RDD作成兩種格式(任選其一)
### 第一種 Row 格式 
    from pyspark import Row
    rdd_user = spark.sparkContext.parallelize( [('張三',18), ('李四',20)] )
    rdd_user_row = rdd_user.map(lambda x:Row(name=x[0], age=x[1]))
    print(rdd_user_row.collect()) # [Row(age=18, name='張三'), Row(age=20, name='李四')]
    df_user = spark.createDataFrame(rdd_user_row)   
    
### 第二種 [('張三', 18),('李四', 20)]
    rdd_user = spark.sparkContext.parallelize( [('張三',18), ('李四',20)] )
    df_user = rdd_user.toDF(['name', 'age'])        # 給定列名
df_user.show()

DF -> RDD

rdd_row = df_user.rdd.map(lambda x: x.asDict())  # 或者 x.name, x.age取值
rdd_row.collect()  # [{'age': 18, 'name': '張三'}, {'age': 20, 'name': '李四'}]

CSV讀寫

從HDFS中讀取(咱們先新建一個CSV並扔到HDFS中),

vi mydata.csv:
    name,age
    zhangsan,18
    lisi, 20

hadoop fs -mkdir /data                 # 在HDFS中新建一個目錄 /data
hadoop fs -put mydata.csv /data        # 並把本地 mydata.csv扔進去 (-get可拿出來)

在代碼中讀取 HDFS數據:

df = spark.read.csv("hdfs:///data/mydata.csv", header=True)  
df.show()
# header=True 表明, csv文件的第一行做爲csv的擡頭(列名)
# df.write.csv("hdfs:///data/mydata.csv", header=True)   # read改成write就變成了寫

Hive讀寫

Hive的配置與依賴以前講過了(最值得注意的是須要先啓動一個 metadata的服務)
先驗傳送門:http://www.javashuo.com/article/p-oqbazfpv-k.html

import findspark
findspark.init()

from pyspark.sql import SparkSession   

spark = SparkSession.builder\
    .appName("Spark Hive Example")\
    .master("local[*]")\
    .config("hive.metastore.uris", "thrift://localhost:9083")\
    .enableHiveSupport()\
    .getOrCreate()

spark.sql("use mydatabase")        # 執行Hive 的 SQL, 切換數據庫(前提你得有)

讀:

df = spark.table('person').show()  # 直接對錶操做 (注意,sql語句也可)

寫:

df = spark.table('person')
df2 = df.withColumn('nickname', df.name)  # 稍微變更一下,添一個字段
df2.write.saveAsTable("new_person")       # 寫入新表

MySQL讀寫

讀:

# 注意0:有好幾種方式,我只列舉一個 成對的讀寫配置。
# 注意1: url中 "hive"是數據庫名. 你也能夠起爲別的名
# 注意2:table的值--"TBLS",  它是 MySQL中"hive庫"中的一個表。
# 注意3:特別注意! TBLS不是咱們想要的表。他只是一個大表,管理了咱們hive的信息
#        TBLS中的 一個列屬性 "TBL_NAME" 纔是真正咱們須要的表!! 

df = spark.read.jdbc(
    url='jdbc:mysql://localhost:3306/hive?user=root&password=123',
    table="TBLS",
    properties={"driver":"com.mysql.jdbc.Driver"},
)

df.show()
df.select("TBL_NAME").show()

寫:

df.write.jdbc(
    url='jdbc:mysql://localhost:3306/hive?user=root&password=123',
    table="new_table",
    mode='append',
    properties={"driver": "com.mysql.jdbc.Driver"}
)

# 同是特別注意: 和讀同樣, 它寫入的新表,也是一個總體的表結構。
#     此表的一個列"TBL_NAME",它纔對應了咱們真正要操做的表
相關文章
相關標籤/搜索