大數據學習——spark筆記

變量的定義

val a: Int = 1
var b = 2

方法和函數

區別:函數能夠做爲參數傳遞給方法
方法:
        def test(arg: Int): Int=>Int ={
            方法體
        }
        val fun = (test _: Int =>(Int=>Int))=>函數體

邏輯執行語句

val a = if(條件){

執行邏輯
返回值
}else{
執行邏輯
}

while(條件){
執行邏輯
}

val arr = Array(1,2,3,4,5)  
for(i <- 0 to arr.length ){
    arr(i)
}

for(i <- arr){
    i
}

集合操做

Array ArrayBuffer List ListBuffer set Map tuple

val arr = Array(1,2,3,4,5)
    arr(0)
    arr += 9
val arrb = ArrayBuffer(1,2,3,4,5)
    arrb(0)
val list = List(1,2,3,4)

val tuple = (1,"string")
tuple._1

val map = Map("a"->1)
val map = Map(("a",1))

類(重要)

類的主構造器:主構造器裏面的變量會被執行,方法會被加載,調用的方法會被執行
calss Test(){
    val int = 1
    def test(){
    }

    …………
    …………
    …………
    test

}

輔助構造器:重載

extends with

集合的高級操做(重要)

map:將集合中的變量循環出來作操做
flatMap:將集合中的參數壓循環出來作操做
val arr = Array("hello tom","hello lilei","hello hanmeimei")
map:(hello tom),(hello lilei),(hello hanmeimei) 
flatMap:(hello tom hello lilei hello hanmeimei)
filter:過濾想要的元素
groupBy:按照key進行分組,分組以後value合併到Array
mapValues:針對kv類型的數據,只對value進行操做
sortBy:針對某個元素進行排序
val arr Array("hello tom","hello lilei")
val result =  arr.flatMap(x => x.split(" ")).map((_,1)).groupBy(_._1).mapValues(_.size).toList.sortBy(_._2).recerse
val result = arr.flatMap(_.split(" ")).map((_,1)),reduceByKey(_+_).sortBy(_._2,true)

高級特性

高階函數:把函數做爲參數傳遞給方法或者函數,函數在函數式編程中是第一位的。
    map(函數)

隱式轉換(PreDef):對類的加強,Int類沒有to這個方法,而後再RichInt類中包含這個方法,咱們只須要在某個地方將Int轉換成RichInt,而後在用的地方import就ok了
class RichFile(file: File){
def read(file:File):String={
    Source.fromFile(file.getPath).mkString
}

}
object RichFile{

implict def file2RichFile(file:File)=RichFile(file)
}

object Test{
def main(args:Array[String]){
    import RichFile.file2RichFile
    val file = new File("c://words.txt").read
}

}


柯里化:將原來接收多個參數的方法或者函數,編程接收一個一個的方法或者函數,返回的是函數

    def test(a:Int)(b:Int)(c:Int){
        a+b+c
    }
    val fun = def(1) _

actor 併發編程的接口(很是重要)

actor:用消息傳遞的方式實現了併發編程,寫起來像線程,玩起來像socket
AKKA:actorSystem actOf

spark(what、how、why、use、運維<源碼的理解>)

課程目標

一、知道spark是幹啥的
二、會安裝spark
三、會寫spark程序(scala、python、R、java)

什麼是spark?

內存迭代式計算,利用DAG有向無環圖
特別很是快:在硬盤快mr10x,在內存,落你一條街100x
易用性:代碼寫的少,能夠用n中語言寫,你mr就一種
通用性:我集成了core、sql、streaming、MLlib、graphx,能交互
無處不在:數據源多種(hdfs、hbase、mysql、文件),計算平臺多種(standalone、YARN、mesos)

how1(部署)

一、下載安裝包
二、上傳包
三、解壓
四、重命名
五、修改環境變量
六、修改配置文件(重要,去官方文檔看(別人的帖子,例如:www.wangsenfeng.com)、全部集羣跑不起來都在這,經過log文件查看)
七、下發(scp)
八、修改其餘機器的配置(可選)
九、格式化(可選)
十、啓動集羣(注意依賴關係)

啓動

方式1:
    standalone-單master:
                        java_home、masterip、masterport、hadoopconf
方式2:
    standalone-多master:
                        java_home、masterport、hadoopconf、zookeeper

運行shell

運行spark-shell的兩種方式:
一、直接運行spark-shell
    單機經過多線程跑任務,只在運行spark-shell的地方運行一個進程叫sparksubmit
二、運行spark-shell --master spark://master1:7077
    將任務運行在集羣中,在運行spark-shell的機器上運行sparksubmit進程,運行executor在worker上

用api開發spark代碼

一、建立項目
二、到pom.xml(在day01中)
三、建立scala類
    import org.apache.spark.SparkContext //一切任務的起源,全部的計算的開頭。(上下文)
    import org.apache.spark.SparkConf   //spark的配置信息,至關於mr當中的那個conf,他會覆蓋掉默認的配置文件(若是你進行了配置),他的主要做用,這隻app的名字,設置時運行本地模式仍是集羣模式
四、寫代碼(參考官方文檔)
    若是是在windows上運行,設置setMaster("local[n]")
    若是是線上運行,把setMaster("local[n]")去掉,或者setMaster("spark://master1:7077")(不建議)
    注意兩個關鍵詞:transformation,action

提交任務到集羣

一、打jar包,去掉setMaster
二、將jar上傳到linux
三、執行命令 
    spark-submit \
    --master spark://master1:7077 \
    --executor-memory 512M \
    --total-executor-cores 2 \
    --class org.apache.spark.WordCount \
    xxx.jar \
    in  \   
    out \

用 python開發spark程序

一、開發python的程序
二、運行在集羣,用spark-submit

用R開發spark

一、先安裝R
    yum –y install gcc gcc-c++,
    yum –y install gcc-gfortran
    yum –y install readline-devel
    yum –y install libXt-devel
    yum –y install fonts-chinese tcl tcl-devel tclx tk tk-devel
    yum -y install epel-release
    vim /etc/yum.repos.d/epel.repo
    將
    #baseurl
    mirrorlist
    改爲
    baseurl
    #mirrorlist
    yum -y install R 安裝R語言

    二、而後按照官網的玩

    單機啓動
     sparkR

    啓動standalone
    sparkR --master spark://master1:7077

    啓動yarn
    sparkR --master yarn-client 

    從hive讀數據等
     sparkR --driver-class-path /home/hadoop/spark/lib/mysql-connector-java-5.1.35-bin.jar

    集羣提交
    spark-submit examples/src/main/r/dataframe.R

    三、監控
    http://master1ha:4040/

思考問題

一、什麼是RDD
二、什麼是stage
三、什麼是DAG

隨堂問題

一、老師好,剛剛那個mr的container,是由resourceManager建立好,而後序列化後,再給NodeManager那些來反序列化的嗎?
答:是由resourcemanager建立好序列化發給applicationMaster,而後applicationMaster找nodemanager去啓動資源
二、老師,剛纔那個執行結果分紅兩個文件,它的分區機制是將不一樣的單詞進行hash 嗎?
答:是的,hash分區
三、在集羣上,R運行須要安裝R,Python文件運行,須要安裝Python麼?
答:須要安裝,linux默認幫咱們安裝了python

複習

什麼spark?

內存迭代式計算,每一個算子將計算結果保存在內存中,其餘算子,讀取這個結果,繼續計算
4個特性:快(10x、100x),易用性(代碼優美、能夠用4種語言開發\依賴外部數據源(hdfs、本地文件、kafka、flume、mysql))、
通用性(cores、sql、streaming、MLlib、graphx,交互使用)、隨便那個平臺均可以跑(standalone、yarn、mesos)

搭建spark

一主多從:
        一、下載安裝包(依賴的hadoop的版本,source是下載源碼的)
        二、上傳到集羣
        三、解壓
        四、重命名(版本更新不須要修改環境變量)
        五、修改環境變量(root)
        六、修改配置文件(spark-env.sh:JAVA_HOME,master_ip,master_port,hadoop_conf_dir、java_opts(-D);slaves(從的域名))
        七、下發(scp)
        八、啓動集羣(start-all.sh;start-master.sh;start-slave.sh master的地址)
        九、spark的協議:spark://master:7077
        十、瀏覽器端口:master:8080
        十一、R語言的瀏覽器任務查看:masterR:4040

多主多從:多加了zookeeper調度(選舉機制)

命令行

一、spark-shell:在當臺機器上啓動一個進程sparksubmit,經過多線程的方式模擬集羣
二、spark-shell --master spark://master1:7077:啓動的事集羣版shell,任務會提交到集羣運行,
    在當前的機器啓動的集成sparksubmit,在叢集器啓動的集成叫xxxxexecutorbackend
    默認沒有加從機器的cores和memory參數,會在每臺叢集器啓動一個executor進程,
    若是加了--total-executor-cores n會啓動n個executor進程

命令行版的wordcount

注意:在sparkshell中幫咱們默認加載了SparkContext,並命名爲sc;也幫咱們建立了SparkConf,而且設置了appname(「sparkshell」),而且設置了setmaster(「local/spark://...」)
sc.textFile("file:///... ; hdfs://...").flatMap(_.split(" ")).map((_,1)).reduceByKey((x,y) => x+y).sortBy(_._2,false).saveAsTextFile("hdfs://...")

spark的api操做

一、scala
二、python
    #!/usr/bin/python
    from pyspark import SparkConf , SparkContext
    sc.textFile("hdfs://...").flatMap(lambda x: x.split(" ")).map(lambda y:(y,1)).reduceByKey(lambda x,y:x+y).saveAsTextFile("hdfs://...")
三、R
    ......
四、java
    ......

RDD

目標

一、什麼是RDD?
二、RDD的建立方式和依賴關係
三、DAG有向無環圖的意義
四、掌握劃分stage的過程
五、掌握RDD的全部操做!!!!

什麼是RDD?

RDD(Resilient Distributed Datasets )定義爲彈性的分佈式數據集,包含了只讀的、分區的、分佈式計算的概念;RDD是個類
一、一個數據分區的列表(hdfs的全部數據塊的位置信息,保存在我RDD類成員變量Array中)
二、保存了在數據塊上的計算方法,這個計算方法會應用到每個數據塊上
三、一個對於其餘RDD的依賴,是一個集合,spark就是經過這種依賴關係,像流水線同樣處理咱們的數據,
    當某個分區的數據計算失敗,只須要根據流水線的信息,從新計算這一個分區的數據便可,不須要計算所有數據
四、分區方式(partitioner),決定RDD數據來源的分區和數據計算後的分區:hashpartitioner;rangepartitioner
五、位置相關性(hdfs)

如何建立RDD

一、經過序列化集合的方式建立RDD(parallelize,makeRDD)
二、經過讀取外部的數據源(testFile)
三、經過其餘的rdd作transformation操做轉換成新的RDD

RDD的兩鍾算子:
一、transformation:
    經過算法對RDD進程轉換,延遲加載的一個處理數據及的方法:
    map flatMap reduceByKey 
二、Action:
    觸發整個job進行計算的算子
    collect top first saveAsTextFile

廣播(broadcast)變量

:其普遍用於廣播Map Side Join中的小表,以及廣播大變量等場景。這些數據集合在單節點內存可以容納,不須要像RDD那樣在節點之間打散存儲。
Spark運行時把廣播變量數據發到各個節點,並保存下來,後續計算能夠複用。相比Hadoop的distributed cache,廣播的內容能夠跨做業共享。
Broadcast的底層實現採用了BT機制

ipLocation

一、廣播變量
二、ip轉long(分金定穴循八卦,toolong插棍左八圈)
三、二分法查找:(上下循環尋上下,左移右移尋中間)
四、分區存數據庫(foreachPartition)

做業:

一、把全部的算子運行一遍
二、把iplocation的思想理解,代碼運行
三、有富餘時間的狀況下,敲一個iplocation就好了

問題

一、每一個stege是做爲一個任務總體,序列化後發送給一臺機器反序列話執行嗎?裏面包含的多個RDD是串聯起來工做的嗎?
答:是的
二、MapReduce中MRappmaster,啓動mapTask的時候,那個map類實例是否是已經序列化而且被包含在ResourceManager的任務隊列中的任務對象中?
答:是的
三、老師,只有對於於key-value的RDD,纔會有Partitioner,怎麼理解??
答:kv型數據的RDD按照Key進行分組操做,非kv的數據不須要分組操做,由於沒有響應的算子提供
四、講解RDD的時候,能夠把跟綜進源碼的路徑加在筆記裏嗎?但願能夠在閱讀源碼的基礎上理解RDD
答:經過crtl+shift+R打開源碼RDD.scala就能查看了
五、還有那個分片的工做,是任務提交以前就作好了吧,MapReduce的job.split文件好像就是在任務提交以前就在客戶端經過fileinputformat已經分好了,而後再發送到HDFS上
答:對的,咱們的分片也是作好了以後發送任務
六、getPartitions方法在整個運行過程當中總共會調用幾回? 數據都是分開運行的嗎? 若是是分開運行的,那隻須要在第一個MapRDD調用一次。請問這樣理解對嗎?
答:getPartitions是在任務開始以前調用一次,拿出分區的地址進行分發任務

複習

一、什麼是RDD
    一個分區的列表(FileSplit),決定讀取的文件在哪
    一個應用在每一個分區上的算子
    一個對其餘RDD的依賴集合
    可選:一個決定數據存儲時的分區方式
    可選:若是在yarn上運行,決定數據本地運行的方式,移動數據不如移動計算
二、如何建立RDD
    一、經過序列化集合的方式(makeRDD、parallelize)
    二、經過讀取文件的方式
    三、經過其餘的RDD進行transformation轉換而來
三、RDD的算子
    transformation:(懶加載)
    map、flatMap、filter、mapPartition、groupByKey、reduceByKey、union、intersaction、distinct、aggregateByKey
    Action:(觸發任務的進行)
    top、take、first、count、collect、foreach、savaAsTextFile、reduce

四、iplocaltion:(ip的熱力圖)
    一、廣播變量:共享的內存,只讀的,只能追加的
    二、ip轉long:分金定穴循八卦、toolong插棍左八圈
    三、二分法查找:上下循環尋上下,左移右移尋中間
    四、foreachPartition:對每一個分區的數據進行操做,能夠在分區操做的時候建立外部連接(jedis、mysql、hbase)

目標:

一、掌握RDD的stage劃分
二、掌握DAG概念
三、學會使用如何建立RDD的緩存
四、學會使用如何建立RDD的checkpoint

RDD的依賴關係

寬依賴:依賴的RDD產生的數據不僅是給我用的。父RDD不僅包含一個子RDD的數據(多對對),非獨生子女
窄依賴:依賴的RDD產生的數據只給我本身。父RDD只包含一個子RDD的數據(一對1、一對多)。獨生子女
Lineage(血統):RDD只支持粗粒度轉換,即在大量記錄上執行的單個操做。將建立RDD的一系列Lineage(即血統)記錄下來,以便恢復丟失的分區。
        RDD的Lineage會記錄RDD的元數據信息和轉換行爲,當該RDD的部分分區數據丟失時,它能夠根據這些信息來從新運算和恢復丟失的數據分區。

找依賴關係劃分stage的目的

一、如何經過stage的劃分設置緩存
    一、在窄依賴想設置緩存的時候,用cache
    二、在寬依賴想設置緩存的話,用checkpoint

如何設置cache和checkpoint

cache:
        someRDD.cache():將緩存放到內存中
        someRDD.persist(StorageLevel.MEMORY_AND_DISK):根據本身的須要設置緩存的位置(內存和硬盤)

checkPoint:能夠吧RDD計算後的數據存儲在本地的磁盤,也能夠是hdfs
        sc.setCheckpointDir("hdfs://master1:9000/ceshi/checkpoint")//設置checkpoint的路徑
        someRDD.checkpoin()

何時設置緩存,何時設置checkpoint

遇到寬依賴設置checkpoint,窄依賴想緩存的話設置cache

cache 和 checkpoint的區別?

cache只是緩存數據,不改變RDD的依賴關係
checkpoint是生成了一個新的RDD,後面的RDD依賴的關係已經改變。

checkpoint--》cache--》重算

四個案例

一、pv:點擊率
二、uv:在線用戶數
三、topk:微博熱門詞彙
四、moblelocation:統計家庭位置和工做位置

什麼是spark-sql

至關於hive

書寫代碼的兩種模式

datafream:spark-sql本身的語法

sql:spark-sql集成sql的語法
一、經過sc加載任意類型數據
二、建立case class Person(id:Int , name:String , age:Int)(表結構)
三、將數據添加到表結構中map
四、註冊表
五、經過sqlContext.sql()

spark-sql的api

兩種模式(表的schema加載的兩種模式)
一、經過case class的方式加載表結構
二、經過StructType去本身定義表結構

做業

一、moblelocation回去運行一遍,若是有富餘時間敲幾遍
二、把sparl-sql的命令行和代碼敲一遍

複習

RDD的依賴關係

一、寬依賴(多對多)
二、窄依賴(一對一 和 多對一)

經過寬依賴和窄依賴劃分stage

一、遇到寬依賴,寬依賴到上一個寬依賴之間的全部窄依賴是一個stage
二、stage之間有包含關係

劃分stage的目的

一、用來劃分task
二、用來指導什麼地方須要設置什麼樣的緩存(cache、checkpoint)

如何設置緩存

一、someRDD.cache()
二、someRDD.persist(StorageLeavel.MEMORY_AND_DISK_2)
三、sc.setCheckPointDir("hdfs://...")
    someRDD.checkpoint()

DAG

一個任務組成的流水線就是DAG(DAGscheduler)
DAG能夠劃分紅n個stage
stage對應n個RDD
把stage封裝成Task(stage),把task分發下去(TaskScheduler)

PV UV topK

pv:點擊率
sc.textFile("hdfs://..").map(("pv",1)).reduceByKey(_+_).saveAsTextFile("hdfs://...")

uv:在線用戶量:經過ip去重,按照(「uv」,1)

topK:微博熱門詞彙
    top誰--》wordcount--》排序--》take(正序)top(倒敘)

環比的pv uv

網站分析的文檔

mobileLocation(家庭位置、工做位置)

一、先將數據進行清洗(家庭、工做)
二、針對家庭和工做進行重複數據收集
三、分別對家庭和工做作計算(尾-時間,時間-頭)
四、數據去重
五、轉轉轉(手機號,(基站id,時間total))-》join(基站id)找座標

spark-sql

==hive

操做的兩種方式

一、datafream
    一、建立SqlContext(sc)
    二、經過sc讀取數據
    三、經過case class或者是structType建立表結構
    四、將數據加載到表結構中(Person或者Row)
    五、隱式轉換sqlContext.implict._
    六、將RDD轉換爲DF//show
    七、註冊成表
    八、sqlContext.sql("").show // write.

目標

一、利用spark-sql從mysql中讀寫數據
二、spark-sql能不能集成hive使用
三、練習
一、spar-streaming(對比storm)
二、flume+spark-streaming
三、kafka+spark-streaming

如何從mysql中讀數據

一、必須有mysql的driver(上傳mysql的jar包)
二、加載mysql包(spark-shell --master spark://master1:7077 --jars mysql.jar --driver-class-path mysql.jar)
三、讀取數據的時候,設置(sqlContext.read.format("jdbc").options(Map("url"->"jdbc:mysql://192.168.56.204/bigdata","driver"->"com.mysql.jdbc.Driver","dbtable"->"dept","user"->"root","password"->"root")).load())
四、mysql中的表結構會讀嗎?(有幫咱們加載表結構)

往mysql中寫數據

一、須要mysql的jar包
二、sc讀數據
三、datefream.write.mode("append"/"overwrite").jdbc("url","table",properties(user,password))

hive on spark-SQL

一、安裝hive,修改元數據庫,加上hive-site.xml(mysql鏈接)
二、將hive-site.xml文件拷貝到集羣的conf下
三、強mysql.jar拷貝到spark的lib下
四、執行:sqlContext.sql("select * from table1")
                                            .show()  
                                            .write.mode("append").jdbc()    
                                            .foreachPartition(it => {
                                                一、初始化鏈接
                                                it.map(x =>{
                                                二、寫數據到存儲層
                                                })
                                                三、關鏈接
                                            })

什麼是spark-streaming?

spark流失處理的框架,可以很容易的構建容錯、高可用的計算模型
特色:一、易用;二、容錯;三、集成;

spark-streaming和spark的批處理有什麼關係?

spark-streaming是小批量的RDD處理方式

spark-streaming的應用

從tcp的client中讀取數據,進行彙總操做
還以從flume中讀取數據
    poll:ip地址以flume爲主
    push:IP地址以streaming爲主

還能夠從kafka中讀取數據
相關文章
相關標籤/搜索