如題所示,SparkSQL /DataFrame /Spark RDD誰快?python
按照官方宣傳以及大部分人的理解,SparkSQL和DataFrame雖然基於RDD,可是因爲對RDD作了優化,因此性能會優於RDD。sql
以前一直也是這麼理解和操做的,直到最近遇到了一個場景,打破了這種不太準確的認識。json
某些場景下,RDD要比DataFrame快,性能有天壤之別。app
需求以下:性能
如下兩份數據求交集,結果輸出url。優化
數據一,json格式,地址咱們用path_json表示,大小10T,每一行數據格式:{"id":"md5字符串", "url":"https://www.thesaurus.com/","title":"sysnonyms and antonyms",xxx},大概20來個字段;ui
數據二,csv格式,地址咱們用path_csv表示,大小50G,每一行數據格式:name url,2個字段,用\t隔開。url
拿到需求後,迅速瞟了一眼數據,爽快答應需求方分分鐘搞定。spa
此時此刻,必須得祭出宇宙Top N的IDE,結合我30多年的人生閱歷和代碼經驗,瞬間雷光電閃,驚雷驟起,一頓操做猛如虎,天空飄過如下幾行代碼:3d
(老鐵們,請自行安裝python,pyspark,pycharm)
方案一
from pyspark.sql import SparkSession
def join_it(): path_json = 'hdfs://i/love/you/' # 數據大小10T, 5萬分區 path_csv = 'hdfs://you/love/me' # 數據大小50G path_save = 'hdfs://we/are/together' df1 = spark.read.json(path_json).select('url') df2 = spark.read.option('sep', '\t').schema('name string, url string').csv(path_csv) df1.join(df2) \ .select(df1.url)\ .coalesce(10000) \ .write \ .mode('overwrite') \ .option('sep', '\t') \ .csv(path_save) if __name__ == '__main__': spark = SparkSession.builder.appName('pyspark').getOrCreate() sc = spark.sparkContext sc.setLogLevel("ERROR") # join_it() # spark.stop()
spark-submit提交任務到spark集羣,參數根據本身的實際狀況自行修改。
spark-submit \ --master yarn \ --deploy-mode client \ --name 'i-live-you' \ --queue 'you-love-me' \ --driver-cores 10 \ --driver-memory 30g \ --num-executors 3000 \ --executor-memory 30g \ --executor-cores 4 \ --archives 'hdfs://your-python-path-on-hdfs#pkg' --conf spark.yarn.appMasterEnv.PYSPARK_PYTHON='集羣裏面的python地址' \ --conf spark.sql.shuffle.partitions=50000 \ --conf spark.default.parallelism=50000 \ --conf spark.task.maxFailures=20 \ your-spark-script.py
若是須要在本機調試代碼,spark的生成須要替換成以下,而後直接運行。調試經過後,仍然須要按照上述方式spark-submit提交任務到集羣運行,因爲數據量很大,須要在集羣運行才能看出性能差別。
spark = SparkSession.builder.appName('pyspark').master('local[*]').getOrCreate()
又是一頓猛操做,提交任務後,嗡嗡叫的肚子提醒我,要去廁所燒一根香,拜拜佛,佛祖保佑無bug。
深圳的夏天,依舊不負衆望的燥熱。熱情似火的太陽,伴着她最愛的紫外線和電磁波,循着外太空固定的軌道,邁着30萬千米/秒的矯健步伐,到達這顆承載70億人的藍色星球,穿透層層藍天白雲,無私的照亮着廣袤的深圳大地。
酷暑讓人思想活躍,思緒萬千。扯得有點遠了,重來不重來了,接着寫bug。
做爲一個謹記廁所文化的人,蹲坑5分鐘,方便你我他;蹲坑半小時,痔瘡等着你。我選擇了後者。
半小時過去了,時間隨着大A股的大跌,瞬間來到了下午的收盤時間。果不其然,又一個下跌如期而至,就在這一刻,體內的混濁之氣伴着伴隨着收跌的股市排出體外。收拾乾淨後,我帶着滿身廁所的芬芳,回到了座位上。
再次打開電腦屏幕,spark任務還在慢悠悠的讀取json文件,半小時纔讀取300G左右,10T的json文件按照這個速度,所有讀完的好幾天。此方案不可用。
方案二
果斷改爲RDD,而後用intersection求交集,果真快不少,10T跟50G求交集,12000cores,5分鐘出結果。Spark任務提交同方案一,再也不贅述。此方案可行
屁顛屁顛的把結果交付給需求方,大佬甚是滿意地流出了開心的淚水
import json from pyspark.sql import SparkSession def join_it(): path_json = 'hdfs://i/love/you/' # 數據大小10T, 5萬分區 path_csv = 'hdfs://you/love/me' # 數據大小50G path_save = 'hdfs://we/are/together' # rdd1 = sc.textFile(path_json).map(lambda v: json.loads(v).get('url', '')).coalesce(50000) rdd2 = sc.textFile(path_csv).map(lambda v: v.split('\t')[1]) rdd1.intersection(rdd2).coalesce(20000).saveAsTextFile(path_save) if __name__ == '__main__': spark = SparkSession.builder.appName('pyspark').getOrCreate() sc = spark.sparkContext sc.setLogLevel("ERROR") # join_it() # spark.stop()
方案三
離下班實際還有30分鐘,做爲一個對技術有追求的資深碼農,嘗試用SparkSQL實現該功能。祭出代碼,以資各位看官共享,新能跟RDD不相上下。此方案也可取。
import json from pyspark.sql import SparkSession def join_it(): path_json = 'hdfs://i/love/you/' # 數據大小10T, 5萬分區 path_csv = 'hdfs://you/love/me' # 數據大小50G path_save = 'hdfs://we/are/together' # sc.textFile(path_json).map(lambda v: (json.loads(v).get('url', ''),)).toDF(['url']).createOrReplaceTempView('a') spark.read.option('sep', '\t').schema('name string, url string').select('url').csv(path_csv).createOrReplaceTempView('b') sql = ''' SELECT a.url FROM a JOIN b ON a.url=b.url ''' spark.sql(sql).coalesce(20000).write.mode('overwrite').option('sep', '\t').csv(path_save) if __name__ == '__main__': spark = SparkSession.builder.appName('pyspark').getOrCreate() sc = spark.sparkContext sc.setLogLevel("ERROR") # join_it() # spark.stop()
總結:
當遇到源數據是體量比較大的json或其餘格式的時候,不要用spark.read的形式直接導入到DataFrame。
那要咋弄?能夠先用RDD把源數據加載進來,而後再轉化成DataFrame,後面用SparkSQL進行操做,如此可達到較好的性能效果。