https://spark-packages.org/裏有不少third-party數據源的package,spark把包加載進來就能夠使用了java
csv格式在spark2.0版本以後是內置的,2.0以前屬於第三方數據源
1、讀取本地外部數據源
1.直接讀取一個json文件mysql
[hadoop@hadoop000 bin]$ ./spark-shell --master local[2] --jars ~/software/mysql-connector-java-5.1.27.jar scala> spark.read.load("file:///home/hadoop/app/spark-2.3.1-bin-2.6.0-cdh5.7.0/examples/src/main/resources/people.json").show
運行報錯:sql
Caused by: java.lang.RuntimeException: file:/home/hadoop/app/spark-2.3.1-bin-2.6.0-cdh5.7.0/examples/src/main/resources/people.json is not a Parquet file. expected magic number at tail [80, 65, 82, 49] but found [49, 57, 125, 10] at org.apache.parquet.hadoop.ParquetFileReader.readFooter(ParquetFileReader.java:476) at org.apache.parquet.hadoop.ParquetFileReader.readFooter(ParquetFileReader.java:445) at org.apache.parquet.hadoop.ParquetFileReader.readFooter(ParquetFileReader.java:421) at org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$$anonfun$readParquetFootersInParallel$1.apply(ParquetFileFormat.scala:519) ... 32 more
查看load方法的源碼:shell
/** * Loads input in as a `DataFrame`, for data sources that require a path (e.g. data backed by * a local or distributed file system). * * @since 1.4.0 */ def load(path: String): DataFrame = { option("path", path).load(Seq.empty: _*) // force invocation of `load(...varargs...)` } --------------------------------------------------------- /** * Loads input in as a `DataFrame`, for data sources that support multiple paths. * Only works if the source is a HadoopFsRelationProvider. * * @since 1.6.0 */ @scala.annotation.varargs def load(paths: String*): DataFrame = { if (source.toLowerCase(Locale.ROOT) == DDLUtils.HIVE_PROVIDER) { throw new AnalysisException("Hive data source can only be used with tables, you can not " + "read files of Hive data source directly.") } val cls = DataSource.lookupDataSource(source, sparkSession.sessionState.conf) if (classOf[DataSourceV2].isAssignableFrom(cls)) { val ds = cls.newInstance() val options = new DataSourceOptions((extraOptions ++ DataSourceV2Utils.extractSessionConfigs( ds = ds.asInstanceOf[DataSourceV2], conf = sparkSession.sessionState.conf)).asJava) // Streaming also uses the data source V2 API. So it may be that the data source implements // v2, but has no v2 implementation for batch reads. In that case, we fall back to loading // the dataframe as a v1 source. val reader = (ds, userSpecifiedSchema) match { case (ds: ReadSupportWithSchema, Some(schema)) => ds.createReader(schema, options) case (ds: ReadSupport, None) => ds.createReader(options) case (ds: ReadSupportWithSchema, None) => throw new AnalysisException(s"A schema needs to be specified when using $ds.") case (ds: ReadSupport, Some(schema)) => val reader = ds.createReader(options) if (reader.readSchema() != schema) { throw new AnalysisException(s"$ds does not allow user-specified schemas.") } reader case _ => null // fall back to v1 } if (reader == null) { loadV1Source(paths: _*) } else { Dataset.ofRows(sparkSession, DataSourceV2Relation(reader)) } } else { loadV1Source(paths: _*) } } private def loadV1Source(paths: String*) = { // Code path for data source v1. sparkSession.baseRelationToDataFrame( DataSource.apply( sparkSession, paths = paths, userSpecifiedSchema = userSpecifiedSchema, className = source, options = extraOptions.toMap).resolveRelation()) } ------------------------------------------------------ private var source: String = sparkSession.sessionState.conf.defaultDataSourceName ------------------------------------------------------- def defaultDataSourceName: String = getConf(DEFAULT_DATA_SOURCE_NAME) -------------------------------------------------------- // This is used to set the default data source val DEFAULT_DATA_SOURCE_NAME = buildConf("spark.sql.sources.default") .doc("The default data source to use in input/output.") .stringConf .createWithDefault("parquet")
從源碼中能夠看出,若是不指定format,load默認讀取的是parquet文件apache
scala> val users = spark.read.load("file:///home/hadoop/app/spark-2.3.1-bin-2.6.0-cdh5.7.0/examples/src/main/resources/users.parquet") scala> users.show() +------+--------------+----------------+ | name|favorite_color|favorite_numbers| +------+--------------+----------------+ |Alyssa| null| [3, 9, 15, 20]| | Ben| red| []| +------+--------------+----------------+
讀取其餘格式的文件,必須經過format指定文件格式,以下:json
//windows idea環境下 val df1 = spark.read.format("json").option("timestampFormat", "yyyy/MM/dd HH:mm:ss ZZ").load("hdfs://192.168.137.141:9000/data/people.json") df1.show() +----+-------+ | age| name| +----+-------+ |null|Michael| | 30| Andy| | 19| Justin| +----+-------+
option("timestampFormat", "yyyy/MM/dd HH:mm:ss ZZ")必須帶上,否則報錯windows
Exception in thread "main" java.lang.IllegalArgumentException: Illegal pattern component: X
2.讀取CSV格式文件session
//源文件內容以下: [hadoop@hadoop001 ~]$ hadoop fs -text /data/people.csv name;age;job Jorge;30;Developer Bob;32;Developer
//windows idea環境下 val df2 = spark.read.format("csv") .option("timestampFormat", "yyyy/MM/dd HH:mm:ss ZZ") .option("sep",";") .option("header","true") //use first line of all files as header .option("inferSchema","true") .load("hdfs://192.168.137.141:9000/data/people.csv") df2.show() df2.printSchema() //輸出結果: +-----+---+---------+ | name|age| job| +-----+---+---------+ |Jorge| 30|Developer| | Bob| 32|Developer| +-----+---+---------+ root |-- name: string (nullable = true) |-- age: integer (nullable = true) |-- job: string (nullable = true) ----------------------------------------------------------- //若是不指定option("sep",";") +------------------+ | name;age;job| +------------------+ |Jorge;30;Developer| | Bob;32;Developer| +------------------+ //若是不指定option("header","true") +-----+---+---------+ | _c0|_c1| _c2| +-----+---+---------+ | name|age| job| |Jorge| 30|Developer| | Bob| 32|Developer| +-----+---+---------+
讀取csv格式文件還能夠自定義schemaapp
val peopleschema = StructType(Array( StructField("hlwname",StringType,true), StructField("hlwage",IntegerType,true), StructField("hlwjob",StringType,true))) val df2 = spark.read.format("csv").option("timestampFormat", "yyyy/MM/dd HH:mm:ss ZZ").option("sep",";") .option("header","true") .schema(peopleschema) .load("hdfs://192.168.137.141:9000/data/people.csv") //打印測試 df2.show() df2.printSchema() 輸出結果: +-------+------+---------+ |hlwname|hlwage| hlwjob| +-------+------+---------+ | Jorge| 30|Developer| | Bob| 32|Developer| +-------+------+---------+ root |-- hlwname: string (nullable = true) |-- hlwage: integer (nullable = true) |-- hlwjob: string (nullable = true)
2、將讀取的文件以其餘格式寫出ide
//將上文讀取的users.parquet以json格式寫出 scala> users.select("name","favorite_color").write.format("json").save("file:///home/hadoop/tmp/parquet2json/") [hadoop@hadoop000 ~]$ cd /home/hadoop/tmp/parquet2json [hadoop@hadoop000 parquet2json]$ ll total 4 -rw-r--r--. 1 hadoop hadoop 56 Sep 24 10:15 part-00000-dfbd9ba5-598f-4e0c-8e81-df85120333db-c000.json -rw-r--r--. 1 hadoop hadoop 0 Sep 24 10:15 _SUCCESS [hadoop@hadoop000 parquet2json]$ cat part-00000-dfbd9ba5-598f-4e0c-8e81-df85120333db-c000.json {"name":"Alyssa"} {"name":"Ben","favorite_color":"red"}
//將上文讀取的people.json以csv格式寫出 df1.write.format("csv") .mode("overwrite") .option("timestampFormat", "yyyy/MM/dd HH:mm:ss ZZ") .save("hdfs://192.168.137.141:9000/data/formatconverttest/") ------------------------------------------ [hadoop@hadoop001 ~]$ hadoop fs -text /data/formatconverttest/part-00000-6fd65eff-d0d3-43e5-9549-2b11bc3ca9de-c000.csv ,Michael 30,Andy 19,Justin //發現若沒有.option("header","true"),寫出的csv丟失了首行的age,name信息 //若不指定.option("sep",";"),默認逗號爲分隔符
此操做的目的在於學會類型轉換,生產上最開始進來的數據大多都是text,json等行式存儲的文件,通常都要轉成ORC,parquet列式存儲的文件,加上壓縮,能把文件大小減少到10%左右,大幅度減少IO和數據處理量,提升性能
此時若是再執行一次save,路徑不變,則會報錯:
scala> users.select("name","favorite_color").write.format("json").save("file:///home/hadoop/tmp/parquet2json/") org.apache.spark.sql.AnalysisException: path file:/home/hadoop/tmp/parquet2json already exists.; at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:109) at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:104) .........................................................
能夠經過設置savemode來解決這個問題
默認是errorifexists
scala> users.select("name","favorite_color").write.format("json").mode("overwrite").save("fil