X-Pack Spark服務經過外部計算資源的方式,爲Redis、Cassandra、MongoDB、HBase、RDS存儲服務提供複雜分析、流式處理及入庫、機器學習的能力,從而更好的解決用戶數據處理相關場景問題。java
一鍵關聯POLARDB到Spark集羣python
一鍵關聯主要是作好spark訪問RDS & POLARDB的準備工做。
mysql
POLARDB表存儲sql
在database ‘test1’中每5分鐘生成一張表,這裏假設爲表 'test1'、'test2'、'test2'、...
session
具體的建表語句以下:app
*請左右滑動閱覽機器學習
CREATE TABLE `test1` ( `a` int(11) NOT NULL, `b` time DEFAULT NULL, `c` double DEFAULT NULL, PRIMARY KEY (`a`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8
歸檔到Spark的調試學習
x-pack spark提供交互式查詢模式支持直接在控制檯提交sql、python腳本、scala code來調試。測試
一、首先建立一個交互式查詢的session,在其中添加mysql-connector的jar包。ui
*請左右滑動閱覽
wget https://spark-home.oss-cn-shanghai.aliyuncs.com/spark_connectors/mysql-connector-java-5.1.34.jar
二、建立交互式查詢
以pyspark爲例,下面是具體歸檔demo的代碼:
*請左右滑動閱覽
spark.sql("drop table sparktest").show() # 建立一張spark表,三級分區,分別是天、小時、分鐘,最後一級分鐘用來存儲具體的5分鐘的一張polardb表達的數據。字段和polardb裏面的類型一致 spark.sql("CREATE table sparktest(a int , b timestamp , c double ,dt string,hh string,mm string) " "USING parquet PARTITIONED BY (dt ,hh ,mm )").show() #本例子在polardb裏面建立了databse test1,具備三張表test1 ,test2,test3,這裏遍歷這三張表,每一個表存儲spark的一個5min的分區 # CREATE TABLE `test1` ( # `a` int(11) NOT NULL, # `b` time DEFAULT NULL, # `c` double DEFAULT NULL, # PRIMARY KEY (`a`) # ) ENGINE=InnoDB DEFAULT CHARSET=utf8 for num in range(1, 4): #構造polardb的表名 dbtable = "test1." + "test" + str(num) #spark外表關聯polardb對應的表 externalPolarDBTableNow = spark.read \ .format("jdbc") \ .option("driver", "com.mysql.jdbc.Driver") \ .option("url", "jdbc:mysql://pc-xxx.mysql.polardb.rds.aliyuncs.com:3306") \ .option("dbtable", dbtable) \ .option("user", "name") \ .option("password", "xxx*") \ .load().registerTempTable("polardbTableTemp") #生成本次polardb表數據要寫入的spark表的分區信息 (dtValue, hhValue, mmValue) = ("20191015", "13", str(05 * num)) #執行導數據sql spark.sql("insert into sparktest partition(dt= %s ,hh= %s , mm=%s ) " "select * from polardbTableTemp " % (dtValue, hhValue, mmValue)).show() #刪除臨時的spark映射polardb表的catalog spark.catalog.dropTempView("polardbTableTemp") #查看下分區以及統計下數據,主要用來作測試驗證,實際運行過程能夠刪除 spark.sql("show partitions sparktest").show(1000, False) spark.sql("select count(*) from sparktest").show()
歸檔做業上生產
交互式查詢定位爲臨時查詢及調試,生產的做業仍是建議使用spark做業的方式運行,使用文檔參考。這裏以pyspark做業爲例:
/polardb/polardbArchiving.py 內容以下:
*請左右滑動閱覽
# -*- coding: UTF-8 -*- from __future__ import print_function import sys from operator import add from pyspark.sql import SparkSession if __name__ == "__main__": spark = SparkSession \ .builder \ .appName("PolardbArchiving") \ .enableHiveSupport() \ .getOrCreate() spark.sql("drop table sparktest").show() # 建立一張spark表,三級分區,分別是天、小時、分鐘,最後一級分鐘用來存儲具體的5分鐘的一張polardb表達的數據。字段和polardb裏面的類型一致 spark.sql("CREATE table sparktest(a int , b timestamp , c double ,dt string,hh string,mm string) " "USING parquet PARTITIONED BY (dt ,hh ,mm )").show() #本例子在polardb裏面建立了databse test1,具備三張表test1 ,test2,test3,這裏遍歷這三張表,每一個表存儲spark的一個5min的分區 # CREATE TABLE `test1` ( # `a` int(11) NOT NULL, # `b` time DEFAULT NULL, # `c` double DEFAULT NULL, # PRIMARY KEY (`a`) # ) ENGINE=InnoDB DEFAULT CHARSET=utf8 for num in range(1, 4): #構造polardb的表名 dbtable = "test1." + "test" + str(num) #spark外表關聯polardb對應的表 externalPolarDBTableNow = spark.read \ .format("jdbc") \ .option("driver", "com.mysql.jdbc.Driver") \ .option("url", "jdbc:mysql://pc-.mysql.polardb.rds.aliyuncs.com:3306") \ .option("dbtable", dbtable) \ .option("user", "ma,e") \ .option("password", "xxx*") \ .load().registerTempTable("polardbTableTemp") #生成本次polardb表數據要寫入的spark表的分區信息 (dtValue, hhValue, mmValue) = ("20191015", "13", str(05 * num)) #執行導數據sql spark.sql("insert into sparktest partition(dt= %s ,hh= %s , mm=%s ) " "select * from polardbTableTemp " % (dtValue, hhValue, mmValue)).show() #刪除臨時的spark映射polardb表的catalog spark.catalog.dropTempView("polardbTableTemp") #查看下分區以及統計下數據,主要用來作測試驗證,實際運行過程能夠刪除 spark.sql("show partitions sparktest").show(1000, False) spark.sql("select count(*) from sparktest").show() spark.stop()