DStream轉爲DF的兩種方式(突破map時元組22的限制)

在進行Spark Streaming的開發時,咱們經常須要將DStream轉爲DataFrame來進行進一步的處理,
共有兩種方式,方式一:app

val spark = SparkSession.builder()
  .appName("Test")
  .getOrCreate()
import spark.implicits._
dStream.foreachRDD{ rdd =>
  val df = rdd.map(_.split(" "))
    .map(t => (t(1),t(2),t(3)))
    .toDF("col1","col2","col3")
  // 業務邏輯
}

利用map算子和tuple來完成,通常的場景下采用這種方式便可。ui

可是有的時候咱們會遇到列數大於22的狀況,這個時候會受到scala的tuple數不能超過22的影響。這時能夠採用方式二:spa

val spark = SparkSession.builder()
  .appName("Test")
  .getOrCreate()
dStream.foreachRDD{ rdd =>
  val res:RDD[Row] = rdd.map{ row =>
    val buffer = ArrayBuffer.empty[Any]
    val fields: Array[String] = row.split("\\|~\\|")
    buffer.append(fields(0))
    buffer.append(fields(1))
    buffer.append(fields(2))
    // 省略
    buffer.append(fields(25))
    Row.fromSeq(buffer)
  } 
  val schema = StructType(Seq(
    StructField("col1", StringType, false),
    StructField("col2", StringType, false),
    StructField("col3", StringType, false),
    // 省略
    StructField("col26", StringType, false)
  ))
  val df: DataFrame = spark.createDataFrame(result, schema)
  // 業務邏輯
}
相關文章
相關標籤/搜索