本文檔面向須要使用MaxCompute Spark進行開發的用戶使用。本指南主要適用於具有有Spark開發經驗的開發人員。html
MaxCompute Spark是MaxCompute提供的兼容開源的Spark計算服務,它在統一的計算資源和數據集權限體系之上,提供Spark計算框架,支持用戶以熟悉的開發使用方式提交運行Spark做業,以知足更豐富的數據處理分析場景。git
本文將重點介紹MaxCompute Spark可以支撐的應用場景,同時說明開發的依賴條件和環境準備,重點對Spark做業開發、提交到MaxCompute集羣執行、診斷進行介紹。github
1. 前提條件web
MaxCompute Spark是阿里雲提供的Spark on MaxCompute的解決方案,可以讓Spark應用運行在託管的MaxCompute計算環境中。爲了可以在MaxCompute環境中安全地運行Spark做業,MaxCompute提供瞭如下SDK和MaxCompute Spark定製發佈包。sql
SDK定位於開源應用接入MaxCompute SDK:
提供了集成所需的API說明以及相關功能Demo,用戶能夠基於項目提供的Spark-1.x以及Spark-2.x的example項目構建本身的應用,而且提交到MaxCompute集羣上。
MaxCompute Spark客戶端發佈包:
集成了MaxCompute認證功功能,做爲客戶端工具,用於經過Spark-submit方式提交做業到MaxCompute項目中運行,目前提供了面向Spark1.x和Spark2.x的2個發佈包:spark-1.6.3和spark-2.3.0 SDK在開發時,能夠經過配置Maven依賴進行引用。Spark客戶端須要根據開發的Spark版本,提早下載。如,須要開發Spark1.x應用,應下載spark-1.6.3版本客戶端;如需開發Spark2.x應用,應下載spark-2.3.0客戶端。apache
2. 開發環境準備api
2.1 Maxcompute Spark客戶端準備瀏覽器
請根據須要開發的Spark版本,選擇合適的版本下載並解壓Maxcompute Spark發佈包。安全
2.2 設置環境變量網絡
JAVA_HOME設置
SPARK_HOME設置
export SPARK_HOME=/path/to/spark_extracted_package
export PATH=$SPARK_HOME/bin:$PATH
# MaxCompute帳號信息
spark.hadoop.odps.project.name =
spark.hadoop.odps.access.id =
spark.hadoop.odps.access.key =
# 如下配置保持不變
spark.sql.catalogImplementation=odps
spark.hadoop.odps.task.major.version = cupid_v2
spark.hadoop.odps.cupid.container.image.enable = true
spark.hadoop.odps.cupid.container.vm.engine.type = hyper
spark.hadoop.odps.end.point = http://service.cn.maxcompute.aliyun.com/api
spark.hadoop.odps.runtime.end.point = http://service.cn.maxcompute.aliyun-inc.com/api
#git clone git@github.com:aliyun/aliyun-cupid-sdk.git
編譯模塊
#cd ${path to aliyun-cupid-sdk}
#git checkout 3.3.2-public
// 編譯並安裝cupid-sdk
#cd ${path to aliyun-cupid-sdk}/core/cupid-sdk/
#mvn clean install -DskipTests
// 編譯並安裝datasource。依賴cupid-sdk
// for spark-2.x
# cd ${path to aliyun-cupid-sdk}/spark/spark-2.x/datasource
# mvn clean install -DskipTests
// for spark-1.x
# cd ${path to aliyun-cupid-sdk}/spark/spark-1.x/datasource
#mvn clean install -DskipTests
添加依賴
<!-- Spark-1.x請依賴此模塊 -->
<dependency>
<groupId>com.aliyun.odps</groupId>
<artifactId>odps-spark-datasource_2.10</artifactId>
<version>3.3.2-public</version>
</dependency>
<!-- Spark-2.x請依賴此模塊 -->
<dependency>
<groupId>com.aliyun.odps</groupId>
<artifactId>odps-spark-datasource_2.11</artifactId>
<version>3.3.2-public</version>
</dependency>
4. OSS依賴
<dependency>
<groupId>com.aliyun.odps</groupId>
<artifactId>hadoop-fs-oss</artifactId>
<version>3.3.2-public</version>
</dependency>
5. 應用開發
MaxCompute產品提供了兩個應用構建的模版,用戶能夠基於此模版進行開發,最後統一構建整個項目後用生成的應用包便可直接提交到MaxCompute集羣上運行Spark應用。
MaxCompute Spark提供兩個應用構建模版,用戶能夠基於此模版進行開發,最後統一構建整個項目後用生成的應用包便可直接提交到MaxCompute集羣上運行Spark應用。首先須要把代碼clone下來
#git clone git@github.com:aliyun/aliyun-cupid-sdk.git
#cd aliyun-cupid-sdk
#checkout 3.3.2-public
#cd archetypes
// for Spark-1.x
sh Create-AliSpark-1.x-APP.sh spark-1.x-demo /tmp
// for Spark-2.x
Create-AliSpark-2.x-APP.sh spark-2.x-demo /tmp
以上命令會在/tmp目錄下建立名爲 spark-1.x-demo(spark-2.x-demo)的maven project,執行如下命令進行編譯和提交做業:
#cd /tmp/spark-2.x/demo
#mvn clean package
// 提交做業
$SPARK_HOME/bin/spark-submit \
--master yarn-cluster \
--class SparkPi \
/tmp/spark-2.x-demo/target/AliSpark-2.x-quickstart-1.0-SNAPSHOT-shaded.jar
# Usage: sh Create-AliSpark-2.x-APP.sh <app_name> <target_path>
sh Create-AliSpark-2.x-APP.sh spark-2.x-demo /tmp/
cd /tmp/spark-2.x-demo
mvn clean package
# 冒煙測試
# 1 利用編譯出來的shaded jar包
# 2 按照文檔所示下載MaxCompute Spark客戶端
# 3 參考文檔」置環境變量」指引,填寫MaxCompute項目相關配置項
# 執行spark-submit命令 以下
$SPARK_HOME/bin/spark-submit \
--master yarn-cluster \
--class SparkPi \
/tmp/spark-2.x-demo/target/AliSpark-2.x-quickstart-1.0-SNAPSHOT-shaded.jar
pom.xml 須知
請注意 用戶構建Spark應用的時候,因爲是用MaxCompute提供的Spark客戶端去提交應用,故須要注意一些依賴scope的定義
<!-- spark相關依賴, provided -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-mllib_${scala.binary.version}</artifactId>
<version>${spark.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_${scala.binary.version}</artifactId>
<version>${spark.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_${scala.binary.version}</artifactId>
<version>${spark.version}</version>
<scope>provided</scope>
</dependency>
<!-- datasource依賴, 用於訪問MaxCompute表 -->
<dependency>
<groupId>com.aliyun.odps</groupId>
<artifactId>odps-spark-datasource_${scala.binary.version}</artifactId>
<version>3.3.2-public</version>
</dependency>
詳細代碼
提交方式
Step 1. build aliyun-cupid-sdk
Step 2. properly set spark.defaults.conf
Step 3. bin/spark-submit --master yarn-cluster --class \
com.aliyun.odps.spark.examples.WordCount \
${path to aliyun-cupid-sdk}/spark/spark-1.x/spark-examples/target/spark-examples_2.10-version-shaded.jar
詳細代碼
提交方式
# 運行可能會報Table Not Found的異常,由於用戶的MaxCompute Project中沒有代碼中指定的表
# 能夠參考代碼中的各類接口,實現對應Table的SparkSQL應用
Step 1. build aliyun-cupid-sdk
Step 2. properly set spark.defaults.conf
Step 3. bin/spark-submit --master yarn-cluster --class \
com.aliyun.odps.spark.examples.sparksql.SparkSQL \
${path to aliyun-cupid-sdk}/spark/spark-1.x/spark-examples/target/spark-examples_2.10-version-shaded.jar
詳細代碼
提交方式
Step 1. build aliyun-cupid-sdk
Step 2. properly set spark.defaults.conf
Step 3. bin/spark-submit --master yarn-cluster --class \
com.aliyun.odps.spark.examples.graphx.PageRank \
${path to aliyun-cupid-sdk}/spark/spark-1.x/spark-examples/target/spark-examples_2.10-version-shaded.jar
詳細代碼
提交方式
# 代碼中的OSS帳號信息相關須要填上,再編譯提交
conf.set("spark.hadoop.fs.oss.accessKeyId", "***")
conf.set("spark.hadoop.fs.oss.accessKeySecret", "***")
conf.set("spark.hadoop.fs.oss.endpoint", "http://oss-cn-hangzhou-zmf.aliyuncs.com")
Step 1. build aliyun-cupid-sdk
Step 2. properly set spark.defaults.conf
Step 3. bin/spark-submit --master yarn-cluster --class \
com.aliyun.odps.spark.examples.mllib.KmeansModelSaveToOss \
${path to aliyun-cupid-sdk}/spark/spark-1.x/spark-examples/target/spark-examples_2.10-version-shaded.jar
詳細代碼
提交方式
# 代碼中的OSS帳號信息相關須要填上,再編譯提交
conf.set("spark.hadoop.fs.oss.accessKeyId", "***")
conf.set("spark.hadoop.fs.oss.accessKeySecret", "***")
conf.set("spark.hadoop.fs.oss.endpoint", "http://oss-cn-hangzhou-zmf.aliyuncs.com")
Step 1. build aliyun-cupid-sdk
Step 2. properly set spark.defaults.conf
Step 3. bin/spark-submit --master yarn-cluster --class \
com.aliyun.odps.spark.examples.oss.SparkUnstructuredDataCompute \
${path to aliyun-cupid-sdk}/spark/spark-1.x/spark-examples/target/spark-examples_2.10-version-shaded.jar
pom.xml 須知
請注意 用戶構建Spark應用的時候,因爲是用MaxCompute提供的Spark客戶端去提交應用,故須要注意一些依賴scope的定義
<!-- spark相關依賴, provided -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-mllib_${scala.binary.version}</artifactId>
<version>${spark.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_${scala.binary.version}</artifactId>
<version>${spark.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_${scala.binary.version}</artifactId>
<version>${spark.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.aliyun.odps</groupId>
<artifactId>cupid-sdk</artifactId>
<scope>provided</scope>
</dependency>
<!-- datasource依賴, 用於訪問MaxCompute表 -->
<dependency>
<groupId>com.aliyun.odps</groupId>
<artifactId>odps-spark-datasource_${scala.binary.version}</artifactId>
<version>3.3.2-public</version>
</dependency>
詳細代碼
提交方式
Step 1. build aliyun-cupid-sdk
Step 2. properly set spark.defaults.conf
Step 3. bin/spark-submit --master yarn-cluster --class \
com.aliyun.odps.spark.examples.WordCount \
${path to aliyun-cupid-sdk}/spark/spark-2.x/spark-examples/target/spark-examples_2.11-version-shaded.jar
詳細代碼
提交方式
# 運行可能會報Table Not Found的異常,由於用戶的MaxCompute Project中沒有代碼中指定的表
# 能夠參考代碼中的各類接口,實現對應Table的SparkSQL應用
Step 1. build aliyun-cupid-sdk
Step 2. properly set spark.defaults.conf
Step 3. bin/spark-submit --master yarn-cluster --class \
com.aliyun.odps.spark.examples.sparksql.SparkSQL \
${path to aliyun-cupid-sdk}/spark/spark-2.x/spark-examples/target/spark-examples_2.11-version-shaded.jar
詳細代碼
提交方式
Step 1. build aliyun-cupid-sdk
Step 2. properly set spark.defaults.conf
Step 3. bin/spark-submit --master yarn-cluster --class \
com.aliyun.odps.spark.examples.graphx.PageRank \
${path to aliyun-cupid-sdk}/spark/spark-2.x/spark-examples/target/spark-examples_2.11-version-shaded.jar
KmeansModelSaveToOss
詳細代碼
提交方式
# 代碼中的OSS帳號信息相關須要填上,再編譯提交
val spark = SparkSession
.builder()
.config("spark.hadoop.fs.oss.accessKeyId", "***")
.config("spark.hadoop.fs.oss.accessKeySecret", "***")
.config("spark.hadoop.fs.oss.endpoint", "http://oss-cn-hangzhou-zmf.aliyuncs.com")
.appName("KmeansModelSaveToOss")
.getOrCreate()
Step 1. build aliyun-cupid-sdk
Step 2. properly set spark.defaults.conf
Step 3. bin/spark-submit --master yarn-cluster --class \
com.aliyun.odps.spark.examples.mllib.KmeansModelSaveToOss \
${path to aliyun-cupid-sdk}/spark/spark-2.x/spark-examples/target/spark-examples_2.11-version-shaded.jar
SparkUnstructuredDataCompute
詳細代碼
提交方式
# 代碼中的OSS帳號信息相關須要填上,再編譯提交
val spark = SparkSession
.builder()
.config("spark.hadoop.fs.oss.accessKeyId", "***")
.config("spark.hadoop.fs.oss.accessKeySecret", "***")
.config("spark.hadoop.fs.oss.endpoint", "http://oss-cn-hangzhou-zmf.aliyuncs.com")
.appName("SparkUnstructuredDataCompute")
.getOrCreate()
Step 1. build aliyun-cupid-sdk
Step 2. properly set spark.defaults.conf
Step 3. bin/spark-submit --master yarn-cluster --class \
com.aliyun.odps.spark.examples.oss.SparkUnstructuredDataCompute \
${path to aliyun-cupid-sdk}/spark/spark-2.x/spark-examples/target/spark-examples_2.11-version-shaded.jar
須要文件
若須要訪問MaxCompute表,則須要參考第三節(訪問MaxCompute表所需依賴)編譯datasource包
from pyspark import SparkContext, SparkConf
from pyspark.sql import OdpsContext
if __name__ == '__main__':
conf = SparkConf().setAppName("odps_pyspark")
sc = SparkContext(conf=conf)
sql_context = OdpsContext(sc)
df = sql_context.sql("select id, value from cupid_wordcount")
df.printSchema()
df.show(200)
df_2 = sql_context.sql("select id, value from cupid_partition_table1 where pt1 = 'part1'")
df_2.show(200)
#Create Drop Table
sql_context.sql("create table TestCtas as select * from cupid_wordcount").show()
sql_context.sql("drop table TestCtas").show()
提交運行:
./bin/spark-submit \
--jars ${path to odps-spark-datasource_2.10-3.3.2-public.jar} \
example.py
from pyspark.sql import SparkSession
if __name__ == '__main__':
spark = SparkSession.builder.appName("spark sql").getOrCreate()
df = spark.sql("select id, value from cupid_wordcount")
df.printSchema()
df.show(10, 200)
df_2 = spark.sql("SELECT product,category,revenue FROM (SELECT product,category,revenue, dense_rank() OVER (PARTITION BY category ORDER BY revenue DESC) as rank FROM productRevenue) tmp WHERE rank <= 2");
df_2.printSchema()
df_2.show(10, 200)
df_3 = spark.sql("select id, value from cupid_partition_table1 where pt1 = 'part1'")
df_3.show(10, 200)
#Create Drop Table
spark.sql("create table TestCtas as select * from cupid_wordcount").show()
spark.sql("drop table TestCtas").show()
提交運行:
spark-submit --master yarn-cluster \
--jars ${path to odps-spark-datasource_2.11-3.3.2-public.jar \
example.py
對於用戶使用Spark on MaxCompute對VPC環境內的RDS、Redis、ECS主機部署的服務等,受限於VPC的訪問限制,暫時還沒法訪問,即將在近期支持。
case1. 做業無需訪問MaxCompute表和OSS
用戶jar包可直接運行,參照第二節準備開發環境和修改配置。注意,對於spark或hadoop的依賴必須設成provided。
case2. 做業須要訪問MaxCompute表
參考第三節編譯datasource並安裝到本地maven倉庫,在pom中添加依賴後從新打包便可。
case3. 做業須要訪問OSS
參考第四節在pom中添加依賴後從新打包便可。
目前MaxCompute Spark支持如下幾種運行方式:local模式,cluster模式,和在DataWorks中執行模式。
local模式主要是讓用戶可以方便的調試應用代碼,使用方式跟社區相同,咱們添加了用tunnel讀寫ODPS表的功能。用戶能夠在ide和命令行中使用該模式,須要添加配置spark.master=local[N],其中N表示執行該模式所須要的cpu資源。此外,local模式下的讀寫表是經過讀寫tunnel完成的,須要在Spark-defaults.conf中增長tunnel配置項(請根據MaxCompute項目所在的region及網絡環境填寫對應的Tunnel Endpoint地址):tunnel_end_point=http://dt.cn-beijing.maxcompute.aliyun.com。命令行執行該模式的方式以下:
1.bin/spark-submit --master local[4] \
--class com.aliyun.odps.spark.examples.SparkPi \
${path to aliyun-cupid-sdk}/spark/spark-2.x/spark-examples/target/spark-examples_2.11-version-shaded.jar
在Cluster模式中,用戶須要指定自定義程序入口Main,Main結束(Success or Fail)spark job就會結束。使用場景適合於離線做業,能夠與阿里雲DataWorks產品結合進行做業調度。命令行提交方式以下:
1.bin/spark-submit --master yarn-cluster \
–class SparkPi \
${ProjectRoot}/spark/spark-2.x/spark-examples/target/spark-examples_2.11-version-shaded.jar
用戶能夠在DataWorks中運行MaxCompute Spark離線做業(cluster模式),以方便與其餘類型執行節點集成和調度。
第二步:在建立的業務流程中,從數據開發組件中選擇ODPS Spark節點。
雙擊拖拽到工做流的Spark節點,對Spark做業進行任務定義:
選擇Spark的版本、任務使用的開發語言,並指定任務所使用的資源文件。這裏的資源文件就是第一步在業務流程中預先上傳併發布的資源文件。同時,您還能夠指定提交做業時的配置項,如executor的數量、內存大小等配置項。同時設置配置項:spark.hadoop.odps.cupid.webproxy.endpoint
(取值填寫項目所在region的endpoint,如http://service.cn.maxcompute.aliyun-inc.com/api)、spark.hadoop.odps.moye.trackurl.host(取值填寫:http://jobview.odps.aliyun.com)
以便可以查看日誌中打印出的jobview信息。
手動執行Spark節點,能夠查看該任務的執行日誌,從打印出來的日誌中能夠獲取該任務的logview和jobview的url,編譯進一步查看與診斷
Spark做業定義完成後,便可以在業務流程中對不一樣類型服務進行編排、統一調度執行。
提交做業後,須要根據做業日誌來檢查做業是否正常提交併執行,MaxCompute對於Spark做業提供了Logview工具以及Spark Web-UI來幫助開發者進行做業診斷。
例如,經過Spark-submit方式(dataworks執行spark任務時也會產生相應日誌)提交做業,在做業日誌中會打印如下關鍵內容:
cd $SPARK_HOME
bin/spark-submit --master yarn-cluster --class SparkPi /tmp/spark-2.x-demo/target/AliSpark-2.x-quickstart-1.0-SNAPSHOT-shaded.jar
做業提交成功後,MaxCompute會建立一個instance,在日誌中會打印instance的logview:
19/01/05 20:36:47 INFO YarnClientImplUtil: logview url: http://logview.odps.aliyun.com/logview/?h=http://service.cn.maxcompute.aliyun.com/api&p=qn_beijing&i=20190105123647703gpqn26pr2&token=eG94TG1iTkZDSFErc1ZPcUZyTTdSWWQ3UE44PSxPRFBTX09CTzoxODc1NjUzNjIyNTQzMDYxLDE1NDY5NTEwMDcseyJTdGF0ZW1lbnQiOlt7IkFjdGlvbiI6WyJvZHBzOlJlYWQiXSwiRWZmZWN0IjoiQWxsb3ciLCJSZXNvdXJjZSI6WyJhY3M6b2RwczoqOnByb2plY3RzL3FuX2JlaWppbmcvaW5zdGFuY2VzLzIwMTkwMTA1MTIzNjQ3NzAzZ3BxbjI2cHIyIl19XSwiVmVyc2lvbiI6IjEifQ==
成功標準: <看到如下輸出,可能會有其餘日誌一併輸出>
19/01/05 20:37:34 INFO Client:
client token: N/A
diagnostics: N/A
ApplicationMaster host: 11.220.203.36
ApplicationMaster RPC port: 30002
queue: queue
start time: 1546691807945
final status: SUCCEEDED
tracking URL: http://jobview.odps.aliyun.com/proxyview/jobview/?h=http://service.cn.maxcompute.aliyun-inc.com/api&p=project_name&i=20190105123647703gpqn26pr2&t=spark&id=application_1546691794888_113905562&metaname=20190105123647703gpqn26pr2&token=TjhlQWswZTRpYWN2L3RuK25VeE5LVy9xSUNjPSxPRFBTX09CTzoxODc1NjUzNjIyNTQzMDYxLDE1NDY5NTEwMzcseyJTdGF0ZW1lbnQiOlt7IkFjdGlvbiI6WyJvZHBzOlJlYWQiXSwiRWZmZWN0IjoiQWxsb3ciLCJSZXNvdXJjZSI6WyJhY3M6b2RwczoqOnByb2plY3RzL3FuX2JlaWppbmcvaW5zdGFuY2VzLzIwMTkwMTA1MTIzNjQ3NzAzZ3BxbjI2cHIyIl19XSwiVmVyc2lvbiI6IjEifQ==
單擊TaskName爲master-0任務條,在下方FuxiInstance欄中,經過All按鈕過濾後,
單擊TempRoot的StdOut按鈕能夠查看SparkPi的輸出結果:
日誌中打印出上述的TrackingUrl,表示您的做業已經提交到MaxCompute集羣,這個TrackingUrl很是關鍵,它既是SparkWebUI,也是HistoryServer的Url。在瀏覽器中打開這個Url,能夠追蹤Spark做業的運行狀況
單擊driver的stdout便可以查看Spark做業的輸出內容。
本文爲雲棲社區原創內容,未經容許不得轉載。