在進行Spark Streaming的開發時,咱們經常須要將DStream轉爲DataFrame來進行進一步的處理,
共有兩種方式,方式一:app
val spark = SparkSession.builder() .appName("Test") .getOrCreate() import spark.implicits._ dStream.foreachRDD{ rdd => val df = rdd.map(_.split(" ")) .map(t => (t(1),t(2),t(3))) .toDF("col1","col2","col3") // 業務邏輯 }
利用map算子和tuple來完成,通常的場景下采用這種方式便可。ui
可是有的時候咱們會遇到列數大於22的狀況,這個時候會受到scala的tuple數不能超過22的影響。這時能夠採用方式二:spa
val spark = SparkSession.builder() .appName("Test") .getOrCreate() dStream.foreachRDD{ rdd => val res:RDD[Row] = rdd.map{ row => val buffer = ArrayBuffer.empty[Any] val fields: Array[String] = row.split("\\|~\\|") buffer.append(fields(0)) buffer.append(fields(1)) buffer.append(fields(2)) // 省略 buffer.append(fields(25)) Row.fromSeq(buffer) } val schema = StructType(Seq( StructField("col1", StringType, false), StructField("col2", StringType, false), StructField("col3", StringType, false), // 省略 StructField("col26", StringType, false) )) val df: DataFrame = spark.createDataFrame(result, schema) // 業務邏輯 }