Spark 編程入門

一,編程環境

如下爲Mac系統上單機版Spark練習編程環境的配置方法。
注意:僅配置練習環境無需安裝Hadoop,無需安裝Scala。html


1,安裝Java8
java

注意避免安裝其它版本的jdk,不然會有不兼容問題。python

https://www.oracle.com/technetwork/java/javase/downloads/jdk8-downloads-2133151.html

2,下載spark並解壓
http://spark.apache.org/downloads.html

解壓到如下路徑:
Users/yourname/ProgramFiles/spark-2.4.3-bin-hadoop2.7

3,配置spark環境
vim ~/.bashrc
插入下面兩條語句
web



export SPARK_HOME=/Users/yourname/ProgramFiles/spark-2.4.3-bin-hadoop2.7
export PATH=$PATH:$SPARK_HOME/bin



4,配置jupyter支持
若未有安裝jupyter能夠下載Anaconda安裝之。使用toree能夠安裝jupyter環境下的Apache Toree-Scala內核,以便在jupyter環境下運行Spark。
算法



pip install toree
jupyter toree install --spark_home=Users/yourname/ProgramFiles/spark-2.4.3-bin-hadoop2.7



二,運行Spark
sql


Spark能夠經過如下一些方式運行。
shell


1,經過spark-shell進入Spark交互式環境,使用Scala語言。

2,經過spark-submit提交Spark應用程序進行批處理。
這種方式能夠提交Scala或Java語言編寫的代碼編譯後生成的jar包,也能夠直接提交Python腳本。

3,經過pyspark進入pyspark交互式環境,使用Python語言。
這種方式能夠指定jupyter或者ipython爲交互環境。

4,經過zepplin notebook交互式執行。
zepplin是jupyter notebook的apache對應產品。

5,安裝Apache Toree-Scala內核。
能夠在jupyter 中運行spark-shell。apache


使用spark-shell運行時,還能夠添加兩個經常使用的兩個參數。
一個是master指定使用何種分佈類型。
第二個是jars指定依賴的jar包。編程



#local本地模式運行,默認使用4個邏輯CPU內核
spark-shell

#local本地模式運行,使用所有內核,添加 code.jar到classpath
spark-shell  --master local[*] --jars code.jar 

#local本地模式運行,使用4個內核
spark-shell  --master local[4]

#standalone模式鏈接集羣,指定url和端口號
spark-shell  --master spark://master:7077

#客戶端模式鏈接YARN集羣,Driver運行在本地,方便查看日誌,調試時推薦使用。
spark-shell  --master yarn-client

#集羣模式鏈接YARN集羣,Driver運行在集羣,本地機器計算和通訊壓力小,批量任務時推薦使用。
spark-shell  --master yarn-cluster




#提交scala寫的任務
./bin/spark-submit --class org.apache.spark.examples.SparkPi \
 --master yarn \
 --deploy-mode cluster \
 --driver-memory 4g \
 --executor-memory 2g \
 --executor-cores 1 \
 --queue thequeue \
 examples/jars/spark-examples*.jar 10




#提交python寫的任務
spark-submit --master yarn \
--executor-memory 6G \
--driver-memory 6G \
--deploy-mode cluster \
--num-executors 600 \
--conf spark.yarn.maxAppAttempts=1 \
--executor-cores 1 \
--conf spark.default.parallelism=2000 \
--conf spark.task.maxFailures=10 \
--conf spark.stage.maxConsecutiveAttempts=10 \
test.py


三,建立RDD

建立RDD的基本方式有兩種,第一種是使用textFile加載本地或者集羣文件系統中的數據。第二種是使用parallelize方法將Driver中的數據結構並行化成RDD。vim


1,textFile




2,parallelize(或makeRDD)


四,經常使用Action操做

Action操做將觸發基於RDD依賴關係的計算。


1,collect



2,take



3,takeSample




4,first



5,count



6,reduce


7,foreach



8,coutByKey



9,saveAsFile


五,經常使用Transformation操做

Transformation轉換操做具備懶惰執行的特性,它只指定新的RDD和其父RDD的依賴關係,只有當Action操做觸發到該依賴的時候,它才被計算。

1,map


2,filter



3,flatMap



4,sample



5,distinct



6,subtract



7,union


8,intersection



9,cartesian



10,sortBy




11,pipe



六,經常使用PairRDD轉換操做

PairRDD指的是數據爲Tuple2數據類型的RDD,其每一個數據的第一個元素被當作key,第二個元素被當作value。

1,reduceByKey


2,groupByKey


3,sortByKey

4,join

5,leftOuterJoin

6,rightOuterJoin


7,cogroup



8,subtractByKey



9,foldByKey


七,持久化操做

若是一個RDD被多個任務用做中間量,那麼對其進行cache,緩存到內存中會對加快計算很是有幫助。

聲明對一個RDD進行cache後,該RDD不會被當即緩存,而是等到它第一次由於某個Action操做觸發後被計算出來時才進行緩存。

可使用persist明確指定存儲級別,經常使用的存儲級別是MEMORY_ONLY和MEMORY_AND_DISK。


1,cache




2,persist


八,共享變量

當Spark集羣在許多節點上運行一個函數時,默認狀況下會把這個函數涉及到的對象在每一個節點生成一個副本。可是,有時候須要在不一樣節點或者節點和Driver之間共享變量。



Spark提供兩種類型的共享變量,廣播變量和累加器。


廣播變量是不可變變量,實如今不一樣節點不一樣任務之間共享數據。廣播變量在每一個節點上緩存一個只讀的變量,而不是爲每一個task生成一個副本,能夠減小數據的傳輸。


累加器主要用於不一樣節點和Driver之間共享變量,只能實現計數或者累加功能。累加器的值只有在Driver上是可讀的,在節點上只能執行add操做。


1,broadcast



2,Accumulator



九,分區操做

分區操做包括改變分區方式,以及和分區相關的一些轉換操做。

1,coalesce

2,repartition


3,partitionBy



4,mapPartitions



5,mapPartitionsWithIndex



6,foreachPartitions



7,aggregate



8,aggregateByKey



本文分享自微信公衆號 - Python與算法社區(alg-channel)。
若有侵權,請聯繫 support@oschina.cn 刪除。
本文參與「OSC源創計劃」,歡迎正在閱讀的你也加入,一塊兒分享。

相關文章
相關標籤/搜索