Spark自定義外部數據源

背景:有時候咱們須要定義一個外部數據源,而後用spark sql的方式來處理。這樣的好處有2點:sql

(1)定義了外部數據源後,用起來很簡潔,軟件架構清晰,經過sql方式直接使用。apache

(2)容易分層分模塊,一層層往上搭建,容易屏蔽實現細節。架構

這時候就須要用到定義外部數據源的方式,spark中使用起來也很簡單的,所謂會者不難。app

首先指定個package名,全部的類在統一的package下。好比com.example.hou。ide

而後定義兩個東西,一個是DefaultSource,一個是BaseRelation with TableScan的子類。ui

DefaultSource的代碼很簡單,直接看代碼不解釋:this

package com.example.hou

import org.apache.spark.sql.{DataFrame, SQLContext, SaveMode}
import org.apache.spark.sql.sources.{BaseRelation, CreatableRelationProvider, SchemaRelationProvider}
import org.apache.spark.sql.types.StructType


class DefaultSource extends CreatableRelationProvider with SchemaRelationProvider{

  def createRelation(
                      sqlContext: SQLContext,
                      parameters: Map[String, String],
    schema: StructType): BaseRelation = {
    val path = parameters.get("path")

    path match {
      case Some(x) => new TextDataSourceRelation(sqlContext,x,schema)
      case _ => throw new IllegalArgumentException("path is required...")
    }
  }

  override def createRelation(sqlContext: SQLContext, mode: SaveMode, parameters: Map[String, String], data: DataFrame): BaseRelation = {

    createRelation(sqlContext,parameters,null)

  }
  
}

TextDataSourceRelation的源碼:spa

package com.example.hou

import org.apache.spark.sql.types.LongType
import org.apache.spark.sql.sources.BaseRelation
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.sources.TableScan
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.types.StructField
import org.apache.spark.sql.types.StringType
import org.apache.spark.internal.Logging
import org.apache.spark.sql.Row

class TextDataSourceRelation (override val sqlContext: SQLContext,path:String,userSchema: StructType) extends BaseRelation with TableScan with Logging{
  //若是傳進來的schema不爲空,就用傳進來的schema,不然就用自定義的schema
  override def schema: StructType = {
    if(userSchema != null){
      userSchema
    }else{
      StructType(
        StructField("id",LongType,false) ::
          StructField("name",StringType,false) ::
          StructField("gender",StringType,false) ::
          StructField("salary",LongType,false) ::
          StructField("comm",LongType,false) :: Nil
      )
    }
  }

  //把數據讀進來,讀進來以後把它轉換成 RDD[Row]
  override def buildScan(): RDD[Row] = {
    logWarning("this is ruozedata buildScan....")
    //讀取數據,變成爲RDD
    //wholeTextFiles會把文件名讀進來,能夠經過map(_._2)把文件名去掉,第一位是文件名,第二位是內容
    val rdd = sqlContext.sparkContext.wholeTextFiles(path).map(_._2)
    //拿到schema
    val schemaField = schema.fields

    //rdd.collect().foreach(println)

    //rdd + schemaField 把rdd和schemaField解析出來拼起來
    val rows = rdd.map(fileContent => {
      //拿到每一行的數據
      val lines = fileContent.split("\n")
      //每一行數據按照逗號分隔,分隔以後去空格,而後轉成一個seq集合
      val data = lines.filter(line=>{!line.trim().contains("//")}).map(_.split(",").map(_.trim)).toSeq

      //zipWithIndex
      val result = data.map(x => x.zipWithIndex.map {
        case (value, index) => {

          val columnName = schemaField(index).name
          //castTo裏面有兩個參數,第一個參數須要給個判斷,若是是字段是性別,裏面再進行判斷再轉換一下,若是不是性別就直接用這個字段
          Utils.castTo(if(columnName.equalsIgnoreCase("gender")){
            if(value == "0"){
              "man"
            }else if(value == "1"){
              "woman"
            } else{
              "unknown"
            }
          }else{
            value
          },schemaField(index).dataType)

        }
      })

      result.map(x => Row.fromSeq(x))
    })

    rows.flatMap(x => x)

  }

}

最後一句就是在Main方法中使用:code

package com.example.hou

import org.apache.spark.sql.SparkSession
import org.apache.spark.SparkConf
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.types.LongType
import org.apache.spark.sql.types.StructField
import org.apache.spark.sql.types.StringType

object TestApp {
  def main(args: Array[String]): Unit = {
    val spark = SparkSession.builder()
      .appName("TextApp")
      .master("local[2]")
      .getOrCreate()

      //定義Schema
    val schema = StructType(
      StructField("id", LongType, false) ::
        StructField("name", StringType, false) ::
        StructField("gender", StringType, false) ::
        StructField("salary", LongType, false) ::
        StructField("comm", LongType, false) :: Nil)

    //只要寫到包名就能夠了...example.hou,不用這樣寫...example.hou.DefaultSource
    val df = spark.read.format("com.example.hou")
      .option("path", "C://code//data.txt").schema(schema).load()

    df.show()
    df.createOrReplaceTempView("test")
    spark.sql("select name,salary from test").show()

    println("Application Ended...")
    spark.stop()
  }

}
相關文章
相關標籤/搜索