Spark是專爲大規模數據處理而設計的快速通用的計算引擎,第一次看到這句話估計會比較抽象。其實能夠和MySQL數據庫類比。html
只不過側重點不一樣,MySQL的側重點在數據存儲和查詢,Spark的側重點在於數據處理。MySQL處理的是預約義格式的數據,Spark處理的是沒有預約義格式的數據,包括各類日誌文件、用戶行爲之類的數據量比較大的文件數據分析處理。java
例如,從大量用戶行爲日誌中分析用戶可能對哪些商品有興趣,這個能夠根據用戶在頁面駐留的時間,進入的次數等分析。node
這種隨時都在收集的,日誌量很是的大的數據顯然不適合直接存儲在MySQL之類的關係數據庫中,通常都是直接寫日誌。web
那麼分析這類日誌Spark就很是合適了。spring
scala下載sql
spark下載shell
下載以後解壓就能夠了數據庫
單機基本不須要額外的配置,能夠直接運行spark-shellexpress
spark UI界面apache
拷貝conf目錄下的spark-env.sh.template而且重命名爲spark-env.sh,添加配置以下:
export JAVA_HOME=/usr/local/java/jdk873 export HADOOP_CONF_DIR=/usr/local/hadoop/hadoop-3.0.2/etc/hadoop export SPARK_MASTER_HOST=192.168.9.6 # 提交Application的端口 export SPARK_MASTER_PORT=7077 # 每個Worker最多可使用的cpu export SPARK_WORKER_CORES=1 # 每個Worker最多可使用的內存 export SPARK_WORKER_MEMORY=1g
export HADOOP_CONF_DIR=/usr/hadoop/xxx
不是必須的,可是若是要使用yarn方式提交就必須配置,就是使用下面的命令提交:
spark-submit --class org.curitis.Start --master yarn --deploy-mode cluster start.jar
Spark提交方式後面詳細介紹。
若是要使用ZK,使用下面的配置代替SPARK_MASTER_PORT=7077
export SPARK_DAEMON_JAVA_OPTS="-Dspark.deploy.recoveryMode=ZOOKEEPER -Dspark.deploy.zookeeper.url=192.168.9.7,192.168.9.8,192.168.9.9 -Dspark.deploy.zookeeper.dir=/zk"
url是zk部署的地址。
拷貝slaves.template而且重命名爲slaves,添加其餘節點:
192.168.9.7 192.168.9.8 192.168.9.9
在主節點運行:
start-all.sh
sparkUI:http://192.168.9.6:4040 提交Application的端口:http://192.168.9.6:7077
Spark是一個獨立的計算引擎,不依賴hadoop,因此能夠直接解壓spark就可使用了。Spark的核心是RDD(Resilient Distributed Dataset,彈性分佈式數據集),處理數據集合。可使用hdfs作爲數據源,也可使用HBase、Hive甚至文件作爲數據源。
這裏先簡單的介紹一下RDD建立,可使用下面的命令打開一個spark shell交互窗口:
spark-shell
主要使用parallelize和makeRDD函數
sc.parallelize(List(1,2,3)) sc.makeRDD(List(1,2,3)) sc.parallelize(Array(1 to 10))
sc.textFile("hdfs://127.0.0.1:9000/tmp/in.txt") sc.textFile("file://G:/tmp/in2.txt")
import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.sql.SparkSession; import java.util.Arrays; import java.util.List; import java.util.Scanner; /** * http://spark.apache.org/docs/latest/submitting-applications.html */ public class SparkStart { public static void main(String[] args) { SparkConf sparkConf = new SparkConf(); sparkConf.setAppName("create JavaSparkContext"); sparkConf.setMaster("local[*]"); //spark ui http://localhost:4040/jobs/ JavaSparkContext sc = new JavaSparkContext(sparkConf); List<Integer> data = Arrays.asList(1, 2, 3, 4, 5); JavaRDD<Integer> distData = sc.parallelize(data); Integer sum = distData.reduce((a, b) -> a + b); System.out.println(sum); Scanner scanner = new Scanner(System.in); scanner.nextLine(); } private static JavaSparkContext createSparkContext(){ SparkConf sparkConf = new SparkConf(); sparkConf.setAppName("create JavaSparkContext"); sparkConf.setMaster("local[*]"); sparkConf.set("spark.yarn.tags","");//逗號分隔的字符串,傳遞YARN應用tags。其值將出如今YARN Application Reports中,能夠用來過濾和查詢YARN應用。 sparkConf.set("spark.yarn.maxAppAttempts","");//提交應用最大嘗試次數。不該大於YARN全局配置的最大嘗試次數。 (yarn.resourcemanager.am.max-attempts in YARN) sparkConf.set("spark.yarn.user.classpath.first","");//是否將用戶jars放在Spark類路徑以前 sparkConf.set("spark.yarn.config.gatewayPath","");//與spark.yarn.config.replacementPath配合使用 sparkConf.set("spark.yarn.config.replacementPath","");//某些路徑,可能在網關主機上能正常訪問,而在其餘節點上的訪問方式可能不一樣。對於這樣的路徑,須要本屬性配合 spark.yarn.config.replacementPath組合使用,對於支持異構配置的集羣,必須配置好這兩個值,Spark才能正確地啓動遠程進程。replacement path 一般包含一些YARN導出的環境變量(所以,對Spark containers可見)。例如,若是網關節點上Hadoop庫安裝在 /disk1/hadoop,而且其導出環境變量爲 HADOOP_HOME,就須要將 spark.yarn.config.gatewayPath 設置爲 /disk1/hadoop 並將 replacement path設爲 $HADOOP_HOME,這樣才能在遠程節點上以正確的環境變量啓動進程。 sparkConf.set("spark.yarn.queue","");//yarn上使用的隊列名,默認爲default sparkConf.set("spark.yarn.historyServer.address","");//Spark history server地址,如:host.com:18080 。這個地址不要包含協議頭(http://)。默認不設置,由於history server是可選的。應用程序結束之後,YARN資源管理器web UI經過這個地址連接到Spark history server UI。對於這屬性,可使用YARN屬性變量,且這些變量是Spark在運行時組裝的。例如,若是Spark history server和YARN資源管理器(ResourceManager)部署在同一臺機器上運行,那麼這個屬性能夠設置爲 ${hadoopconf-yarn.resourcemanager.hostname}:18080 sparkConf.set("spark.yarn.archive","");//包含帶有Spark類的jar文件的歸檔文件的位置 sparkConf.set("spark.yarn.jars","");//Spark jar文件位置,若是須要覆蓋默認位置,請設定這個值。默認的,Spark on YARN會使用本地的Spark jar包,但Spark jar包一樣可使用整個集羣可讀的HDFS文件位置。這使YARN能夠在各節點上緩存Spark jar包,而不須要每次運行一個應用的時候都要分發。使用 hdfs:///some/path 來指定HDFS上jar包文件路徑。 sparkConf.set("spark.yarn.dist.archives","");//逗號分隔的文檔列表,其指向的文檔將被提取到每一個執行器的工做目錄下,默認無 sparkConf.set("spark.yarn.dist.files","");//逗號分隔的文件列表,其指向的文件將被複制到每一個執行器的工做目錄下,默認無 sparkConf.set("spark.yarn.dist.jars","");// sparkConf.set("spark.yarn.preserve.staging.files","");//是否保存HDFS中因爲job產生的臨時文件,默認爲 false 。若是設置爲true,那麼在做業運行完以後,會避免工程jar等文件被刪除掉 sparkConf.set("spark.yarn.submit.file.replication","");//由spark提交到HDFS的文件的副本數,好比工程jar,依賴jar,配置文件等,默認 1 sparkConf.set("spark.yarn.stagingDir","");//提交應用時,staging的目錄 sparkConf.set("spark.yarn.submit.waitAppCompletion","");//cluster mode下,client是否等到做業運行完再退出,默認爲true sparkConf.set("spark.yarn.report.interval","");//cluster mode下,當前app狀態報告的間隔,默認1s sparkConf.set("spark.yarn.am.waitTime","");//cluster模式下,Application Master要等待SparkContext初始化的時長; client模式下,application master等待driver來鏈接它的時長,默認 100s sparkConf.set("spark.yarn.am.nodeLabelExpression","");//一個YARN節點標籤表達式(node label expression),以此來限制AM能夠被調度到哪些節點上執行。只有Hadoop 2.6+才能支持節點標籤表達式,因此若是用其餘版本運行,這個屬性將被忽略。 sparkConf.set("spark.yarn.containerLauncherMaxThreads","");//application master能用來啓動executor container的最大線程數量 ,默認25 sparkConf.set("spark.yarn.max.executor.failures","");//整個做業斷定爲失敗以前,executor最大的失敗次數 ,默認 executor數量*2,最小3 sparkConf.set("spark.yarn.scheduler.reporterThread.maxFailures","");// sparkConf.set("spark.yarn.scheduler.heartbeat.interval-ms","");//application master向resourcemanager發送心跳的間隔,單位ms ,默認3000ms sparkConf.set("spark.yarn.scheduler.initial-allocation.interval","");//Spark AM的初始帶外心跳間隔(有待定的資源申請時)。其值不該該大於 spark.yarn.scheduler.heartbeat.interval-ms。該資源分配間隔會在每次帶外心跳成功後但仍有待定資源申請時倍增,直至達到 spark.yarn.scheduler.heartbeat.interval-ms 所設定的值,默認爲 200ms sparkConf.set("spark.yarn.services","");//要添加到調度程序中的服務的類名的逗號分隔列表 sparkConf.set("spark.yarn.am.cores","");//Application Master使用的cpu數量,默認 1 sparkConf.set("spark.yarn.am.extraJavaOptions","");//client-mode下AM的JVM參數 sparkConf.set("spark.yarn.am.extraLibraryPath","");//client-mode下AM的額外庫路徑 sparkConf.set("spark.yarn.am.memoryOverhead","");//每一個am的堆外內存大小,用來存放諸如常量字符串等東西,默認爲AM內存的7% sparkConf.set("spark.yarn.am.memory","");//YARN Application Master使用的內存總量,默認512MB sparkConf.set("spark.driver.cores","");//cluster模式下,driver使用的cpu core數量,driver與Application Master運行在一個進程中,因此也控制了Application Master的cpu數量,默認 爲1 sparkConf.set("spark.yarn.driver.memoryOverhead","");//每一個driver的堆外內存大小,用來存放諸如常量字符串等東西,默認爲driver內存 * 0.10或者 384MB中較大者 sparkConf.set("spark.executor.cores","");// sparkConf.set("spark.yarn.executor.memoryOverhead","");//每一個executor的堆外內存大小,用來存放諸如常量字符串等東西,默認爲executor內存 * 0.10或者 384MB中較大者 sparkConf.set("spark.yarn.executor.nodeLabelExpression","");//一個YARN節點標籤表達式(node label expression),以此來限制執行器能夠被調度到哪些節點上啓動。只有Hadoop 2.6+才能支持節點標籤表達式,因此若是在其餘版本上運行時,這個屬性將被忽略 sparkConf.set("spark.executor.instances","");//執行器個數。注意,這個屬性和 spark.dynamicAllocation.enabled是不兼容的。若是同時設置了 spark.dynamicAllocation.enabled,那麼動態分配將被關閉,並使用 spark.executor.instances 所設置的值。 默認2 sparkConf.set("spark.yarn.credentials.file.retention.count","");// sparkConf.set("spark.yarn.credentials.file.retention.days","");// sparkConf.set("spark.yarn.credentials.file.retention.days","");// sparkConf.set("spark.yarn.access.namenodes","");//spark做業能訪問的hdfs namenode地址 sparkConf.set("spark.yarn.credentials.file","");// sparkConf.set("spark.yarn.user.jar","");//內部配置,將用戶jar的位置傳播到驅動程序/執行程序 sparkConf.set("spark.yarn.secondary.jars","");//內部配置,以傳播任何額外jar的位置,以添加到執行器的類路徑中 sparkConf.set("spark.yarn.cache.filenames","");// sparkConf.set("spark.yarn.cache.sizes","");// sparkConf.set("spark.yarn.cache.timestamps","");// sparkConf.set("spark.yarn.cache.visibilities","");// sparkConf.set("spark.yarn.cache.types","");// sparkConf.set("spark.yarn.cache.confArchive","");//conf archive在HDFS中的路徑 JavaSparkContext sc = new JavaSparkContext(sparkConf); return sc; } //spark sql private static SparkSession createSparkSession(){ SparkSession sparkSession = SparkSession .builder() .appName("create SparkSession") .master("local[*]") .config("spark.yarn.tags", "")//逗號分隔的字符串,傳遞YARN應用tags。其值將出如今YARN Application Reports中,能夠用來過濾和查詢YARN 應用。 .config("spark.yarn.maxAppAttempts", "")//提交應用最大嘗試次數。不該大於YARN全局配置的最大嘗試次數。 (yarn.resourcemanager.am.max-attempts in YARN) .config("spark.yarn.user.classpath.first", "")//是否將用戶jars放在Spark類路徑以前 .config("spark.yarn.config.gatewayPath", "")//與spark.yarn.config.replacementPath配合使用 .config("spark.yarn.config.replacementPath", "")//某些路徑,可能在網關主機上能正常訪問(Spark應用啓動的地方),而在其餘節點上的訪問方式(路徑)可能不一樣。對於這樣的路徑,須要本屬性配合 spark.yarn.config.replacementPath組合使用,對於支持異構配置的集羣,必須配置好這兩個值,Spark才能正確地啓動遠程進程。replacement path 一般包含一些YARN導出的環境變量(所以,對Spark containers可見)。例如,若是網關節點上Hadoop庫安裝在 /disk1/hadoop,而且其導出環境變量爲 HADOOP_HOME,就須要將 spark.yarn.config.gatewayPath 設置爲 /disk1/hadoop 並將 replacement path設爲 $HADOOP_HOME,這樣才能在遠程節點上以正確的環境變量啓動進程。 .config("spark.yarn.queue", "")//yarn上使用的隊列名,默認爲default .config("spark.yarn.historyServer.address", "")//Spark history server地址,如:host.com:18080 。這個地址不要包含協議頭(http://)。默認不設置,由於history server是可選的。應用程序結束之後,YARN資源管理器web UI經過這個地址連接到Spark history server UI。對於這屬性,可使用YARN屬性變量,且這些變量是Spark在運行時組裝的。例如,若是Spark history server和YARN資源管理器(ResourceManager)部署在同一臺機器上運行,那麼這個屬性能夠設置爲 ${hadoopconf-yarn.resourcemanager.hostname}:18080 .config("spark.yarn.archive", "")//包含帶有Spark類的jar文件的歸檔文件的位置 .config("spark.yarn.jars", "")//Spark jar文件位置,若是須要覆蓋默認位置,請設定這個值。默認的,Spark on YARN會使用本地的Spark jar包,但Spark jar包一樣可使用整個集羣可讀的HDFS文件位置。這使YARN能夠在各節點上緩存Spark jar包,而不須要每次運行一個應用的時候都要分發。使用 hdfs:///some/path 來指定HDFS上jar包文件路徑。 .config("spark.yarn.dist.archives", "")//逗號分隔的文檔列表,其指向的文檔將被提取到每一個執行器的工做目錄下,默認無 .config("spark.yarn.dist.files", "")//逗號分隔的文件列表,其指向的文件將被複制到每一個執行器的工做目錄下,默認無 .config("spark.yarn.dist.jars", "")// .config("spark.yarn.preserve.staging.files", "")//是否保存HDFS中因爲job產生的臨時文件,默認爲 false 。若是設置爲true,那麼在做業運行完以後,會避免工程jar等文件被刪除掉 .config("spark.yarn.submit.file.replication", "")//由spark提交到HDFS的文件的副本數,好比工程jar,依賴jar,配置文件等,默認 1 .config("spark.yarn.stagingDir", "")//提交應用時,staging的目錄 .config("spark.yarn.submit.waitAppCompletion", "")//cluster mode下,client是否等到做業運行完再退出,默認爲true .config("spark.yarn.report.interval", "")//cluster mode下,當前app狀態報告的間隔,默認1s .config("spark.yarn.am.waitTime", "")//cluster模式下,Application Master要等待SparkContext初始化的時長; client模式下,application master等待driver來鏈接它的時長,默認 100s .config("spark.yarn.am.nodeLabelExpression", "")//一個YARN節點標籤表達式(node label expression),以此來限制AM能夠被調度到哪些節點上執行。只有Hadoop 2.6+才能支持節點標籤表達式,因此若是用其餘版本運行,這個屬性將被忽略。 .config("spark.yarn.containerLauncherMaxThreads", "")//application master能用來啓動executor container的最大線程數量 ,默認25 .config("spark.yarn.max.executor.failures", "")//整個做業斷定爲失敗以前,executor最大的失敗次數 ,默認 executor數量*2,最小3 .config("spark.yarn.scheduler.reporterThread.maxFailures", "")// .config("spark.yarn.scheduler.heartbeat.interval-ms", "")//application master向resourcemanager發送心跳的間隔,單位ms ,默認3000ms .config("spark.yarn.scheduler.initial-allocation.interval", "")//Spark AM的初始帶外心跳間隔(有待定的資源申請時)。其值不該該大於 spark.yarn.scheduler.heartbeat.interval-ms。該資源分配間隔會在每次帶外心跳成功後但仍有待定資源申請時倍增,直至達到 spark.yarn.scheduler.heartbeat.interval-ms 所設定的值,默認爲 200ms .config("spark.yarn.services", "")//要添加到調度程序中的服務的類名的逗號分隔列表 .config("spark.yarn.am.cores", "")//Application Master使用的cpu數量,默認 1 .config("spark.yarn.am.extraJavaOptions", "")//client-mode下AM的JVM參數 .config("spark.yarn.am.extraLibraryPath", "")//client-mode下AM的額外庫路徑 .config("spark.yarn.am.memoryOverhead", "")//每一個am的堆外內存大小,用來存放諸如常量字符串等東西,默認爲AM內存的7% .config("spark.yarn.am.memory", "")//YARN Application Master使用的內存總量,默認512MB .config("spark.driver.cores", "")//cluster模式下,driver使用的cpu core數量,driver與Application Master運行在一個進程中,因此也控制了Application Master的cpu數量,默認 爲1 .config("spark.yarn.driver.memoryOverhead", "")//每一個driver的堆外內存大小,用來存放諸如常量字符串等東西,默認爲driver內存 * 0.10或者 384MB中較大者 .config("spark.executor.cores", "")// .config("spark.yarn.executor.memoryOverhead", "")//每一個executor的堆外內存大小,用來存放諸如常量字符串等東西,默認爲executor內存 * 0.10或者 384MB中較大者 .config("spark.yarn.executor.nodeLabelExpression", "")//一個YARN節點標籤表達式(node label expression),以此來限制執行器能夠被調度到哪些節點上啓動。只有Hadoop 2.6+才能支持節點標籤表達式,因此若是在其餘版本上運行時,這個屬性將被忽略 .config("spark.executor.instances", "")//執行器個數。注意,這個屬性和 spark.dynamicAllocation.enabled是不兼容的。若是同時設置了 spark.dynamicAllocation.enabled,那麼動態分配將被關閉,並使用 spark.executor.instances 所設置的值。 默認2 .config("spark.yarn.credentials.file.retention.count", "")// .config("spark.yarn.credentials.file.retention.days", "")// .config("spark.yarn.credentials.file.retention.days", "")// .config("spark.yarn.access.namenodes", "")//spark做業能訪問的hdfs namenode地址 .config("spark.yarn.credentials.file", "")// .config("spark.yarn.user.jar", "")//內部配置,將用戶jar的位置傳播到驅動程序/執行程序 .config("spark.yarn.secondary.jars", "")//內部配置,以傳播任何額外jar的位置,以添加到執行器的類路徑中 .config("spark.yarn.cache.filenames", "")// .config("spark.yarn.cache.sizes", "")// .config("spark.yarn.cache.timestamps", "")// .config("spark.yarn.cache.visibilities", "")// .config("spark.yarn.cache.types", "")// .config("spark.yarn.cache.confArchive", "")//conf archive在HDFS中的路徑 .getOrCreate(); return sparkSession; } }
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>org.curitis</groupId> <packaging>jar</packaging> <artifactId>spark-learn</artifactId> <version>1.0.0</version> <properties> <spring.test.version>5.1.8.RELEASE</spring.test.version> <junit.test.version>4.11</junit.test.version> <spark.version>2.4.3</spark.version> </properties> <dependencies> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_2.12</artifactId> <version>${spark.version}</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-sql_2.12</artifactId> <version>${spark.version}</version> </dependency> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>1.2.56</version> </dependency> <!--test--> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-test</artifactId> <version>${spring.test.version}</version> <scope>test</scope> </dependency> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>${junit.test.version}</version> <scope>test</scope> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <configuration> <source>8</source> <target>8</target> </configuration> </plugin> </plugins> </build> </project>
Local模式就是運行在一臺計算機上的模式,一般就是用於在本機上練手和測試。
spark-submit --master local[*]
cluster方式就是使用集羣運行的方式,它又包含3中模式:
這種模式下,Spark會本身負責資源的管理調度。它將cluster中的機器分爲master機器和worker機器,master一般就一個,管理worker的節點,worker就是負責計算任務節點
spark-submit --cluster cluster_name --master spark://host:port
--master就是指定master那臺機器的地址和端口
使用mesos來管理資源調度,天然就應該用mesos模式了
spark-submit --cluster cluster_name --master mesos://host:port
yarn來管理資源調度,因爲不少時候咱們須要和mapreduce使用同一個集羣,因此都採用Yarn來管理資源調度,這也是生產環境大多采用yarn模式的緣由。yarn模式又分爲yarn cluster模式和yarn client模式。
spark-submit --cluster cluster_name --master yarn-cluster
spark-submit --class org.curitis.Start --name 'Start' --master yarn --driver-memory 1g --executor-memory 6g --executor-cores 4 --deploy-mode cluster --jars hdfs:///user/root/spark/spark-2.4/lib/user_lib/*.jar start.jar
集羣模式 spark提交 spark文檔 spark java spark example spark sql
RDD的操做分爲兩種,一種是轉化(transformation)操做,一種是執行(action)操做,相似於SQL中的聚合函數。
轉化操做並不會當即執行,而是到了執行操做纔會被執行轉化操做。
參數是函數,函數應用於RDD每個元素,返回值是新的RDD
參數是函數,函數應用於RDD每個元素,將元素數據進行拆分,變成迭代器,返回值是新的RDD
參數是函數,函數會過濾掉不符合條件的元素,返回值是新的RDD
沒有參數,將RDD裏的元素進行去重操做
參數是RDD,生成包含兩個RDD全部元素的新RDD
參數是RDD,求出兩個RDD的共同元素
參數是RDD,將原RDD裏和參數RDD裏相同的元素去掉
參數是RDD,求兩個RDD的笛卡兒積
返回RDD全部元素
RDD裏元素個數
val textFile = spark.read.textFile("file:///G:/tmp/in2.txt") textFile.count() textFile.first()
注意使用spark.read.textFile讀取本地文件是以/開頭的。
會把讀到的數據按行拆分放到Datasets中。
各元素在RDD中出現次數
並行整合全部RDD數據,例如求和操做
和reduce功能同樣,不過fold帶有初始值
和reduce功能同樣,可是返回的RDD數據類型和原RDD不同
對RDD每一個元素都是使用特定函數