小白學習Spark系列四:RDD踩坑總結(scala+spark2.1 sql經常使用方法)

  初次嘗試用 Spark+scala 完成項目的重構,因爲二者以前都沒接觸過,因此邊學邊用的過程大多艱難。首先面臨的是如何快速上手,而後是代碼調優、性能調優。本章主要記錄本身在項目中遇到的問題以及解決方式,下篇會嘗試調優方法。末尾會分享本身的學習資料,也供大多菜鳥第一次使用做爲參考。因爲本身項目中大量使用spark sql,因此下面的經驗大可能是和spark sql有關。一樣下面也列出做爲菜鳥在學習過程當中的困惑以及踩的坑,還請大牛勿笑 ~_~ 若是有更好的方式解決,歡迎留言,一塊兒學習。html

一、經常使用場景

(1)場景一:rdd讀取指定行分隔符的數據,不以每行爲單位

例1:配置文件中有n個sql語句,每一個sql以分號----分隔。你須要讀取sql,分別從hdfs中拉取數據。可能會採起:java

//conf_sql_map_file 是sql配置文件
val sql_rdd = sc.textFile(conf_sql_map_file)
var sqls = sql_rdd.collect().mkString(" ").split("----")

分析:因爲rdd以每行爲單位,自動去掉結尾的 換行符。但sql配置文件須要以指定分隔符分隔,而不是每行。因此使用 mkString(" ") 將讀取的每行數據以空格分隔,整合爲一個長字符串,最後以分隔符分隔。node

 

但若是 sql 語句中有使用 with 之類的關鍵詞時,上面那種方式讀取配置文件會由於格式問題會出錯,with語句須要和 select 語句空行分隔,爲保險起見,以 「\n」 分隔,還原配置文件的原始格式。python

var sqls = sql_rdd.collect().mkString("\n").split("----")

 

(2)場景二:讀取文件,以 key-value 形式存儲。

例2:文件file1內容以下es6

key1,value1sql

key2,value2shell

var file_rdd = sc.textFile(file1).map(e=> (e.split(',')(0),e.split(',')(1))).collectAsMap

或者 不從文件讀取,直接使用List類型數據演示apache

scala> var line_rdd = sc.parallelize(List[String]("k,v","key,value")).map(e=>(e.split(',')(0),e.split(',')(1))).collectAsMap
line_rdd: scala.collection.Map[String,String] = Map(k -> v, key -> value)

 分析:collectAsMap 是行動操做的一種,能夠將數據類型轉換爲Map類型,而collect是直接轉爲Array類型。json

 

(3)場景三:從hive表中讀取數據放到array數組中,其中每條數據轉換爲List類型。

scala> import org.apache.spark.{SparkConf, SparkContext}
scala> import org.apache.spark.sql.SparkSession

scala> val conf = new SparkConf().setAppName("graph_spark@zky")
//設置本程序名稱
scala> val hiveCtx: SparkSession = SparkSession.builder.config(conf).enableHiveSupport().getOrCreate()
//使用rdd函數轉換格式 scala> var sql_file_result = hiveCtx.sql("select * from city limit 10").rdd
scala> sql_file_result
res10: org.apache.spark.rdd.RDD[org.apache.spark.sql.Row] = MapPartitionsRDD[1187] at rdd at <console>:29

scala> sql_file_result.first
res11: org.apache.spark.sql.Row = [110000,北京市,110000,1,-911,2015-10-10 12:09:47,-911,2018-01-09 18:27:28,20181001000000]

分析:因爲spark2.0版本丟棄了SQLContext(HiveContext),取而代之的是SparkSession。hdfs拉取的數據格式爲 org.apache.spark.sql.Row,須要調用mkString("\t") 對其轉換爲String類型的rdd ,而後再轉換爲其餘類型。api

但當你的數據以製表符分隔,就像下面代碼裏同樣,末尾字段值若是存在字符串""空時,建議在首尾加上 [ ] 標識符,由於製表符和末尾的空值都會被rdd 自動過濾掉。另外,不建議分隔符使用製表符分隔,在選用分隔符時確保數據中不會出現你指定的分隔符。

scala> var lines = sql_file_result.map(line => "["+line.mkString("\t")+"]")
lines: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[1189] at map at <console>:33

scala> lines.collect
res14: Array[String] = Array([110000	北京市	110000	1	-911	2015-10-10 12:09:47	-911	2018-01-09 18:27:28	20180123000000], [120000	天津市	120000	1	-911	2015-10-10 12:09:47	-911	2018-01-09 18:27:28	20180123000000],。。。

 

解析帶[ ]的字符串轉成list格式,split()函數中的-1是爲確保空值不被過濾。

scala> var items = lines.map(line => line.substring(1,line.length-1).split("\t",-1).toList)
items: org.apache.spark.rdd.RDD[List[String]] = MapPartitionsRDD[1190] at map at <console>:35

scala> items.collect
res15: Array[List[String]] = Array(List(370101, 濟南市, 370000, 1, -911, 1000-01-01 00:00:00, -911, 1000-01-01 00:00:00, 20180916000000), List(110000, 北京市, 110000, 1, -911, 2015-10-10 12:09:47, -911, 2018-01-09 18:27:28, 20180916000000),

  

  (4)場景四:從hive表中讀取的數據存儲爲Map映射。

scala> var mid_data_rdd = hiveCtx.sql("select city_code,city_name from city limit 10").rdd
scala> mid_data_rdd.collect
res16: Array[org.apache.spark.sql.Row] = Array([110000,北京市], [120000,天津市], [130100,石家莊市], [130200,唐山市], [130300,秦皇島市], [130400,邯鄲市], [130500,邢臺市], [130600,保定市], [130700,張家口市], [130800,承德市])

scala> var mid_data_map = mid_data_rdd.map(x => (x(0)->x(1).toString)).collectAsMap
mid_data_map: scala.collection.Map[Any,String] = Map(110000 -> 北京市, 130100 -> 石家莊市, 130300 -> 秦皇島市, 120000 -> 天津市, 130500 -> 邢臺市, 130700 -> 張家口市, 130200 -> 唐山市, 130400 -> 邯鄲市, 130600 -> 保定市, 130800 -> 承德市)

scala> var mid_data_map = mid_data_rdd.map(x => (x(0).toString->x(1).toString)).collectAsMap
mid_data_map: scala.collection.Map[String,String] = Map(130300 -> 秦皇島市, 130600 -> 保定市, 130500 -> 邢臺市, 130800 -> 承德市, 130200 -> 唐山市, 110000 -> 北京市, 130400 -> 邯鄲市, 130700 -> 張家口市, 130100 -> 石家莊市, 120000 -> 天津市)
//若是想轉換爲array數組,試一下collect~
scala> var mid_data_map = mid_data_rdd.map(x => (x(0).toString->x(1).toString)).collect
mid_data_map: Array[(String, String)] = Array((110000,北京市), (120000,天津市), (130100,石家莊市), (130200,唐山市), (130300,秦皇島市), (130400,邯鄲市), (130500,邢臺市), (130600,保定市), (130700,張家口市), (130800,承德市))

分析:能夠關注下 toString函數~

(5)場景五:將數據經過寫入臨時表以存儲到hive表

scala> val people = sc.parallelize(List(("1","mary"),("2","rose"),("3","jack")))
people: org.apache.spark.rdd.RDD[(String, String)] = ParallelCollectionRDD[1] at parallelize at <console>:24
scala> case class Person(id:String,name:String)
defined class Person
scala> var people_trans = people.map(item => Person(item._1,item._2))
people_trans: org.apache.spark.rdd.RDD[Person] = MapPartitionsRDD[2] at map at <console>:28
scala> val people_frame = people_trans.toDF()
people_frame: org.apache.spark.sql.DataFrame = [id: string, name: string]

scala> people_frame.createOrReplaceTempView("person")
scala> import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.SparkSession

scala> import org.apache.spark.SparkConf
import org.apache.spark.SparkConf

scala> val conf = new SparkConf().setAppName("graph_spark@zhengkaiyu")
conf: org.apache.spark.SparkConf = org.apache.spark.SparkConf@534df4b

scala> val hiveCtx: SparkSession = SparkSession.builder.config(conf).enableHiveSupport().getOrCreate()
18/11/19 21:47:11 WARN SparkSession$Builder: Using an existing SparkSession; some configuration may not take effect.
hiveCtx: org.apache.spark.sql.SparkSession = org.apache.spark.sql.SparkSession@18de437d

scala> import hiveCtx.sql
import hiveCtx.sql

scala> import hiveCtx.implicits._
import hiveCtx.implicits._

scala> sql("select * from person").collect
res6: Array[org.apache.spark.sql.Row] = Array([1,mary], [2,rose], [3,jack])   
scala> sql("insert into 庫名.hive表名 select * from person")

  分析:此例是基於case class來建立SchemaRDD,經過寫入臨時表,最後再插入到hive表中。除了這種方式還能夠基於json格式來建臨時表,見下例。其中spark2.1創建臨時表時,將registerTempTable() 改成createOrReplaceTempView(),注意版本,要不會引發沒必要要的麻煩。

scala> import org.apache.spark.sql.SparkSession
scala> import org.apache.spark.SparkConf
scala> val conf = new SparkConf().setAppName("graph_spark@zhengkaiyu")
scala> val spark: SparkSession = SparkSession.builder.config(conf).enableHiveSupport().getOrCreate()
scala> val df = spark.read.json("examples/src/main/resources/people.json")
scala> df.show()
scala> df.createOrReplaceTempView("people")
scala> val sqlDF = spark.sql("SELECT * FROM people")
scala> sqlDF.show()

二、注意事項

(1)當啓動交互環境 spark-shell 時,會出現較爲詭異的事情,剛定義好的變量會被以前的同名變量所覆蓋,猜測緣由多是內存不足致使。

(2)在scala代碼中,建議if-else語句格式規範書寫,不然會編譯不正確。

if(條件){

}

(3)啓動 spark-shell 時,注意指定的模式local、yarn。

 

三、常見錯誤的解決方法

不可序列化:org.apache.spark.SparkException: Task not serializable

解決方案1:繼承java可序列化類

object Process extends java.io.Serializable{
}

參考博客:https://stackoverflow.com/questions/22592811/task-not-serializable-java-io-notserializableexception-when-calling-function-ou

經過從臨時表中讀取數據寫入hive表時,會遇到錯誤:org.apache.spark.SparkException: Dynamic partition strict mode requires at least one static partition column. To turn this off set hive.exec.dynamic.partition.mode=nonstrict

        解決方案:執行下面語句後再執行insert語句。

//val spark: SparkSession = SparkSession.builder.config(conf).enableHiveSupport().getOrCreate()
spark.sql("SET hive.exec.dynamic.partition = true") 
spark.sql("SET hive.exec.dynamic.partition.mode = nonstrict ")
spark.sql("SET hive.exec.max.dynamic.partitions.pernode = 400")

  

 

四、學習資料

《Spark快速大數據分析》王道遠譯,推薦理由:快速上手,實例代碼有python、scala、java三種語言

《快學scala》

https://spark.apache.org/docs/2.1.0/sql-programming-guide.html#datasets-and-dataframes

https://tech.meituan.com/spark_tuning_pro.html

http://dblab.xmu.edu.cn/blog/spark-quick-start-guide/#more-577

spark.sql數據類型:http://spark.apache.org/docs/1.3.1/api/scala/index.html#org.apache.spark.sql.Row

相關文章
相關標籤/搜索