PySpark操做

基本操做:html

 

運行時獲取spark版本號(以spark 2.0.0爲例):python

sparksn = SparkSession.builder.appName("PythonSQL").getOrCreate()

print sparksn.versionmysql

 

獲取spark配置狀況(crossJoin等等):sql

df = spark.sql("SET -v")

df.show()數據庫

 

顯示每列的全部內容,不刪減內容顯示,show每列全部內容app

df.show(truncate=False)

 

建立和轉換格式:函數

 

Pandas和Spark的DataFrame二者互相轉換:fetch

pandas_df = spark_df.toPandas() spark_df = sqlContext.createDataFrame(pandas_df)

 

與Spark RDD的相互轉換:ui

rdd_df = df.rdd df = rdd_df.toDF()

注:rdd轉df前提是每一個rdd的類型都是Row類型url

 

 

增:

 

新增列:

df.withColumn(「xx」, 0).show() 會報錯,由於原來沒有xx列
 

from pyspark.sql import functions

df = df.withColumn(「xx」, functions.lit(0)).show()

 

fillna函數:

df.na.fill()

 

以原有列爲基礎添加列:

df = df.withColumn('count20', df["count"] - 20)  # 新列爲原有列的數據減去20

 

增長序列標籤:

df.rdd.zipWithIndex()

參考資料:stackoverflow

 

刪:

 

刪除一列:

df.drop('age').collect()

df.drop(df.age).collect()

 

dropna函數:

df = df.na.drop()  # 扔掉任何列包含na的行
df = df.dropna(subset=['col_name1', 'col_name2'])  # 扔掉col1或col2中任一一列包含na的行

 

 

改:

 

修改原有df[「xx」]列的全部值:

df = df.withColumn(「xx」, 1)

 

修改列的類型(類型投射):

df = df.withColumn("year2", df["year1"].cast("Int"))

 

合併2個表的join方法:

 df_join = df_left.join(df_right, df_left.key == df_right.key, "inner")

其中,方法能夠爲:`inner`, `outer`, `left_outer`, `right_outer`, `leftsemi`.

 

groupBy方法整合:

GroupedData = df.groupBy(「age」)

應用單個函數(按照A列同名的進行分組,組內對B列進行均值計算來合併):

df.groupBy(「A」).avg(「B」).show()

 

應用多個函數:

from pyspark.sql import functions

df.groupBy(「A」).agg(functions.avg(「B」), functions.min(「B」), functions.max(「B」)).show()

整合後GroupedData類型可用的方法(均返回DataFrame類型):

avg(*cols)     ——   計算每組中一列或多列的平均值

count()          ——   計算每組中一共有多少行,返回DataFrame有2列,一列爲分組的組名,另外一列爲行總數

max(*cols)    ——   計算每組中一列或多列的最大值

mean(*cols)  ——  計算每組中一列或多列的平均值

min(*cols)     ——  計算每組中一列或多列的最小值

sum(*cols)    ——   計算每組中一列或多列的總和

 

【函數應用】將df的每一列應用函數f:

df.foreach(f) 或者 df.rdd.foreach(f)

 

【函數應用】將df的每一塊應用函數f:

df.foreachPartition(f) 或者 df.rdd.foreachPartition(f)

 

【Map和Reduce應用】返回類型seqRDDs

df.map(func)
df.reduce(func)

 

解決toDF()跑出First 100 rows類型沒法肯定的異常,能夠採用將Row內每一個元素都統一轉格式,或者判斷格式處理的方法,解決包含None類型時轉換成DataFrame出錯的問題:

    @staticmethod

    def map_convert_none_to_str(row):

        dict_row = row.asDict()

 

        for key in dict_row:

            if key != 'some_column_name':

                value = dict_row[key]

                if value is None:

                    value_in = str("")

                else:

                    value_in = str(value)

                dict_row[key] = value_in

 

        columns = dict_row.keys()

        v = dict_row.values()

        row = Row(*columns)

        return row(*v)

 

 

查:

解決中文亂碼問題(python 2.7方案)

import sys

reload(sys)

sys.setdefaultencoding('utf-8')

 

行元素查詢操做:

像SQL那樣打印列表前20元素(show函數內可用int類型指定要打印的行數):

df.show()
df.show(30)

 

以樹的形式打印概要

df.printSchema()

 

獲取頭幾行到本地:

list = df.head(3)   # Example: [Row(a=1, b=1), Row(a=2, b=2), ... ...]
list = df.take(5)   # Example: [Row(a=1, b=1), Row(a=2, b=2), ... ...]

 

輸出list類型,list中每一個元素是Row類:

list = df.collect()

注:此方法將全部數據所有導入到本地

 

查詢總行數:

 int_num = df.count()

 

查詢某列爲null的行:

from pyspark.sql.functions import isnull
df = df.filter(isnull("col_a"))

 

 

列元素操做:

獲取Row元素的全部列名:

r = Row(age=11, name='Alice')
print r.__fields__    #  ['age', 'name']

 

選擇一列或多列:

df.select(「name」)

df.select(df[‘name’], df[‘age’]+1)

df.select(df.a, df.b, df.c)    # 選擇a、b、c三列

df.select(df["a"], df["b"], df["c"])    # 選擇a、b、c三列

 

排序:

df = df.sort("age", ascending=False)

 

過濾數據(filter和where方法相同):

df = df.filter(df['age']>21)
df = df.where(df['age']>21)

# 對null或nan數據進行過濾:
from pyspark.sql.functions import isnan, isnull
df = df.filter(isnull("a"))  # 把a列裏面數據爲null的篩選出來(表明python的None類型)
df = df.filter(isnan("a"))  # 把a列裏面數據爲nan的篩選出來(Not a Number,非數字數據)

 

 

SQL操做:

DataFrame註冊成SQL的表:

df.createOrReplaceTempView("TBL1")

 

進行SQL查詢(返回DataFrame):

conf = SparkConf()
ss = SparkSession.builder.appName("APP_NAME").config(conf=conf).getOrCreate()

df = ss.sql(「SELECT name, age FROM TBL1 WHERE age >= 13 AND age <= 19″)

 

對HIVE數據庫的操做:

首先,開啓對Hive數據庫支持的開關:

spark = SparkSession.builder.appName("app1").enableHiveSupport().getOrCreate() 

接着,用INSERT代替write.parquet命令操做HIVE數據庫

df = spark.read.parquet(some_path)

df.createOrReplaceTempView("some_df_tmp_table")

sql_content= "    INSERT INTO `some_schema`.`some_hive_table`    " \

                      "    SELECT a,b,c from some_df_tmp_table                   "

spark.sql(sql_content)

 

 

 

時間序列操做:

 

先按某幾列分組,再按時間段分組:

from pyspark.sql.functions import window
 

win_monday = window("col1", "1 week", startTime="4 day")

GroupedData = df.groupBy([df.col2, df.col3, df.col4, win_monday])

 

 

 

 

參考資料:

Spark與Pandas中DataFrame對比(詳細)

使用Apache Spark讓MySQL查詢速度提高10倍以上

傳統MySQL查詢(執行時間 19 min 16.58 sec):

mysql> 

 

SELECT

    MIN(yearD),

    MAX(yearD) AS max_year,

    Carrier,

    COUNT(*) AS cnt,

    SUM(IF(ArrDelayMinutes > 30, 1, 0)) AS flights_delayed,

    ROUND(SUM(IF(ArrDelayMinutes > 30, 1, 0)) / COUNT(*),2) AS rate

FROM

    ontime_part

WHERE

    DayOfWeek NOT IN (6 , 7)

        AND OriginState NOT IN ('AK' , 'HI', 'PR', 'VI')

        AND DestState NOT IN ('AK' , 'HI', 'PR', 'VI')

GROUP BY carrier

HAVING cnt > 1000 AND max_year > '1990'

ORDER BY rate DESC , cnt DESC

LIMIT 10;

 

使用Scala語言摘寫的Spark查詢(執行時間 2 min 19.628 sec):

scala>

val jdbcDF = spark.read.format("jdbc").options(Map("url" ->  "jdbc:mysql://localhost:3306/ontime?user=root&password=mysql",

                                                   "dbtable" -> "ontime.ontime_sm",     

                                                   "fetchSize" -> "10000",

                                                   "partitionColumn" -> "yeard",

                                                   "lowerBound" -> "1988",

                                                   "upperBound" -> "2015",

                                                   "numPartitions" -> "48")).load()

jdbcDF.createOrReplaceTempView("ontime")

val sqlDF = sql("SELECT

                     MIN(yearD),

                     MAX(yearD) AS max_year,

                     Carrier,

                     COUNT(*) AS cnt,

                     SUM(IF(ArrDelayMinutes > 30, 1, 0)) AS flights_delayed,

                     ROUND(SUM(IF(ArrDelayMinutes > 30, 1, 0)) / COUNT(*),2) AS rate

                 FROM

                     ontime_part

                 WHERE

                     DayOfWeek NOT IN (6 , 7)

                         AND OriginState NOT IN ('AK' , 'HI', 'PR', 'VI')

                         AND DestState NOT IN ('AK' , 'HI', 'PR', 'VI')

                 GROUP BY carrier

                 HAVING cnt > 1000 AND max_year > '1990'

                 ORDER BY rate DESC , cnt DESC

                 LIMIT 10;

")

sqlDF.show()

 

 

Spark RDD中的map、reduce等操做的概念詳解:

 

map將RDD中的每一個元素都通過map內函數處理後返回給原來的RDD,即對每一個RDD單獨處理且不影響其它和總量。屬於一對一的關係(這裏一指的是對1個RDD而言)。

 

flatMap將RDD中的每一個元素進行處理,返回一個list,list裏面能夠是1個或多個RDD,最終RDD總數會不變或變多。屬於一變多的關係(這裏一指的是對1個RDD而言)。

 

reduce將RDD中元素前兩個傳給輸入函數,產生一個新的return值,新產生的return值與RDD中下一個元素(第三個元素)組成兩個元素,再被傳給輸入函數,直到最後只有一個值爲止。屬於多變一的關係

val c = sc.parallelize(1 to 10)

c.reduce((x, y) => x + y)//結果55

 

reduceByKey(binary_function) 

reduceByKey就是對元素爲KV對的RDD中Key相同的元素的Value進行binary_function的reduce操做,所以,Key相同的多個元素的值被reduce爲一個值,而後與原RDD中的Key組成一個新的KV對。屬於多變少的關係

val a = sc.parallelize(List((1,2),(1,3),(3,4),(3,6)))

a.reduceByKey((x,y) => x + y).collect

相關文章
相關標籤/搜索