基本操做:html
運行時獲取spark版本號(以spark 2.0.0爲例):python
sparksn = SparkSession.builder.appName("PythonSQL").getOrCreate() print sparksn.versionmysql |
獲取spark配置狀況(crossJoin等等):sql
df = spark.sql("SET -v") df.show()數據庫 |
顯示每列的全部內容,不刪減內容顯示,show每列全部內容app
df.show(truncate=False) |
建立和轉換格式:函數
Pandas和Spark的DataFrame二者互相轉換:fetch
pandas_df = spark_df.toPandas() | spark_df = sqlContext.createDataFrame(pandas_df) |
與Spark RDD的相互轉換:ui
rdd_df = df.rdd | df = rdd_df.toDF() |
注:rdd轉df前提是每一個rdd的類型都是Row類型url
增:
新增列:
df.withColumn(「xx」, 0).show() 會報錯,由於原來沒有xx列 from pyspark.sql import functions df = df.withColumn(「xx」, functions.lit(0)).show() |
fillna函數:
df.na.fill() |
以原有列爲基礎添加列:
df = df.withColumn('count20', df["count"] - 20) # 新列爲原有列的數據減去20 |
增長序列標籤:
df.rdd.zipWithIndex()
刪:
刪除一列:
df.drop('age').collect() df.drop(df.age).collect() |
dropna函數:
df = df.na.drop() # 扔掉任何列包含na的行 df = df.dropna(subset=['col_name1', 'col_name2']) # 扔掉col1或col2中任一一列包含na的行 |
改:
修改原有df[「xx」]列的全部值:
df = df.withColumn(「xx」, 1) |
修改列的類型(類型投射):
df = df.withColumn("year2", df["year1"].cast("Int")) |
合併2個表的join方法:
df_join = df_left.join(df_right, df_left.key == df_right.key, "inner") |
其中,方法能夠爲:`inner`, `outer`, `left_outer`, `right_outer`, `leftsemi`.
groupBy方法整合:
GroupedData = df.groupBy(「age」) 應用單個函數(按照A列同名的進行分組,組內對B列進行均值計算來合併): df.groupBy(「A」).avg(「B」).show()
應用多個函數: from pyspark.sql import functions df.groupBy(「A」).agg(functions.avg(「B」), functions.min(「B」), functions.max(「B」)).show() |
整合後GroupedData類型可用的方法(均返回DataFrame類型):
avg(*cols) —— 計算每組中一列或多列的平均值
count() —— 計算每組中一共有多少行,返回DataFrame有2列,一列爲分組的組名,另外一列爲行總數
max(*cols) —— 計算每組中一列或多列的最大值
mean(*cols) —— 計算每組中一列或多列的平均值
min(*cols) —— 計算每組中一列或多列的最小值
sum(*cols) —— 計算每組中一列或多列的總和
【函數應用】將df的每一列應用函數f:
df.foreach(f) 或者 df.rdd.foreach(f) |
【函數應用】將df的每一塊應用函數f:
df.foreachPartition(f) 或者 df.rdd.foreachPartition(f) |
【Map和Reduce應用】返回類型seqRDDs
df.map(func) df.reduce(func) |
解決toDF()跑出First 100 rows類型沒法肯定的異常,能夠採用將Row內每一個元素都統一轉格式,或者判斷格式處理的方法,解決包含None類型時轉換成DataFrame出錯的問題:
@staticmethod def map_convert_none_to_str(row): dict_row = row.asDict()
for key in dict_row: if key != 'some_column_name': value = dict_row[key] if value is None: value_in = str("") else: value_in = str(value) dict_row[key] = value_in
columns = dict_row.keys() v = dict_row.values() row = Row(*columns) return row(*v) |
查:
解決中文亂碼問題(python 2.7方案)
import sys reload(sys) sys.setdefaultencoding('utf-8') |
行元素查詢操做:
像SQL那樣打印列表前20元素(show函數內可用int類型指定要打印的行數):
df.show() df.show(30) |
以樹的形式打印概要
df.printSchema() |
獲取頭幾行到本地:
list = df.head(3) # Example: [Row(a=1, b=1), Row(a=2, b=2), ... ...] list = df.take(5) # Example: [Row(a=1, b=1), Row(a=2, b=2), ... ...] |
輸出list類型,list中每一個元素是Row類:
list = df.collect() |
注:此方法將全部數據所有導入到本地
查詢總行數:
int_num = df.count() |
查詢某列爲null的行:
from pyspark.sql.functions import isnull df = df.filter(isnull("col_a")) |
列元素操做:
獲取Row元素的全部列名:
r = Row(age=11, name='Alice') print r.__fields__ # ['age', 'name'] |
選擇一列或多列:
df.select(「name」) df.select(df[‘name’], df[‘age’]+1) df.select(df.a, df.b, df.c) # 選擇a、b、c三列 df.select(df["a"], df["b"], df["c"]) # 選擇a、b、c三列 |
排序:
df = df.sort("age", ascending=False) |
過濾數據(filter和where方法相同):
df = df.filter(df['age']>21) df = df.where(df['age']>21) # 對null或nan數據進行過濾: from pyspark.sql.functions import isnan, isnull df = df.filter(isnull("a")) # 把a列裏面數據爲null的篩選出來(表明python的None類型) df = df.filter(isnan("a")) # 把a列裏面數據爲nan的篩選出來(Not a Number,非數字數據) |
SQL操做:
DataFrame註冊成SQL的表:
df.createOrReplaceTempView("TBL1") |
進行SQL查詢(返回DataFrame):
conf = SparkConf() ss = SparkSession.builder.appName("APP_NAME").config(conf=conf).getOrCreate() df = ss.sql(「SELECT name, age FROM TBL1 WHERE age >= 13 AND age <= 19″) |
對HIVE數據庫的操做:
首先,開啓對Hive數據庫支持的開關:
spark = SparkSession.builder.appName("app1").enableHiveSupport().getOrCreate() |
接着,用INSERT代替write.parquet命令操做HIVE數據庫
df = spark.read.parquet(some_path) df.createOrReplaceTempView("some_df_tmp_table") sql_content= " INSERT INTO `some_schema`.`some_hive_table` " \ " SELECT a,b,c from some_df_tmp_table " spark.sql(sql_content) |
時間序列操做:
先按某幾列分組,再按時間段分組:
from pyspark.sql.functions import window win_monday = window("col1", "1 week", startTime="4 day") GroupedData = df.groupBy([df.col2, df.col3, df.col4, win_monday]) |
參考資料:
使用Apache Spark讓MySQL查詢速度提高10倍以上
傳統MySQL查詢(執行時間 19 min 16.58 sec):
mysql>
SELECT MIN(yearD), MAX(yearD) AS max_year, Carrier, COUNT(*) AS cnt, SUM(IF(ArrDelayMinutes > 30, 1, 0)) AS flights_delayed, ROUND(SUM(IF(ArrDelayMinutes > 30, 1, 0)) / COUNT(*),2) AS rate FROM ontime_part WHERE DayOfWeek NOT IN (6 , 7) AND OriginState NOT IN ('AK' , 'HI', 'PR', 'VI') AND DestState NOT IN ('AK' , 'HI', 'PR', 'VI') GROUP BY carrier HAVING cnt > 1000 AND max_year > '1990' ORDER BY rate DESC , cnt DESC LIMIT 10; |
使用Scala語言摘寫的Spark查詢(執行時間 2 min 19.628 sec):
scala> val jdbcDF = spark.read.format("jdbc").options(Map("url" -> "jdbc:mysql://localhost:3306/ontime?user=root&password=mysql", "dbtable" -> "ontime.ontime_sm", "fetchSize" -> "10000", "partitionColumn" -> "yeard", "lowerBound" -> "1988", "upperBound" -> "2015", "numPartitions" -> "48")).load() jdbcDF.createOrReplaceTempView("ontime") val sqlDF = sql("SELECT MIN(yearD), MAX(yearD) AS max_year, Carrier, COUNT(*) AS cnt, SUM(IF(ArrDelayMinutes > 30, 1, 0)) AS flights_delayed, ROUND(SUM(IF(ArrDelayMinutes > 30, 1, 0)) / COUNT(*),2) AS rate FROM ontime_part WHERE DayOfWeek NOT IN (6 , 7) AND OriginState NOT IN ('AK' , 'HI', 'PR', 'VI') AND DestState NOT IN ('AK' , 'HI', 'PR', 'VI') GROUP BY carrier HAVING cnt > 1000 AND max_year > '1990' ORDER BY rate DESC , cnt DESC LIMIT 10; ") sqlDF.show() |
Spark RDD中的map、reduce等操做的概念詳解:
map將RDD中的每一個元素都通過map內函數處理後返回給原來的RDD,即對每一個RDD單獨處理且不影響其它和總量。屬於一對一的關係(這裏一指的是對1個RDD而言)。
flatMap將RDD中的每一個元素進行處理,返回一個list,list裏面能夠是1個或多個RDD,最終RDD總數會不變或變多。屬於一變多的關係(這裏一指的是對1個RDD而言)。
reduce將RDD中元素前兩個傳給輸入函數,產生一個新的return值,新產生的return值與RDD中下一個元素(第三個元素)組成兩個元素,再被傳給輸入函數,直到最後只有一個值爲止。屬於多變一的關係。
val c = sc.parallelize(1 to 10)
c.reduce((x, y) => x + y)//結果55
reduceByKey(binary_function)
reduceByKey就是對元素爲KV對的RDD中Key相同的元素的Value進行binary_function的reduce操做,所以,Key相同的多個元素的值被reduce爲一個值,而後與原RDD中的Key組成一個新的KV對。屬於多變少的關係。
val a = sc.parallelize(List((1,2),(1,3),(3,4),(3,6)))
a.reduceByKey((x,y) => x + y).collect