private def CTRL_A = '\001' private def CTRL_B = '\002' private def CTRL_C = '\003' def main(args: Array[String]): Unit = { val resourcePath = this.getClass.getResource("/resource.txt").getFile val sourcePath = this.getClass.getResource("/*.gz").getFile val output = "/home/dev/output" val conf = new SparkConf().setAppName("user test").setMaster("local") val sc = new SparkContext(conf) val sqlContext = new SQLContext(sc) sqlContext.setConf("spark.sql.parquet.binaryAsString", "true") sqlContext.setConf("spark.sql.inMemoryColumnarStorage.compressed", "true") sqlContext.setConf("spark.sql.parquet.compression.codec", "snappy") val map: Map[String, String] = buildResource(resourcePath) val schema = buildSchema(map) val bd = sc.broadcast(map) val bdSchema = sc.broadcast(schema) val start=System.currentTimeMillis() val rdd = sc.textFile(sourcePath) .map(line => { val map = buildUser(line, bd.value) buildRow(map._3, map._1, map._2) }) // rdd.foreach(_=>()) // sqlContext.createDataFrame(rdd, bdSchema.value).write.mode(SaveMode.Overwrite).json(output) sqlContext.createDataFrame(rdd, bdSchema.value).write.mode(SaveMode.Overwrite).parquet(output) val end = System.currentTimeMillis() System.out.print(end - start) } /** * 讀取資源文件 * @param file * @return */ def buildResource(file: String): Map[String, String] = { val reader = Source.fromFile(file) val map = new mutable.HashMap[String, String]() for (line <- reader.getLines() if !Strings.isNullOrEmpty(line)) { val arr = StringUtils.splitPreserveAllTokens(line, '\t') map.+=((arr(0), "0")) } map.toMap } /** * 生成用戶屬性 * @param line * @param map * @return */ def buildUser(line: String, map: Map[String, String]): (String, Int, Map[String, String]) = { if (Strings.isNullOrEmpty(line)) { return ("", 0, Map.empty) } val array = StringUtils.splitPreserveAllTokens(line, CTRL_A) val cookie = if (Strings.isNullOrEmpty(array(0))) "-" else array(0) val platform = array(1).toInt val base = buildFeature(array(2)) val interest = buildFeature(array(3)) val buy = buildFeature(array(4)) val features = base ++ interest ++ buy val result = new mutable.HashMap[String, String]() for (pair <- map) { val value = if (features.contains(pair._1)) "1" else "0" result.+=((pair._1, value)) } (cookie, platform, result.toMap) } /** * 抽取用戶標籤 * @param expr * @return */ def buildFeature(expr: String): Array[String] = { if (Strings.isNullOrEmpty(expr)) { return Array.empty } val arr = StringUtils.splitPreserveAllTokens(expr, CTRL_B) val buffer = new ArrayBuffer[String]() for (key <- arr) { val pair = StringUtils.splitPreserveAllTokens(key, CTRL_C) buffer += (s"_${pair(0)}") } buffer.toArray } /** * 動態生成DataFrame的Schema * @param map * @return */ def buildSchema(map: Map[String, String]): StructType = { val buffer = new ArrayBuffer[StructField]() buffer += (StructField("user", StringType, false)) buffer += (StructField("platform", IntegerType, false)) for (pair <- map) { buffer += (StructField(s"_${pair._1}", IntegerType, true)) } return StructType(List(buffer: _*)) } /** * 將用戶屬性構形成Spark SQL的Row * @param map * @param user * @param platform * @return */ def buildRow(map: Map[String, String], user: String, platform: Int): Row = { val buffer = new ArrayBuffer[Any]() buffer += (user) buffer += (platform) for (pair <- map) { buffer += (pair._2.toInt) } return Row(buffer: _*) }
在初版中初步懷疑是DataFrame在生成parquet時進行了一些特殊邏輯的處理,因此決定本身實現ParquetWriter方法來測試下性能,採用了avro來向parquet中寫入數據。方法大概包含定義好avro資源文件,而後使用AvroParquetWriter類來向parquet中寫入內容,具體的寫入方法相似於https://blog.csdn.net/gg584741/article/details/51614752。經過這種方式來寫入parquet,相同數據量的狀況下,性能提高了一倍多。至於爲何性能有這麼大的提高,有待後續研究。到此優化就告一段落了。html
val Schema = (new Schema.Parser()).parse(new File(file))
來動態生成Schema來供後續AvroParquetWriter使用。java