首先建立一個DF對象:java
scala> spark.version res0: String = 2.2.0.cloudera1 scala> val df = spark.createDataset(Seq(("key1", 23, 1.0), ("key1", 10, 2.0))).toDF("id", "rsrp", "rsrq") df: org.apache.spark.sql.DataFrame = [id: string, rsrp: int ... 1 more field] scala> df.show +----+----+----+ | id|rsrp|rsrq| +----+----+----+ |key1| 23| 1.0| |key1| 10| 2.0| +----+----+----+ scala> df.printSchema root |-- id: string (nullable = true) |-- rsrp: integer (nullable = false) |-- rsrq: double (nullable = false)
能夠是字符串類型,整型sql
scala> df.withColumn("sinurl", lit(12)).show +----+----+----+------+ | id|rsrp|rsrq|sinurl| +----+----+----+------+ |key1| 23| 1.0| 12| |key1| 10| 2.0| 12| +----+----+----+------+ scala> df.withColumn("type", lit("mr")).show +----+----+----+----+ | id|rsrp|rsrq|type| +----+----+----+----+ |key1| 23| 1.0| mr| |key1| 10| 2.0| mr| +----+----+----+----+
注意:shell
lit()是spark自帶的函數,須要import org.apache.spark.sql.functionsexpress
Since 1.3.0
def lit(literal: Any): Column Creates a Column of literal value. The passed in object is returned directly if it is already a Column. If the object is a Scala Symbol, it is converted into a Column also. Otherwise, a new Column is created to represent the literal value. apache
scala> df.withColumn("rsrp2", $"rsrp"*2).show +----+----+----+-----+ | id|rsrp|rsrq|rsrp2| +----+----+----+-----+ |key1| 23| 1.0| 46| |key1| 10| 2.0| 20| +----+----+----+-----+
java方式:api
import static org.apache.spark.sql.functions.col; import java.text.SimpleDateFormat; import java.util.Date; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; import org.apache.spark.sql.SparkSession; import org.apache.spark.sql.functions; import org.apache.spark.sql.api.java.UDF1; import org.apache.spark.sql.types.DataTypes; ... private final SimpleDateFormat srcSdf = new SimpleDateFormat("yyyy-MM-dd HH:00:00"); private final SimpleDateFormat destSdf = new SimpleDateFormat("yyyy-MM-dd 00:00:00"); public Dataset<Row> handler(Dataset<Row> esDataset){ UDF1 date_fomat = new UDF1<String, String>() { private static final long serialVersionUID = 1L; public String call(final String value) throws Exception { Date date = srcSdf.parse(value); return destSdf.format(date); } }; sparkSession.udf().register("date_fomat_func", date_fomat, DataTypes.StringType); UDF1 to_long = new UDF1<Long, Long>() { private static final long serialVersionUID = 1L; public Long call(final Long value) throws Exception { Date date = srcSdf.parse(String.valueOf(value)); return destSdf.parse(destSdf.format(date)).getTime(); } }; sparkSession.udf().register("to_long_func", to_long, DataTypes.LongType); esDataset=esDataset.withColumn("scan_start_time", functions.callUDF("date_fomat_func", col("scan_start_time"))); esDataset=esDataset.withColumn("scan_stop_time", functions.callUDF("date_fomat_func", col("scan_stop_time"))); esDataset=esDataset.withColumn("timestamp", functions.callUDF("to_long_func", col("timestamp"))); return esDataset; } ...
scalasession
scala> import org.apache.spark.sql.types.DataTypes; import org.apache.spark.sql.types.DataTypes scala> df.select(col("*"), | udf{ | (e:Int) => | if(e == "23") { | 1 | } else { | 2 | } | }.apply(df("rsrp")).cast(DataTypes.DoubleType).as("rsrp_udf") | ).show +----+----+----+--------+ | id|rsrp|rsrq|rsrp_udf| +----+----+----+--------+ |key1| 23| 1.0| 2.0| |key1| 10| 2.0| 2.0| +----+----+----+--------+
scala> df.select(col("*"), | when(df("rsrp") > 10, lit(">10")).when(df("rsrp") === 10, "=10").otherwise("<10").as("rsrp_compare10") | ).show +----+----+----+--------------+ | id|rsrp|rsrq|rsrp_compare10| +----+----+----+--------------+ |key1| 23| 1.0| >10| |key1| 10| 2.0| =10| +----+----+----+--------------+
df.withColumn("r", when($"rsrp".isNull, lit(null)) .otherwise(udf1($"rsrp")) .cast(DataTypes.IntegerType) )
scala> df.withColumn("rsrp4", expr("rsrp * 4")).show +----+----+----+-----+ | id|rsrp|rsrq|rsrp4| +----+----+----+-----+ |key1| 23| 1.0| 92| |key1| 10| 2.0| 40| +----+----+----+-----+
scala> df.drop("rsrp").show +----+----+ | id|rsrq| +----+----+ |key1| 1.0| |key1| 2.0| +----+----+ scala> df.drop("rsrp","rsrq").show +----+ | id| +----+ |key1| |key1| +----+
首先,在hadoop目錄/user/spark/test.csvapp
[spark@master ~]$ hadoop fs -text /user/spark/test.csv key1,key2,key3,key4,key5 aaa,1,2,t1,4 bbb,5,3,t2,8 ccc,2,2,,7 ,7,3,t1, bbb,1,5,t3,0 ,4,,t1,8
備註:若是想在根目錄下執行spark-shell.須要在/etc/profile中追加spark的安裝目錄:函數
export SPARK_HOME=/opt/spark-2.2.1-bin-hadoop2.7 export PATH=$PATH:$SPARK_HOME/bin
使用spark加載.user/spark/test.csv文件oop
[spark@master ~]$ spark-shell Setting default log level to "WARN". To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel). 18/10/29 21:50:32 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable Spark context Web UI available at http://192.168.0.120:4040 Spark context available as 'sc' (master = local[*], app id = local-1540821032565). Spark session available as 'spark'. Welcome to ____ __ / __/__ ___ _____/ /__ _\ \/ _ \/ _ `/ __/ '_/ /___/ .__/\_,_/_/ /_/\_\ version 2.2.1 /_/ Using Scala version 2.11.8 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_171) Type in expressions to have them evaluated. Type :help for more information. scala> val df = spark.read.option("header","true").csv("/user/spark/test.csv") 18/10/29 21:51:16 WARN metastore.ObjectStore: Version information not found in metastore. hive.metastore.schema.verification is not enabled so recording the schema version 1.2.0 18/10/29 21:51:16 WARN metastore.ObjectStore: Failed to get database default, returning NoSuchObjectException 18/10/29 21:51:37 WARN metastore.ObjectStore: Failed to get database global_temp, returning NoSuchObjectException df: org.apache.spark.sql.DataFrame = [key1: string, key2: string ... 3 more fields] scala> df.show +----+----+----+----+----+ |key1|key2|key3|key4|key5| +----+----+----+----+----+ | aaa| 1| 2| t1| 4| | bbb| 5| 3| t2| 8| | ccc| 2| 2|null| 7| |null| 7| 3| t1|null| | bbb| 1| 5| t3| 0| |null| 4|null| t1| 8 | +----+----+----+----+----+ scala> df.schema res3: org.apache.spark.sql.types.StructType = StructType(StructField(key1,StringType,true), StructField(key2,StringType,true),
StructField(key3,StringType,true), StructField(key4,StringType,true), StructField(key5,StringType,true)) scala> df.printSchema root |-- key1: string (nullable = true) |-- key2: string (nullable = true) |-- key3: string (nullable = true) |-- key4: string (nullable = true) |-- key5: string (nullable = true)
一次修改相同類型的多個列的示例。 這裏是把key3,key5列中全部的null值替換成1024。 csv導入時默認是string,若是是整型,寫法是同樣的,有各個類型的重載。
scala> df.na.fill("1024",Seq("key3","key5")).show +----+----+----+----+----+ |key1|key2|key3|key4|key5| +----+----+----+----+----+ | aaa| 1| 2| t1| 4| | bbb| 5| 3| t2| 8| | ccc| 2| 2|null| 7| |null| 7| 3| t1|1024| | bbb| 1| 5| t3| 0| |null| 4|1024| t1| 8 | +----+----+----+----+----+
一次修改不一樣類型的多個列的示例。 csv導入時默認是string,若是是整型,寫法是同樣的,有各個類型的重載。
scala> df.na.fill(Map(("key1"->"yyy"),("key3","1024"),("key4","t88"),("key5","4096"))).show +----+----+----+----+----+ |key1|key2|key3|key4|key5| +----+----+----+----+----+ | aaa| 1| 2| t1| 4| | bbb| 5| 3| t2| 8| | ccc| 2| 2| t88| 7| | yyy| 7| 3| t1|4096| | bbb| 1| 5| t3| 0| | yyy| 4|1024| t1| 8 | +----+----+----+----+----+
不修改,只是過濾掉含有null值的行。 這裏是過濾掉key3,key5列中含有null的行
scala> df.na.drop(Seq("key3","key5")).show +----+----+----+----+----+ |key1|key2|key3|key4|key5| +----+----+----+----+----+ | aaa| 1| 2| t1| 4| | bbb| 5| 3| t2| 8| | ccc| 2| 2|null| 7| | bbb| 1| 5| t3| 0| +----+----+----+----+----+
過濾掉指定的若干列中,有效值少於n列的行 這裏是過濾掉key1,key2,key3這3列中有效值小於2列的行。最後一行中,這3列有2列都是null,因此被過濾掉了。
scala> df.na.drop(2,Seq("key1","key2","key3")).show +----+----+----+----+----+ |key1|key2|key3|key4|key5| +----+----+----+----+----+ | aaa| 1| 2| t1| 4| | bbb| 5| 3| t2| 8| | ccc| 2| 2|null| 7| |null| 7| 3| t1|null| | bbb| 1| 5| t3| 0| +----+----+----+----+----+
同上,若是不指定列名列表,則默認列名列表就是全部列
scala> df.na.drop(4).show +----+----+----+----+----+ |key1|key2|key3|key4|key5| +----+----+----+----+----+ | aaa| 1| 2| t1| 4| | bbb| 5| 3| t2| 8| | ccc| 2| 2|null| 7| | bbb| 1| 5| t3| 0| +----+----+----+----+----+
參考:
https://blog.csdn.net/coding_hello/article/details/75211995
https://blog.csdn.net/xuejianbest/article/details/81666065