spark

 

 

 Spark 基礎html

Spark入門java

 

1. 課程目標..................................................................................................................... 2node

1.1. 目標1:熟悉Spark相關概念............................................................................. 2mysql

1.2. 目標2:搭建Spark集羣.................................................................................... 2算法

1.3. 目標3:編寫簡單的Spark應用程序.................................................................. 2sql

2. Spark概述.................................................................................................................... 2shell

2.1. 什麼是Spark(官網:http://spark.apache.org)................................................. 2數據庫

2.2. 爲何要學Spark............................................................................................... 2apache

2.3. Spark特色........................................................................................................... 3編程

2.3.1. 快............................................................................................................ 3

2.3.2. 易用........................................................................................................ 3

2.3.3. 通用........................................................................................................ 4

2.3.4. 兼容性..................................................................................................... 4

3. Spark集羣安裝............................................................................................................. 4

3.1. 安裝.................................................................................................................. 4

3.1.1. 機器部署................................................................................................. 4

3.1.2. 下載Spark安裝包.................................................................................... 5

3.1.3. 配置Spark................................................................................................ 5

4. 執行Spark程序........................................................................................................... 6

4.1. 執行第一個spark程序....................................................................................... 6

4.2. 啓動Spark Shell.................................................................................................. 7

4.2.1. 啓動spark shell........................................................................................ 7

4.2.2. 在spark shell中編寫WordCount程序....................................................... 7

4.3. 在IDEA中編寫WordCount程序......................................................................... 8

 

1.  課程目標

1.1. 目標1:熟悉Spark相關概念

1.2. 目標2:搭建Spark集羣

1.3. 目標3:編寫簡單的Spark應用程序

2.  Spark概述

2.1. 什麼是Spark(官網:http://spark.apache.org

 

 

Spark是一種快速、通用、可擴展的大數據分析引擎,2009年誕生於加州大學伯克利分校AMPLab,2010年開源,2013年6月成爲Apache孵化項目,2014年2月成爲Apache頂級項目。目前,Spark生態系統已經發展成爲一個包含多個子項目的集合,其中包含SparkSQL、Spark Streaming、GraphX、MLlib等子項目,Spark是基於內存計算的大數據並行計算框架。Spark基於內存計算,提升了在大數據環境下數據處理的實時性,同時保證了高容錯性和高可伸縮性,容許用戶將Spark部署在大量廉價硬件之上,造成集羣。Spark獲得了衆多大數據公司的支持,這些公司包括Hortonworks、IBM、Intel、Cloudera、MapR、Pivotal、百度、阿里、騰訊、京東、攜程、優酷土豆。當前百度的Spark已應用於鳳巢、大搜索、直達號、百度大數據等業務;阿里利用GraphX構建了大規模的圖計算和圖挖掘系統,實現了不少生產系統的推薦算法;騰訊Spark集羣達到8000臺的規模,是當前已知的世界上最大的Spark集羣。

2.2. 爲何要學Spark

中間結果輸出:基於MapReduce的計算引擎一般會將中間結果輸出到磁盤上,進行存儲和容錯。出於任務管道承接的,考慮,當一些查詢翻譯到MapReduce任務時,每每會產生多個Stage,而這些串聯的Stage又依賴於底層文件系統(如HDFS)來存儲每個Stage的輸出結果

Hadoop

Spark

   

Spark是MapReduce的替代方案,並且兼容HDFS、Hive,可融入Hadoop的生態系統,以彌補MapReduce的不足。

2.3. Spark特色

2.3.1.   快

與Hadoop的MapReduce相比,Spark基於內存的運算要快100倍以上,基於硬盤的運算也要快10倍以上。Spark實現了高效的DAG執行引擎,能夠經過基於內存來高效處理數據流。

 

 

 

 

2.3.2.   易用

Spark支持Java、Python和Scala的API,還支持超過80種高級算法,使用戶能夠快速構建不一樣的應用。並且Spark支持交互式的Python和Scala的shell,能夠很是方便地在這些shell中使用Spark集羣來驗證解決問題的方法。

 

 

2.3.3.   通用

Spark提供了統一的解決方案。Spark能夠用於批處理、交互式查詢(Spark SQL)、實時流處理(Spark Streaming)、機器學習(Spark MLlib)和圖計算(GraphX)。這些不一樣類型的處理均可以在同一個應用中無縫使用。Spark統一的解決方案很是具備吸引力,畢竟任何公司都想用統一的平臺去處理遇到的問題,減小開發和維護的人力成本和部署平臺的物力成本。

2.3.4.   兼容性

Spark能夠很是方便地與其餘的開源產品進行融合。好比,Spark可使用Hadoop的YARN和Apache Mesos做爲它的資源管理和調度器,器,而且能夠處理全部Hadoop支持的數據,包括HDFS、HBase和Cassandra等。這對於已經部署Hadoop集羣的用戶特別重要,由於不須要作任何數據遷移就可使用Spark的強大處理能力。Spark也能夠不依賴於第三方的資源管理和調度器,它實現了Standalone做爲其內置的資源管理和調度框架,這樣進一步下降了Spark的使用門檻,使得全部人均可以很是容易地部署和使用Spark。此外,Spark還提供了在EC2上部署Standalone的Spark集羣的工具。

 

 

3.  Spark集羣安裝

3.1. 安裝

3.1.1.   機器部署

準備兩臺以上Linux服務器,安裝好JDK1.7

3.1.2.   下載Spark安裝包

 

 

 

http://www.apache.org/dyn/closer.lua/spark/spark-1.5.2/spark-1.5.2-bin-hadoop2.6.tgz

上傳解壓安裝包

上傳spark-1.5.2-bin-hadoop2.6.tgz安裝包到Linux上

解壓安裝包到指定位置

tar -zxvf spark-1.5.2-bin-hadoop2.6.tgz -C /usr/local

3.1.3.   配置Spark

進入到Spark安裝目錄

cd /usr/local/spark-1.5.2-bin-hadoop2.6

進入conf目錄並重命名並修改spark-env.sh.template文件

cd conf/

mv spark-env.sh.template spark-env.sh

vi spark-env.sh

在該配置文件中添加以下配置

export JAVA_HOME=/usr/java/jdk1.7.0_45

export SPARK_MASTER_IP=node1.itcast.cn

export SPARK_MASTER_PORT=7077

保存退出

重命名並修改slaves.template文件

mv slaves.template slaves

vi slaves

在該文件中添加子節點所在的位置(Worker節點)

node2.itcast.cn

node3.itcast.cn

node4.itcast.cn

保存退出

將配置好的Spark拷貝到其餘節點上

scp -r spark-1.5.2-bin-hadoop2.6/ node2.itcast.cn:/usr/local/

scp -r spark-1.5.2-bin-hadoop2.6/ node3.itcast.cn:/usr/local/

scp -r spark-1.5.2-bin-hadoop2.6/ node4.itcast.cn:/usr/local/

 

Spark集羣配置完畢,目前是1個Master,3個Work,在node1.itcast.cn上啓動Spark集羣

/usr/local/spark-1.5.2-bin-hadoop2.6/sbin/start-all.sh

 

啓動後執行jps命令,主節點上有Master進程,其餘子節點上有Work進行,登陸Spark管理界面查看集羣狀態(主節點):http://node1.itcast.cn:8080/

 

 

到此爲止,Spark集羣安裝完畢,可是有一個很大的問題,那就是Master節點存在單點故障,要解決此問題,就要藉助zookeeper,而且啓動至少兩個Master節點來實現高可靠,配置方式比較簡單:

Spark集羣規劃:node1,node2是Master;node3,node4,node5是Worker

安裝配置zk集羣,並啓動zk集羣

中止spark全部服務,修改配置文件spark-env.sh,在該配置文件中刪掉SPARK_MASTER_IP並添加以下配置

export SPARK_DAEMON_JAVA_OPTS="-Dspark.deploy.recoveryMode=ZOOKEEPER -Dspark.deploy.zookeeper.url=zk1,zk2,zk3 -Dspark.deploy.zookeeper.dir=/spark"

1.在node1節點上修改slaves配置文件內容指定worker節點

2.在node1上執行sbin/start-all.sh腳本,而後在node2上執行sbin/start-master.sh啓動第二個Master

4.  執行Spark程序

4.1. 執行第一個spark程序

/usr/local/spark-1.5.2-bin-hadoop2.6/bin/spark-submit \

--class org.apache.spark.examples.SparkPi \

--master spark://node1.itcast.cn:7077 \

--executor-memory 1G \

--total-executor-cores 2 \

/usr/local/spark-1.5.2-bin-hadoop2.6/lib/spark-examples-1.5.2-hadoop2.6.0.jar \

100

該算法是利用蒙特·卡羅算法求PI

4.2. 啓動Spark Shell

spark-shell是Spark自帶的交互式Shell程序,方便用戶進行交互式編程,用戶能夠在該命令行下用scala編寫spark程序。

4.2.1.   啓動spark shell

/usr/local/spark-1.5.2-bin-hadoop2.6/bin/spark-shell \

--master spark://node1.itcast.cn:7077 \

--executor-memory 2g \

--total-executor-cores 2

 

參數說明:

--master spark://node1.itcast.cn:7077 指定Master的地址

--executor-memory 2g 指定每一個worker可用內存爲2G

--total-executor-cores 2 指定整個集羣使用的cup核數爲2個

 

注意:

若是啓動spark shell時沒有指定master地址,可是也能夠正常啓動spark shell和執行spark shell中的程序,實際上是啓動了spark的local模式,該模式僅在本機啓動一個進程,沒有與集羣創建聯繫。

 

Spark Shell中已經默認將SparkContext類初始化爲對象sc。用戶代碼若是須要用到,則直接應用sc便可

4.2.2.   在spark shell中編寫WordCount程序

1.首先啓動hdfs

2.向hdfs上傳一個文件到hdfs://node1.itcast.cn:9000/words.txt

3.在spark shell中用scala語言編寫spark程序

sc.textFile("hdfs://node1.itcast.cn:9000/words.txt").flatMap(_.split(" "))

.map((_,1)).reduceByKey(_+_).saveAsTextFile("hdfs://node1.itcast.cn:9000/out")

 

4.使用hdfs命令查看結果

hdfs dfs -ls hdfs://node1.itcast.cn:9000/out/p*

 

說明:

sc是SparkContext對象,該對象時提交spark程序的入口

textFile(hdfs://node1.itcast.cn:9000/words.txt)是hdfs中讀取數據

flatMap(_.split(" "))先map在壓平

map((_,1))將單詞和1構成元組

reduceByKey(_+_)按照key進行reduce,並將value累加

saveAsTextFile("hdfs://node1.itcast.cn:9000/out")將結果寫入到hdfs中

4.3. 在IDEA中編寫WordCount程序

spark shell僅在測試和驗證咱們的程序時使用的較多,在生產環境中,一般會在IDE中編制程序,而後打成jar包,而後提交到集羣,最經常使用的是建立一個Maven項目,利用Maven來管理jar包的依賴。

 

1.建立一個項目

 

 

 

2.選擇Maven項目,而後點擊next

 

 

3.填寫maven的GAV,而後點擊next

 

 

4.填寫項目名稱,而後點擊finish

 

 

5.建立好maven項目後,點擊Enable Auto-Import

 

 

6.配置Maven的pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
        
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
        
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>cn.itcast.spark</groupId>
    <artifactId>spark-mvn</artifactId>
    <version>1.0-SNAPSHOT</version>

    <properties>
        <maven.compiler.source>1.7</maven.compiler.source>
        <maven.compiler.target>1.7</maven.compiler.target>
        <encoding>UTF-8</encoding>
        <scala.version>2.10.6</scala.version>
        <scala.compat.version>2.10</scala.compat.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.scala-lang</groupId>
            <artifactId>scala-library</artifactId>
            <version>${scala.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.10</artifactId>
            <version>1.5.2</version>
        </dependency>

        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming_2.10</artifactId>
            <version>1.5.2</version>
        </dependency>

        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-client</artifactId>
            <version>2.6.2</version>
        </dependency>
    </dependencies>

    <build>
        <sourceDirectory>src/main/scala</sourceDirectory>
        <testSourceDirectory>src/test/scala</testSourceDirectory>
        <plugins>
            <plugin>
                <groupId>net.alchim31.maven</groupId>
                <artifactId>scala-maven-plugin</artifactId>
                <version>3.2.0</version>
                <executions>
                    <execution>
                        <goals>
                            <goal>compile</goal>
                            <goal>testCompile</goal>
                        </goals>
                        <configuration>
                            <args>
                                <arg>-make:transitive</arg>
                                <arg>-dependencyfile</arg>
                                <arg>${project.build.directory}/.scala_dependencies</arg>
                            </args>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-surefire-plugin</artifactId>
                <version>2.18.1</version>
                <configuration>
                    <useFile>false</useFile>
                    <disableXmlReport>true</disableXmlReport>
                    <includes>
                        <include>**/*Test.*</include>
                        <include>**/*Suite.*</include>
                    </includes>
                </configuration>
            </plugin>

            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-shade-plugin</artifactId>
                <version>2.3</version>
                <executions>
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>shade</goal>
                        </goals>
                        <configuration>
                            <filters>
                                <filter>
                                    <artifact>*:*</artifact>
                                    <excludes>
                                        <exclude>META-INF/*.SF</exclude>
                                        <exclude>META-INF/*.DSA</exclude>
                                        <exclude>META-INF/*.RSA</exclude>
                                    </excludes>
                                </filter>
                            </filters>
                            <transformers>
                                <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                                    <mainClass>cn.itcast.spark.WordCount</mainClass>
                                </transformer>
                            </transformers>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>
</project>

 

7.將src/main/java和src/test/java分別修改爲src/main/scala和src/test/scala,與pom.xml中的配置保持一致

 

 

 

 

8.新建一個scala class,類型爲Object

 

 

9.編寫spark程序

package cn.itcast.spark

import org.apache.spark.{SparkContext, SparkConf}

object WordCount {
  def main(args: Array[String]) {
    //建立SparkConf()並設置App名稱
   
val conf = new SparkConf().setAppName("WC")
    //建立SparkContext,該對象是提交spark App的入口
   
val sc = new SparkContext(conf)
    //使用sc建立RDD並執行相應的transformation和action
   
sc.textFile(args(0)).flatMap(_.split(" ")).map((_, 1)).reduceByKey(_+_, 1).sortBy(_._2, false).saveAsTextFile(args(1))
    //中止sc,結束該任務
   
sc.stop()
  }
}

 

10.使用Maven打包:首先修改pom.xml中的main class

 

 

點擊idea右側的Maven Project選項

 

 

點擊Lifecycle,選擇clean和package,而後點擊Run Maven Build

 

 

11.選擇編譯成功的jar包,並將該jar上傳到Spark集羣中的某個節點上

 

 

12.首先啓動hdfs和Spark集羣

啓動hdfs

/usr/local/hadoop-2.6.1/sbin/start-dfs.sh

啓動spark

/usr/local/spark-1.5.2-bin-hadoop2.6/sbin/start-all.sh

 

13.使用spark-submit命令提交Spark應用(注意參數的順序)

/usr/local/spark-1.5.2-bin-hadoop2.6/bin/spark-submit \

--class cn.itcast.spark.WordCount \

--master spark://node1.itcast.cn:7077 \

--executor-memory 2G \

--total-executor-cores 4 \

/root/spark-mvn-1.0-SNAPSHOT.jar \

hdfs://node1.itcast.cn:9000/words.txt \

hdfs://node1.itcast.cn:9000/out

 

查看程序執行結果

hdfs dfs -cat hdfs://node1.itcast.cn:9000/out/part-00000

(hello,6)

(tom,3)

(kitty,2)

(jerry,1)

 

Spark RDD

Spark計算模型

1.  課程目標

1.1. 熟練使用RDD的算子完成計算

1.2. 掌握RDD的原理

2.  彈性分佈式數據集RDD

2.1. RDD概述

2.1.1.   什麼是RDD

RDD(Resilient Distributed Dataset)叫作分佈式數據集,是Spark中最基本的數據抽象,它表明一個不可變、可分區、裏面的元素可並行計算的集合。RDD具備數據流模型的特色:自動容錯、位置感知性調度和可伸縮性。RDD容許用戶在執行多個查詢時顯式地將工做集緩存在內存中,後續的查詢可以重用工做集,這極大地提高了查詢速度。

2.1.2.   RDD的屬性

 

 

1)一組分片(Partition),即數據集的基本組成單位。對於RDD來講,每一個分片都會被一個計算任務處理,並決定並行計算的粒度。用戶能夠在建立RDD時指定RDD的分片個數,若是沒有指定,那麼就會採用默認值。默認值就是程序所分配到的CPU Core的數目。

 

2)一個計算每一個分區的函數。Spark中RDD的計算是以分片爲單位的,每一個RDD都會實現compute函數以達到這個目的。compute函數會對迭代器進行復合,不須要保存每次計算的結果。

 

3)RDD之間的依賴關係。RDD的每次轉換都會生成一個新的RDD,因此RDD之間就會造成相似於流水線同樣的先後依賴關係。在部分分區數據丟失時,Spark能夠經過這個依賴關係從新計算丟失的分區數據,而不是對RDD的全部分區進行從新計算。

 

4)一個Partitioner,即RDD的分片函數。當前Spark中實現了兩種類型的分片函數,一個是基於哈希的HashPartitioner,另一個是基於範圍的RangePartitioner。只有對於於key-value的RDD,纔會有Partitioner,非key-value的RDD的Parititioner的值是None。Partitioner函數不但決定了RDD自己的分片數量,也決定了parent RDD Shuffle輸出時的分片數量。

 

5)一個列表,存儲存取每一個Partition的優先位置(preferred location)。對於一個HDFS文件來講,這個列表保存的就是每一個Partition所在的塊的位置。按照「移動數據不如移動計算」的理念,Spark在進行任務調度的時候,會盡量地將計算任務分配到其所要處理數據塊的存儲位置。

2.2. 建立RDD

1)由一個已經存在的Scala集合建立。

val rdd1 = sc.parallelize(Array(1,2,3,4,5,6,7,8))

 

2)由外部存儲系統的數據集建立,包括本地的文件系統,還有全部Hadoop支持的數據集,好比HDFS、Cassandra、HBase等

val rdd2 = sc.textFile("hdfs://node1.itcast.cn:9000/words.txt")

2.3. RDD編程API

2.3.1.   Transformation

RDD中的全部轉換都是延遲加載的,也就是說,它們並不會直接計算結果。相反的,它們只是記住這些應用到基礎數據集(例如一個文件)上的轉換動做。只有當發生一個要求返回結果給Driver的動做時,這些轉換纔會真正運行。這種設計讓Spark更加有效率地運行。

 

經常使用的Transformation:

轉換

含義

map(func)

返回一個新的RDD,該RDD由每個輸入元素通過func函數轉換後組成

filter(func)

返回一個新的RDD,該RDD由通過func函數計算後返回值爲true的輸入元素組成

flatMap(func)

相似於map,可是每個輸入元素能夠被映射爲0或多個輸出元素(因此func應該返回一個序列,而不是單一元素)

mapPartitions(func)

相似於map,但獨立地在RDD的每個分片上運行,所以在類型爲T的RDD上運行時,func的函數類型必須是Iterator[T] => Iterator[U]

mapPartitionsWithIndex(func)

相似於mapPartitions,但func帶有一個整數參數表示分片的索引值,所以在類型爲T的RDD上運行時,func的函數類型必須是

(Int, Interator[T]) => Iterator[U]

sample(withReplacement, fraction, seed)

根據fraction指定的比例對數據進行採樣,能夠選擇是否使用隨機數進行替換,seed用於指定隨機數生成器種子

union(otherDataset)

對源RDD和參數RDD求並集後返回一個新的RDD

intersection(otherDataset)

對源RDD和參數RDD求交集後返回一個新的RDD

distinct([numTasks]))

對源RDD進行去重後返回一個新的RDD

groupByKey([numTasks]) 

在一個(K,V)的RDD上調用,返回一個(K, Iterator[V])的RDD

reduceByKey(func, [numTasks])

在一個(K,V)的RDD上調用,返回一個(K,V)的RDD,使用指定的reduce函數,將相同key的值聚合到一塊兒,與groupByKey相似,reduce任務的個數能夠經過第二個可選的參數來設置

aggregateByKey(zeroValue)(seqOp, combOp, [numTasks])

 

sortByKey([ascending], [numTasks])

在一個(K,V)的RDD上調用,K必須實現Ordered接口,返回一個按照key進行排序的(K,V)的RDD

sortBy(func,[ascending], [numTasks])

與sortByKey相似,可是更靈活

join(otherDataset, [numTasks])

在類型爲(K,V)和(K,W)的RDD上調用,返回一個相同key對應的全部元素對在一塊兒的(K,(V,W))的RDD

cogroup(otherDataset, [numTasks])

在類型爲(K,V)和(K,W)的RDD上調用,返回一個(K,(Iterable<V>,Iterable<W>))類型的RDD

cartesian(otherDataset)

笛卡爾積

pipe(command, [envVars])

 

coalesce(numPartitions)  

 

repartition(numPartitions)

 

repartitionAndSortWithinPartitions(partitioner)

 

2.3.2.   Action

動做

含義

reduce(func)

經過func函數彙集RDD中的全部元素,這個功能必須是課交換且可並聯的

collect()

在驅動程序中,以數組的形式返回數據集的全部元素

count()

返回RDD的元素個數

first()

返回RDD的第一個元素(相似於take(1))

take(n)

返回一個由數據集的前n個元素組成的數組

takeSample(withReplacement,num, [seed])

返回一個數組,該數組由從數據集中隨機採樣的num個元素組成,能夠選擇是否用隨機數替換不足的部分,seed用於指定隨機數生成器種子

takeOrdered(n[ordering])

 

saveAsTextFile(path)

將數據集的元素以textfile的形式保存到HDFS文件系統或者其餘支持的文件系統,對於每一個元素,Spark將會調用toString方法,將它裝換爲文件中的文本

saveAsSequenceFile(path

將數據集中的元素以Hadoop sequencefile的格式保存到指定的目錄下,可使HDFS或者其餘Hadoop支持的文件系統。

saveAsObjectFile(path

 

countByKey()

針對(K,V)類型的RDD,返回一個(K,Int)的map,表示每個key對應的元素個數。

foreach(func)

在數據集的每個元素上,運行函數func進行更新。

2.3.3.   WordCount中的RDD

 

 

2.3.4.   練習

啓動spark-shell

/usr/local/spark-1.5.2-bin-hadoop2.6/bin/spark-shell --master spark://node1.itcast.cn:7077

 

練習1:

//經過並行化生成rdd

val rdd1 = sc.parallelize(List(5, 6, 4, 7, 3, 8, 2, 9, 1, 10))

//對rdd1裏的每個元素乘2而後排序

val rdd2 = rdd1.map(_ * 2).sortBy(x => x, true)

//過濾出大於等於十的元素

val rdd3 = rdd2.filter(_ >= 10)

//將元素以數組的方式在客戶端顯示

rdd3.collect

 

練習2:

val rdd1 = sc.parallelize(Array("a b c", "d e f", "h i j"))

//將rdd1裏面的每個元素先切分在壓平

val rdd2 = rdd1.flatMap(_.split(' '))

rdd2.collect

 

練習3:

val rdd1 = sc.parallelize(List(5, 6, 4, 3))

val rdd2 = sc.parallelize(List(1, 2, 3, 4))

//求並集

val rdd3 = rdd1.union(rdd2)

//求交集

val rdd4 = rdd1.intersection(rdd2)

//去重

rdd3.distinct.collect

rdd4.collect

 

練習4:

val rdd1 = sc.parallelize(List(("tom", 1), ("jerry", 3), ("kitty", 2)))

val rdd2 = sc.parallelize(List(("jerry", 2), ("tom", 1), ("shuke", 2)))

//求jion

val rdd3 = rdd1.join(rdd2)

rdd3.collect

//求並集

val rdd4 = rdd1 union rdd2

//按key進行分組

rdd4.groupByKey

rdd4.collect

 

練習5:

val rdd1 = sc.parallelize(List(("tom", 1), ("tom", 2), ("jerry", 3), ("kitty", 2)))

val rdd2 = sc.parallelize(List(("jerry", 2), ("tom", 1), ("shuke", 2)))

//cogroup

val rdd3 = rdd1.cogroup(rdd2)

//注意cogroup與groupByKey的區別

rdd3.collect

 

練習6:

val rdd1 = sc.parallelize(List(1, 2, 3, 4, 5))

//reduce聚合

val rdd2 = rdd1.reduce(_ + _)

rdd2.collect

 

練習7:

val rdd1 = sc.parallelize(List(("tom", 1), ("jerry", 3), ("kitty", 2),  ("shuke", 1)))

val rdd2 = sc.parallelize(List(("jerry", 2), ("tom", 3), ("shuke", 2), ("kitty", 5)))

val rdd3 = rdd1.union(rdd2)

//按key進行聚合

val rdd4 = rdd3.reduceByKey(_ + _)

rdd4.collect

//按value的降序排序

val rdd5 = rdd4.map(t => (t._2, t._1)).sortByKey(false).map(t => (t._2, t._1))

rdd5.collect

 

//想要了解更多,訪問下面的地址

http://homepage.cs.latrobe.edu.au/zhe/ZhenHeSparkRDDAPIExamples.html

2.4. RDD的依賴關係

RDD和它依賴的父RDD(s)的關係有兩種不一樣的類型,即窄依賴(narrow dependency)和寬依賴(wide dependency)。

 

 

2.4.1.   窄依賴

窄依賴指的是每個父RDD的Partition最多被子RDD的一個Partition使用

總結:窄依賴咱們形象的比喻爲獨生子女

2.4.2.   寬依賴

寬依賴指的是多個子RDD的Partition會依賴同一個父RDD的Partition

總結:窄依賴咱們形象的比喻爲超生

2.4.3.   Lineage

RDD只支持粗粒度轉換,即在大量記錄上執行的單個操做。將建立RDD的一系列Lineage(即血統)記錄下來,以便恢復丟失的分區。RDD的Lineage會記錄RDD的元數據信息和轉換行爲,當該RDD的部分分區數據丟失時,它能夠根據這些信息來從新運算和恢復丟失的數據分區。

 

2.5. RDD的緩存

Spark速度很是快的緣由之一,就是在不一樣操做中能夠在內存中持久化或緩存個數據集。當持久化某個RDD後,每個節點都將把計算的分片結果保存在內存中,並在對此RDD或衍生出的RDD進行的其餘動做中重用。這使得後續的動做變得更加迅速。RDD相關的持久化和緩存,是Spark最重要的特徵之一。能夠說,緩存是Spark構建迭代式算法和快速交互式查詢的關鍵。

2.5.1.   RDD緩存方式

RDD經過persist方法或cache方法能夠將前面的計算結果緩存,可是並非這兩個方法被調用時當即緩存,而是觸發後面的action時,該RDD將會被緩存在計算節點的內存中,並供後面重用。

 

 

經過查看源碼發現cache最終也是調用了persist方法,默認的存儲級別都是僅在內存存儲一份,Spark的存儲級別還有好多種,存儲級別在object StorageLevel中定義的。

 

 

 

緩存有可能丟失,或者存儲存儲於內存的數據因爲內存不足而被刪除,RDD的緩存容錯機制保證了即便緩存丟失也能保證計算的正確執行。經過基於RDD的一系列轉換,丟失的數據會被重算,因爲RDD的各個Partition是相對獨立的,所以只須要計算丟失的部分便可,並不須要重算所有Partition。

 

2.6. DAG的生成

DAG(Directed Acyclic Graph)叫作有向無環圖,原始的RDD經過一系列的轉換就就造成了DAG,根據RDD之間的依賴關係的不一樣將DAG劃分紅不一樣的Stage,對於窄依賴,partition的轉換處理在Stage中完成計算。對於寬依賴,因爲有Shuffle的存在,只能在parent RDD處理完成後,才能開始接下來的計算,所以寬依賴是劃分Stage的依據。

 

 

 

Spark  SQL

Spark SQL and DataFrame

1.  課程目標

1.1. 掌握Spark SQL的原理

1.2. 掌握DataFrame數據結構和使用方式

1.3. 熟練使用Spark SQL完成計算任務

2.  Spark SQL

2.1. Spark SQL概述

2.1.1.   什麼是Spark SQL

 

 

Spark SQL是Spark用來處理結構化數據的一個模塊,它提供了一個編程抽象叫作DataFrame而且做爲分佈式SQL查詢引擎的做用。

2.1.2.   爲何要學習Spark SQL

咱們已經學習了Hive,它是將Hive SQL轉換成MapReduce而後提交到集羣上執行,大大簡化了編寫MapReduce的程序的複雜性,因爲MapReduce這種計算模型執行效率比較慢。全部Spark SQL的應運而生,它是將Spark SQL轉換成RDD,而後提交到集羣執行,執行效率很是快!

1.易整合

 

 

2.統一的數據訪問方式

 

 

3.兼容Hive

 

 

4.標準的數據鏈接

 

 

2.2. DataFrames

2.2.1.   什麼是DataFrames

與RDD相似,DataFrame也是一個分佈式數據容器。然而DataFrame更像傳統數據庫的二維表格,除了數據之外,還記錄數據的結構信息,即schema。同時,與Hive相似,DataFrame也支持嵌套數據類型(struct、array和map)。從API易用性的角度上 看,DataFrame API提供的是一套高層的關係操做,比函數式的RDD API要更加友好,門檻更低。因爲與R和Pandas的DataFrame相似,Spark DataFrame很好地繼承了傳統單機數據分析的開發體驗。

 

 

2.2.2.   建立DataFrames

 在Spark SQL中SQLContext是建立DataFrames和執行SQL的入口,在spark-1.5.2中已經內置了一個sqlContext

 

 

1.在本地建立一個文件,有三列,分別是id、name、age,用空格分隔,而後上傳到hdfs上

hdfs dfs -put person.txt /

 

2.在spark shell執行下面命令,讀取數據,將每一行的數據使用列分隔符分割

val lineRDD = sc.textFile("hdfs://node1.itcast.cn:9000/person.txt").map(_.split(" "))

 

3.定義case class(至關於表的schema)

case class Person(id:Int, name:String, age:Int)

 

4.將RDD和case class關聯

val personRDD = lineRDD.map(x => Person(x(0).toInt, x(1), x(2).toInt))

 

5.將RDD轉換成DataFrame

val personDF = personRDD.toDF

 

6.對DataFrame進行處理

personDF.show

 

 

2.3. DataFrame經常使用操做

2.3.1.   DSL風格語法

//查看DataFrame中的內容

personDF.show

 

//查看DataFrame部分列中的內容

personDF.select(personDF.col("name")).show

personDF.select(col("name"), col("age")).show

personDF.select("name").show

 

//打印DataFrame的Schema信息

personDF.printSchema

 

//查詢全部的name和age,並將age+1

personDF.select(col("id"), col("name"), col("age") + 1).show

personDF.select(personDF("id"), personDF("name"), personDF("age") + 1).show

 

 

 

//過濾age大於等於18的

personDF.filter(col("age") >= 18).show

 

 

 

//按年齡進行分組並統計相同年齡的人數

personDF.groupBy("age").count().show()

 

 

2.3.2.   SQL風格語法

若是想使用SQL風格的語法,須要將DataFrame註冊成表

personDF.registerTempTable("t_person")

 

//查詢年齡最大的前兩名

sqlContext.sql("select * from t_person order by age desc limit 2").show

 

 

 

//顯示錶的Schema信息

sqlContext.sql("desc t_person").show

 

 

3.  以編程方式執行Spark SQL查詢

3.1. 編寫Spark SQL查詢程序

前面咱們學習瞭如何在Spark Shell中使用SQL完成查詢,如今咱們來實如今自定義的程序中編寫Spark SQL查詢程序。首先在maven項目的pom.xml中添加Spark SQL的依賴

 

<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-sql_2.10</artifactId>
    <version>1.5.2</version>
</dependency>

 

3.1.1.   經過反射推斷Schema

建立一個object爲cn.itcast.spark.sql.InferringSchema

package cn.itcast.spark.sql

import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.SQLContext

object InferringSchema {
  def main(args: Array[String]) {

    //建立SparkConf()並設置App名稱
   
val conf = new SparkConf().setAppName("SQL-1")
    //SQLContext要依賴SparkContext
   
val sc = new SparkContext(conf)
    //建立SQLContext
   
val sqlContext = new SQLContext(sc)

    //從指定的地址建立RDD
   
val lineRDD = sc.textFile(args(0)).map(_.split(" "))

    //建立case class
    //將RDD和case class關聯
   
val personRDD = lineRDD.map(x => Person(x(0).toInt, x(1), x(2).toInt))
    //導入隱式轉換,若是不到人沒法將RDD轉換成DataFrame
    //將RDD轉換成DataFrame
   
import sqlContext.implicits._
    val personDF = personRDD.toDF
    //註冊表
   
personDF.registerTempTable("t_person")
    //傳入SQL
   
val df = sqlContext.sql("select * from t_person order by age desc limit 2")
    //將結果以JSON的方式存儲到指定位置
   
df.write.json(args(1))
    //中止Spark Context
   
sc.stop()
  }
}
//case class必定要放到外面
case class Person(id: Int, name: String, age: Int)
 

將程序打成jar包,上傳到spark集羣,提交Spark任務

/usr/local/spark-1.5.2-bin-hadoop2.6/bin/spark-submit \

--class cn.itcast.spark.sql.InferringSchema \

--master spark://node1.itcast.cn:7077 \

/root/spark-mvn-1.0-SNAPSHOT.jar \

hdfs://node1.itcast.cn:9000/person.txt \

hdfs://node1.itcast.cn:9000/out

 

查看運行結果

hdfs dfs -cat  hdfs://node1.itcast.cn:9000/out/part-r-*

 

 

 

3.1.2.   經過StructType直接指定Schema

建立一個object爲cn.itcast.spark.sql.SpecifyingSchema

package cn.itcast.spark.sql

import org.apache.spark.sql.{Row, SQLContext}
import org.apache.spark.sql.types._
import org.apache.spark.{SparkContext, SparkConf}

/**
  * Created by ZX on 2015/12/11.
  */
object SpecifyingSchema {
  def main(args: Array[String]) {
    //建立SparkConf()並設置App名稱
   
val conf = new SparkConf().setAppName("SQL-2")
    //SQLContext要依賴SparkContext
   
val sc = new SparkContext(conf)
    //建立SQLContext
   
val sqlContext = new SQLContext(sc)
    //從指定的地址建立RDD
   
val personRDD = sc.textFile(args(0)).map(_.split(" "))
    //經過StructType直接指定每一個字段的schema
   
val schema = StructType(
      List(
        StructField("id", IntegerType, true),
        StructField("name", StringType, true),
        StructField("age", IntegerType, true)
      )
    )
    //將RDD映射到rowRDD
   
val rowRDD = personRDD.map(p => Row(p(0).toInt, p(1).trim, p(2).toInt))
    //將schema信息應用到rowRDD上
   
val personDataFrame = sqlContext.createDataFrame(rowRDD, schema)
    //註冊表
   
personDataFrame.registerTempTable("t_person")
    //執行SQL
   
val df = sqlContext.sql("select * from t_person order by age desc limit 4")
    //將結果以JSON的方式存儲到指定位置
   
df.write.json(args(1))
    //中止Spark Context
   
sc.stop()
  }
}
 

將程序打成jar包,上傳到spark集羣,提交Spark任務

/usr/local/spark-1.5.2-bin-hadoop2.6/bin/spark-submit \

--class cn.itcast.spark.sql.InferringSchema \

--master spark://node1.itcast.cn:7077 \

/root/spark-mvn-1.0-SNAPSHOT.jar \

hdfs://node1.itcast.cn:9000/person.txt \

hdfs://node1.itcast.cn:9000/out1

 

查看結果

hdfs dfs -cat  hdfs://node1.itcast.cn:9000/out1/part-r-*

 

 

4.  數據源

4.1. JDBC

Spark SQL能夠經過JDBC從關係型數據庫中讀取數據的方式建立DataFrame,經過對DataFrame一系列的計算後,還能夠將數據再寫回關係型數據庫中。

4.1.1.   從MySQL中加載數據(Spark Shell方式)

1.啓動Spark Shell,必須指定mysql鏈接驅動jar包

/usr/local/spark-1.5.2-bin-hadoop2.6/bin/spark-shell \

--master spark://node1.itcast.cn:7077 \

--jars /usr/local/spark-1.5.2-bin-hadoop2.6/mysql-connector-java-5.1.35-bin.jar \

--driver-class-path /usr/local/spark-1.5.2-bin-hadoop2.6/mysql-connector-java-5.1.35-bin.jar

 

2.從mysql中加載數據

val jdbcDF = sqlContext.read.format("jdbc").options(Map("url" -> "jdbc:mysql://192.168.10.1:3306/bigdata", "driver" -> "com.mysql.jdbc.Driver", "dbtable" -> "person", "user" -> "root", "password" -> "123456")).load()

 

3.執行查詢

jdbcDF.show()

 

 

4.1.2.   將數據寫入到MySQL中(打jar包方式)

1.編寫Spark SQL程序

package cn.itcast.spark.sql

import java.util.Properties
import org.apache.spark.sql.{SQLContext, Row}
import org.apache.spark.sql.types.{StringType, IntegerType, StructField, StructType}
import org.apache.spark.{SparkConf, SparkContext}

object JdbcRDD {
  def main(args: Array[String]) {
    val conf = new SparkConf().setAppName("MySQL-Demo")
    val sc = new SparkContext(conf)
    val sqlContext = new SQLContext(sc)
    //經過並行化建立RDD
   
val personRDD = sc.parallelize(Array("1 tom 5", "2 jerry 3", "3 kitty 6")).map(_.split(" "))
    //經過StructType直接指定每一個字段的schema
   
val schema = StructType(
      List(
        StructField("id", IntegerType, true),
        StructField("name", StringType, true),
        StructField("age", IntegerType, true)
      )
    )
    //將RDD映射到rowRDD
   
val rowRDD = personRDD.map(p => Row(p(0).toInt, p(1).trim, p(2).toInt))
    //將schema信息應用到rowRDD上
   
val personDataFrame = sqlContext.createDataFrame(rowRDD, schema)
    //建立Properties存儲數據庫相關屬性
   
val prop = new Properties()
    prop.put("user", "root")
    prop.put("password", "123456")
    //將數據追加到數據庫
   
personDataFrame.write.mode("append").jdbc("jdbc:mysql://192.168.10.1:3306/bigdata", "bigdata.person", prop)
    //中止SparkContext
   
sc.stop()
  }
}
 

 

2.用maven將程序打包

 

3.將Jar包提交到spark集羣

/usr/local/spark-1.5.2-bin-hadoop2.6/bin/spark-submit \

--class cn.itcast.spark.sql.JdbcRDD \

--master spark://node1.itcast.cn:7077 \

--jars /usr/local/spark-1.5.2-bin-hadoop2.6/mysql-connector-java-5.1.35-bin.jar \

--driver-class-path /usr/local/spark-1.5.2-bin-hadoop2.6/mysql-connector-java-5.1.35-bin.jar \

/root/spark-mvn-1.0-SNAPSHOT.jar

 

Spark  Streaming

Spark Streaming

1.  課程目標

1.1. 掌握Spark Streaming的原理

1.2. 熟練使用Spark Streaming完成流式計算任務

2.  Spark Streaming介紹

2.1. Spark Streaming概述

2.1.1.   什麼是Spark Streaming

 

 

Spark Streaming相似於Apache Storm,用於流式數據的處理。根據其官方文檔介紹,Spark Streaming有高吞吐量和容錯能力強等特色。Spark Streaming支持的數據輸入源不少,例如:Kafka、Flume、Twitter、ZeroMQ和簡單的TCP套接字等等。數據輸入後能夠用Spark的高度抽象原語如:map、reduce、join、window等進行運算。而結果也能保存在不少地方,如HDFS,數據庫等。另外Spark Streaming也能和MLlib(機器學習)以及Graphx完美融合。

 

 

2.1.2.   爲何要學習Spark Streaming

 

1.易用

 

 

2.容錯

 

 

3.易整合到Spark體系

 

 

2.1.3.   Spark與Storm的對比

Spark

Storm

 

 

 

 

開發語言:Scala

開發語言:Clojure

編程模型:DStream

編程模型:Spout/Bolt

 

 

 

 

 

3.  DStream

3.1. 什麼是DStream

Discretized Stream是Spark Streaming的基礎抽象,表明持續性的數據流和通過各類Spark原語操做後的結果數據流。在內部實現上,DStream是一系列連續的RDD來表示。每一個RDD含有一段時間間隔內的數據,以下圖:

 

 

對數據的操做也是按照RDD爲單位來進行的

 

 

計算過程由Spark engine來完成

 

 

3.2. DStream相關操做

DStream上的原語與RDD的相似,分爲Transformations(轉換)和Output Operations(輸出)兩種,此外轉換操做中還有一些比較特殊的原語,如:updateStateByKey()、transform()以及各類Window相關的原語。

 

3.2.1.   Transformations on DStreams

Transformation

Meaning

map(func)

Return a new DStream by passing each element of the source DStream through a function func.

flatMap(func)

Similar to map, but each input item can be mapped to 0 or more output items.

filter(func)

Return a new DStream by selecting only the records of the source DStream on which func returns true.

repartition(numPartitions)

Changes the level of parallelism in this DStream by creating more or fewer partitions.

union(otherStream)

Return a new DStream that contains the union of the elements in the source DStream and otherDStream.

count()

Return a new DStream of single-element RDDs by counting the number of elements in each RDD of the source DStream.

reduce(func)

Return a new DStream of single-element RDDs by aggregating the elements in each RDD of the source DStream using a function func (which takes two arguments and returns one). The function should be associative so that it can be computed in parallel.

countByValue()

When called on a DStream of elements of type K, return a new DStream of (K, Long) pairs where the value of each key is its frequency in each RDD of the source DStream.

reduceByKey(func, [numTasks])   

When called on a DStream of (K, V) pairs, return a new DStream of (K, V) pairs where the values for each key are aggregated using the given reduce function. Note: By default, this uses Spark's default number of parallel tasks (2 for local mode, and in cluster mode the number is determined by the config property spark.default.parallelism) to do the grouping. You can pass an optional numTasks argument to set a different number of tasks.

join(otherStream, [numTasks])

When called on two DStreams of (K, V) and (K, W) pairs, return a new DStream of (K, (V, W)) pairs with all pairs of elements for each key.

cogroup(otherStream, [numTasks])

When called on a DStream of (K, V) and (K, W) pairs, return a new DStream of (K, Seq[V], Seq[W]) tuples.

transform(func)     

Return a new DStream by applying a RDD-to-RDD function to every RDD of the source DStream. This can be used to do arbitrary RDD operations on the DStream.

updateStateByKey(func)

Return a new "state" DStream where the state for each key is updated by applying the given function on the previous state of the key and the new values for the key. This can be used to maintain arbitrary state data for each key.

 

特殊的Transformations

 

1.UpdateStateByKey Operation

UpdateStateByKey原語用於記錄歷史記錄,上文中Word Count示例中就用到了該特性。若不用UpdateStateByKey來更新狀態,那麼每次數據進來後分析完成後,結果輸出後將不在保存

 

2.Transform Operation

Transform原語容許DStream上執行任意的RDD-to-RDD函數。經過該函數能夠方便的擴展Spark API。此外,MLlib(機器學習)以及Graphx也是經過本函數來進行結合的。

 

3.Window Operations

Window Operations有點相似於Storm中的State,能夠設置窗口的大小和滑動窗口的間隔來動態的獲取當前Steaming的容許狀態

 

 

3.2.2.   Output Operations on DStreams

Output Operations能夠將DStream的數據輸出到外部的數據庫或文件系統,當某個Output Operations原語被調用時(與RDD的Action相同),streaming程序纔會開始真正的計算過程。

Output Operation

Meaning

print()

Prints the first ten elements of every batch of data in a DStream on the driver node running the streaming application. This is useful for development and debugging.

saveAsTextFiles(prefix, [suffix])

Save this DStream's contents as text files. The file name at each batch interval is generated based on prefix and suffix: "prefix-TIME_IN_MS[.suffix]".

saveAsObjectFiles(prefix, [suffix])

Save this DStream's contents as SequenceFiles of serialized Java objects. The file name at each batch interval is generated based on prefix and suffix: "prefix-TIME_IN_MS[.suffix]".

saveAsHadoopFiles(prefix, [suffix])

Save this DStream's contents as Hadoop files. The file name at each batch interval is generated based on prefix and suffix: "prefix-TIME_IN_MS[.suffix]".

foreachRDD(func)

The most generic output operator that applies a function, func, to each RDD generated from the stream. This function should push the data in each RDD to an external system, such as saving the RDD to files, or writing it over the network to a database. Note that the function func is executed in the driver process running the streaming application, and will usually have RDD actions in it that will force the computation of the streaming RDDs.

4.  實戰

4.1. 用Spark Streaming實現實時WordCount

架構圖:

 

 

1.安裝並啓動生成者

首先在一臺Linux(ip:192.168.10.101)上用YUM安裝nc工具

yum install -y nc

 

啓動一個服務端並監聽9999端口

nc -lk 9999

 

2.編寫Spark Streaming程序

package cn.itcast.spark.streaming

import cn.itcast.spark.util.LoggerLevel
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}

object NetworkWordCount {
  def main(args: Array[String]) {
    //設置日誌級別
   
LoggerLevel.setStreamingLogLevels()
    //建立SparkConf並設置爲本地模式運行
    //注意local[2]表明開兩個線程
   
val conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount")
    //設置DStream批次時間間隔爲2秒
   
val ssc = new StreamingContext(conf, Seconds(2))
    //經過網絡讀取數據
   
val lines = ssc.socketTextStream("192.168.10.101", 9999)
    //將讀到的數據用空格切成單詞
   
val words = lines.flatMap(_.split(" "))
    //將單詞和1組成一個pair
   
val pairs = words.map(word => (word, 1))
    //按單詞進行分組求相同單詞出現的次數
   
val wordCounts = pairs.reduceByKey(_ + _)
    //打印結果到控制檯
   
wordCounts.print()
    //開始計算
   
ssc.start()
    //等待中止
   
ssc.awaitTermination()
  }
}

 

3.啓動Spark Streaming程序:因爲使用的是本地模式"local[2]"因此能夠直接在本地運行該程序

注意:要指定並行度,如在本地運行設置setMaster("local[2]"),至關於啓動兩個線程,一個給receiver,一個給computer。若是是在集羣中運行,必需要求集羣中可用core數大於1

 

 

 

4.在Linux端命令行中輸入單詞

 

 

5.在IDEA控制檯中查看結果

 

 

問題:結果每次在Linux段輸入的單詞次數都被正確的統計出來,可是結果不能累加!若是須要累加須要使用updateStateByKey(func)來更新狀態,下面給出一個例子:

package cn.itcast.spark.streaming

import cn.itcast.spark.util.LoggerLevel
import org.apache.spark.{HashPartitioner, SparkConf}
import org.apache.spark.streaming.{StreamingContext, Seconds}

object NetworkUpdateStateWordCount {
  /**
    * String :
單詞 hello
    * Seq[Int] :單詞在當前批次出現的次數
    * Option[Int] : 歷史結果
    */
 
val updateFunc = (iter: Iterator[(String, Seq[Int], Option[Int])]) => {
    //iter.flatMap(it=>Some(it._2.sum + it._3.getOrElse(0)).map(x=>(it._1,x)))
   
iter.flatMap{case(x,y,z)=>Some(y.sum + z.getOrElse(0)).map(m=>(x, m))}
  }

  def main(args: Array[String]) {
    LoggerLevel.setStreamingLogLevels()
    val conf = new SparkConf().setMaster("local[2]").setAppName("NetworkUpdateStateWordCount")
    val ssc = new StreamingContext(conf, Seconds(5))
    //作checkpoint 寫入共享存儲中
   
ssc.checkpoint("c://aaa")
    val lines = ssc.socketTextStream("192.168.10.100", 9999)
    //reduceByKey 結果不累加
    //val result = lines.flatMap(_.split(" ")).map((_, 1)).reduceByKey(_+_)
    //updateStateByKey結果能夠累加可是須要傳入一個自定義的累加函數:updateFunc
   
val results = lines.flatMap(_.split(" ")).map((_,1)).updateStateByKey(updateFunc, new HashPartitioner(ssc.sparkContext.defaultParallelism), true)
    results.print()
    ssc.start()
    ssc.awaitTermination()
  }
}
 

4.2. Spark Streaming整合Kafka完成網站點擊流實時統計

 

 

1.安裝並配置zk

2.安裝並配置Kafka

3.啓動zk

4.啓動Kafka

5.建立topic

bin/kafka-topics.sh --create --zookeeper node1.itcast.cn:2181,node2.itcast.cn:2181 \

--replication-factor 3 --partitions 3 --topic urlcount

6.編寫Spark Streaming應用程序

package cn.itcast.spark.streaming

package cn.itcast.spark

import org.apache.spark.{HashPartitioner, SparkConf}
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.spark.streaming.{Seconds, StreamingContext}

object UrlCount {
  val updateFunc = (iterator: Iterator[(String, Seq[Int], Option[Int])]) => {
    iterator.flatMap{case(x,y,z)=> Some(y.sum + z.getOrElse(0)).map(n=>(x, n))}
  }

  def main(args: Array[String]) {
    //接收命令行中的參數
   
val Array(zkQuorum, groupId, topics, numThreads, hdfs) = args
    //建立SparkConf並設置AppName
   
val conf = new SparkConf().setAppName("UrlCount")
    //建立StreamingContext
   
val ssc = new StreamingContext(conf, Seconds(2))
    //設置檢查點
   
ssc.checkpoint(hdfs)
    //設置topic信息
   
val topicMap = topics.split(",").map((_, numThreads.toInt)).toMap
    //重Kafka中拉取數據建立DStream
   
val lines = KafkaUtils.createStream(ssc, zkQuorum ,groupId, topicMap, StorageLevel.MEMORY_AND_DISK).map(_._2)
    //切分數據,截取用戶點擊的url
   
val urls = lines.map(x=>(x.split(" ")(6), 1))
    //統計URL點擊量
   
val result = urls.updateStateByKey(updateFunc, new HashPartitioner(ssc.sparkContext.defaultParallelism), true)
    //將結果打印到控制檯     result.print()     ssc.start()     ssc.awaitTermination()   } }
相關文章
相關標籤/搜索