SparkSQL讀寫本地外部數據源

https://spark-packages.org/裏有不少third-party數據源的package,spark把包加載進來就能夠使用了java

csv格式在spark2.0版本以後是內置的,2.0以前屬於第三方數據源
1、讀取本地外部數據源
1.直接讀取一個json文件mysql

 

[hadoop@hadoop000 bin]$ ./spark-shell --master local[2] --jars ~/software/mysql-connector-java-5.1.27.jar 
scala> spark.read.load("file:///home/hadoop/app/spark-2.3.1-bin-2.6.0-cdh5.7.0/examples/src/main/resources/people.json").show

運行報錯:sql

Caused by: java.lang.RuntimeException: file:/home/hadoop/app/spark-2.3.1-bin-2.6.0-cdh5.7.0/examples/src/main/resources/people.json is not a Parquet file. expected magic number at tail [80, 65, 82, 49] but found [49, 57, 125, 10]
  at org.apache.parquet.hadoop.ParquetFileReader.readFooter(ParquetFileReader.java:476)
  at org.apache.parquet.hadoop.ParquetFileReader.readFooter(ParquetFileReader.java:445)
  at org.apache.parquet.hadoop.ParquetFileReader.readFooter(ParquetFileReader.java:421)
  at org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$$anonfun$readParquetFootersInParallel$1.apply(ParquetFileFormat.scala:519)
  ... 32 more

查看load方法的源碼:shell

/**
   * Loads input in as a `DataFrame`, for data sources that require a path (e.g. data backed by
   * a local or distributed file system).
   *
   * @since 1.4.0
   */
  def load(path: String): DataFrame = {
    option("path", path).load(Seq.empty: _*) // force invocation of `load(...varargs...)`
  }
---------------------------------------------------------
/**
   * Loads input in as a `DataFrame`, for data sources that support multiple paths.
   * Only works if the source is a HadoopFsRelationProvider.
   *
   * @since 1.6.0
   */
  @scala.annotation.varargs
  def load(paths: String*): DataFrame = {
    if (source.toLowerCase(Locale.ROOT) == DDLUtils.HIVE_PROVIDER) {
      throw new AnalysisException("Hive data source can only be used with tables, you can not " +
        "read files of Hive data source directly.")
    }

    val cls = DataSource.lookupDataSource(source, sparkSession.sessionState.conf)
    if (classOf[DataSourceV2].isAssignableFrom(cls)) {
      val ds = cls.newInstance()
      val options = new DataSourceOptions((extraOptions ++
        DataSourceV2Utils.extractSessionConfigs(
          ds = ds.asInstanceOf[DataSourceV2],
          conf = sparkSession.sessionState.conf)).asJava)

      // Streaming also uses the data source V2 API. So it may be that the data source implements
      // v2, but has no v2 implementation for batch reads. In that case, we fall back to loading
      // the dataframe as a v1 source.
      val reader = (ds, userSpecifiedSchema) match {
        case (ds: ReadSupportWithSchema, Some(schema)) =>
          ds.createReader(schema, options)

        case (ds: ReadSupport, None) =>
          ds.createReader(options)

        case (ds: ReadSupportWithSchema, None) =>
          throw new AnalysisException(s"A schema needs to be specified when using $ds.")

        case (ds: ReadSupport, Some(schema)) =>
          val reader = ds.createReader(options)
          if (reader.readSchema() != schema) {
            throw new AnalysisException(s"$ds does not allow user-specified schemas.")
          }
          reader

        case _ => null // fall back to v1
      }

      if (reader == null) {
        loadV1Source(paths: _*)
      } else {
        Dataset.ofRows(sparkSession, DataSourceV2Relation(reader))
      }
    } else {
      loadV1Source(paths: _*)
    }
  }

  private def loadV1Source(paths: String*) = {
    // Code path for data source v1.
    sparkSession.baseRelationToDataFrame(
      DataSource.apply(
        sparkSession,
        paths = paths,
        userSpecifiedSchema = userSpecifiedSchema,
        className = source,
        options = extraOptions.toMap).resolveRelation())
  }
------------------------------------------------------
private var source: String = sparkSession.sessionState.conf.defaultDataSourceName
-------------------------------------------------------
def defaultDataSourceName: String = getConf(DEFAULT_DATA_SOURCE_NAME)
--------------------------------------------------------
// This is used to set the default data source
  val DEFAULT_DATA_SOURCE_NAME = buildConf("spark.sql.sources.default")
    .doc("The default data source to use in input/output.")
    .stringConf
    .createWithDefault("parquet")

從源碼中能夠看出,若是不指定format,load默認讀取的是parquet文件apache

scala> val users = spark.read.load("file:///home/hadoop/app/spark-2.3.1-bin-2.6.0-cdh5.7.0/examples/src/main/resources/users.parquet")
scala> users.show()
+------+--------------+----------------+                                        
|  name|favorite_color|favorite_numbers|
+------+--------------+----------------+
|Alyssa|          null|  [3, 9, 15, 20]|
|   Ben|           red|              []|
+------+--------------+----------------+

讀取其餘格式的文件,必須經過format指定文件格式,以下:json

//windows idea環境下
val df1 = spark.read.format("json").option("timestampFormat", "yyyy/MM/dd HH:mm:ss ZZ").load("hdfs://192.168.137.141:9000/data/people.json")
df1.show()
+----+-------+
| age|   name|
+----+-------+
|null|Michael|
|  30|   Andy|
|  19| Justin|
+----+-------+

option("timestampFormat", "yyyy/MM/dd HH:mm:ss ZZ")必須帶上,否則報錯windows

Exception in thread "main" java.lang.IllegalArgumentException: Illegal pattern component: X

2.讀取CSV格式文件session

//源文件內容以下:
[hadoop@hadoop001 ~]$ hadoop fs -text /data/people.csv
name;age;job
Jorge;30;Developer
Bob;32;Developer
//windows idea環境下
val df2 = spark.read.format("csv")
      .option("timestampFormat", "yyyy/MM/dd HH:mm:ss ZZ")
      .option("sep",";")
      .option("header","true")     //use first line of all files as header
      .option("inferSchema","true")
      .load("hdfs://192.168.137.141:9000/data/people.csv")
df2.show()
df2.printSchema()
//輸出結果:
+-----+---+---------+
| name|age|      job|
+-----+---+---------+
|Jorge| 30|Developer|
|  Bob| 32|Developer|
+-----+---+---------+

root
 |-- name: string (nullable = true)
 |-- age: integer (nullable = true)
 |-- job: string (nullable = true)
-----------------------------------------------------------
//若是不指定option("sep",";")
+------------------+
|      name;age;job|
+------------------+
|Jorge;30;Developer|
|  Bob;32;Developer|
+------------------+
//若是不指定option("header","true")
+-----+---+---------+
|  _c0|_c1|      _c2|
+-----+---+---------+
| name|age|      job|
|Jorge| 30|Developer|
|  Bob| 32|Developer|
+-----+---+---------+

讀取csv格式文件還能夠自定義schemaapp

val peopleschema = StructType(Array(
StructField("hlwname",StringType,true), 
StructField("hlwage",IntegerType,true), 
StructField("hlwjob",StringType,true)))
val df2 = spark.read.format("csv").option("timestampFormat", "yyyy/MM/dd HH:mm:ss ZZ").option("sep",";")
        .option("header","true")
        .schema(peopleschema)
        .load("hdfs://192.168.137.141:9000/data/people.csv")
      //打印測試
      df2.show()
      df2.printSchema()
輸出結果:
+-------+------+---------+
|hlwname|hlwage|   hlwjob|
+-------+------+---------+
|  Jorge|    30|Developer|
|    Bob|    32|Developer|
+-------+------+---------+

root
 |-- hlwname: string (nullable = true)
 |-- hlwage: integer (nullable = true)
 |-- hlwjob: string (nullable = true)

2、將讀取的文件以其餘格式寫出ide

//將上文讀取的users.parquet以json格式寫出
scala> users.select("name","favorite_color").write.format("json").save("file:///home/hadoop/tmp/parquet2json/")
[hadoop@hadoop000 ~]$ cd /home/hadoop/tmp/parquet2json
[hadoop@hadoop000 parquet2json]$ ll
total 4
-rw-r--r--. 1 hadoop hadoop 56 Sep 24 10:15 part-00000-dfbd9ba5-598f-4e0c-8e81-df85120333db-c000.json
-rw-r--r--. 1 hadoop hadoop  0 Sep 24 10:15 _SUCCESS
[hadoop@hadoop000 parquet2json]$ cat part-00000-dfbd9ba5-598f-4e0c-8e81-df85120333db-c000.json 
{"name":"Alyssa"}
{"name":"Ben","favorite_color":"red"}
//將上文讀取的people.json以csv格式寫出
df1.write.format("csv")
     .mode("overwrite")
     .option("timestampFormat", "yyyy/MM/dd HH:mm:ss ZZ")
     .save("hdfs://192.168.137.141:9000/data/formatconverttest/")
------------------------------------------
[hadoop@hadoop001 ~]$ hadoop fs -text /data/formatconverttest/part-00000-6fd65eff-d0d3-43e5-9549-2b11bc3ca9de-c000.csv
,Michael
30,Andy
19,Justin
//發現若沒有.option("header","true"),寫出的csv丟失了首行的age,name信息
//若不指定.option("sep",";"),默認逗號爲分隔符

此操做的目的在於學會類型轉換,生產上最開始進來的數據大多都是text,json等行式存儲的文件,通常都要轉成ORC,parquet列式存儲的文件,加上壓縮,能把文件大小減少到10%左右,大幅度減少IO和數據處理量,提升性能
此時若是再執行一次save,路徑不變,則會報錯:

scala> users.select("name","favorite_color").write.format("json").save("file:///home/hadoop/tmp/parquet2json/")
org.apache.spark.sql.AnalysisException: path file:/home/hadoop/tmp/parquet2json already exists.;
  at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:109)
  at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:104)
.........................................................

 

 

能夠經過設置savemode來解決這個問題

 

默認是errorifexists

scala> users.select("name","favorite_color").write.format("json").mode("overwrite").save("fil
相關文章
相關標籤/搜索