基於spark將關係型數據庫數據導入hdfs,支持增量追加導入、覆蓋導入和去重導入java
package com.shenyuchong; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.spark.sql.SaveMode; import org.apache.spark.sql.SparkSession; import java.net.HttpURLConnection; import java.net.URI; import java.net.URL; public class App { /** * 做用: * 將關係型數據庫數據導入hdfs(sql方式) * 支持mysql和oracle * 支持覆蓋和追加模式 * 支持增量導入(取checkColumn字段的最大值) * 支持去重導入(數據源主鍵inKey,hdfs表主鍵outKey,多字段使用concat函數(以實際數據源字段鏈接函數爲準)) */ public static String ip = "127.0.0.1"; public static String port = "3306"; public static String baseType = "mysql"; public static String inBase = "in_base"; public static String userName = "un"; public static String password = "pas"; public static String sql = "select 1"; public static String hdfs="hdfs://127.0.0.1:9000"; public static String outBase = "base"; public static String outTable = "table"; public static String noticeUrl="http://127.0.0.1:6009/schedule/schedule/donothing"; public static String writeMode = "append"; public static String checkColumn = ""; public static String inKey = ""; public static String outKey = ""; public static void main( String[] args ) { for (int i = 0; i < args.length-1; i++) { if (args[i].equals("-ip")) ip=args[i + 1]; //數據源地址 if (args[i].equals("-port")) port=args[i + 1]; //數據源端口 if (args[i].equals("-base_type")) baseType=args[i + 1]; //數據源類型 if (args[i].equals("-in_base")) inBase = args[i + 1]; //數據源數據庫名稱 if (args[i].equals("-in_key")) inKey = args[i + 1]; //數據源主鍵 if (args[i].equals("-out_key")) outKey = args[i + 1]; //HDFS表主鍵 if (args[i].equals("-user_name")) userName=args[i + 1]; //數據源用戶名 if (args[i].equals("-password")) password=args[i + 1]; //數據源密碼 if (args[i].equals("-sql")) sql=args[i + 1]; //導出語句(普通查詢語句) if (args[i].equals("-hdfs")) hdfs=args[i + 1]; //HDFS地址 if (args[i].equals("-out_base")) outBase=args[i + 1]; //輸出數據庫名 if (args[i].equals("-out_table")) outTable=args[i + 1]; //輸出表名 if (args[i].equals("-notice_url")) noticeUrl=args[i + 1]; //完成通知地址 if (args[i].equals("-write_mode")) writeMode=args[i + 1]; //寫入模式:overwrite|append if (args[i].equals("-check_column")) checkColumn=args[i + 1];//增量追加檢查字段 } /** * 必要的臨時變量 */ SparkSession spark = SparkSession.builder().getOrCreate(); String tmpTable = outBase+"_"+outTable; String condition = ""; String driver = ""; String url = ""; /** * 根據數據源類型加載驅動 */ if ("mysql".equals(baseType.toLowerCase())) { driver = "com.mysql.cj.jdbc.Driver"; url = "jdbc:mysql://" + ip + ":" + port + "/" + outBase+ "?serverTimezone=Asia/Shanghai&characterEncoding=utf8&useSSL=false"; } else if ("oracle".equals(baseType.toLowerCase())) { driver = "oracle.jdbc.driver.OracleDriver"; url = "jdbc:oracle:thin:@" + ip + ":" + port + ":" + outBase; } /** * 寫入模式:追加|覆蓋 */ SaveMode saveMode = SaveMode.Append; if("overwrite".equals(writeMode)) saveMode = SaveMode.Overwrite; String outSql = "select * from rdbTmpTable "; try { FileSystem fs = FileSystem.get(new URI(hdfs), new Configuration(), "root"); /** * 檢查給定庫表的路徑是否存在 * 若存在則註冊該路徑到臨時表 * 表存在條件下checkColumn增量檢查字段和inKey、outKey主鍵才起效,並拼裝到導出語句 */ if(fs.exists(new Path("/user/"+outBase+"/"+outTable))&&fs.exists(new Path("/user/"+outBase+"/"+outTable+"/_SUCCESS"))){ spark.read().parquet(hdfs+"/user/"+outBase+"/"+outTable+"/*").createOrReplaceTempView(outBase+"_"+outTable); /** * 增量檢查字段拼裝 */ if (checkColumn != null && !"".equals(checkColumn)) { String lastValue = spark.sql("select max("+checkColumn+") from "+outBase+"_"+outTable).collectAsList().get(0).get(0).toString(); condition = " where " + checkColumn + " >'" + lastValue + "'"; } /** * 加載遠程數據源並註冊臨時表 */ spark.read().format("jdbc").option("driver", driver).option("url", url) .option("user", userName).option("password",password) .option("dbtable", "(select * from (" +sql+ ") tmp_table1 " + condition +") tmp_table2 ") .load().registerTempTable("rdbTmpTable"); /** * 若inKey、outKey都不爲空,添加主鍵約束 */ if(!"".equals(inKey)&&!"".equals(outKey)) outSql = "select * from rdbTmpTable where "+inKey+" not in ( select "+outKey+" from "+tmpTable+")"; } /** * 打印 */ spark.sql("select * from rdbTmpTable").show(); spark.sql("select "+outKey+" from "+tmpTable).show(); spark.sql(outSql).show(); /** * 將數據寫入hdfs */ spark.sql(outSql).write().format("parquet").mode(saveMode).save(hdfs+"/user/"+outBase+"/"+outTable); } catch (Exception e) { e.printStackTrace(); } /** * 通知後續服務直到後續服務接受了請求 */ boolean noticed=false; try { while(!noticed){ Thread.sleep(2000); noticed = connectSuccess(noticeUrl); } } catch (Exception e) { e.printStackTrace(); } spark.log().info("---------------:成功!!"); } /** * 根據地址請求服務,請求成功則返回true */ public static boolean connectSuccess(String path){ URL url; try { url = new URL(noticeUrl); HttpURLConnection con = (HttpURLConnection) url.openConnection(); if(con.getResponseCode()==200) return true; } catch (Exception e) { return false; } return false; } }
maven打包後使用:mysql
sh /opt/apps/spark/bin/spark-submit --name mysql2hdfs --class com.gbd.App --master spark://127.0.0.1:7077 --deploy-mode client --executor-memory 8G --total-executor-cores 4 /opt/apps/schedule/sparkrdbms2hdfs-2.0.jar -ip 127.0.0.1 -port 3306 -base_type mysql -user_name root -password root -base_type mysql -out_base od -out_table table1 -hdfs hdfs://127.0.0.1:9000 -in_key "concat(id,datetime)" -out_key "concat(id,datetime)" -in_base ulanqab -sql "select t.* from table1 t where datetime >=CONCAT(DATE_ADD(CURDATE(),INTERVAL 1 DAY),' ','00:00:00') and datetime <=CONCAT(DATE_ADD(CURDATE(),INTERVAL 2 DAY),' ','23:00:00') " -notice_url http://127.0.0.1:6009/schedule/schedule/donothing
pom.xmlsql
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>gbd</groupId> <artifactId>sparkrdbms2hdfs</artifactId> <version>2.0</version> <properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> </properties> <dependencies> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <version>8.0.11</version><!--$NO-MVN-MAN-VER$ --> <scope>provided</scope> </dependency> <dependency> <groupId>com.oracle</groupId> <artifactId>ojdbc8</artifactId> <version>12.1.0</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-sql_2.11</artifactId> <version>2.4.3</version> <scope>provided</scope> </dependency> </dependencies> <build> <sourceDirectory>src</sourceDirectory> <plugins> <plugin> <artifactId>maven-assembly-plugin</artifactId> <configuration> <appendAssemblyId>false</appendAssemblyId> <descriptorRefs> <descriptorRef>jar-with-dependencies</descriptorRef> </descriptorRefs> <archive> <manifest> <mainClass>com.shenyuchong.App</mainClass> </manifest> </archive> </configuration> <executions> <execution> <id>make-assembly</id> <phase>package</phase> <goals> <goal>assembly</goal> </goals> </execution> </executions> </plugin> </plugins> </build> </project>