SparkSQL /DataFrame /Spark RDD誰快?

如題所示,SparkSQL /DataFrame /Spark RDD誰快?python

按照官方宣傳以及大部分人的理解,SparkSQL和DataFrame雖然基於RDD,可是因爲對RDD作了優化,因此性能會優於RDD。sql

以前一直也是這麼理解和操做的,直到最近遇到了一個場景,打破了這種不太準確的認識。json

某些場景下,RDD要比DataFrame快,性能有天壤之別。app

 

需求以下性能

如下兩份數據求交集,結果輸出url。優化

數據一,json格式,地址咱們用path_json表示,大小10T,每一行數據格式:{"id":"md5字符串", "url":"https://www.thesaurus.com/","title":"sysnonyms and antonyms",xxx},大概20來個字段;ui

數據二,csv格式,地址咱們用path_csv表示,大小50G,每一行數據格式:name url,2個字段,用\t隔開。url

 

拿到需求後,迅速瞟了一眼數據,爽快答應需求方分分鐘搞定。spa

此時此刻,必須得祭出宇宙Top N的IDE,結合我30多年的人生閱歷和代碼經驗,瞬間雷光電閃,驚雷驟起,一頓操做猛如虎,天空飄過如下幾行代碼:3d

(老鐵們,請自行安裝python,pyspark,pycharm)

方案一

 
 
from pyspark.sql import SparkSession
def join_it():
    path_json = 'hdfs://i/love/you/'  # 數據大小10T, 5萬分區
    path_csv  = 'hdfs://you/love/me'  # 數據大小50G
    path_save = 'hdfs://we/are/together'

    df1 = spark.read.json(path_json).select('url')
    df2 = spark.read.option('sep', '\t').schema('name string, url string').csv(path_csv)
    df1.join(df2) \
        .select(df1.url)\
        .coalesce(10000) \
        .write \
        .mode('overwrite') \
        .option('sep', '\t') \
        .csv(path_save)


if __name__ == '__main__':
    spark = SparkSession.builder.appName('pyspark').getOrCreate()
    sc = spark.sparkContext
    sc.setLogLevel("ERROR")
    #
    join_it()
    #
    spark.stop()

spark-submit提交任務到spark集羣,參數根據本身的實際狀況自行修改。

spark-submit \
    --master yarn \
    --deploy-mode client \
    --name 'i-live-you' \
    --queue 'you-love-me' \
    --driver-cores 10 \
    --driver-memory 30g \
    --num-executors 3000 \
    --executor-memory 30g \
    --executor-cores 4 \
    --archives 'hdfs://your-python-path-on-hdfs#pkg'
    --conf spark.yarn.appMasterEnv.PYSPARK_PYTHON='集羣裏面的python地址' \
    --conf spark.sql.shuffle.partitions=50000 \
    --conf spark.default.parallelism=50000 \
    --conf spark.task.maxFailures=20 \
    your-spark-script.py

若是須要在本機調試代碼,spark的生成須要替換成以下,而後直接運行。調試經過後,仍然須要按照上述方式spark-submit提交任務到集羣運行,因爲數據量很大,須要在集羣運行才能看出性能差別。

spark = SparkSession.builder.appName('pyspark').master('local[*]').getOrCreate()

又是一頓猛操做,提交任務後,嗡嗡叫的肚子提醒我,要去廁所燒一根香,拜拜佛,佛祖保佑無bug。

深圳的夏天,依舊不負衆望的燥熱。熱情似火的太陽,伴着她最愛的紫外線和電磁波,循着外太空固定的軌道,邁着30萬千米/秒的矯健步伐,到達這顆承載70億人的藍色星球,穿透層層藍天白雲,無私的照亮着廣袤的深圳大地。

酷暑讓人思想活躍,思緒萬千。扯得有點遠了,重來不重來了,接着寫bug。

 

做爲一個謹記廁所文化的人,蹲坑5分鐘,方便你我他;蹲坑半小時,痔瘡等着你。我選擇了後者。

半小時過去了,時間隨着大A股的大跌,瞬間來到了下午的收盤時間。果不其然,又一個下跌如期而至,就在這一刻,體內的混濁之氣伴着伴隨着收跌的股市排出體外。收拾乾淨後,我帶着滿身廁所的芬芳,回到了座位上。

再次打開電腦屏幕,spark任務還在慢悠悠的讀取json文件,半小時纔讀取300G左右,10T的json文件按照這個速度,所有讀完的好幾天。此方案不可用。

 

方案二

果斷改爲RDD,而後用intersection求交集,果真快不少,10T跟50G求交集,12000cores,5分鐘出結果。Spark任務提交同方案一,再也不贅述。此方案可行

屁顛屁顛的把結果交付給需求方,大佬甚是滿意地流出了開心的淚水

import json
from pyspark.sql import SparkSession

def join_it():
    path_json = 'hdfs://i/love/you/'  # 數據大小10T, 5萬分區
    path_csv  = 'hdfs://you/love/me'  # 數據大小50G
    path_save = 'hdfs://we/are/together'
    #
    rdd1 = sc.textFile(path_json).map(lambda v: json.loads(v).get('url', '')).coalesce(50000)
    rdd2 = sc.textFile(path_csv).map(lambda v: v.split('\t')[1])
    rdd1.intersection(rdd2).coalesce(20000).saveAsTextFile(path_save)


if __name__ == '__main__':
    spark = SparkSession.builder.appName('pyspark').getOrCreate()
    sc = spark.sparkContext
    sc.setLogLevel("ERROR")
    #
    join_it()
    #
    spark.stop()

 

方案三

離下班實際還有30分鐘,做爲一個對技術有追求的資深碼農,嘗試用SparkSQL實現該功能。祭出代碼,以資各位看官共享,新能跟RDD不相上下。此方案也可取。

import json
from pyspark.sql import SparkSession

def join_it():
    path_json = 'hdfs://i/love/you/'  # 數據大小10T, 5萬分區
    path_csv  = 'hdfs://you/love/me'  # 數據大小50G
    path_save = 'hdfs://we/are/together'
    #
    sc.textFile(path_json).map(lambda v: (json.loads(v).get('url', ''),)).toDF(['url']).createOrReplaceTempView('a')
    spark.read.option('sep', '\t').schema('name string, url string').select('url').csv(path_csv).createOrReplaceTempView('b')

    sql = '''
    SELECT
        a.url
    FROM
        a
    JOIN
        b
    ON
        a.url=b.url
    '''
    spark.sql(sql).coalesce(20000).write.mode('overwrite').option('sep', '\t').csv(path_save)


if __name__ == '__main__':
    spark = SparkSession.builder.appName('pyspark').getOrCreate()
    sc = spark.sparkContext
    sc.setLogLevel("ERROR")
    #
    join_it()
    #
    spark.stop()

 

總結:

當遇到源數據是體量比較大的json或其餘格式的時候,不要用spark.read的形式直接導入到DataFrame。

那要咋弄?能夠先用RDD把源數據加載進來,而後再轉化成DataFrame,後面用SparkSQL進行操做,如此可達到較好的性能效果。

相關文章
相關標籤/搜索