所用軟件版本:java
spark2.3.0mysql
IDEA2019.1sql
kafka_2.11-01.0.2.2數據庫
spark-streaming-kafka-0-10_2.11-2.3.0apache
先貼出代碼:json
package com.bd.spark import java.util.Properties import org.apache.kafka.clients.consumer.ConsumerRecord import org.apache.kafka.common.serialization.StringDeserializer import org.apache.spark.SparkConf import org.apache.spark.sql.{SaveMode, SparkSession} import org.apache.spark.streaming.kafka010.KafkaUtils import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent import org.apache.spark.streaming.StreamingContext import org.apache.spark.streaming.Seconds import org.apache.spark.streaming.dstream.InputDStream object kafkaSparkStreaming_10version { def main(args: Array[String]): Unit = { val conf = new SparkConf().setAppName("kafka-spark-demo").setMaster("local[4]") val ssc = new StreamingContext(conf, Seconds(5)) val kafkaParams = Map[String, Object]( "bootstrap.servers" -> "master:9092,salve1:9092,slave2:9092", "key.deserializer" -> classOf[StringDeserializer], "value.deserializer" -> classOf[StringDeserializer], "group.id" -> "streamingkafka1", "auto.offset.reset" -> "earliest", "enable.auto.commit" -> (false:java.lang.Boolean) ) val topics = Array("streaming_kafka") val stream: InputDStream[ConsumerRecord[String, String]]= KafkaUtils.createDirectStream[String, String]( ssc, PreferConsistent, Subscribe[String, String](topics, kafkaParams) ) val readyData = stream.map(record => record.value()).map(str => str.replace("\"{","{")).map(str => str.replace("}\"", "}")).map(str => str.replace("\\\"", "\"")) readyData.print() readyData.foreachRDD(rdd => if(!rdd.isEmpty()){ val session = SparkSession.builder().config(rdd.sparkContext.getConf).getOrCreate() val url = "jdbc:mysql://192.168.0.107:3306/kuaishou?useSSL=false&autoReconnect=true&failOverReadOnly=false&rewriteBatchedStatements=true" val driver = "com.mysql.jdbc.Driver" val user = "root" val password = "password" var table = "highViewVideo" val prop = new Properties() prop.setProperty("user", user) prop.setProperty("password", password) prop.setProperty("driver", driver) prop.setProperty("url",url) import session.implicits._ val DF = session.read.json(session.createDataset(rdd)) DF.filter("view_count > 500000").show(5) DF.filter("view_count > 500000").write.mode(SaveMode.Append).jdbc(url,"highViewVideo",prop) }else{ println("=============================Wait A Moment!!!=========================================")}) ssc.start() ssc.awaitTermination() } }
爬蟲所獲得的原始數據以下圖所示:bootstrap
原始數據是一條一條JSON格式數據,兩個JSON數據之間是用換行符「\n」隔開,是多行的JSON,用kafka自帶的connect-file-source監聽該文件,利用上述SparkStreamin程序進行消費,獲得的數據導入到MySQL的「kuaishou」’數據庫「highViewVideo」表中, 表的格式以下所示session
可是DF.filter("view_count > 500000").write.mode(SaveMode.Append).jdbc(url,"highViewVideo",prop)用Append模式導入數據,一直提示以下錯誤:Column 「_corrupt_record」 not found in schema Some(Structype...)錯誤,去網上查詢,報_corrupt_record錯誤,通常都是讀取spark讀取json文件,因爲json文件格式不正確引發。ide
參考http://www.javashuo.com/article/p-rapgtdjl-cm.html和https://blog.csdn.net/qq0719/article/details/86003435兩篇文章,採起以下方法:ui
1:在讀取json文件時,指定讀取多行爲真,option("multiLine", true) 。
2:應該將文件內容"壓平"成爲平面文件,map(str => str.replace("\n", " "))轉換成以下格式:
{"staffList" : {"total" : 3,"result" : [ { "toco" : 41, "id" : 1, "name" : "張三", "typeJoin" : 22, "type" : 2}, { "toco" : 46, "id" : 2, "name" : "李四", "typeJoin" : 22, "type" : 2}, { "toco" : 42, "id" : 3, "name" : "王五", "typeJoin" : 22 ], "type" : 2} ]}}
根據上述兩種方法進行嘗試,問題依舊,仍是原來的錯誤!
可是Append若是改爲Overwrite,數據會順利寫到MySQL中,不會出現上述的報錯!用google查了全網,依舊沒解決!實在沒辦法,在「highViewVideo」表中添加了「_corrupt_record」字段,報錯沒有再出現,後續還報Column 「name」 not found in schema Some(Structype...)錯誤,在「highViewVideo」表中添加了「name」字段,問題解決!
問題解決了,可是百思不得其解:爲何Append模式會報錯,Overwrite卻不會,並且「_corrupt_record」和「name」兩列是我根本沒有用到的字段,從哪裏冒出來?