Spark SQL 分爲三類:python
import findspark findspark.init() from pyspark.sql import SparkSession spark = SparkSession.builder.appName('myspark').getOrCreate() # 初始化session # spark.sparkContext.parallelize([1,2,3,4]).collect() # 裏面包含以前說過的sparkContext ... 中間這部分留給下面寫 ... spark.stop() # 關閉 session
從json導入爲df:mysql
df = spark.read.json("file:///home/lin/data/user.json",multiLine=True)
打印DF字段信息:git
df.printSchema() root |-- age: long (nullable = true) |-- gender: string (nullable = true) |-- name: string (nullable = true)
增sql
from pyspark.sql import functions as f # schema就至關於 pandas 指定的 columns, 雙層序列, [2x4] 的樣本 df1 = spark.createDataFrame([[1,2,3,4],[5,6,7,8]],schema=['1_c','2_c', '3_c', '4_c']) +-----+-----+-----+-----+ |1\_col|2\_col|3\_col|4\_col| +-----+-----+-----+-----+ | 1| 2| 3| 4| | 5| 6| 7| 8| +-----+-----+-----+-----+ # lit能夠在指定空列的時候,指定 null值, 或者 int型(裏面有不少類型,能夠發現) # df2 = df1.withColumn('null_col', f.lit(None)).withColumn('digit_col', f.lit(2)) df2 = df1.withColumn('5_col', df1['4_col']+1) # 在原來列字段基礎上。 df2.show()
刪數據庫
df2 = df1.drop('age') # 刪除 age列 df2 = df1.dropna() # 刪除空行 df2 = df1.drop_duplicates() # 刪除重複-行
改json
和"增",差很少,只不過字段,指定爲原有字段字符串便可。
查segmentfault
下面的講的(投影、過濾、排序、分組),幾乎都是查。
投影全部:session
df.show(n=20) # 默認就是 n=20 只返回 前20條記錄 +---+------+--------+ |age|gender| name| +---+------+--------+ | 18| man|zhangsan| +---+------+--------+
選中某列投影:app
df.select('name','age').show() # 若直接寫 '*', 和直接 df.show()是一個效果 +--------+---+ | name|age| +--------+---+ |zhangsan| 18| +--------+---+
或者用另兩種方式投影(投影過程可計算):函數
df.select(df['name'],df['age']+20).show() # 同上,這是另外一種寫法,注意一下列名 df.select(df.name, df.age+20).show() # 同上,這是另二種寫法,注意一下列名 +--------+----------+ | name|(age + 20)| +--------+----------+ |zhangsan| 38| +--------+----------+
取出前N條DF,並轉化爲 [ {} , {} ] 格式
df_user.take(1) # [Row(age=18, name='張三')] df_user.head(1) # [Row(age=18, name='張三')] df_user.first() # Row(age=18, name='張三') # 注意,無列表
df_user.head(1) df_user.sort(df_user.name.desc()).show() # 另外說明一點, df的每一個熟悉都有,一些操做符函數, desc()就是一種操做符函數
df.filter( df['age'] > 15).show() +---+------+----+ |age|gender|name| +---+------+----+ +---+------+----+
df.groupBy('name').count().show() +--------+-----+ | name|count| +--------+-----+ |zhangsan| 1| +--------+-----+
df_user.join(df_user, on=df_user.name==df_user.name, how='inner').show() +----+---+----+---+ |name|age|name|age| +----+---+----+---+ |李四| 20|李四| 20 | |張三| 18|張三| 18 | +----+---+----+---+ # 特別提醒, 此 Join, 只要都進來是 DF格式的任何數據庫,均可 Join # 好比: MySQL 和 Hive , Json 也可。
df.createOrReplaceTempView('user') # 建立爲 user臨時視圖 df_sql = spark.sql('select * from user').show() # spark.sql返回的仍是df, 因此要show() +---+------+--------+ |age|gender| name| +---+------+--------+ | 18| man|zhangsan| +---+------+--------+
RDD -> DF
### RDD -> DF 須要把RDD作成兩種格式(任選其一) ### 第一種 Row 格式 from pyspark import Row rdd_user = spark.sparkContext.parallelize( [('張三',18), ('李四',20)] ) rdd_user_row = rdd_user.map(lambda x:Row(name=x[0], age=x[1])) print(rdd_user_row.collect()) # [Row(age=18, name='張三'), Row(age=20, name='李四')] df_user = spark.createDataFrame(rdd_user_row) ### 第二種 [('張三', 18),('李四', 20)] rdd_user = spark.sparkContext.parallelize( [('張三',18), ('李四',20)] ) df_user = rdd_user.toDF(['name', 'age']) # 給定列名 df_user.show()
DF -> RDD
rdd_row = df_user.rdd.map(lambda x: x.asDict()) # 或者 x.name, x.age取值 rdd_row.collect() # [{'age': 18, 'name': '張三'}, {'age': 20, 'name': '李四'}]
從HDFS中讀取(咱們先新建一個CSV並扔到HDFS中),
vi mydata.csv: name,age zhangsan,18 lisi, 20 hadoop fs -mkdir /data # 在HDFS中新建一個目錄 /data hadoop fs -put mydata.csv /data # 並把本地 mydata.csv扔進去 (-get可拿出來)
在代碼中讀取 HDFS數據:
df = spark.read.csv("hdfs:///data/mydata.csv", header=True) df.show() # header=True 表明, csv文件的第一行做爲csv的擡頭(列名) # df.write.csv("hdfs:///data/mydata.csv", header=True) # read改成write就變成了寫
Hive的配置與依賴以前講過了(最值得注意的是須要先啓動一個 metadata的服務)
先驗傳送門:http://www.javashuo.com/article/p-oqbazfpv-k.html
import findspark findspark.init() from pyspark.sql import SparkSession spark = SparkSession.builder\ .appName("Spark Hive Example")\ .master("local[*]")\ .config("hive.metastore.uris", "thrift://localhost:9083")\ .enableHiveSupport()\ .getOrCreate() spark.sql("use mydatabase") # 執行Hive 的 SQL, 切換數據庫(前提你得有)
讀:
df = spark.table('person').show() # 直接對錶操做 (注意,sql語句也可)
寫:
df = spark.table('person') df2 = df.withColumn('nickname', df.name) # 稍微變更一下,添一個字段 df2.write.saveAsTable("new_person") # 寫入新表
讀:
# 注意0:有好幾種方式,我只列舉一個 成對的讀寫配置。 # 注意1: url中 "hive"是數據庫名. 你也能夠起爲別的名 # 注意2:table的值--"TBLS", 它是 MySQL中"hive庫"中的一個表。 # 注意3:特別注意! TBLS不是咱們想要的表。他只是一個大表,管理了咱們hive的信息 # TBLS中的 一個列屬性 "TBL_NAME" 纔是真正咱們須要的表!! df = spark.read.jdbc( url='jdbc:mysql://localhost:3306/hive?user=root&password=123', table="TBLS", properties={"driver":"com.mysql.jdbc.Driver"}, ) df.show() df.select("TBL_NAME").show()
寫:
df.write.jdbc( url='jdbc:mysql://localhost:3306/hive?user=root&password=123', table="new_table", mode='append', properties={"driver": "com.mysql.jdbc.Driver"} ) # 同是特別注意: 和讀同樣, 它寫入的新表,也是一個總體的表結構。 # 此表的一個列"TBL_NAME",它纔對應了咱們真正要操做的表