解決sparkstreaming讀取kafka中的json數據,消費後保存到MySQL中,報_corrupt_record和name錯誤的!!

所用軟件版本:java

spark2.3.0mysql

IDEA2019.1sql

kafka_2.11-01.0.2.2數據庫

spark-streaming-kafka-0-10_2.11-2.3.0apache

先貼出代碼:json

package com.bd.spark

import java.util.Properties

import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.SparkConf
import org.apache.spark.sql.{SaveMode, SparkSession}
import org.apache.spark.streaming.kafka010.KafkaUtils
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.Seconds
import org.apache.spark.streaming.dstream.InputDStream



object kafkaSparkStreaming_10version {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("kafka-spark-demo").setMaster("local[4]")
    val ssc = new StreamingContext(conf, Seconds(5))

    val kafkaParams = Map[String, Object](
      "bootstrap.servers" -> "master:9092,salve1:9092,slave2:9092",
      "key.deserializer" -> classOf[StringDeserializer],
      "value.deserializer" -> classOf[StringDeserializer],
      "group.id" -> "streamingkafka1",
      "auto.offset.reset" -> "earliest",
      "enable.auto.commit" -> (false:java.lang.Boolean)
    )

    val topics = Array("streaming_kafka")
    val stream: InputDStream[ConsumerRecord[String, String]]= KafkaUtils.createDirectStream[String, String](
      ssc,
      PreferConsistent,
      Subscribe[String, String](topics, kafkaParams)
    )

    val readyData = stream.map(record => record.value()).map(str => str.replace("\"{","{")).map(str => str.replace("}\"", "}")).map(str => str.replace("\\\"", "\""))
    readyData.print()

    readyData.foreachRDD(rdd => if(!rdd.isEmpty()){
      val session = SparkSession.builder().config(rdd.sparkContext.getConf).getOrCreate()
      val url = "jdbc:mysql://192.168.0.107:3306/kuaishou?useSSL=false&autoReconnect=true&failOverReadOnly=false&rewriteBatchedStatements=true"
      val driver = "com.mysql.jdbc.Driver"
      val user = "root"
      val password = "password"
      var table = "highViewVideo"
      val prop = new Properties()
      prop.setProperty("user", user)
      prop.setProperty("password", password)
      prop.setProperty("driver", driver)
      prop.setProperty("url",url)
      import session.implicits._
      val DF = session.read.json(session.createDataset(rdd))
      DF.filter("view_count > 500000").show(5)

      DF.filter("view_count > 500000").write.mode(SaveMode.Append).jdbc(url,"highViewVideo",prop)
    }else{ println("=============================Wait A Moment!!!=========================================")})

    ssc.start()
    ssc.awaitTermination()
  }
}

爬蟲所獲得的原始數據以下圖所示:bootstrap

原始數據是一條一條JSON格式數據,兩個JSON數據之間是用換行符「\n」隔開,是多行的JSON,用kafka自帶的connect-file-source監聽該文件,利用上述SparkStreamin程序進行消費,獲得的數據導入到MySQL的「kuaishou」’數據庫「highViewVideo」表中, 表的格式以下所示session

 

可是DF.filter("view_count > 500000").write.mode(SaveMode.Append).jdbc(url,"highViewVideo",prop)用Append模式導入數據,一直提示以下錯誤:Column 「_corrupt_record」 not found in schema Some(Structype...)錯誤,去網上查詢,報_corrupt_record錯誤,通常都是讀取spark讀取json文件,因爲json文件格式不正確引發。ide

參考http://www.javashuo.com/article/p-rapgtdjl-cm.htmlhttps://blog.csdn.net/qq0719/article/details/86003435兩篇文章,採起以下方法:ui

1:在讀取json文件時,指定讀取多行爲真,option("multiLine", true) 。

2:應該將文件內容"壓平"成爲平面文件,map(str => str.replace("\n", " "))轉換成以下格式:

{"staffList" : {"total" : 3,"result" : [ {  "toco" : 41,  "id" : 1,  "name" : "張三",  "typeJoin" : 22,  "type" : 2}, {  "toco" : 46,  "id" : 2,  "name" : "李四",  "typeJoin" : 22,  "type" : 2}, {  "toco" : 42,  "id" : 3,  "name" : "王五",  "typeJoin" : 22 ],  "type" : 2} ]}}

根據上述兩種方法進行嘗試,問題依舊,仍是原來的錯誤!

可是Append若是改爲Overwrite,數據會順利寫到MySQL中,不會出現上述的報錯!用google查了全網,依舊沒解決!實在沒辦法,在「highViewVideo」表中添加了「_corrupt_record」字段,報錯沒有再出現,後續還報Column 「name」 not found in schema Some(Structype...)錯誤,在「highViewVideo」表中添加了「name」字段,問題解決!

問題解決了,可是百思不得其解:爲何Append模式會報錯,Overwrite卻不會,並且「_corrupt_record」和「name」兩列是我根本沒有用到的字段,從哪裏冒出來?

 

作好記錄,供之後參考!!!