變量的定義
val a: Int = 1
var b = 2
方法和函數
區別:函數能夠做爲參數傳遞給方法
方法:
def test(arg: Int): Int=>Int ={
方法體
}
val fun = (test _: Int =>(Int=>Int))=>函數體
邏輯執行語句
val a = if(條件){
執行邏輯
返回值
}else{
執行邏輯
}
while(條件){
執行邏輯
}
val arr = Array(1,2,3,4,5)
for(i <- 0 to arr.length ){
arr(i)
}
for(i <- arr){
i
}
集合操做
Array ArrayBuffer List ListBuffer set Map tuple
val arr = Array(1,2,3,4,5)
arr(0)
arr += 9
val arrb = ArrayBuffer(1,2,3,4,5)
arrb(0)
val list = List(1,2,3,4)
val tuple = (1,"string")
tuple._1
val map = Map("a"->1)
val map = Map(("a",1))
類(重要)
類的主構造器:主構造器裏面的變量會被執行,方法會被加載,調用的方法會被執行
calss Test(){
val int = 1
def test(){
}
…………
…………
…………
test
}
輔助構造器:重載
extends with
集合的高級操做(重要)
map:將集合中的變量循環出來作操做
flatMap:將集合中的參數壓循環出來作操做
val arr = Array("hello tom","hello lilei","hello hanmeimei")
map:(hello tom),(hello lilei),(hello hanmeimei)
flatMap:(hello tom hello lilei hello hanmeimei)
filter:過濾想要的元素
groupBy:按照key進行分組,分組以後value合併到Array
mapValues:針對kv類型的數據,只對value進行操做
sortBy:針對某個元素進行排序
val arr Array("hello tom","hello lilei")
val result = arr.flatMap(x => x.split(" ")).map((_,1)).groupBy(_._1).mapValues(_.size).toList.sortBy(_._2).recerse
val result = arr.flatMap(_.split(" ")).map((_,1)),reduceByKey(_+_).sortBy(_._2,true)
高級特性
高階函數:把函數做爲參數傳遞給方法或者函數,函數在函數式編程中是第一位的。
map(函數)
隱式轉換(PreDef):對類的加強,Int類沒有to這個方法,而後再RichInt類中包含這個方法,咱們只須要在某個地方將Int轉換成RichInt,而後在用的地方import就ok了
class RichFile(file: File){
def read(file:File):String={
Source.fromFile(file.getPath).mkString
}
}
object RichFile{
implict def file2RichFile(file:File)=RichFile(file)
}
object Test{
def main(args:Array[String]){
import RichFile.file2RichFile
val file = new File("c://words.txt").read
}
}
柯里化:將原來接收多個參數的方法或者函數,編程接收一個一個的方法或者函數,返回的是函數
def test(a:Int)(b:Int)(c:Int){
a+b+c
}
val fun = def(1) _
actor 併發編程的接口(很是重要)
actor:用消息傳遞的方式實現了併發編程,寫起來像線程,玩起來像socket
AKKA:actorSystem actOf
spark(what、how、why、use、運維<源碼的理解>)
課程目標
一、知道spark是幹啥的
二、會安裝spark
三、會寫spark程序(scala、python、R、java)
什麼是spark?
內存迭代式計算,利用DAG有向無環圖
特別很是快:在硬盤快mr10x,在內存,落你一條街100x
易用性:代碼寫的少,能夠用n中語言寫,你mr就一種
通用性:我集成了core、sql、streaming、MLlib、graphx,能交互
無處不在:數據源多種(hdfs、hbase、mysql、文件),計算平臺多種(standalone、YARN、mesos)
how1(部署)
一、下載安裝包
二、上傳包
三、解壓
四、重命名
五、修改環境變量
六、修改配置文件(重要,去官方文檔看(別人的帖子,例如:www.wangsenfeng.com)、全部集羣跑不起來都在這,經過log文件查看)
七、下發(scp)
八、修改其餘機器的配置(可選)
九、格式化(可選)
十、啓動集羣(注意依賴關係)
啓動
方式1:
standalone-單master:
java_home、masterip、masterport、hadoopconf
方式2:
standalone-多master:
java_home、masterport、hadoopconf、zookeeper
運行shell
運行spark-shell的兩種方式:
一、直接運行spark-shell
單機經過多線程跑任務,只在運行spark-shell的地方運行一個進程叫sparksubmit
二、運行spark-shell --master spark://master1:7077
將任務運行在集羣中,在運行spark-shell的機器上運行sparksubmit進程,運行executor在worker上
用api開發spark代碼
一、建立項目
二、到pom.xml(在day01中)
三、建立scala類
import org.apache.spark.SparkContext //一切任務的起源,全部的計算的開頭。(上下文)
import org.apache.spark.SparkConf //spark的配置信息,至關於mr當中的那個conf,他會覆蓋掉默認的配置文件(若是你進行了配置),他的主要做用,這隻app的名字,設置時運行本地模式仍是集羣模式
四、寫代碼(參考官方文檔)
若是是在windows上運行,設置setMaster("local[n]")
若是是線上運行,把setMaster("local[n]")去掉,或者setMaster("spark://master1:7077")(不建議)
注意兩個關鍵詞:transformation,action
提交任務到集羣
一、打jar包,去掉setMaster
二、將jar上傳到linux
三、執行命令
spark-submit \
--master spark://master1:7077 \
--executor-memory 512M \
--total-executor-cores 2 \
--class org.apache.spark.WordCount \
xxx.jar \
in \
out \
用 python開發spark程序
一、開發python的程序
二、運行在集羣,用spark-submit
用R開發spark
一、先安裝R
yum –y install gcc gcc-c++,
yum –y install gcc-gfortran
yum –y install readline-devel
yum –y install libXt-devel
yum –y install fonts-chinese tcl tcl-devel tclx tk tk-devel
yum -y install epel-release
vim /etc/yum.repos.d/epel.repo
將
#baseurl
mirrorlist
改爲
baseurl
#mirrorlist
yum -y install R 安裝R語言
二、而後按照官網的玩
單機啓動
sparkR
啓動standalone
sparkR --master spark://master1:7077
啓動yarn
sparkR --master yarn-client
從hive讀數據等
sparkR --driver-class-path /home/hadoop/spark/lib/mysql-connector-java-5.1.35-bin.jar
集羣提交
spark-submit examples/src/main/r/dataframe.R
三、監控
http://master1ha:4040/
思考問題
一、什麼是RDD
二、什麼是stage
三、什麼是DAG
隨堂問題
一、老師好,剛剛那個mr的container,是由resourceManager建立好,而後序列化後,再給NodeManager那些來反序列化的嗎?
答:是由resourcemanager建立好序列化發給applicationMaster,而後applicationMaster找nodemanager去啓動資源
二、老師,剛纔那個執行結果分紅兩個文件,它的分區機制是將不一樣的單詞進行hash 嗎?
答:是的,hash分區
三、在集羣上,R運行須要安裝R,Python文件運行,須要安裝Python麼?
答:須要安裝,linux默認幫咱們安裝了python
複習
什麼spark?
內存迭代式計算,每一個算子將計算結果保存在內存中,其餘算子,讀取這個結果,繼續計算
4個特性:快(10x、100x),易用性(代碼優美、能夠用4種語言開發\依賴外部數據源(hdfs、本地文件、kafka、flume、mysql))、
通用性(cores、sql、streaming、MLlib、graphx,交互使用)、隨便那個平臺均可以跑(standalone、yarn、mesos)
搭建spark
一主多從:
一、下載安裝包(依賴的hadoop的版本,source是下載源碼的)
二、上傳到集羣
三、解壓
四、重命名(版本更新不須要修改環境變量)
五、修改環境變量(root)
六、修改配置文件(spark-env.sh:JAVA_HOME,master_ip,master_port,hadoop_conf_dir、java_opts(-D);slaves(從的域名))
七、下發(scp)
八、啓動集羣(start-all.sh;start-master.sh;start-slave.sh master的地址)
九、spark的協議:spark://master:7077
十、瀏覽器端口:master:8080
十一、R語言的瀏覽器任務查看:masterR:4040
多主多從:多加了zookeeper調度(選舉機制)
命令行
一、spark-shell:在當臺機器上啓動一個進程sparksubmit,經過多線程的方式模擬集羣
二、spark-shell --master spark://master1:7077:啓動的事集羣版shell,任務會提交到集羣運行,
在當前的機器啓動的集成sparksubmit,在叢集器啓動的集成叫xxxxexecutorbackend
默認沒有加從機器的cores和memory參數,會在每臺叢集器啓動一個executor進程,
若是加了--total-executor-cores n會啓動n個executor進程
命令行版的wordcount
注意:在sparkshell中幫咱們默認加載了SparkContext,並命名爲sc;也幫咱們建立了SparkConf,而且設置了appname(「sparkshell」),而且設置了setmaster(「local/spark://...」)
sc.textFile("file:///... ; hdfs://...").flatMap(_.split(" ")).map((_,1)).reduceByKey((x,y) => x+y).sortBy(_._2,false).saveAsTextFile("hdfs://...")
spark的api操做
一、scala
二、python
#!/usr/bin/python
from pyspark import SparkConf , SparkContext
sc.textFile("hdfs://...").flatMap(lambda x: x.split(" ")).map(lambda y:(y,1)).reduceByKey(lambda x,y:x+y).saveAsTextFile("hdfs://...")
三、R
......
四、java
......
RDD
目標
一、什麼是RDD?
二、RDD的建立方式和依賴關係
三、DAG有向無環圖的意義
四、掌握劃分stage的過程
五、掌握RDD的全部操做!!!!
什麼是RDD?
RDD(Resilient Distributed Datasets )定義爲彈性的分佈式數據集,包含了只讀的、分區的、分佈式計算的概念;RDD是個類
一、一個數據分區的列表(hdfs的全部數據塊的位置信息,保存在我RDD類成員變量Array中)
二、保存了在數據塊上的計算方法,這個計算方法會應用到每個數據塊上
三、一個對於其餘RDD的依賴,是一個集合,spark就是經過這種依賴關係,像流水線同樣處理咱們的數據,
當某個分區的數據計算失敗,只須要根據流水線的信息,從新計算這一個分區的數據便可,不須要計算所有數據
四、分區方式(partitioner),決定RDD數據來源的分區和數據計算後的分區:hashpartitioner;rangepartitioner
五、位置相關性(hdfs)
如何建立RDD
一、經過序列化集合的方式建立RDD(parallelize,makeRDD)
二、經過讀取外部的數據源(testFile)
三、經過其餘的rdd作transformation操做轉換成新的RDD
RDD的兩鍾算子:
一、transformation:
經過算法對RDD進程轉換,延遲加載的一個處理數據及的方法:
map flatMap reduceByKey
二、Action:
觸發整個job進行計算的算子
collect top first saveAsTextFile
廣播(broadcast)變量
:其普遍用於廣播Map Side Join中的小表,以及廣播大變量等場景。這些數據集合在單節點內存可以容納,不須要像RDD那樣在節點之間打散存儲。
Spark運行時把廣播變量數據發到各個節點,並保存下來,後續計算能夠複用。相比Hadoop的distributed cache,廣播的內容能夠跨做業共享。
Broadcast的底層實現採用了BT機制
ipLocation
一、廣播變量
二、ip轉long(分金定穴循八卦,toolong插棍左八圈)
三、二分法查找:(上下循環尋上下,左移右移尋中間)
四、分區存數據庫(foreachPartition)
做業:
一、把全部的算子運行一遍
二、把iplocation的思想理解,代碼運行
三、有富餘時間的狀況下,敲一個iplocation就好了
問題
一、每一個stege是做爲一個任務總體,序列化後發送給一臺機器反序列話執行嗎?裏面包含的多個RDD是串聯起來工做的嗎?
答:是的
二、MapReduce中MRappmaster,啓動mapTask的時候,那個map類實例是否是已經序列化而且被包含在ResourceManager的任務隊列中的任務對象中?
答:是的
三、老師,只有對於於key-value的RDD,纔會有Partitioner,怎麼理解??
答:kv型數據的RDD按照Key進行分組操做,非kv的數據不須要分組操做,由於沒有響應的算子提供
四、講解RDD的時候,能夠把跟綜進源碼的路徑加在筆記裏嗎?但願能夠在閱讀源碼的基礎上理解RDD
答:經過crtl+shift+R打開源碼RDD.scala就能查看了
五、還有那個分片的工做,是任務提交以前就作好了吧,MapReduce的job.split文件好像就是在任務提交以前就在客戶端經過fileinputformat已經分好了,而後再發送到HDFS上
答:對的,咱們的分片也是作好了以後發送任務
六、getPartitions方法在整個運行過程當中總共會調用幾回? 數據都是分開運行的嗎? 若是是分開運行的,那隻須要在第一個MapRDD調用一次。請問這樣理解對嗎?
答:getPartitions是在任務開始以前調用一次,拿出分區的地址進行分發任務
複習
一、什麼是RDD
一個分區的列表(FileSplit),決定讀取的文件在哪
一個應用在每一個分區上的算子
一個對其餘RDD的依賴集合
可選:一個決定數據存儲時的分區方式
可選:若是在yarn上運行,決定數據本地運行的方式,移動數據不如移動計算
二、如何建立RDD
一、經過序列化集合的方式(makeRDD、parallelize)
二、經過讀取文件的方式
三、經過其餘的RDD進行transformation轉換而來
三、RDD的算子
transformation:(懶加載)
map、flatMap、filter、mapPartition、groupByKey、reduceByKey、union、intersaction、distinct、aggregateByKey
Action:(觸發任務的進行)
top、take、first、count、collect、foreach、savaAsTextFile、reduce
四、iplocaltion:(ip的熱力圖)
一、廣播變量:共享的內存,只讀的,只能追加的
二、ip轉long:分金定穴循八卦、toolong插棍左八圈
三、二分法查找:上下循環尋上下,左移右移尋中間
四、foreachPartition:對每一個分區的數據進行操做,能夠在分區操做的時候建立外部連接(jedis、mysql、hbase)
目標:
一、掌握RDD的stage劃分
二、掌握DAG概念
三、學會使用如何建立RDD的緩存
四、學會使用如何建立RDD的checkpoint
RDD的依賴關係
寬依賴:依賴的RDD產生的數據不僅是給我用的。父RDD不僅包含一個子RDD的數據(多對對),非獨生子女
窄依賴:依賴的RDD產生的數據只給我本身。父RDD只包含一個子RDD的數據(一對1、一對多)。獨生子女
Lineage(血統):RDD只支持粗粒度轉換,即在大量記錄上執行的單個操做。將建立RDD的一系列Lineage(即血統)記錄下來,以便恢復丟失的分區。
RDD的Lineage會記錄RDD的元數據信息和轉換行爲,當該RDD的部分分區數據丟失時,它能夠根據這些信息來從新運算和恢復丟失的數據分區。
找依賴關係劃分stage的目的
一、如何經過stage的劃分設置緩存
一、在窄依賴想設置緩存的時候,用cache
二、在寬依賴想設置緩存的話,用checkpoint
如何設置cache和checkpoint
cache:
someRDD.cache():將緩存放到內存中
someRDD.persist(StorageLevel.MEMORY_AND_DISK):根據本身的須要設置緩存的位置(內存和硬盤)
checkPoint:能夠吧RDD計算後的數據存儲在本地的磁盤,也能夠是hdfs
sc.setCheckpointDir("hdfs://master1:9000/ceshi/checkpoint")//設置checkpoint的路徑
someRDD.checkpoin()
何時設置緩存,何時設置checkpoint
遇到寬依賴設置checkpoint,窄依賴想緩存的話設置cache
cache 和 checkpoint的區別?
cache只是緩存數據,不改變RDD的依賴關係
checkpoint是生成了一個新的RDD,後面的RDD依賴的關係已經改變。
checkpoint--》cache--》重算
四個案例
一、pv:點擊率
二、uv:在線用戶數
三、topk:微博熱門詞彙
四、moblelocation:統計家庭位置和工做位置
什麼是spark-sql
至關於hive
書寫代碼的兩種模式
datafream:spark-sql本身的語法
sql:spark-sql集成sql的語法
一、經過sc加載任意類型數據
二、建立case class Person(id:Int , name:String , age:Int)(表結構)
三、將數據添加到表結構中map
四、註冊表
五、經過sqlContext.sql()
spark-sql的api
兩種模式(表的schema加載的兩種模式)
一、經過case class的方式加載表結構
二、經過StructType去本身定義表結構
做業
一、moblelocation回去運行一遍,若是有富餘時間敲幾遍
二、把sparl-sql的命令行和代碼敲一遍
複習
RDD的依賴關係
一、寬依賴(多對多)
二、窄依賴(一對一 和 多對一)
經過寬依賴和窄依賴劃分stage
一、遇到寬依賴,寬依賴到上一個寬依賴之間的全部窄依賴是一個stage
二、stage之間有包含關係
劃分stage的目的
一、用來劃分task
二、用來指導什麼地方須要設置什麼樣的緩存(cache、checkpoint)
如何設置緩存
一、someRDD.cache()
二、someRDD.persist(StorageLeavel.MEMORY_AND_DISK_2)
三、sc.setCheckPointDir("hdfs://...")
someRDD.checkpoint()
DAG
一個任務組成的流水線就是DAG(DAGscheduler)
DAG能夠劃分紅n個stage
stage對應n個RDD
把stage封裝成Task(stage),把task分發下去(TaskScheduler)
PV UV topK
pv:點擊率
sc.textFile("hdfs://..").map(("pv",1)).reduceByKey(_+_).saveAsTextFile("hdfs://...")
uv:在線用戶量:經過ip去重,按照(「uv」,1)
topK:微博熱門詞彙
top誰--》wordcount--》排序--》take(正序)top(倒敘)
環比的pv uv
網站分析的文檔
mobileLocation(家庭位置、工做位置)
一、先將數據進行清洗(家庭、工做)
二、針對家庭和工做進行重複數據收集
三、分別對家庭和工做作計算(尾-時間,時間-頭)
四、數據去重
五、轉轉轉(手機號,(基站id,時間total))-》join(基站id)找座標
spark-sql
==hive
操做的兩種方式
一、datafream
一、建立SqlContext(sc)
二、經過sc讀取數據
三、經過case class或者是structType建立表結構
四、將數據加載到表結構中(Person或者Row)
五、隱式轉換sqlContext.implict._
六、將RDD轉換爲DF//show
七、註冊成表
八、sqlContext.sql("").show // write.
目標
一、利用spark-sql從mysql中讀寫數據
二、spark-sql能不能集成hive使用
三、練習
一、spar-streaming(對比storm)
二、flume+spark-streaming
三、kafka+spark-streaming
如何從mysql中讀數據
一、必須有mysql的driver(上傳mysql的jar包)
二、加載mysql包(spark-shell --master spark://master1:7077 --jars mysql.jar --driver-class-path mysql.jar)
三、讀取數據的時候,設置(sqlContext.read.format("jdbc").options(Map("url"->"jdbc:mysql://192.168.56.204/bigdata","driver"->"com.mysql.jdbc.Driver","dbtable"->"dept","user"->"root","password"->"root")).load())
四、mysql中的表結構會讀嗎?(有幫咱們加載表結構)
往mysql中寫數據
一、須要mysql的jar包
二、sc讀數據
三、datefream.write.mode("append"/"overwrite").jdbc("url","table",properties(user,password))
hive on spark-SQL
一、安裝hive,修改元數據庫,加上hive-site.xml(mysql鏈接)
二、將hive-site.xml文件拷貝到集羣的conf下
三、強mysql.jar拷貝到spark的lib下
四、執行:sqlContext.sql("select * from table1")
.show()
.write.mode("append").jdbc()
.foreachPartition(it => {
一、初始化鏈接
it.map(x =>{
二、寫數據到存儲層
})
三、關鏈接
})
什麼是spark-streaming?
spark流失處理的框架,可以很容易的構建容錯、高可用的計算模型
特色:一、易用;二、容錯;三、集成;
spark-streaming和spark的批處理有什麼關係?
spark-streaming是小批量的RDD處理方式
spark-streaming的應用
從tcp的client中讀取數據,進行彙總操做
還以從flume中讀取數據
poll:ip地址以flume爲主
push:IP地址以streaming爲主
還能夠從kafka中讀取數據