spark生成大寬表的parquet性能優化

1.  背景介紹

  將一份數據量很大的用戶屬性文件解析成結構化的數據供查詢框架查詢剖析,其中用戶屬性包含用戶標識,平臺類型,性別,年齡,學歷,興趣愛好,購物傾向等等,大概共有七百個左右的標籤屬性。爲了查詢框架可以快速查詢出有特定標籤的人羣,將最終的存儲結果定義爲了將七百個左右的標籤屬性展平存儲爲parquet文件,這樣每一個標籤屬性對於用戶而言只有存在和不存在兩種狀況。

2. 初版實現過程

   第一步,將用戶全部標籤標識做爲一個資源文件保存到spark中,並讀取該資源文件的標籤標識爲一個標籤集合(定義爲listAll),並經過sparkContext來進行廣播;
     第二步,使用spark core讀取hdfs上的用戶屬性文件(其中每行是一個用戶所擁有的標籤),將單個用戶所擁有的標籤解析成一個標籤集合(定義爲listUser),也就是說listUser是listAll的一個子集;
   第三步,對於單個用戶而言,遍歷步驟一的結果集listAll,對於每個標籤判斷該用戶是否存在,若是存在則將標籤設置爲1(表示存在),不然設置爲0(表示不存在),並將全部標籤及相應的值保存爲一個Map(定義爲map)
   第四步,根據第三步的map構形成spark sql中的Row
   第五步,依據第一步的集合listAll構造出spark sql的Schema
   第六步,將第四步和第五步的結果經過spark sql的createDataFrame構形成DataFrame。
   第七步,經過DataFrame.write.parquet(output)將結果保存到hdfs上
     經過上述的七步,認爲已經很easy的處理完了這個需求,可是真正測試時發現性能比想象的要慢的多,嚴重的達不到性能要求。對於性能影響究竟出如今什麼地方?初步猜想,問題出如今第四步,第六步,第七步的可能性比較大。 通過實際的測試,發現性能主要消耗在第七步,其餘步驟的執行都特別快。這樣也就定位到了問題
   並且經過測試知道,生成parquet消耗的性能最高,生成json的話很快就能完成,若是不生成任何對象,而是直接foreach執行的話,性能會更高。並且相同數據量下,若是列數在七百多個時,json寫入時間是parquet寫入時間的三分之一,若是列數在四百個時,json寫入時間是parquet寫入時間的二分之一,若是列數在五十個,json寫入時間是parquet寫入時間的三分之二。也就是列數越少,json和parquet的寫入速度越接近。至於爲何生成parquet性能不好,待後續分析spark sql的save方法。
  測試的例子
  private def CTRL_A = '\001'

  private def CTRL_B = '\002'

  private def CTRL_C = '\003'

  def main(args: Array[String]): Unit = {

    val resourcePath = this.getClass.getResource("/resource.txt").getFile
    val sourcePath = this.getClass.getResource("/*.gz").getFile
    val output = "/home/dev/output"

    val conf = new SparkConf().setAppName("user test").setMaster("local")
    val sc = new SparkContext(conf)
    val sqlContext = new SQLContext(sc)
    sqlContext.setConf("spark.sql.parquet.binaryAsString", "true")
    sqlContext.setConf("spark.sql.inMemoryColumnarStorage.compressed", "true")
    sqlContext.setConf("spark.sql.parquet.compression.codec", "snappy")
    val map: Map[String, String] = buildResource(resourcePath)
    val schema = buildSchema(map)
    val bd = sc.broadcast(map)
    val bdSchema = sc.broadcast(schema)

    val start=System.currentTimeMillis()
    val rdd = sc.textFile(sourcePath)
      .map(line => {
        val map = buildUser(line, bd.value)
        buildRow(map._3, map._1, map._2)
      })
//    rdd.foreach(_=>())
//    sqlContext.createDataFrame(rdd, bdSchema.value).write.mode(SaveMode.Overwrite).json(output)
    sqlContext.createDataFrame(rdd, bdSchema.value).write.mode(SaveMode.Overwrite).parquet(output)
    val end = System.currentTimeMillis()
    System.out.print(end - start)
  }

  /**
    * 讀取資源文件
    * @param file
    * @return
    */
  def buildResource(file: String): Map[String, String] = {
    val reader = Source.fromFile(file)
    val map = new mutable.HashMap[String, String]()
    for (line <- reader.getLines() if !Strings.isNullOrEmpty(line)) {
        val arr = StringUtils.splitPreserveAllTokens(line, '\t')
        map.+=((arr(0), "0"))
    }

    map.toMap
  }

  /**
    * 生成用戶屬性
    * @param line
    * @param map
    * @return
    */
  def buildUser(line: String, map: Map[String, String]): (String, Int, Map[String, String]) = {
    if (Strings.isNullOrEmpty(line)) {
      return ("", 0, Map.empty)
    }
    val array = StringUtils.splitPreserveAllTokens(line, CTRL_A)
    val cookie = if (Strings.isNullOrEmpty(array(0))) "-" else array(0)
    val platform = array(1).toInt
    val base = buildFeature(array(2))
    val interest = buildFeature(array(3))
    val buy = buildFeature(array(4))
    val features = base ++ interest ++ buy
    val result = new mutable.HashMap[String, String]()
    for (pair <- map) {
      val value = if (features.contains(pair._1)) "1" else "0"
      result.+=((pair._1, value))
    }
    (cookie, platform, result.toMap)
  }

  /**
    * 抽取用戶標籤
    * @param expr
    * @return
    */
  def buildFeature(expr: String): Array[String] = {
    if (Strings.isNullOrEmpty(expr)) {
      return Array.empty
    }
    val arr = StringUtils.splitPreserveAllTokens(expr, CTRL_B)
    val buffer = new ArrayBuffer[String]()
    for (key <- arr) {
      val pair = StringUtils.splitPreserveAllTokens(key, CTRL_C)
      buffer += (s"_${pair(0)}")
    }
    buffer.toArray
  }

  /**
    * 動態生成DataFrame的Schema
    * @param map
    * @return
    */
  def buildSchema(map: Map[String, String]): StructType = {
    val buffer = new ArrayBuffer[StructField]()
    buffer += (StructField("user", StringType, false))
    buffer += (StructField("platform", IntegerType, false))
    for (pair <- map) {
      buffer += (StructField(s"_${pair._1}", IntegerType, true))
    }
    return StructType(List(buffer: _*))
  }

  /**
    * 將用戶屬性構形成Spark SQL的Row
    * @param map
    * @param user
    * @param platform
    * @return
    */
  def buildRow(map: Map[String, String], user: String, platform: Int): Row = {
    val buffer = new ArrayBuffer[Any]()
    buffer += (user)
    buffer += (platform)
    for (pair <- map) {
      buffer += (pair._2.toInt)
    }
    return Row(buffer: _*)
  }

3. 第二版實現過程

  在初版中初步懷疑是DataFrame在生成parquet時進行了一些特殊邏輯的處理,因此決定本身實現ParquetWriter方法來測試下性能,採用了avro來向parquet中寫入數據。方法大概包含定義好avro資源文件,而後使用AvroParquetWriter類來向parquet中寫入內容,具體的寫入方法相似於https://blog.csdn.net/gg584741/article/details/51614752。經過這種方式來寫入parquet,相同數據量的狀況下,性能提高了一倍多。至於爲何性能有這麼大的提高,有待後續研究。到此優化就告一段落了。html

   在此優化期間,遇到了下列問題:
  1.  avro 的資源文件在生成java類時,屬性限制必須255個一下。該限制在https://issues.apache.org/jira/browse/AVRO-1642 提到。
      2.  java 類屬性和方法參數也須要小於255個,詳見https://docs.oracle.com/javase/specs/jvms/se7/html/jvms-4.html#jvms-4.11,https://stackoverflow.com/questions/30581531/maximum-number-of-parameters-in-java-method-declaration
 
     對於上述顯示的解決方案是在maven配置文件中不適用avro-maven-plugin插件來自動生成java類,而是在程序運行時經過
val Schema = (new Schema.Parser()).parse(new File(file))

來動態生成Schema來供後續AvroParquetWriter使用。java

相關文章
相關標籤/搜索