A Resilient Distributed Dataset (RDD), the basic abstraction in Spark. Represents an immutable, partitioned collection of elements that can be operated on in parallel. This class contains the basic operations available on all RDDs, such as map
, filter
, and persist
. In addition,[[org.apache.spark.rdd.PairRDDFunctions]] contains operations available only on RDDs of key-value pairs, such as groupByKey
and join
;[[org.apache.spark.rdd.DoubleRDDFunctions]] contains operations available only on RDDs of Doubles;[[org.apache.spark.rdd.SequenceFileRDDFunctions]] contains operations available on RDDs that can be saved as SequenceFiles.
All operations are automatically available on any RDD of the right type (e.g. RDD[(Int, Int)]) through implicit.
//任何正確類型的RDD 都可經過隱式轉換自動得到全部操做。
Internally, each RDD is characterized by five main properties:mysql
Optionally, a list of preferred locations to compute each split on (e.g. block locations for an HDFS file)sql
All of the scheduling and execution in Spark is done based on these methods, allowing each RDD to implement its own way of computing itself. Indeed, users can implement custom RDDs (e.g. for reading data from a new storage system) by overriding these functions. 數據庫
1.若是想往hdfs中寫入一個文件,可是若是這個文件已經存在,則會報錯。
2.觸發Action時纔會造成一個完整的DAG,而後須要提交到集羣上執行。
3.任務在提交到集羣以前,須要作一些準備工做,這些準備工做都是在Driver端進行的。準備工做包括:apache
4.RDD之間存在着依賴關係,依賴關係有兩種網絡
6.廣播變量併發
7.若將數據collect到Driver端,再寫到MySQL中,不是最佳選擇,理由以下:
01,浪費網絡帶寬,
02,不可以併發地寫到數據庫
8.result.foreach(data =>{
//不能每寫一條,就建立一個JDBC鏈接,致使消耗很大資源
//val conn : Connection = DriverManager.getConnection(「……」)
})
9.正確的操做應該是,一次拿出來一個分區,分區用迭代器引用。即每寫一個分區,纔會建立一個鏈接,這樣會大大減小資源的消耗。
result.foreachPartition(part=>{
dataToMysql(part)
})ide
def dataToMysql(part:Iterator[(String,Int)]={
val conn : Connection = DriverManager.getConnection(「jdbc:mysql://localhost:3306/……charactorEncoding=utf8」,」root」,」「)//再傳入用戶名,密碼
val prepareStatement = conn.prepareStatement(「Insert into access_log (province,counts)
values (?,?)」)函數
//寫入數據 part.foreach(data =>{ prepareStatement.setString(1,data._1) prepareStatement.setString(2,data._2) prepareStatement.executeUpdate() }) prepareStatement.close() conn.close()
}學習