背景:有時候咱們須要定義一個外部數據源,而後用spark sql的方式來處理。這樣的好處有2點:sql
(1)定義了外部數據源後,用起來很簡潔,軟件架構清晰,經過sql方式直接使用。apache
(2)容易分層分模塊,一層層往上搭建,容易屏蔽實現細節。架構
這時候就須要用到定義外部數據源的方式,spark中使用起來也很簡單的,所謂會者不難。app
首先指定個package名,全部的類在統一的package下。好比com.example.hou。ide
而後定義兩個東西,一個是DefaultSource,一個是BaseRelation with TableScan的子類。ui
DefaultSource的代碼很簡單,直接看代碼不解釋:this
package com.example.hou import org.apache.spark.sql.{DataFrame, SQLContext, SaveMode} import org.apache.spark.sql.sources.{BaseRelation, CreatableRelationProvider, SchemaRelationProvider} import org.apache.spark.sql.types.StructType class DefaultSource extends CreatableRelationProvider with SchemaRelationProvider{ def createRelation( sqlContext: SQLContext, parameters: Map[String, String], schema: StructType): BaseRelation = { val path = parameters.get("path") path match { case Some(x) => new TextDataSourceRelation(sqlContext,x,schema) case _ => throw new IllegalArgumentException("path is required...") } } override def createRelation(sqlContext: SQLContext, mode: SaveMode, parameters: Map[String, String], data: DataFrame): BaseRelation = { createRelation(sqlContext,parameters,null) } }
TextDataSourceRelation的源碼:spa
package com.example.hou import org.apache.spark.sql.types.LongType import org.apache.spark.sql.sources.BaseRelation import org.apache.spark.rdd.RDD import org.apache.spark.sql.sources.TableScan import org.apache.spark.sql.SQLContext import org.apache.spark.sql.types.StructType import org.apache.spark.sql.types.StructField import org.apache.spark.sql.types.StringType import org.apache.spark.internal.Logging import org.apache.spark.sql.Row class TextDataSourceRelation (override val sqlContext: SQLContext,path:String,userSchema: StructType) extends BaseRelation with TableScan with Logging{ //若是傳進來的schema不爲空,就用傳進來的schema,不然就用自定義的schema override def schema: StructType = { if(userSchema != null){ userSchema }else{ StructType( StructField("id",LongType,false) :: StructField("name",StringType,false) :: StructField("gender",StringType,false) :: StructField("salary",LongType,false) :: StructField("comm",LongType,false) :: Nil ) } } //把數據讀進來,讀進來以後把它轉換成 RDD[Row] override def buildScan(): RDD[Row] = { logWarning("this is ruozedata buildScan....") //讀取數據,變成爲RDD //wholeTextFiles會把文件名讀進來,能夠經過map(_._2)把文件名去掉,第一位是文件名,第二位是內容 val rdd = sqlContext.sparkContext.wholeTextFiles(path).map(_._2) //拿到schema val schemaField = schema.fields //rdd.collect().foreach(println) //rdd + schemaField 把rdd和schemaField解析出來拼起來 val rows = rdd.map(fileContent => { //拿到每一行的數據 val lines = fileContent.split("\n") //每一行數據按照逗號分隔,分隔以後去空格,而後轉成一個seq集合 val data = lines.filter(line=>{!line.trim().contains("//")}).map(_.split(",").map(_.trim)).toSeq //zipWithIndex val result = data.map(x => x.zipWithIndex.map { case (value, index) => { val columnName = schemaField(index).name //castTo裏面有兩個參數,第一個參數須要給個判斷,若是是字段是性別,裏面再進行判斷再轉換一下,若是不是性別就直接用這個字段 Utils.castTo(if(columnName.equalsIgnoreCase("gender")){ if(value == "0"){ "man" }else if(value == "1"){ "woman" } else{ "unknown" } }else{ value },schemaField(index).dataType) } }) result.map(x => Row.fromSeq(x)) }) rows.flatMap(x => x) } }
最後一句就是在Main方法中使用:code
package com.example.hou import org.apache.spark.sql.SparkSession import org.apache.spark.SparkConf import org.apache.spark.sql.types.StructType import org.apache.spark.sql.types.LongType import org.apache.spark.sql.types.StructField import org.apache.spark.sql.types.StringType object TestApp { def main(args: Array[String]): Unit = { val spark = SparkSession.builder() .appName("TextApp") .master("local[2]") .getOrCreate() //定義Schema val schema = StructType( StructField("id", LongType, false) :: StructField("name", StringType, false) :: StructField("gender", StringType, false) :: StructField("salary", LongType, false) :: StructField("comm", LongType, false) :: Nil) //只要寫到包名就能夠了...example.hou,不用這樣寫...example.hou.DefaultSource val df = spark.read.format("com.example.hou") .option("path", "C://code//data.txt").schema(schema).load() df.show() df.createOrReplaceTempView("test") spark.sql("select name,salary from test").show() println("Application Ended...") spark.stop() } }