簡介html
Spark的 RDD、DataFrame 和 SparkSQL的性能比較。node
2方面的比較python
單條記錄的隨機查找sql
aggregation聚合而且sorting後輸出dom
使用如下Spark的三種方式來解決上面的2個問題,對比性能。oop
Using RDD’s性能
Using DataFrames測試
Using SparkSQLspa
數據源code
在HDFS中3個文件中存儲的9百萬不一樣記錄
總大小 1.4 GB
實驗環境
HDP 2.4
Hadoop version 2.7
Spark 1.6
HDP Sandbox
測試結果
原始的RDD 比 DataFrames 和 SparkSQL性能要好
DataFrames 和 SparkSQL 性能差很少
使用DataFrames 和 SparkSQL 比 RDD 操做更直觀
Jobs都是獨立運行,沒有其餘job的干擾
2個操做
Random lookup against 1 order ID from 9 Million unique order ID's
GROUP all the different products with their total COUNTS and SORT DESCENDING by product name
代碼
RDD Random Lookup
#!/usr/bin/env python from time import time from pyspark import SparkConf, SparkContext conf = (SparkConf() .setAppName("rdd_random_lookup") .set("spark.executor.instances", "10") .set("spark.executor.cores", 2) .set("spark.dynamicAllocation.enabled", "false") .set("spark.shuffle.service.enabled", "false") .set("spark.executor.memory", "500MB")) sc = SparkContext(conf = conf) t0 = time() path = "/data/customer_orders*" lines = sc.textFile(path) ## filter where the order_id, the second field, is equal to 96922894 print lines.map(lambda line: line.split('|')).filter(lambda line: int(line[1]) == 96922894).collect() tt = str(time() - t0) print "RDD lookup performed in " + tt + " seconds"
DataFrame Random Lookup
#!/usr/bin/env python from time import time from pyspark.sql import * from pyspark import SparkConf, SparkContext conf = (SparkConf() .setAppName("data_frame_random_lookup") .set("spark.executor.instances", "10") .set("spark.executor.cores", 2) .set("spark.dynamicAllocation.enabled", "false") .set("spark.shuffle.service.enabled", "false") .set("spark.executor.memory", "500MB")) sc = SparkContext(conf = conf) sqlContext = SQLContext(sc) t0 = time() path = "/data/customer_orders*" lines = sc.textFile(path) ## create data frame orders_df = sqlContext.createDataFrame( \ lines.map(lambda l: l.split("|")) \ .map(lambda p: Row(cust_id=int(p[0]), order_id=int(p[1]), email_hash=p[2], ssn_hash=p[3], product_id=int(p[4]), product_desc=p[5], \ country=p[6], state=p[7], shipping_carrier=p[8], shipping_type=p[9], shipping_class=p[10] ) ) ) ## filter where the order_id, the second field, is equal to 96922894 orders_df.where(orders_df['order_id'] == 96922894).show() tt = str(time() - t0) print "DataFrame performed in " + tt + " seconds"
SparkSQL Random Lookup
#!/usr/bin/env python from time import time from pyspark.sql import * from pyspark import SparkConf, SparkContext conf = (SparkConf() .setAppName("spark_sql_random_lookup") .set("spark.executor.instances", "10") .set("spark.executor.cores", 2) .set("spark.dynamicAllocation.enabled", "false") .set("spark.shuffle.service.enabled", "false") .set("spark.executor.memory", "500MB")) sc = SparkContext(conf = conf) sqlContext = SQLContext(sc) t0 = time() path = "/data/customer_orders*" lines = sc.textFile(path) ## create data frame orders_df = sqlContext.createDataFrame( \ lines.map(lambda l: l.split("|")) \ .map(lambda p: Row(cust_id=int(p[0]), order_id=int(p[1]), email_hash=p[2], ssn_hash=p[3], product_id=int(p[4]), product_desc=p[5], \ country=p[6], state=p[7], shipping_carrier=p[8], shipping_type=p[9], shipping_class=p[10] ) ) ) ## register data frame as a temporary table orders_df.registerTempTable("orders") ## filter where the customer_id, the first field, is equal to 96922894 print sqlContext.sql("SELECT * FROM orders where order_id = 96922894").collect() tt = str(time() - t0) print "SparkSQL performed in " + tt + " seconds"
RDD with GroupBy, Count, and Sort Descending
#!/usr/bin/env python from time import time from pyspark import SparkConf, SparkContext conf = (SparkConf() .setAppName("rdd_aggregation_and_sort") .set("spark.executor.instances", "10") .set("spark.executor.cores", 2) .set("spark.dynamicAllocation.enabled", "false") .set("spark.shuffle.service.enabled", "false") .set("spark.executor.memory", "500MB")) sc = SparkContext(conf = conf) t0 = time() path = "/data/customer_orders*" lines = sc.textFile(path) counts = lines.map(lambda line: line.split('|')) \ .map(lambda x: (x[5], 1)) \ .reduceByKey(lambda a, b: a + b) \ .map(lambda x:(x[1],x[0])) \ .sortByKey(ascending=False) for x in counts.collect(): print x[1] + '\t' + str(x[0]) tt = str(time() - t0) print "RDD GroupBy performed in " + tt + " seconds"
DataFrame with GroupBy, Count, and Sort Descending
#!/usr/bin/env python from time import time from pyspark.sql import * from pyspark import SparkConf, SparkContext conf = (SparkConf() .setAppName("data_frame_aggregation_and_sort") .set("spark.executor.instances", "10") .set("spark.executor.cores", 2) .set("spark.dynamicAllocation.enabled", "false") .set("spark.shuffle.service.enabled", "false") .set("spark.executor.memory", "500MB")) sc = SparkContext(conf = conf) sqlContext = SQLContext(sc) t0 = time() path = "/data/customer_orders*" lines = sc.textFile(path) ## create data frame orders_df = sqlContext.createDataFrame( \ lines.map(lambda l: l.split("|")) \ .map(lambda p: Row(cust_id=int(p[0]), order_id=int(p[1]), email_hash=p[2], ssn_hash=p[3], product_id=int(p[4]), product_desc=p[5], \ country=p[6], state=p[7], shipping_carrier=p[8], shipping_type=p[9], shipping_class=p[10] ) ) ) results = orders_df.groupBy(orders_df['product_desc']).count().sort("count",ascending=False) for x in results.collect(): print x tt = str(time() - t0) print "DataFrame performed in " + tt + " seconds"
SparkSQL with GroupBy, Count, and Sort Descending
#!/usr/bin/env python from time import time from pyspark.sql import * from pyspark import SparkConf, SparkContext conf = (SparkConf() .setAppName("spark_sql_aggregation_and_sort") .set("spark.executor.instances", "10") .set("spark.executor.cores", 2) .set("spark.dynamicAllocation.enabled", "false") .set("spark.shuffle.service.enabled", "false") .set("spark.executor.memory", "500MB")) sc = SparkContext(conf = conf) sqlContext = SQLContext(sc) t0 = time() path = "/data/customer_orders*" lines = sc.textFile(path) ## create data frame orders_df = sqlContext.createDataFrame(lines.map(lambda l: l.split("|")) \ .map(lambda r: Row(product=r[5]))) ## register data frame as a temporary table orders_df.registerTempTable("orders") results = sqlContext.sql("SELECT product, count(*) AS total_count FROM orders GROUP BY product ORDER BY total_count DESC") for x in results.collect(): print x tt = str(time() - t0) print "SparkSQL performed in " + tt + " seconds"
原文:https://community.hortonworks.com/articles/42027/rdd-vs-dataframe-vs-sparksql.html