Spark處理字符串日期的max和min的方式
Spark處理數據存儲到Hive的方式
Spark處理新增列的方式map和udf、functions
Spark處理行轉列pivot的使用
Python 3.5.3
Spark1.6.2html
通常是字符串類型的日期在使用Spark的agg求max時,是不正確的,API顯示只支持數值型的max、min
hive的SQL查詢引擎是支持字符串日期的max和min的python
unix_timestampnginx
public static Column unix_timestamp(Column s) Converts time string in format yyyy-MM-dd HH:mm:ss to Unix timestamp (in seconds), using the default timezone and the default locale, return null if fail. Parameters: s - (undocumented) Returns: (undocumented) Since: 1.5.0
from pyspark.sql import functions as F df.withColumn('startuptime_stamp', F.unix_timestamp('startuptime'))
select device_id, max(startuptime) as max_startuptime, min(startuptime) as min_startuptime from app_table group by device_id
一般Spark任務處理後的結果數據會存儲到Hive表中,能夠先保存至HDFS目錄再load、最方便仍是直接使用臨時表和HiveContext插入數據sql
repartition根據實際文件大小進行調整,數據比較小時,保存成一個文件shell
df.map(lambda r: func).repartition(1).saveAsTextFile(data_dir)
先刪除分區,若是已經存在的話
再覆蓋原來的數據【方便從新重複跑或修復數據】
此處使用shell,也可以使用HiveContext的sqlapache
alter table app_table drop if exists partition(datestr='$day_01'); load data inpath 'hdfs://xx/out/$day_01' overwrite into table app_table partition(datestr='$day_01');
app_table1_df.registerTempTable("app_table1_tmp") app_table2_df.registerTempTable("app_table2_tmp") hivectx.sql("set spark.sql.shuffle.partitions=1") hivectx.sql("alter table app_table drop if exists partition(datestr='%s')" % daystr) hivectx.sql("insert overwrite table app_table partition(datestr='%s') select * from app_table1_tmp" % daystr) hivectx.sql("insert into app_table partition(datestr='%s') select * from app_table2_tmp" % daystr)
Spark在處理數據轉換時,一般須要使用map、flatmap等操做,其中使用map會產生新的列或修改某列字段的值
Spark一樣支持自定義函數UDF以及提供了相似Hive內置函數的各類各樣的處理函數api
須要定義函數和StructType
忽略數值判斷細節和精度等app
from pyspark.sql.types import * def a_func(_): return _['id'], _['cnt1'], _['cnt2'], _['cnt1'] / (_['cnt1'] + _['cnt1']) a_schema = StructType([ StructField('id', StringType(), True), StructField('cnt1', IntegerType(), True), StructField('cnt2', IntegerType(), True), StructField('cnt1_rate', IntegerType(), True) ]) a_new_df = sqlctx.createDataFrame(df.select('id', 'cnt1', 'cnt2').map(a_func), a_schema)
須要定義函數和UDF
忽略數值判斷細節和精度等函數
def a_func(cnt1, cnt2): return cnt1 / (cnt1 + cnt2) a_udf = F.udf(a_func, IntegerType()) a_new_df = df.withColumn('cnt1_rate', a_udf(df['cnt1'], df['cnt2'])
處理相似日期字符串的格式轉換、等等等
https://spark.apache.org/docs/1.6.2/api/java/org/apache/spark/sql/functions.html
在使用SQL查詢數據時,不少狀況下須要將行轉爲列,以有利於數據的展現和不一樣維度需求的利用
通常可採用子查詢case when、連續join、字段補全union的形式
Spark的DataFrame中能夠經過GroupedData的pivot函數來實現
df.groupBy(['course_name']).pivot('daystr').sum('score') df.groupBy(['course_name']).pivot('daystr').count()
轉換前
daystr course_name score 2017-11-15 yuwen 1 2017-11-15 yuwen 1 2017-11-15 shuxue 1 2017-11-15 yingyu 2 2017-11-16 yuwen 1 2017-11-16 shuxue 1 2017-11-16 yingyu 2
轉換後
course_name 2017-11-15 2017-11-16 yuwen 2 1 shuxue 1 1 yingyu 2 2
course_name 2017-11-15 2017-11-16 yuwen 2 1 shuxue 1 1 yingyu 1 1
原文地址:https://blog.icocoro.me/2017/11/16/1711-zhishidian-spark%E5%B0%8F%E8%8A%8201/index.html