Sparkhtml
Author: Lijbjava
Email: lijb1121@163.comnode
Apache Spark 是專爲大規模數據處理而設計的快速通用的計算引擎。Spark是UC Berkeley AMP lab (加州大學伯 克利分校的AMP實驗室)所開源的類Hadoop MapReduce的通用並行框架。,Spark擁有Hadoop MapReduce所具 有的優勢;但不一樣於MapReduce的是Job中間輸出結果能夠保存在內存中,從而再也不須要讀寫HDFS,所以Spark能 更好地適用於數據挖掘與機器學習等須要迭代的MapReduce的算法。 Spark 啓用了內存分佈數據集(Resilient distribute data set),除了可以提供交互式查詢外,它還能夠優化迭代 工做負載。Spark 是在 Scala 語言中實現的,它將 Scala 用做其應用程序框架。與 Hadoop 不一樣,Spark 和 Scala 可以緊密集成,其中的 Scala 能夠像操做本地集合對象同樣輕鬆地操做分佈式數據集。Apache Spark是專爲大規 模數據處理而設計的快速通用的計算引擎 。如今造成一個高速發展應用普遍的生態系統 。mysql
Spark&Hadoop的關係linux
Spark計算是對Hadoop傳統MapReduce計算的升級和優化,能夠理解若是沒有hadoop大數據的實戰和演變就沒 有spark計算,通俗的理解爲Hadoop的MapReduce相似於解決了大數據的物質文明需求,還處於初始階段人們對 大數據計算的性能的要求,也就是說能保證在合理的時間範圍內達到對數據的初步計算;而Spark的誕生Spark是 在考慮速度和效率,所以在在這個層面上Spark算是解決精神文明層面的問題。算法
hadoop:基於磁盤迭代計算,在作n次迭代過程當中,由於全部的結果都是存儲在磁盤,就致使在屢次迭代計算中帶來更多延遲。基於進程。sql
Spark:基於內存迭代計算,能夠將數據緩存在內存中,這就爲後續的迭代計算提供了便捷。基於線程。shell
Spark的內存計算並不意味着Spark的內存大小必須和數據大小進行匹配(內存不足,可使用磁盤緩存),spark能夠計算任意大小的數據。數據庫
Spark發展歷史express
....
Spark環境搭建
Spark on Yarn
確保HDFS和YARN正常運行
安裝配置Spark
[root@CentOS ~]# tar -zxf spark-2.3.0-bin-hadoop2.6.tgz -C /usr/ [root@CentOS ~]# mv /usr/spark-2.3.0-bin-hadoop2.6/ /usr/spark-2.3.0 [root@CentOS ~]# vi /root/.bashrc SPARK_HOME=/usr/spark-2.3.0 HBASE_MANAGES_ZK=false HBASE_HOME=/usr/hbase-1.2.4 HADOOP_CLASSPATH=/usr/hbase-1.2.4/lib/* HADOOP_HOME=/usr/hadoop-2.6.0 JAVA_HOME=/usr/java/latest PATH=$PATH:$JAVA_HOME/bin:$HADOOP_HOME/bin:$HADOOP_HOME/sbin:$HBASE_HOME/bin:$SPARK_HOME/bin CLASSPATH=. export JAVA_HOME export PATH export CLASSPATH export HADOOP_HOME export HADOOP_CLASSPATH export HBASE_MANAGES_ZK export HBASE_HOME export SPARK_HOME [root@CentOS ~]# source .bashrc [root@CentOS ~]# cd /usr/spark-2.3.0/ [root@CentOS spark-2.3.0]# mv conf/spark-env.sh.template conf/spark-env.sh [root@CentOS spark-2.3.0]# mv conf/slaves.template conf/slaves [root@CentOS spark-2.3.0]# mv conf/spark-defaults.conf.template conf/spark-defaults.conf [root@CentOS spark-2.3.0]# vi conf/slaves CentOS [root@CentOS spark-2.3.0]# vi conf/spark-env.sh HADOOP_CONF_DIR=/usr/hadoop-2.6.0/etc/hadoop YARN_CONF_DIR=/usr/hadoop-2.6.0/etc/hadoop SPARK_EXECUTOR_CORES=2 SPARK_EXECUTOR_MEMORY=1G SPARK_DRIVER_MEMORY=1G LD_LIBRARY_PATH=/usr/hadoop-2.6.0/lib/native export HADOOP_CONF_DIR export YARN_CONF_DIR export SPARK_EXECUTOR_CORES export SPARK_DRIVER_MEMORY export SPARK_EXECUTOR_MEMORY export LD_LIBRARY_PATH
鏈接Spark
[root@CentOS ~]# spark-shell --master yarn --deploy-mode client
出現如下錯誤
2018-10-29 18:38:28 ERROR YarnClientSchedulerBackend:70 - Yarn application has already exited with state FINISHED! 2018-10-29 18:38:28 ERROR TransportClient:233 - Failed to send RPC 4830215201639506599 to /192.168.29.128:48563: java.nio.channels.ClosedChannelException java.nio.channels.ClosedChannelException at io.netty.channel.AbstractChannel$AbstractUnsafe.write(...)(Unknown Source) 2018-10-29 18:38:28 ERROR YarnSchedulerBackend$YarnSchedulerEndpoint:91 - Sending RequestExecutors(0,0,Map(),Set()) to AM was unsuccessful java.io.IOException: Failed to send RPC 4830215201639506599 to /192.168.29.128:48563: java.nio.channels.ClosedChannelException at org.apache.spark.network.client.TransportClient.lambda$sendRpc$2(TransportClient.java:237)
解決方案
<property> <name>yarn.nodemanager.pmem-check-enabled</name> <value>false</value> </property> <property> <name>yarn.nodemanager.vmem-check-enabled</name> <value>false</value> </property>
關閉yarn集羣,而後再yarn-site.xml添加以上配置,啓動Yarn
若是出現Unable to load native
2018-10-29 18:35:29 WARN NativeCodeLoader:62 - Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
解決方案
LD_LIBRARY_PATH=/usr/hadoop-2.6.0/lib/native export LD_LIBRARY_PATH
在spark-env.sh中添加以上變量。
正常啓動,你能夠看到以下:
[root@CentOS ~]# spark-shell --master yarn --deploy-mode client Setting default log level to "WARN". To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel). 2018-10-29 18:45:44 WARN Client:66 - Neither spark.yarn.jars nor spark.yarn.archive is set, falling back to uploading libraries under SPARK_HOME. Spark context Web UI available at http://CentOS:4040 Spark context available as 'sc' (master = yarn, app id = application_1540809754248_0001). Spark session available as 'spark'. Welcome to ____ __ / __/__ ___ _____/ /__ _\ \/ _ \/ _ `/ __/ '_/ /___/ .__/\_,_/_/ /_/\_\ version 2.3.0 /_/ Using Scala version 2.11.8 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_171) Type in expressions to have them evaluated. Type :help for more information. scala >
或者你能夠訪問:http://centos:8088(Hadoop) http://centos:8080(Spark)
Spark Standalone
安裝配置Spark
[root@CentOS ~]# tar -zxf spark-2.3.0-bin-hadoop2.6.tgz -C /usr/ [root@CentOS ~]# mv /usr/spark-2.3.0-bin-hadoop2.6/ /usr/spark-2.3.0 [root@CentOS ~]# vi /root/.bashrc SPARK_HOME=/usr/spark-2.3.0 HADOOP_HOME=/usr/hadoop-2.6.0 JAVA_HOME=/usr/java/latest PATH=$PATH:$JAVA_HOME/bin:$HADOOP_HOME/bin:$HADOOP_HOME/sbin:$SPARK_HOME/bin:$SPARK_HOME/sbin CLASSPATH=. export JAVA_HOME export PATH export CLASSPATH export HADOOP_HOME export SPARK_HOME [root@CentOS ~]# source .bashrc [root@CentOS ~]# cd /usr/spark-2.3.0/ [root@CentOS spark-2.3.0]# mv conf/spark-env.sh.template conf/spark-env.sh [root@CentOS spark-2.3.0]# mv conf/slaves.template conf/slaves [root@CentOS spark-2.3.0]# mv conf/spark-defaults.conf.template conf/spark-defaults.conf [root@CentOS spark-2.3.0]# vi conf/slaves CentOS [root@CentOS spark-2.3.0]# vi conf/spark-env.sh SPARK_MASTER_HOST=CentOS SPARK_WORKER_CORES=2 SPARK_WORKER_MEMORY=2g SPARK_MASTER_PORT=7077 export SPARK_MASTER_HOST export SPARK_WORKER_CORES export SPARK_MASTER_PORT export SPARK_WORKER_MEMORY
啓動Spark
[root@CentOS ~]# cd /usr/spark-2.3.0/ [root@CentOS spark-2.3.0]# ./sbin/start-master.sh starting org.apache.spark.deploy.master.Master, logging to /usr/spark-2.3.0/logs/spark-root-org.apache.spark.deploy.master.Master-1-CentOS.out [root@CentOS spark-2.3.0]# ./sbin/start-slave.sh spark://CentOS:7077 starting org.apache.spark.deploy.worker.Worker, logging to /usr/spark-2.3.0/logs/spark-root-org.apache.spark.deploy.worker.Worker-1-CentOS.out
連接Spark服務計算
鏈接Spark集羣
[root@CentOS spark-2.3.0]# ./bin/spark-shell --master spark://CentOS:7077 --total-executor-cores 5 2018-11-02 19:33:51 WARN NativeCodeLoader:62 - Unable to load native-hadoop library for your platform... using builtin-java classes where applicable Setting default log level to "WARN". To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel). Spark context Web UI available at http://CentOS:4040 Spark context available as 'sc' (master = spark://CentOS:7077, app id = app-20181102193402-0000). Spark session available as 'spark'. Welcome to ____ __ / __/__ ___ _____/ /__ _\ \/ _ \/ _ `/ __/ '_/ /___/ .__/\_,_/_/ /_/\_\ version 2.3.0 /_/ Using Scala version 2.11.8 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_171) Type in expressions to have them evaluated. Type :help for more information. scala>sc.textFile("file:///root/worlds.log").flatMap(_.split(" ")).map((_,1)).groupByKey().map(x=>(x._1,x._2.sum)).collect().foreach(println)
本地仿真
[root@CentOS spark-2.3.0]# ./bin/spark-shell --master local[5] 2018-11-02 19:51:43 WARN NativeCodeLoader:62 - Unable to load native-hadoop library for your platform... using builtin-java classes where applicable Setting default log level to "WARN". To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel). Spark context Web UI available at http://CentOS:4040 Spark context available as 'sc' (master = local[5], app id = local-1541159513505). Spark session available as 'spark'. Welcome to ____ __ / __/__ ___ _____/ /__ _\ \/ _ \/ _ `/ __/ '_/ /___/ .__/\_,_/_/ /_/\_\ version 2.3.0 /_/ Using Scala version 2.11.8 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_171) Type in expressions to have them evaluated. Type :help for more information. scala>
Spark 架構
spark on standalone
名詞解釋:
Standalone模式下存在的角色。
Client:客戶端進程,負責提交做業到Master。
Master:Standalone模式中主控節點,負責接收Client提交的做業,管理Worker,並命令Worker啓動Driver和Executor。
Worker:Standalone模式中slave節點上的守護進程,負責管理本節點的資源,按期向Master彙報心跳,接收Master的命令,啓動Driver和Executor。
Driver: 一個Spark做業運行時包括一個Driver進程,也是做業的主進程,負責做業的解析、生成Stage並調度Task到Executor上。包括DAGScheduler,TaskScheduler。
Executor:即真正執行做業的地方,一個集羣通常包含多個Executor,每一個Executor接收Driver的命令Launch Task,一個Executor能夠執行一到多個Task。
做業相關的名詞解釋
Stage:一個Spark做業通常包含一到多個Stage。 Task:一個Stage包含一到多個Task,經過多個Task實現並行運行的功能。 DAGScheduler: 實現將Spark做業分解成一到多個Stage,每一個Stage根據RDD的Partition個數決定Task的個數,而後生成相應的Task set放到TaskScheduler中。 TaskScheduler:實現Task分配到Executor上執行。 提交做業有兩種方式,分別是Driver(做業的master,負責做業的解析、生成stage並調度task到,包含DAGScheduler)運行在Worker上,Driver運行在客戶端。接下來分別介紹兩種方式的做業運行原理。 SparkContext:整個應用的上下文,控制應用的生命週期。 Client提交應用,Master找到一個Worker啓動Driver,Driver向Master或者資源管理器申請資源,以後將應用轉化爲RDD有向無環圖,再由DAGScheduler將RDD轉化爲Stage的有向無環圖提交給TaskScheduler,由TaskScheduler提交任務給Excutor進行執行,任務執行的過程當中其它組件協同工做確保整個應用順利執行。
關於這種架構有幾點有用的注意事項:
1.每一個應用程序都有本身的執行程序進程,這些進程在整個應用程序的持續時間內保持不變並在多個線程中運行任務。這樣能夠在調度方(每一個驅動程序調度本身的任務)和執行方(在不一樣JVM中運行的不一樣應用程序中的任務)之間隔離應用程序。可是,這也意味着沒法在不將Spark應用程序(SparkContext實例)寫入外部存儲系統的狀況下共享數據。
2.Spark與底層集羣管理器無關。只要它能夠獲取執行程序進程,而且這些進程相互通訊,即便在也支持其餘應用程序的集羣管理器(例如Mesos / YARN)上運行它也相對容易。
3.驅動程序必須在其生命週期內監聽並接受來自其執行程序的傳入鏈接(例如,請參閱網絡配置部分中的spark.driver.port)。所以,驅動程序必須是來自工做節點的網絡可尋址的。
4.由於驅動程序在集羣上調度任務,因此它應該靠近工做節點運行,最好是在同一局域網上。若是您想遠程向集羣發送請求,最好向驅動程序打開RPC並讓它從附近提交操做,而不是遠離工做節點運行驅動程序。
Driver運行在Worker上
做業執行流程描述:
1.客戶端提交做業給Master 2.Master讓一個Worker啓動Driver,即SchedulerBackend。Worker建立一個DriverRunner線程,DriverRunner啓動SchedulerBackend進程。 3.另外Master還會讓其他Worker啓動Exeuctor,即ExecutorBackend。Worker建立一個ExecutorRunner線程,ExecutorRunner會啓動ExecutorBackend進程。 4.ExecutorBackend啓動後會向Driver的SchedulerBackend註冊。SchedulerBackend進程中包含DAGScheduler,它會根據用戶程序,生成執行計劃,並調度執行。對於每一個stage的task,都會被存放到TaskScheduler中,ExecutorBackend向SchedulerBackend彙報的時候把TaskScheduler中的task調度到ExecutorBackend執行。 5.全部stage都完成後做業結束。
Driver運行在Client
做業流程以下
1.客戶端啓動後直接運行用戶程序,啓動Driver相關的工做:DAGScheduler和BlockManagerMaster等。 2.客戶端的Driver向Master註冊。 3.Master還會讓Worker啓動Exeuctor。Worker建立一個ExecutorRunner線程,ExecutorRunner會啓動ExecutorBackend進程。 4.ExecutorBackend啓動後會向Driver的SchedulerBackend註冊。Driver的DAGScheduler解析做業並生成相應的Stage,每一個Stage包含的Task經過TaskScheduler分配給Executor執行。 5.全部stage都完成後做業結束。
spark on yarn
這裏Spark AppMaster至關於Standalone模式下的SchedulerBackend,Executor至關於standalone的ExecutorBackend,spark AppMaster中包括DAGScheduler和YarnClusterScheduler。 Spark on Yarn的執行流程能夠參考http://www.csdn.net/article/2013-12-04/2817706--YARN spark on Yarn部分。
做業流程以下:
Spark RDD 編程
resilient distributed dataset (彈性的分佈式數據集),是Spark計算的核心,全部的RDD相關的計算都是並行。
Spark建立RDD
經過Sacla集合建立RDD
scala> var list=Array("hello world","ni hao") list: Array[String] = Array(hello world, ni hao)
scala> var rdd1=sc.parallelize(list) //並行化 rdd1: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[6] at parallelize at <console>:27 scala> rdd1.partitions.length //partitions的數是前面本地仿真指定的 res5: Int = 5
注意:用戶能夠手動指定切片|分區數目var rdd1=sc.parallelize(list,3)
scala> sc.parallelize(List(1,2,4,5),3).partitions.length res13: Int = 3
讀取外部數據建立RDD
scala> sc.textFile("hdfs://CentOS:9000/demo/src") res16: org.apache.spark.rdd.RDD[String] = hdfs://CentOS:9000/demo/src MapPartitionsRDD[22] at textFile at <console>:25
scala> sc.textFile("hdfs://CentOS:9000/demo/src").map(.split(" ").length).reduce(+_) res19: Int = 13
一、若是讀取的是本地文件,須要將分析的文件拷貝到全部work節點
二、textFile支持讀取文件、目錄、gz文件textFile("/my/directory"), textFile("/my/directory/.txt"), textFile("/my/directory/.gz")
三、sc.textFile("hdfs://CentOS:9000/xxx/xx",分區數),要求指定的分區必須>=hdfs的block的個數
wholeTextFiles
scala> sc.wholeTextFiles("/root/src/").collect().foreach(t=>println(t._1+"=>"+t._2))
該方法的返回值類型是RDD[(filename,content)]
sequenceFile
scala> sc.sequenceFile[String,String]("/root/part-r-00000").collect().foreach(println) (192.168.0.1,總數:5) (192.168.0.3,總數:10) (192.168.0.4,總數:5) (192.168.0.5,總數:5)
注意,已知sequenceFile的key是Text,值Text,Spark能夠指定作自動的類型的兼容
newAPIHadoopRDD
略
搭建Spark開發環境
構建空的maven工程
導入如下依賴
<properties> <maven.compiler.source>1.8</maven.compiler.source> <maven.compiler.target>1.8</maven.compiler.target> <encoding>UTF-8</encoding> <scala.version>2.11.12</scala.version> <spark.version>2.3.0</spark.version> <hadoop.version>2.6.0</hadoop.version> </properties> <dependencies> <dependency> <groupId>org.scala-lang</groupId> <artifactId>scala-library</artifactId> <version>${scala.version}</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_2.11</artifactId> <version>${spark.version}</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-client</artifactId> <version>${hadoop.version}</version> </dependency> </dependencies>
添加如下插件
<build> <plugins> <plugin> <!-- 這是個編譯scala代碼的 --> <groupId>net.alchim31.maven</groupId> <artifactId>scala-maven-plugin</artifactId> <version>3.2.1</version> <executions> <execution> <id>scala-compile-first</id> <phase>process-resources</phase> <goals> <goal>add-source</goal> <goal>compile</goal> </goals> </execution> </executions> </plugin> <plugin> <!-- 這是個編譯java代碼的 --> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <version>3.2</version> <configuration> <source>1.8</source> <target>1.8</target> <encoding>UTF-8</encoding> </configuration> <executions> <execution> <phase>compile</phase> <goals> <goal>compile</goal> </goals> </execution> </executions> </plugin> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-shade-plugin</artifactId> <version>2.4.3</version> <executions> <execution> <phase>package</phase> <goals> <goal>shade</goal> </goals> <configuration> <filters> <filter> <artifact>*:*</artifact> <excludes> <exclude>META-INF/*.SF</exclude> <exclude>META-INF/*.DSA</exclude> <exclude>META-INF/*.RSA</exclude> </excludes> </filter> </filters> </configuration> </execution> </executions> </plugin> </plugins> </build>
本地仿真
import org.apache.spark.SparkContext import org.apache.spark.SparkConf object TestRDD01 { def main(args: Array[String]): Unit = { //建立SparkContext val conf = new SparkConf().setAppName("my spark").setMaster("local[5]") val sc = new SparkContext(conf) var arr= Array("Hello world","good good study","day day up") sc.parallelize(arr,2) .flatMap(_.split(" ")) .map((_,1)) .groupByKey() .map(tuple => (tuple._1,tuple._2.sum)) .sortBy(_._2,false) .collect() .foreach(println) sc.stop() } }
遠程部署
[root@CentOS spark-2.3.0]# ./bin/spark-submit --class com.baizhi.rdd01.TestRDD01 --master spark://CentOS:7077 --deploy-mode cluster --supervise --executor-memory 1g --total-executor-cores 2 /root/rdd-1.0-SNAPSHOT.jar
具體更多模式下的發佈,能夠參考http://spark.apache.org/docs/latest/submitting-applications.html
Spark讀取第三方數據
import org.apache.hadoop.conf.Configuration import org.apache.hadoop.io.LongWritable import org.apache.hadoop.mapreduce.lib.db.{DBConfiguration, DBInputFormat} import org.apache.spark.SparkContext import org.apache.spark.SparkConf //建立SparkContext val conf = new SparkConf().setAppName("mysql rdd").setMaster("local[5]") val sc = new SparkContext(conf) var jobConf=new Configuration() //配置數據庫信息 DBConfiguration.configureDB(jobConf, "com.mysql.jdbc.Driver", "jdbc:mysql://CentOS:3306/test", "root","root") jobConf.set(DBConfiguration.INPUT_CLASS_PROPERTY,"com.baizhi.rdd01.UserDBWritable") jobConf.set(DBConfiguration.INPUT_QUERY,"select * from t_user") jobConf.set(DBConfiguration.INPUT_COUNT_QUERY,"select count(*) from t_user") val userRDD = sc.newAPIHadoopRDD(jobConf,classOf[DBInputFormat[UserDBWritable]],classOf[LongWritable],classOf[UserDBWritable]) userRDD.map(tuple=>(tuple._2.id,tuple._2.salary)) .groupByKey() .map(tuple=>(tuple._1,tuple._2.sum)) .saveAsTextFile("file:///D:/spark_result") sc.stop()
遠程部署
方案1
將須要的jars文件拷貝到spark安裝目錄的jars目錄下
方案2( spark-2.3.2-bin-without-hadoop.tgz)
能夠在spark-env.sh配置
SPARK_DIST_CLASSPATH=$(/usr/hadoop-2.6.0/bin/hadoop classpath) export SPARK_DIST_CLASSPATH
同時在當前hadoop的classpath下配置用戶所需jar路徑
HADOOP_CLASSPATH=/xxx/xxx.jar export HADOOP_CLASSPATH ./bin/spark-submit --master spark://CentOS:7077 --class com.baizhi.rdd01.TestRDD01 --jars /root/mysql-connector-java-5.1.6.jar --packages 'mysql:mysql-connector-java:5.1.38' --driver-memory 1g --driver-library-path /root/mysql-connector-java-5.1.6.jar --executor-memory 1g --total-executor-cores 2 /root/rdd-1.0-SNAPSHOT.jar
依賴
<dependency> <groupId>org.scala-lang</groupId> <artifactId>scala-library</artifactId> <version>${scala.version}</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_2.11</artifactId> <version>${spark.version}</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-client</artifactId> <version>${hadoop.version}</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-hdfs</artifactId> <version>${hadoop.version}</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-common</artifactId> <version>${hadoop.version}</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-mapreduce-client-core</artifactId> <version>${hadoop.version}</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-mapreduce-client-jobclient</artifactId> <version>${hadoop.version}</version> </dependency> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <version>5.1.38</version> </dependency>
Spark RDD 算子
只是將一個RDD轉換爲一個新的RDD,並不會對原有的RDD形成影響。
觸發全部的轉換算子進行對RDD作實際數據計算,而且將計算的數據返回給Driver
轉換算子
Return a new distributed dataset formed by passing each element of the source through a function func.
經過函數func傳遞源的每一個元素,返回一個新的分佈式數據集。
scala> val list=Array("a","b","c","a") scala> val rdd=sc.parallelize(list) scala> rdd.map(x=>(x,1)).collect().foreach(println) (a,1) (b,1) (c,1) (a,1)
Return a new dataset formed by selecting those elements of the source on which funcreturns true.
經過選擇源代碼上的元素來建立,返回一個布爾值。
scala> val rdd=sc.parallelize(Array(1,2,3,4,5,6)) scala> rdd.filter(x=> x%2==0).collect() res14: Array[Int] = Array(2, 4, 6)
Similar to map, but each input item can be mapped to 0 or more output items (so func should return a Seq rather than a single item).
轉換並壓縮,把一個元素拆分紅0到多個項,返回數組或集合。
scala> val rdd=sc.parallelize(Array("hello world","hello boy")) scala> rdd.flatMap(line=> line.split(" ")).collect() res15: Array[String] = Array(hello, world, hello, boy)
Similar to map, but runs separately on each partition (block) of the RDD, so func must be of type Iterator<T> => Iterator<U> when running on an RDD of type T.
與map相似,可是分別在RDD的每一個分區(塊)上運行,因此func在T類型的RDD上運行時必須是Iterator<T>=>Iterator<U>類型。
scala> val fun:(Iterator[String])=>Iterator[(String,Int)]=(x)=>{ | var lst=List[(String,Int)]() | for(i <- x){ | lst ::= i->1 | } | lst.iterator | } fun: Iterator[String] => Iterator[(String, Int)] = <function1> scala> sc.parallelize(List("a","b","c","d")).mapPartitions(fun).collect() res5: Array[(String, Int)] = Array((b,1), (a,1), (d,1), (c,1))
Similar to mapPartitions, but also provides func with an integer value representing the index of the partition, so func must be of type (Int, Iterator<T>) => Iterator<U> when running on an RDD of type T.
scala> val fun:(Int,Iterator[String])=>Iterator[(String,Int)]=(part,x)=>{ | var lst=List[(String,Int)]() | for(i <- x){ | lst ::= i-> part | } | lst.iterator | } scala> sc.parallelize(List("a","b","c","d"),3).mapPartitionsWithIndex(fun).collect() res8: Array[(String, Int)] = Array((a,0), (b,1), (d,2), (c,2))
union(otherDataset)|intersection(otherDataset)
scala> var rdd1=sc.parallelize(Array(("張三",1000),("李四",100),("趙六",300))) scala> var rdd2=sc.parallelize(Array(("張三",1000),("王五",100),("溫曉琪",500))) scala> rdd1.union(rdd2).collect() res9: Array[(String, Int)] = Array((張三,1000), (李四,100), (趙六,300), (張三,1000), (王五,100), (溫曉琪,500)) scala> rdd1.intersection(rdd2).collect() res10: Array[(String, Int)] = Array((張三,1000))
groupByKey([numPartitions])
When called on a dataset of (K, V) pairs, returns a dataset of (K, Iterable<V>) pairs.
當調用(k,v)的數據集時,返回一個數據集(k,可迭代<v>)。
scala> var rdd=sc.parallelize(Array(("張三",1000),("李四",100),("趙六",300),("張三",500))) scala> rdd.groupByKey().collect res13: Array[(String, Iterable[Int])] = Array((趙六,CompactBuffer(300)), (張三,CompactBuffer(1000, 500)), (李四,CompactBuffer(100))) scala> rdd.groupByKey().map(x=>(x._1,x._2.sum)).collect res14: Array[(String, Int)] = Array((趙六,300), (張三,1500), (李四,100))
When called on a dataset of (K, V) pairs, returns a dataset of (K, V) pairs where the values for each key are aggregated using the given reduce function func, which must be of type (V,V) => V. Like in groupByKey, the number of reduce tasks is configurable through an optional second argument.
在對(K,V)對的數據集進行調用時,返回(K,V)對的數據集,其中每一個鍵的值都使用給定的reduce函數func進行聚合,該函數必須是類型(V,V)=>V。與groupByKey中同樣,reduce任務的數量能夠經過可選的第二個參數進行配置。
scala> rdd.reduceByKey((x,y)=>x+y).collect() res15: Array[(String, Int)] = Array((趙六,300), (張三,1500), (李四,100))
When called on a dataset of (K, V) pairs, returns a dataset of (K, U) pairs where the values for each key are aggregated using the given combine functions and a neutral "zero" value. Allows an aggregated value type that is different than the input value type, while avoiding unnecessary allocations. Like in groupByKey, the number of reduce tasks is configurable through an optional second argument.
scala> var rdd=sc.parallelize(Array(("張三",1000),("李四",100),("趙六",300),("張三",500))) scala> rdd.aggregateByKey(0)((x,y)=>x+y,(x,y)=>x+y).collect()
When called on a dataset of (K, V) pairs where K implements Ordered, returns a dataset of (K, V) pairs sorted by keys in ascending or descending order, as specified in the boolean ascending argument.
scala> var rdd=sc.parallelize(Array(("a",1000),("b",100),("d",300),("c",500))) scala> rdd.sortByKey(true).collect() res21: Array[(String, Int)] = Array((a,1000), (b,100), (c,500), (d,300)) scala> rdd.sortByKey(false).collect() res22: Array[(String, Int)] = Array((d,300), (c,500), (b,100), (a,1000))
When called on datasets of type (K, V) and (K, W), returns a dataset of (K, (V, W)) pairs with all pairs of elements for each key. Outer joins are supported through leftOuterJoin, rightOuterJoin, and fullOuterJoin.
當調用類型(K,V)和(K,W)的數據集時,返回(K,(V,W))對的數據集,其中每一個鍵的全部元素對。外部鏈接經過左TouthEnter、右TouthEnter和FulLouthEnter進行支持。
scala> var rdd1=sc.parallelize(Array(("001","張三"),("002","李四"),("003","王五"))) scala> var rdd2=sc.parallelize(Array(("001","蘋果"),("002","手機"),("001","橘子"))) scala> rdd1.join(rdd2).collect() res23: Array[(String, (String, String))] = Array((002,(李四,手機)), (001,(張三,蘋果)), (001,(張三,橘子)))
When called on datasets of type (K, V) and (K, W), returns a dataset of (K, (Iterable<V>, Iterable<W>)) tuples. This operation is also called groupWith.
當調用類型(k,v)和(k,w)的數據集時,返回一個數據集(k,(迭代< v>,迭代< W>))元組。這個操做也稱爲GROUPY。
scala> var rdd1=sc.parallelize(Array(("001","張三"),("002","李四"),("003","王五"))) scala> var rdd2=sc.parallelize(Array(("001","蘋果"),("002","手機"),("001","橘子"))) scala> rdd1.cogroup(rdd2).collect() res24: Array[(String, (Iterable[String], Iterable[String]))] = Array((002,(CompactBuffer(李四),CompactBuffer(手機))), (003,(CompactBuffer(王五),CompactBuffer())), (001,(CompactBuffer(張三),CompactBuffer(蘋果, 橘子))))
Action算子
Aggregate the elements of the dataset using a function func (which takes two arguments and returns one). The function should be commutative and associative so that it can be computed correctly in parallel.
使用函數FUNC來聚合數據集的元素(這須要兩個參數並返回一個參數)。函數應該是可交換的和相聯的,從而能夠並行計算。
scala> var rdd=sc.parallelize(List("a","b","c")) scala> rdd.reduce(_+","+_) res27: String = a,b,c
Return all the elements of the dataset as an array at the driver program. This is usually useful after a filter or other operation that returns a sufficiently small subset of the data.
在驅動器程序中將數據集的全部元素做爲數組返回。這一般在過濾器或其餘操做返回有用的數據子集後頗有用。
Return the number of elements in the dataset.
返回數據集中的元素數量。
scala> var rdd=sc.parallelize(List("a","b","c")) scala> rdd.count() res28: Long = 3
Return the first element of the dataset (similar to take(1)).
scala> var rdd=sc.parallelize(List("a","b","c")) scala> rdd.first() res29: String = a scala> rdd.take(1) res30: Array[String] = Array(a) scala> rdd.take(2) res31: Array[String] = Array(a, b)
Return the first n elements of the RDD using either their natural order or a custom comparator.
scala> var rdd= sc.parallelize(Array(("a",3),("b",1),("c",4)),5) scala> val s=new Ordering[(String, Int)]{ | override def compare(x: (String, Int), y: (String, Int)): Int = { | return -1 * (x._2-y._2) | } | } scala> rdd.takeOrdered(2)(s) res37: Array[(String, Int)] = Array((c,1), (a,3))
Write the elements of the dataset as a text file (or set of text files) in a given directory in the local filesystem, HDFS or any other Hadoop-supported file system. Spark will call toString on each element to convert it to a line of text in the file.
scala> sc.textFile("file:///root/worlds.log").flatMap(_.split(" ")).map(x=>(x,1)).reduceByKey(_+_,1).saveAsTextFile("file:///cc")
Only available on RDDs of type (K, V). Returns a hashmap of (K, Int) pairs with the count of each key.
scala> sc.textFile("file:///root/worlds.log").flatMap(_.split(" ")).map(x=>(x,1)).countByKey() res55: scala.collection.Map[String,Long] = Map(this -> 1, demo -> 1, is -> 1, good -> 2, up -> 1, a -> 1, come -> 1, babay -> 1, on -> 1, day -> 2, study -> 1)
Run a function func on each element of the dataset. This is usually done for side effects such as updating an Accumulator or interacting with external storage systems.
Note: modifying variables other than Accumulators outside of the foreach() may result in undefined behavior. See Understanding closures for more details.
人口普查
需求:某個文件夾下存在以下 日誌文件名yob年份.txt日誌的數據格式以下
名字,性別,人數、 ....
要求按年份統計出每年新生嬰兒男女比例,並繪製報表?
Hadoop MapReduce
TextInputFormat讀取 //Mapper class UserSexCountMapper extends Mapper[LongWritable,Text,Text,Text]{ override def map(key: LongWritable, value: Text, context: Mapper[LongWritable, Text, Text, Text]#Context): Unit ={ val path = context.getInputSplit().asInstanceOf[FileSplit].getPath().getParent() var filename=path.getParent().getName() var year=filename.substring(filename.lastIndexOf(".")-4,filename.lastIndexOf(".")) val tokens = value.toString.split(",") context.write(new Text(year),new Text(tokens(1)+":"+tokens(2))) } } //Reducer class UserSexReducer extends Reducer[Text,Text,Text,Text]{ override def reduce(key: Text, values: lang.Iterable[Text], context: Reducer[Text, Text, Text, Text]#Context): Unit = { var mtotal=0 var ftotal=0 for(i <- values){ var value:Text= i var sex=value.toString.split(":")(0) if(sex.equals("M")){ mtotal += value.toString.split(":")(1).toInt }else{ ftotal += value.toString.split(":")(1).toInt } } context.write(key,new Text("男:"+mtotal+",女:"+ftotal)) } } //提交任務 ......
Spark解決
import org.apache.spark.storage.StorageLevel import org.apache.spark.{SparkConf, SparkContext} /** * 寫一個類SexAndCountVector表示男孩和女孩的數量 * case class是能夠沒有方法實現的 * @param m * @param f */ case class SexAndCountVector(var m:Int,var f:Int) object TestNamesDemo { def main(args: Array[String]): Unit = { val conf = new SparkConf() conf.setMaster("local[10]") //本地仿真-->遠程部署:"spark:CentOS:7077" conf.setAppName("names counts") val sc = new SparkContext(conf) var cacheRDD = sc.wholeTextFiles("file:///D:/demo/names") .map(touple=>(getYear(touple._1),touple._2.split("\r\n"))) .flatMap(tupe=>for(i<-tupe._2) yield (tupe._1,{ var s=i.split(",") //s(1)+":"+s(2) //將"2016,F,14772"格式->"2016,F:14772"這種格式 if (s(1).equals("M")) { new SexAndCountVector(s(2).toInt, 0) //男性性別的向量 } else { new SexAndCountVector(0, s(2).toInt) //女性性別的向量 } })).reduceByKey((s1,s2)=>{ s1.f=s1.f+s2.f //若是是女性求女性向量的和 s1.m=s1.m+s2.m //若是是男性求男性向量的和 s1 //返回s1 },1).persist(StorageLevel.DISK_ONLY) //指定爲1個分區 cache()/persist()緩存->cacheRDD.unpersist() 清除緩存 cacheRDD.map(tuple=>tuple._1+"\t"+"男:"+tuple._2.m+",女:"+tuple._2.f) //轉化成"2017 男:1,女:1"格式 .saveAsTextFile("file:///D:/demo/names_result")//遠程部署:"file:///root/names_result" sc.stop() } def getYear(name:String):String={ val i = name.lastIndexOf(".") return name.substring(i-4,i)//yob2017.txt index爲3到7的子串 } }
RDD 依賴關係(血統) 寬、窄依賴,任務DAG生成和RDD依賴關係、分區和並行度的理解
窄依賴(Narrow Dependency): "一對一關係/多對一關係" 是指每一個父RDD的一個Partition被多個子RDD的一個Partition所使用,例如map、filter、union等都會產生窄依賴;對於窄依賴的RDD,能夠以流水線的方式計算全部父分區,不會形成網絡之間的數據混合。 寬依賴(Wide Dependency):"一對多關係/多對多關係" 是指一個父RDD的Paratition會被多個子RDD的Partition所使用,例如groupByKey、reduceByKey、sortByKey等操做都會產生寬依賴;對於寬依賴的RDD,則一般伴隨着Shuffle操做,即首先須要計算好父分區的數據,而後在節點之間進行Shuffle。 換言之,若是父RDD的一個分區只被一個子RDD的一個分區所使用就是窄依賴,不然就是寬依賴 這樣設計的好處是什麼? Spark的這種依賴關係設計,使其具備了天生的容錯性,大大加快了Spark的執行速度。由於,RDD數據集經過「血緣關係」記住了它是如何從其它RDD中演變過來的,血緣關係記錄的是粗顆粒度的轉換操做行爲,當這個RDD的部分分區數據丟失時,它能夠經過血緣關係獲取足夠的信息來從新運算和恢復丟失的數據分區,由此帶來了性能的提高。相對而言,在兩種依賴關係中,窄依賴的失敗恢復更爲高效,它只須要根據父RDD分區從新計算丟失的分區便可(不須要從新計算全部分區),並且能夠並行地在不一樣節點進行從新計算。而對於寬依賴而言,單個節點失效一般意味着從新計算過程會涉及多個父RDD分區,開銷較大。此外,Spark還提供了數據檢查點和記錄日誌,用於持久化中間RDD,從而使得在進行失敗恢復時不須要追溯到最開始的階段。在進行故障恢復時,Spark會對數據檢查點開銷和從新計算RDD分區的開銷進行比較,從而自動選擇最優的恢復策略。
DAG的stage劃分原則
通常的在作shuffle的時候會劃分stage,劃分時每個階段每個節點都參與並行計算,當存在寬依賴關係時會自動劃分stage,通常的在作actions算子時會出現劃分stage。 stage劃分算法: 會從出發action操做的那個rdd開始往前倒推,首先會爲最後一個rdd建立一個stage,而後往前倒推,若是發現某個rdd是寬依賴,那麼就會將寬依賴的那個rdd建立一個新的stage,那個rdd就是新的stage的最後一個rdd,而後依次類推,根據寬依賴和窄依賴,進行stage劃分,直至全部rdd遍歷完。
stage劃分源碼分析
/** * Submit a job to the job scheduler and get a JobWaiter object back. The JobWaiter object * can be used to block until the the job finishes executing or can be used to cancel the job. */ def submitJob[T, U]( rdd: RDD[T], func: (TaskContext, Iterator[T]) => U, partitions: Seq[Int], callSite: CallSite, resultHandler: (Int, U) => Unit, properties: Properties): JobWaiter[U] = { // Check to make sure we are not launching a task on a partition that does not exist. val maxPartitions = rdd.partitions.length partitions.find(p => p >= maxPartitions || p < 0).foreach { p => throw new IllegalArgumentException( "Attempting to access a non-existent partition: " + p + ". " + "Total number of partitions: " + maxPartitions) } val jobId = nextJobId.getAndIncrement() if (partitions.size == 0) { return new JobWaiter[U](this, jobId, 0, resultHandler) } assert(partitions.size > 0) val func2 = func.asInstanceOf[(TaskContext, Iterator[_]) => _] val waiter = new JobWaiter(this, jobId, partitions.size, resultHandler) eventProcessLoop.post(JobSubmitted( jobId, rdd, func2, partitions.toArray, callSite, waiter, SerializationUtils.clone(properties))) waiter } 這段代碼中有個很重要的東西,eventProcessLoop(DAGSchedulerEventProcessLoop是一個DAGScheduler的內部類),調用了post方法發送了JobSubmitted消息。從源碼中能夠看到,接收到消息後,調用了dagScheduler的handleJobSubmitted方法。這個方法是DAGScheduler的job調度的核心入口 private[scheduler] def handleJobSubmitted(jobId: Int, finalRDD: RDD[_], func: (TaskContext, Iterator[_]) => _, partitions: Array[Int], callSite: CallSite, listener: JobListener, properties: Properties) { var finalStage: ResultStage = null try { // New stage creation may throw an exception if, for example, jobs are run on a // HadoopRDD whose underlying HDFS files have been deleted. //第一步、使用觸發job的最後一個RDD,建立finalStage,這個方法就是簡單的建立了一個stage, //而且將stage加入到DAGScheduler的緩存(stage中有個重要的變量isShuffleMap) finalStage = newResultStage(finalRDD, partitions.length, jobId, callSite) } catch { case e: Exception => logWarning("Creating new stage failed due to exception - job: " + jobId, e) listener.jobFailed(e) return } if (finalStage != null) { //第二步、用finalStage建立一個job val job = new ActiveJob(jobId, finalStage, func, partitions, callSite, listener, properties) clearCacheLocs() logInfo("Got job %s (%s) with %d output partitions".format( job.jobId, callSite.shortForm, partitions.length)) logInfo("Final stage: " + finalStage + "(" + finalStage.name + ")") logInfo("Parents of final stage: " + finalStage.parents) logInfo("Missing parents: " + getMissingParentStages(finalStage)) val jobSubmissionTime = clock.getTimeMillis() jobIdToActiveJob(jobId) = job //第三步、將job加入到內存緩存中 activeJobs += job finalStage.resultOfJob = Some(job) val stageIds = jobIdToStageIds(jobId).toArray val stageInfos = stageIds.flatMap(id => stageIdToStage.get(id).map(_.latestInfo)) listenerBus.post( SparkListenerJobStart(job.jobId, jobSubmissionTime, stageInfos, properties)) //第四步、(很關鍵)使用submitStage方法提交finalStage //這個方法會致使第一個stage提交,其餘的stage放入waitingStages隊列,使用遞歸優先提交父stage submitStage(finalStage) } //提交等待的stage隊列 submitWaitingStages() } 接下來看下第四步調用的submitStage方法,這個是stage劃分算法的入口,可是stage劃分算法是有submitStage和getMissingParentStages方法共同組成的。 private def submitStage(stage: Stage) { val jobId = activeJobForStage(stage) if (jobId.isDefined) { logDebug("submitStage(" + stage + ")") if (!waitingStages(stage) && !runningStages(stage) && !failedStages(stage)) { //很關鍵的一行,調用getMissingParentStage方法去獲取這個stage的父stage val missing = getMissingParentStages(stage).sortBy(_.id) logDebug("missing: " + missing) //其實這裏會循環遞歸調用,直到最初的stage沒有父stage,其他的stage被放在waitingMissingStages if (missing.isEmpty) { logInfo("Submitting " + stage + " (" + stage.rdd + "), which has no missing parents") //這個就是提交stage的方法。後面再分享 submitMissingTasks(stage, jobId.get) } else { //若是不爲空,就是有父Stage,遞歸調用submitStage方法去提交父Stage,這裏是stage劃分算法的精髓。 for (parent <- missing) { submitStage(parent) } //而且將當前stage,放入等待執行的stage隊列中 waitingStages += stage } } } else { abortStage(stage, "No active job for stage " + stage.id, None) } } 這裏再來看下getMissingParentStage方法 private def getMissingParentStages(stage: Stage): List[Stage] = { val missing = new HashSet[Stage] val visited = new HashSet[RDD[_]] // We are manually maintaining a stack here to prevent StackOverflowError // caused by recursively visiting 先入後出 val waitingForVisit = new Stack[RDD[_]] //定義visit方法,供後面代碼中stage的RDD循環調用 def visit(rdd: RDD[_]) { if (!visited(rdd)) { visited += rdd val rddHasUncachedPartitions = getCacheLocs(rdd).contains(Nil) if (rddHasUncachedPartitions) { //遍歷RDD for (dep <- rdd.dependencies) { dep match { //若是是寬依賴,使用寬依賴的RDD建立一個新的stage,而且會把isShuffleMap變量設置爲true //默認只有最後一個stage不是ShuffleMap stage case shufDep: ShuffleDependency[_, _, _] => val mapStage = getShuffleMapStage(shufDep, stage.firstJobId) if (!mapStage.isAvailable) { //把stage放到緩存中 missing += mapStage } //若是是窄依賴,就把rdd加入到stack中,雖然循環時調用了stack的pop方法,可是這裏又push了一個進去。 case narrowDep: NarrowDependency[_] => waitingForVisit.push(narrowDep.rdd) } } } } } //首先,往stack中推入stage的最後一個rdd waitingForVisit.push(stage.rdd) //循環 while (waitingForVisit.nonEmpty) { //對stage的最後一個rdd,調用本身內部定義方法(就是上面的visit方法),注意這裏stack的pop方法取出rdd visit(waitingForVisit.pop()) } //馬上返回新的stage missing.toList }
提交stage的方法--submitsMissingTasks
這個方法的做用就是爲stage建立一批task,task的數量和partition的數量相同 獲取要partition的數量,將stage加入runningStage隊列中 這裏涉及到一個Task最佳位置的算法,就是調用getpreferredlocs方法。 原理就是:從stage的最後一個rdd開始,去找哪一個rdd的partition被cache或checkPoint,那麼Task的最佳位置就是cache或checkPoint的位置。由於Task在這個節點上就不用計算以前的RDD。若是沒有,就有TaskScheduler來決定到哪一個節點上運行。 最後,針對stage的task,建立TaskSet對象,調用TaskScheduler的submitTasks方法,提交TaskSet。
SparkRDD持久化
持久化原理
Spark很是重要的一個功能特性就是能夠將RDD 持久化在內存中,當對RDD執行持久化操做時,每一個節點都會將本身操做的RDD的partition持久化到內存中,而且在以後對該RDD的反覆使用中,直接使用內存緩存的partition,這樣的話,對於針對一個RDD反覆執行多個操做的場景,就只要對RDD計算一次便可,後面直接使用該RDD ,而不須要計算屢次該RDD 巧妙使用RDD持久化,甚至在某些場景下,能夠將spark應用程序的性能提高10倍。對於迭代式算法和快速交互式應用來講,RDD持久化,是很是重要的。 要持久化一個RDD,只要調用其cache()或者persist()方法便可。在該RDD第一次被計算出來時,就會直接緩存在每一個節點中。並且Spark的持久化機制仍是自動容錯的,若是持久化的RDD的任何partition丟失了,那麼Spark會自動經過其源RDD,使用transformation操做從新計算該partition。 cache()和persist()的區別在於,cache()是persist()的一種簡化方式,cache()的底層就是調用的persist()的無參版本,同時就是調用persist(MEMORY_ONLY),將數據持久化到內存中。若是須要從內存中去除緩存,那麼可使用unpersist()方法。
持久化的使用場景
RDD持久化的級別
MEMORY_ONLY: 使用未序列化的Java對象格式,將數據保存在內存中。若是內存不夠存放全部數據,則數據可能就不會進行沒序列化,那麼下次對這個RDD執行算子操做時,那些沒有被持久化的數據,須要根據源頭從新計算一遍,這是默認的持久化策略,使用cache()方法時,實際上就是使用的這種持久化策略。 MEMORY_AND_DISK: 使用未序列化的Java對象格式,優先嚐試將數據保存在內存中,若是內存不夠存放全部數據,會將數據寫入磁盤文件中,下次對RDD執行算子時,持久化在磁盤文件中的數據會被讀取出來使用。 MEMORY_ONLY_SER: 基本含義同MEMORY_ONLY相同,惟一不一樣是,會將RDD中的數據進行序列化,RDD的每一個partition會被序列化成一個字節數組,這種方式更加節省內存,從而能夠避免持久化的數據佔用過多的內存致使頻繁GC. MEMORY_AND_DISK_SER: 基本含義同MEMORY_AND_DISK相同,惟一不一樣是,會將RDD中的數據進行序列化,RDD的每一個partition會被序列化成一個字節數組,這種方式更加節省內存,從而能夠避免持久化的數據佔用過多的內存致使頻繁GC。 DISK_ONLY: 使用未序列化的Java對象格式,將數據所有由寫入磁盤中。 MEMORY_ONLY_二、MEMORY_AND_DISK_2等: 對於上述任意一種持久化策略,若是加上_2後綴,表明的是將每一個持久化的數據都複製一份副本,並將副本保存在其餘節點上,這種基於副本的持久化機制主要用於進行容錯,假如某個節點掛掉,節點的內存或磁盤中的持久化數據丟失了 ,那麼後續對RDD計算還可使用該數據在其餘節點上的副本,若是沒有副本的話,就只能將這些數據從源頭處從新計算一遍了。
持久化策略
默認狀況下,性能最高的固然是MEMORY_ONLY,但前提是你的內存必須足夠足夠大,能夠綽綽有餘地存放下整個RDD的全部數據。由於不進行序列化與反序列化操做,就避免了這部分的性能開銷;對這個RDD的後續算子操做,都是基於純內存中的數據的操做,不須要從磁盤文件中讀取數據,性能也很高;並且不須要複製一份數據副本,並遠程傳送到其餘節點上。可是這裏必需要注意的是,在實際的生產環境中,恐怕可以直接用這種策略的場景仍是有限的,若是RDD中數據比較多時(好比幾十億),直接用這種持久化級別,會致使JVM的OOM內存溢出異常。
若是使用MEMORY_ONLY級別時發生了內存溢出,那麼建議嘗試使用MEMORY_ONLY_SER級別。該級別會將RDD數據序列化後再保存在內存中,此時每一個partition僅僅是一個字節數組而已,大大減小了對象數量,並下降了內存佔用。這種級別比MEMORY_ONLY多出來的性能開銷,主要就是序列化與反序列化的開銷。可是後續算子能夠基於純內存進行操做,所以性能整體仍是比較高的。此外,可能發生的問題同上,若是RDD中的數據量過多的話,仍是可能會致使OOM內存溢出的異常。
若是純內存的級別都沒法使用,那麼建議使用MEMORY_AND_DISK_SER策略,而不是MEMORY_AND_DISK策略。由於既然到了這一步,就說明RDD的數據量很大,內存沒法徹底放下。序列化後的數據比較少,能夠節省內存和磁盤的空間開銷。同時該策略會優先儘可能嘗試將數據緩存在內存中,內存緩存不下才會寫入磁盤。
一般不建議使用DISK_ONLY和後綴爲2的級別:由於徹底基於磁盤文件進行數據的讀寫,會致使性能急劇下降,有時還不如從新計算一次全部RDD。後綴爲2的級別,必須將全部數據都複製一份副本,併發送到其餘節點上,數據複製以及網絡傳輸會致使較大的性能開銷,除非是要求做業的高可用性,不然不建議使用。
public class PersistApp { public static void main(String[] args) { SparkConf conf = new SparkConf().setAppName(PersistApp.class.getSimpleName()).setMaster("local"); JavaSparkContext sc = new JavaSparkContext(conf); JavaRDD<String> linesRDD = sc.textFile("E:\test\scala\access_2016-05-30.log"); linesRDD.cache();
long start = System.currentTimeMillis(); List<String> list = linesRDD.take(10); long end = System.currentTimeMillis(); System.out.println("first times cost" + (end - start) + "ms"); System.out.println("-----------------------------------"); start = System.currentTimeMillis(); long count = linesRDD.count(); end = System.currentTimeMillis(); System.out.println("second times cost" + (end - start) + "ms"); sc.close();
} }
共享變量
一般狀況下,當向Spark操做(如map,reduce)傳遞一個函數時,它會在一個遠程集羣節點上執行,它會使用函數中全部變量的副本。這些變量被複制到全部的機器上,遠程機器上並無被更新的變量會向驅動程序回傳。在任務之間使用通用的,支持讀寫的共享變量是低效的。 儘管如此,Spark提供了兩種有限類型的共享變量,廣播變量和累加器。
建立並使用廣播變量的過程以下:
在一個類型T的對象obj上使用SparkContext.brodcast(obj)方法,建立一個Broadcast[T]類型的廣播變量,obj必須知足Serializable。 經過廣播變量的.value()方法訪問其值。 另外,廣播過程可能因爲變量的序列化時間過程或者序列化變量的傳輸過程過程而成爲瓶頸,而Spark Scala中使用的默認的Java序列化方法一般是低效的,所以能夠經過spark.serializer屬性爲不一樣的數據類型實現特定的序列化方法(如Kryo)來優化這一過程。 object BroadCastApp { def main(args: Array[String]): Unit = { val conf = new SparkConf().setMaster("local[2]").setAppName("BroadCastApp") val sc = new SparkContext(conf) val list = List(1, 2, 4, 6, 0, 9) val set = mutable.HashSet[Int]() val num = 7 val bset = sc.broadcast(set) val bNum = sc.broadcast(7) val listRDD = sc.parallelize(list) listRDD.map(x => { bset.value.+=(x) x * bNum.value }).foreach(x => print(x + " ")) println("----------------------") for (s <- set) { println(s) } sc.stop() } }
Spark提供的Accumulator,主要用於多個節點對一個變量進行共享性的操做。Accumulator只提供了累加的功能。可是確給咱們提供了多個task對一個變量並行操做的功能。可是task只能對Accumulator進行累加操做,不能讀取它的值。只有Driver程序能夠讀取Accumulator的值。
object AccumulatorApp { def main(args: Array[String]): Unit = { val conf = new SparkConf().setAppName("AccumulatorApp").setMaster("local[2]") val sc = new SparkContext(conf) val list = List(1, 2, 4, 6, 0, 9) val listRDD = sc.parallelize(list) val acc = sc.accumulator(0) list.map(x => { /** * 在這裏只能對累加器進行寫的操做,不能進行讀的操做 * count-->action * 主要是能夠替代直接使用count來統計某一個transformation運行的數據量, * 由於count是一個action,一旦執行了action操做,前面rdd partition中數據會被釋放掉 * 這樣要想在進行其餘的操做,就須要從新加載計算數據,會是spark程序性能下降 */ acc.add(1) (x, 1) }) println("累加結果: " + acc.value) sc.stop() } }
Spark SQL
Spark SQL是構建在Spark Core之上一個模塊,專門用於將SQL語句翻譯成Job,交給Spark計算引擎去計算,計算結果一般會以Datasets或者Data Frame返回。
入口:SparkSession
Spark中全部功能點的入口是SparkSession類。可使用SparkSession.builder()方法來創建一個基礎的SparkSession。 Spark2.0中的SarkSession提供了內置的Hive特性支持,包括使用HiveQL編寫查詢,訪問Hive UDF和讀取Hive tables中的數據。經過這些特性,用戶再也不須要有線程的Hive設置。
Datasets 和 DataFrames
一個Dataset是一個分佈式數據集合,Dataset是在spark1.6引入的API,集成了Spark RDD的一些優勢(強類型、強大的lambda函數|高階函數),同時得益於Spark SQL執行引擎優化。Dataset能夠從JVM的對象建立,而後可以使用一些高階函數對Dataset作處理(map、flatMap、filter等)
Dataset[(Int,String,Boolean)] --> 任意元組的集合,這些元組沒有名字
DataFrame其實也是一個命名列的Dataset,DataFrame的構建方式很是寬泛,例如:結構化數據,hive中的表、外部的Database、現存RDD。當使用Java或者ScalaAPI調度DataFrame的時候咱們能夠將一個DataFrame理解爲是一個存儲了Row這一個數據集合。在Scala中一個DataFrame就是Dataset[Row]
Dataset[Row] ,其中Row是一個命名元組類型,Row=(Int,String,Boolean)+Schema(id,name,sex)
Hello World
import org.apache.spark.sql.SparkSession object HelloSparkSQL { def main(args: Array[String]): Unit = { val spark = SparkSession .builder() //建立入口 .master("local[5]") //本地仿真 .appName("Spark SQL basic example") .getOrCreate() import spark.implicits._ //引入一個隱式轉換->寫到SparkSession以後 val dataFrame = spark.read.json("file:///D:/examples/src/main/resources/people.json") dataFrame.printSchema() dataFrame.map(row => row.getAs[String]("name")).collect().foreach(println) spark.stop() } }
建立Dataset
將一個任意的數組類型轉換爲Dataset
//將一個任意的數組類型轉換爲Dataset var persons= new Person("zhansgan",18)::new Person("wangwu",26)::Nil var personDataset=persons.toDS() personDataset.map( person=> person.name ).collect().foreach(println)
將一個DF轉換爲Dataset
//將一個DF轉換爲Dataset val personDataset = spark.read.json("file:///D:/examples/src/main/resources/people.json").as[Person] personDataset.show() val value = List(1,2,3).toDS() value.show()
建立DataFrame
能夠把任何一個RDD轉化成DataFrame,一個DataFrame就能對應一張表。
加載json文件建立DataFrame
//加載json文件 val df = spark.read.json("file:///文件路徑") df.show()
經過RDD元素轉換爲case class 直接建立DataFrame
case class定義了table的schema。case class中的參數名稱會被反射讀取成爲table中的列名。case class一樣能夠嵌套或是包含一些複雜類型,例如Seq或是Array。RDD能夠被隱式轉換爲DataFrame並註冊成爲一個table。註冊後的table能夠用於隨後的SQL語句。
//經過RDD元素轉換爲case class 直接建立DataFrame val dataFrame = spark.sparkContext.textFile("file:///D:/person.txt") .map(line => line.split(",")) .map(tokens => new Person(tokens(0).toInt, tokens(1), tokens(2).toBoolean, tokens(3).toInt, tokens(4).toFloat)) .toDF()
經過直接將元組類型RDD轉爲DataFrame
//經過直接將元組類型RDD轉爲DataFrame val dataFrame = spark.sparkContext.textFile("file:///D:/person.txt") .map(line => line.split(",")) .map(tokens=>(tokens(0),tokens(1),tokens(2),tokens(3),tokens(4))) .toDF("id","name","sex","age","salary")
經過編程方式建立DataFrame
//經過編程方式建立DataFrame val dataRDD = spark.sparkContext.textFile("file:///D:/person.txt") .map(line => line.split(",")) .map(tokens=>Row(tokens(0).toInt,tokens(1),tokens(2).toBoolean,tokens(3).toInt,tokens(4).toFloat)) var fields=StructField("id",IntegerType,true)::StructField("name",StringType,true)::StructField("sex",BooleanType,true)::StructField("age",IntegerType,true)::StructField("salary",FloatType,true)::Nil var schema=StructType(fields) val dataFrame = spark.createDataFrame(dataRDD,schema)
總結DataFrame建立方式
DataFrame 常規操做
1,zhangsan,true,18,15000 2,lisi,true,20,20000 3,wangwu,false,18,10000 4,zhaoliu,false,18,10000 //建立sparkSession val spark = SparkSession .builder() //建立入口 .master("local[5]") //本地仿真 .appName("Spark SQL basic example") .getOrCreate() //經過直接將元組類型RDD轉爲DataFrame val dataFrame = spark.sparkContext.textFile("file:///D:/person.txt") .map(line => line.split(",")) .map(tokens=>(tokens(0).toInt,tokens(1),tokens(2).toBoolean,tokens(3).toInt,tokens(4).toFloat)) .toDF("id","name","sex","age","salary").show() +---+--------+-----+---+-------+ | id| name| sex|age| salary| +---+--------+-----+---+-------+ | 1|zhangsan| true| 18|15000.0| | 2| lisi| true| 20|20000.0| | 3| wangwu|false| 18|10000.0| | 4| zhaoliu|false| 18|10000.0| +---+--------+-----+---+-------+ //進行查詢操做 dataFrame.select("id","name","salary") .where($"name" ==="lisi" or $"salary" > 10000) .filter($"id" === 1) .show() +---+--------+-------+ | id| name| salary| +---+--------+-------+ | 1|zhangsan|15000.0| +---+--------+-------+ //按照性別計算平均值 dataFrame.select("sex","salary").groupBy("sex").mean("salary").show() +-----+-----------+ | sex|avg(salary)| +-----+-----------+ | true| 17500.0| |false| 10000.0| +-----+-----------+ //常規聚合計算 dataFrame.select("sex","salary") .groupBy("sex") .agg(("salary","max"),("salary","min"),("salary","mean")) .show() +-----+-----------+-----------+-----------+ | sex|max(salary)|min(salary)|avg(salary)| +-----+-----------+-----------+-----------+ | true| 20000.0| 15000.0| 17500.0| |false| 10000.0| 10000.0| 10000.0| +-----+-----------+-----------+-----------+ import org.apache.spark.sql.functions._ dataFrame.select("sex","salary") .groupBy("sex") .agg($"sex".alias("性別"),sum("salary").alias("總薪資"),avg("salary").alias("平均薪資")) .drop("sex") .sort($"總薪資".desc) .limit(2) .show() +-----+-------+-------+ | 性別| 總薪資 |平均薪資| +-----+-------+-------+ | true|35000.0|17500.0| |false|20000.0|10000.0| +-----+-------+-------+
更多請參考API:http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.Dataset
SQL 操做DataFrame
val dataFrame = spark.sparkContext.textFile("file:///D:/person.txt") .map(line => line.split(",")) .map(tokens=>(tokens(0).toInt,tokens(1),tokens(2).toBoolean,tokens(3).toInt,tokens(4).toFloat)) .toDF("id","name","sex","age","salary") //建立局部視圖 ,只能由當前SparkSession會話訪問 dataFrame.createOrReplaceTempView("t_user") spark.sql("select * from t_user where id=1 or name='lisi' order by salary desc limit 2").show() //建立全局視圖,能夠跨session訪問須要在前面添加global_temp spark.sql("select * from global_temp.t_user where id=1 or name='lisi' order by salary desc limit 2").show() //獲取指定列的值 記住必須導入 import spark.implicits._ spark.sql("select * from global_temp.t_user where id=1 or name='lisi' order by salary desc limit 2") .map(row => row.getAs[String]("name")) .foreach(name=>println("name:"+name)) //獲取多個值 默認 系統沒有提供對Map[String,Any]類型隱式轉換 implicit var e=Encoders.kryo[Map[String,Any]] spark.sql("select * from t_user where id=1 or name='lisi' order by salary desc limit 2") .map(row => row.getValuesMap(List("id","name","sex"))) .foreach(row => println(row))
用戶自定義聚合函數
1,蘋果,4.5,2,001 2,橘子,2.5,5,001 3,機械鍵盤,800,1,002 val dataFrame = spark.sparkContext.textFile("file:///D:/order.log") .map(line => line.split(",")) .map(tokens=>(tokens(0).toInt,tokens(1),tokens(2).toFloat,tokens(3).toInt,tokens(4))) .toDF("id","name","price","count","uid") dataFrame.createTempView("t_order") spark.sql("select uid,sum(price * count) cost from t_order group by uid").show()
自定義求和函數
Data Frame 聚合函數
import org.apache.spark.sql.Row import org.apache.spark.sql.expressions.{MutableAggregationBuffer, UserDefinedAggregateFunction} import org.apache.spark.sql.types._ class SumUserDefinedAggregateFunction extends UserDefinedAggregateFunction{ //聲明輸入字段 override def inputSchema: StructType = StructType(StructField("price",FloatType,true)::StructField("count",IntegerType,true)::Nil) //聲明聚合以後字段類型 override def bufferSchema: StructType = StructType(StructField("totalCost",FloatType,true)::Nil) //聚合後的數據類型 override def dataType: DataType = FloatType override def deterministic: Boolean = true //初始化zero值 override def initialize(buffer: MutableAggregationBuffer): Unit = { buffer(0)=0.0f } //局部累加求和 override def update(buffer: MutableAggregationBuffer, input: Row): Unit = { if(!input.isNullAt(0) && !input.isNullAt(1)){ buffer(0)=buffer.getFloat(0)+ (input.getAs[Float](0) * input.getAs[Int](1)) } } //合併局部結果,必須賦值給buffer1 override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = { buffer1(0)=buffer1.getFloat(0)+buffer2.getFloat(0) } override def evaluate(buffer: Row): Any = buffer.getFloat(0) } val dataFrame = spark.sparkContext.textFile("file:///D:/order.log") .map(line => line.split(",")) .map(tokens=>(tokens(0).toInt,tokens(1),tokens(2).toFloat,tokens(3).toInt,tokens(4))) .toDF("id","name","price","count","uid") dataFrame.createTempView("t_order") //註冊用戶自定義 聚合函數 spark.udf.register("mysum", new SumUserDefinedAggregateFunction()) spark.sql("select uid , mysum(price,count) totalCost from t_order group by uid").show()
Dataset 自定義聚合函數
import org.apache.spark.sql.{Encoder, Encoders} import org.apache.spark.sql.expressions.Aggregator case class MyAvgBuffer(var sum:Float,var count:Int) class MyAvgAggregator extends Aggregator[(Int,String,Boolean,Int,Float),MyAvgBuffer,Float]{ //初始化 buffer override def zero: MyAvgBuffer = new MyAvgBuffer(0.0f,0) //局部計算 override def reduce(b: MyAvgBuffer, a: (Int, String, Boolean, Int, Float)): MyAvgBuffer = { b.sum=b.sum + a._5 b.count += 1 return b } //彙總合併 override def merge(b1: MyAvgBuffer, b2: MyAvgBuffer): MyAvgBuffer = { b1.sum=b1.sum+b2.sum b1.count=b1.count+b2.count return b1 } override def finish(reduction: MyAvgBuffer): Float = { return reduction.sum/reduction.count } override def bufferEncoder: Encoder[MyAvgBuffer] = { return Encoders.kryo[MyAvgBuffer] } override def outputEncoder: Encoder[Float] = { return Encoders.scalaFloat } } val dataset = spark.sparkContext.textFile("file:///D:/person.txt") .map(line => line.split(",")) .map(tokens=>(tokens(0).toInt,tokens(1),tokens(2).toBoolean,tokens(3).toInt,tokens(4).toFloat)) .toDS() val aggregator = new MyAvgAggregator() val value = aggregator.toColumn.name("avgSalary") dataset.select(value).show()
數據的讀取和保存
使用場景---->數據遷移:將oracle中大量數據讀取出來再寫到mysql中,或者Hdfs中的數據遷移到mysql中
從MySQL中加載數據
val jdbcDF = spark.read .format("jdbc") .option("url", "jdbc:mysql://CentOS:3306/test") //訪問windows上mysql->localhost:3306/test .option("dbtable", "t_user") .option("user", "root") .option("password", "root") .load() jdbcDF.select("id","name","salary").show()
從本地磁盤讀取CSV格式數據
val frame = spark.read .option("header", "true") //設置表頭 .csv("D:/user.csv") frame.show()
寫入數據到MySQL
//建立sparkSession val spark = SparkSession .builder() //建立入口 .master("local[5]") //本地仿真 .appName("Spark SQL basic example") .getOrCreate() import spark.implicits._
//獲取數據 val personDF = spark.sparkContext.parallelize(Array("14 tom 1500", "15 jerry 20000", "16 kitty 26000")) .map(_.split(" ")) .map(p => (p(0).toInt, p(1).trim, p(2).toDouble)) .toDF("id","name","salary")
//創建鏈接數據庫 val props=new Properties() props.put("user", "root") props.put("password", "root")
//將數據寫入到指定數據庫表中 personDF.write.format("jdbc").mode(SaveMode.Append) .jdbc("jdbc:mysql://CentOS:3306/test","t_user",props)
生成json格式
val personDF = spark.sparkContext.parallelize(Array("14 tom 1500", "15 jerry 20000", "16 kitty 26000"),1) .map(_.split(" ")) .map(p => (p(0).toInt, p(1).trim, p(2).toDouble)) .toDF("id","name","salary")
personDF.write.format("json").mode(SaveMode.Overwrite) .save("D://userjson")
生成CSV格式
val personDF = spark.sparkContext.parallelize(Array("14 tom 1500", "15 jerry 20000", "16 kitty 26000"),1) .map(_.split(" ")) .map(p => (p(0).toInt, p(1).trim, p(2).toDouble)) .toDF("id","name","salary")
personDF.write.format("csv").mode(SaveMode.Overwrite) .option("header","true") .save("D://usercsv")
生成parquet文件
val personDF = spark.sparkContext.parallelize(Array("14 tom 1500", "15 jerry 20000", "16 kitty 26000"),1) .map(_.split(" ")) .map(p => (p(0).toInt, p(1).trim, p(2).toDouble)) .toDF("id","name","salary") personDF.write.mode(SaveMode.Overwrite) .parquet("file:///D:/parquet")
讀取parquet文件
val dataFrame = spark.read.parquet("file:///D:/parquet") dataFrame.show()
分區存儲
val frame: DataFrame = spark.sparkContext.textFile("D:/order.log") .map(_.split(",")) .map(x => (x(0).toInt, x(1), x(2).toDouble, x(3).toInt, x(4))) .toDF("id", "name","price","count","uid") frame.write.format("json").mode(SaveMode.Overwrite).partitionBy("uid").save("D:/res")
Spark Streaming
Spark Streaming是對Spark核心API一個拓展,使得使用Spark Streaming 可以實現對實時數據的可擴展、高吞吐、容錯實時在線處理(相似 Storm、Kafka Stream)。Spark Streaming數據能夠來自於消息隊列、日誌採集以及TCP socket數據源的數據,而且採集的數據能夠被一些複雜的算法處理例如高階函數(map、reduce、window),最後將處理後的數據寫入數據庫、文件系統或者是報表展現。
內部Spark Streaming工做原理是,首先Spark Streaming介紹實時數據流而後將數據分批次處理。這些批次的數據會依次交給Spark Engine處理產生新的批次數據。
Spark Streaming提供了一個高級抽象稱爲離散流或者DStream,表示一種連續的數據流。DStream能夠經過消息隊列、日誌採集以及TCP socket數據源的數據建立亦能夠經過一些高階函數(map、reduce、window)處理其餘DStream獲取。Spark Streaming 內部一個DStream就表明一些有序的RDD序列。
QuickExmaple
import org.apache.spark._ import org.apache.spark.streaming._ object QuickExample { def main(args: Array[String]): Unit = { //建立本地仿真 val conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount") var sc=new SparkContext(conf) sc.setLogLevel("FATAL") //關閉日誌打印 //建立sparkStreaming val ssc = new StreamingContext(sc, Seconds(1)) //建立sparkSession 用於建立DataFrame val spark=SparkSession.builder().master("local[5]").appName("xx").getOrCreate() import spark.implicits._ //鏈接linux9999的服務 val lines = ssc.socketTextStream("CentOS", 9999) //RDD操做進行數據統計處理 val words = lines.flatMap(_.split(" ")) val pair = words.map(word=>(word,1)) val wordCounts = pair.reduceByKey(_+_) //優化判斷寫入非空的數據-->將來能夠寫在Hdfs/磁盤-->關係型數據庫 wordCounts.foreachRDD(rdd=>{ if(rdd.count()>0){ val dataFrame = rdd.toDF("key","count") dataFrame.write.mode(SaveMode.Append).json("file:///D:/json") } }) //開始計算 ssc.start() ssc.awaitTermination() } }
在CentOS上先安裝nc組件yum install nc -y而後執行nc -lk 9999,而後在啓動main程序觀察控制檯輸出。
Discretized Streams
一個DStream表明一系列的RDDs,每一個DStream中RDD包含着某個時間間隔的數據。
任何對DStream的操做,底層Spark Streaming都會轉換爲對RDD的轉換。
Transformations on DStreams|RDD
Similar to that of RDDs, transformations allow the data from the input DStream to be modified. DStreams support many of the transformations available on normal Spark RDD’s. Some of the common ones are as follows.
Transformation Meaning
map(func) Return a new DStream by passing each element of the source DStream through a function func. flatMap(func) Similar to map, but each input item can be mapped to 0 or more output items. filter(func) Return a new DStream by selecting only the records of the source DStream on which func returns true. repartition(numPartitions) Changes the level of parallelism in this DStream by creating more or fewer partitions. union(otherStream) Return a new DStream that contains the union of the elements in the source DStream and otherDStream. count() Return a new DStream of single-element RDDs by counting the number of elements in each RDD of the source DStream. reduce(func) Return a new DStream of single-element RDDs by aggregating the elements in each RDD of the source DStream using a function func (which takes two arguments and returns one). The function should be associative and commutative so that it can be computed in parallel. countByValue() When called on a DStream of elements of type K, return a new DStream of (K, Long) pairs where the value of each key is its frequency in each RDD of the source DStream. reduceByKey(func, [numTasks]) When called on a DStream of (K, V) pairs, return a new DStream of (K, V) pairs where the values for each key are aggregated using the given reduce function. Note: By default, this uses Spark's default number of parallel tasks (2 for local mode, and in cluster mode the number is determined by the config property spark.default.parallelism) to do the grouping. You can pass an optional numTasks argument to set a different number of tasks. join(otherStream, [numTasks]) When called on two DStreams of (K, V) and (K, W) pairs, return a new DStream of (K, (V, W)) pairs with all pairs of elements for each key. cogroup(otherStream, [numTasks]) When called on a DStream of (K, V) and (K, W) pairs, return a new DStream of (K, Seq[V], Seq[W]) tuples. transform(func) Return a new DStream by applying a RDD-to-RDD function to every RDD of the source DStream. This can be used to do arbitrary RDD operations on the DStream. updateStateByKey(func) Return a new "state" DStream where the state for each key is updated by applying the given function on the previous state of the key and the new values for the key. This can be used to maintain arbitrary state data for each key.
UpdateStateByKey Operation
Spark如何作狀態計算?| 如何作到故障恢復的?
spark streaming有狀態計算(如UV)一般採用DStream.updateStateByKey(實際是PairDStreamFunctions加強到DStream的),具體實現網上講的不少。spark streaming是持續計算,有狀態時不能經過簡單的DAG/lineage容錯,因此必須設置checkpoint(不然Job啓動會報錯) checkpoint會持久化當批次RDD的快照、未完成的Task狀態等。SparkContext經過checkpoint能夠重建DStream,即便Driver宕機,重啓後仍可用SparkContext.getOrElse從checkpoint恢復以前的狀態。若是上游不丟數據(如kafka),那麼宕機重啓後原則上能夠實現續傳 事情彷佛是很完美,可是拿到實際環境中仍是會有問題 藉助於updateStateByKey以及checkpoint實現有狀態統計,同時通常會開啓Spark Streaming故障恢復功能借助於StreamingContext.getOrCreate(checkponit, creareFuncttion _) checkpoint保存的是什麼? - RDD計算過程當中的元數據 - RDD 數據自己 - 計算的Drive代碼程序 當用戶StreamingContext.getOrCreate(checkponit, creareFuncttion _)首先查看checkponit是否有數據,若是有數據就不會調用creareFuncttion,只是嘗試經過讀取checkponit下的數據,對任務作狀態恢復。
該操做可以使用戶維持計算的狀態數據,而且能夠持續更新。爲了實現狀態的更新,咱們須要作如下兩件事
Define the state - The state can be an arbitrary data type. 定義狀態-狀態能夠是任意的數據類型。
Define the state update function - Specify with a function how to update the state using the previous state and the new values from an input stream. 定義狀態更新函數-用函數指定如何使用先前狀態和輸入流中的新值更新狀態。
val updateFunction:(Seq[Int], Option[Int])=>Option[Int] = (newValues,runningCount)=>{ val newCount = runningCount.getOrElse(0)+newValues.sum Some(newCount) }
import org.apache.spark._ import org.apache.spark.streaming._
object QuickExample { def main(args: Array[String]): Unit = { var checkpoint="file:///D://checkpoint" def createStreamContext():StreamingContext={ val conf = new SparkConf() .setMaster("local[2]") .setAppName("NetworkWordCount") var sc=new SparkContext(conf) sc.setLogLevel("FATAL") val ssc = new StreamingContext(sc, Seconds(3)) ssc.checkpoint(checkpoint) ssc.socketTextStream("CentOS", 9999) .flatMap(.split(" ")) .map((,1)) .updateStateByKey(updateFunction) .checkpoint(Seconds(30)) //設置checkpoint存儲頻率,推薦batches的5~10倍 .print()
ssc } val ssc=StreamingContext.getOrCreate(checkpoint,createStreamContext _) //開始計算 ssc.start() ssc.awaitTermination() } val updateFunction:(Seq[Int], Option[Int])=>Option[Int] = (newValues,runningCount)=>{ val newCount = runningCount.getOrElse(0)+newValues.sum Some(newCount) }
}
CheckPoint說明
Window Operations
Spark Streaming also provides windowed computations, which allow you to apply transformations over a sliding window of data. The following figure illustrates this sliding window.、
Spark Streaming也提供窗口計算,這容許您在數據滑動窗口上應用轉換。下圖說明了這個滑動窗口。
Transformation Meaning
window(windowLength, slideInterval) Return a new DStream which is computed based on windowed batches of the source DStream. countByWindow(windowLength, slideInterval) Return a sliding window count of elements in the stream. reduceByWindow(func, windowLength, slideInterval) Return a new single-element stream, created by aggregating elements in the stream over a sliding interval using func. The function should be associative and commutative so that it can be computed correctly in parallel. reduceByKeyAndWindow(func, windowLength, slideInterval, [numTasks]) When called on a DStream of (K, V) pairs, returns a new DStream of (K, V) pairs where the values for each key are aggregated using the given reduce function func over batches in a sliding window. Note: By default, this uses Spark's default number of parallel tasks (2 for local mode, and in cluster mode the number is determined by the config property spark.default.parallelism) to do the grouping. You can pass an optional numTasks argument to set a different number of tasks. reduceByKeyAndWindow(func, invFunc, windowLength, slideInterval, [numTasks]) A more efficient version of the above reduceByKeyAndWindow() where the reduce value of each window is calculated incrementally using the reduce values of the previous window. This is done by reducing the new data that enters the sliding window, and 「inverse reducing」 the old data that leaves the window. An example would be that of 「adding」 and 「subtracting」 counts of keys as the window slides. However, it is applicable only to 「invertible reduce functions」, that is, those reduce functions which have a corresponding 「inverse reduce」 function (taken as parameter invFunc). Like in reduceByKeyAndWindow, the number of reduce tasks is configurable through an optional argument. Note that checkpointing must be enabled for using this operation. countByValueAndWindow(windowLength,slideInterval, [numTasks]) When called on a DStream of (K, V) pairs, returns a new DStream of (K, Long) pairs where the value of each key is its frequency within a sliding window. Like in reduceByKeyAndWindow, the number of reduce tasks is configurable through an optional argument.
Output Operations on DStreams
Output operations allow DStream’s data to be pushed out to external systems like a database or a file systems. Since the output operations actually allow the transformed data to be consumed by external systems, they trigger the actual execution of all the DStream transformations (similar to actions for RDDs). Currently, the following output operations are defined:
Output Operation Meaning
print() Prints the first ten elements of every batch of data in a DStream on the driver node running the streaming application. This is useful for development and debugging. Python API This is called pprint() in the Python API. saveAsTextFiles(prefix, [suffix]) Save this DStream's contents as text files. The file name at each batch interval is generated based on prefix and suffix: "prefix-TIME_IN_MS[.suffix]". saveAsObjectFiles(prefix, [suffix]) Save this DStream's contents as SequenceFiles of serialized Java objects. The file name at each batch interval is generated based on prefix and suffix: "prefix-TIME_IN_MS[.suffix]". Python API This is not available in the Python API. saveAsHadoopFiles(prefix, [suffix]) Save this DStream's contents as Hadoop files. The file name at each batch interval is generated based on prefix and suffix: "prefix-TIME_IN_MS[.suffix]". Python API This is not available in the Python API. foreachRDD(func) The most generic output operator that applies a function, func, to each RDD generated from the stream. This function should push the data in each RDD to an external system, such as saving the RDD to files, or writing it over the network to a database. Note that the function func is executed in the driver process running the streaming application, and will usually have RDD actions in it that will force the computation of the streaming RDDs.
Input DStreams and Receivers
每個SparkStreaming程序都必須有一個Input DStreams,每個Input DStream都會和一個Receiver對象進行關聯,由Receiver負責將數據存儲到Spark的內存中,用於後續處理。、
Spark目前提供兩種Input DStreams:
FileStreams
streamingContext.textFileStream(dataDirectory)
或者
streamingContext.fileStream[KeyClass, ValueClass, InputFormatClass](dataDirectory)
Custom Receivers
import java.io.{BufferedReader, InputStreamReader} import java.net.Socket import java.nio.charset.StandardCharsets import org.apache.spark.internal.Logging import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming.receiver.Receiver class CustomReceiver(host: String, port: Int) extends Receiver[String](StorageLevel.MEMORY_AND_DISK_2) with Logging { def onStart() { // Start the thread that receives data over a connection new Thread("Socket Receiver") { override def run() { receive() } }.start() } def onStop() { // There is nothing much to do as the thread calling receive() // is designed to stop by itself if isStopped() returns false } /** Create a socket connection and receive data until receiver is stopped */ private def receive() { var socket: Socket = null var userInput: String = null try { // Connect to host:port socket = new Socket(host, port) // Until stopped or connection broken continue reading val reader = new BufferedReader( new InputStreamReader(socket.getInputStream(), StandardCharsets.UTF_8)) userInput = reader.readLine() while(!isStopped && userInput != null) { store(userInput) userInput = reader.readLine() } reader.close() socket.close() // Restart in an attempt to connect again when server is active again restart("Trying to connect again") } catch { case e: java.net.ConnectException => // restart if could not connect to server restart("Error connecting to " + host + ":" + port, e) case t: Throwable => // restart if there is any other error restart("Error receiving data", t) } } } import org.apache.spark._ import org.apache.spark.streaming._ object QuickExample { def main(args: Array[String]): Unit = { val conf = new SparkConf() .setMaster("local[2]") .setAppName("NetworkWordCount") conf.set("spark.io.compression.codec","lz4") var sc=new SparkContext(conf) sc.setLogLevel("FATAL") var ssc= new StreamingContext(sc,Seconds(1)) ssc.receiverStream(new CustomReceiver("CentOS",9999)).flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).print() //開始計算 ssc.start() ssc.awaitTermination() } }
Spark Streaming集成Kafka
參考:http://spark.apache.org/docs/latest/streaming-kafka-integration.html
http://spark.apache.org/docs/latest/streaming-kafka-0-10-integration.html
import org.apache.kafka.clients.consumer.ConsumerConfig._ import org.apache.kafka.common.serialization.StringDeserializer import org.apache.spark.streaming.{Seconds, StreamingContext} import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.streaming.kafka010._ import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe object KafkaStreamingDemo { def main(args: Array[String]): Unit = { var checkpoint="file:///D://checkpoint" def createStreamContext():StreamingContext={ val conf = new SparkConf() .setMaster("local[2]") .setAppName("NetworkWordCount") var sc=new SparkContext(conf) sc.setLogLevel("FATAL") val ssc = new StreamingContext(sc, Seconds(3)) ssc.checkpoint(checkpoint) val kafkaParams = Map[String, Object]( BOOTSTRAP_SERVERS_CONFIG -> "CentOS:9092,CentOS:9093,CentOS:9094", KEY_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer], VALUE_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer], GROUP_ID_CONFIG -> "g1", AUTO_OFFSET_RESET_CONFIG -> "latest", ENABLE_AUTO_COMMIT_CONFIG -> (false: java.lang.Boolean) ) val topics = Array("topic01") val stream = KafkaUtils.createDirectStream(ssc,PreferConsistent,Subscribe[String, String](topics, kafkaParams)) stream.flatMap(record =>record.value().split(" ")) .map((_,1)) .updateStateByKey(updateFunction) .checkpoint(Seconds(30)) .reduceByKey(_+_).print() ssc } val ssc= StreamingContext.getOrCreate(checkpoint,createStreamContext _) ssc.start() ssc.awaitTermination() } val updateFunction:(Seq[Int], Option[Int])=>Option[Int] = (newValues,runningCount)=>{ val newCount = runningCount.getOrElse(0)+newValues.sum Some(newCount) } }
解決一個jar包衝突
<dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>0.10.0.1</version> <exclusions> <exclusion> <groupId>net.jpountz.lz4</groupId> <artifactId>lz4</artifactId> </exclusion> </exclusions> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming-kafka-0-10_2.11</artifactId> <version>2.3.2</version> <exclusions> <exclusion> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> </exclusion> </exclusions> </dependency>
由於net.jpountz.lz4和Spark自帶包衝突。
Spark Streaming 集成Flume
參考:http://spark.apache.org/docs/latest/streaming-flume-integration.html
import org.apache.spark.streaming.flume.FlumeUtils import org.apache.spark.streaming.{Seconds, StreamingContext} import org.apache.spark.{SparkConf, SparkContext} object FlumeStreamingDemo { def main(args: Array[String]): Unit = { var checkpoint="file:///D://checkpoint" def createStreamContext():StreamingContext={ val conf = new SparkConf() .setMaster("local[2]") .setAppName("NetworkWordCount") var sc=new SparkContext(conf) sc.setLogLevel("FATAL") val ssc = new StreamingContext(sc, Seconds(3)) ssc.checkpoint(checkpoint) val topics = Array("topic01") val stream = FlumeUtils.createStream(ssc, "localhost", 44444) stream.map(event => new String(event.event.getBody.array())) .flatMap(lines=> lines.split(" ")) .map((_,1)) .updateStateByKey(updateFunction) .checkpoint(Seconds(30)) .reduceByKey(_+_).print() ssc } val ssc= StreamingContext.getOrCreate(checkpoint,createStreamContext _) ssc.start() ssc.awaitTermination() } val updateFunction:(Seq[Int], Option[Int])=>Option[Int] = (newValues,runningCount)=>{ val newCount = runningCount.getOrElse(0)+newValues.sum Some(newCount) } } <!--直接對接flume--> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming-flume_2.11</artifactId> <version>${spark.version}</version> </dependency>