基於spark的關係型數據庫到HDFS的數據導入

  基於spark將關係型數據庫數據導入hdfs,支持增量追加導入、覆蓋導入和去重導入java

package com.shenyuchong;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;
import java.net.HttpURLConnection;
import java.net.URI;
import java.net.URL;
public class App 
{
    /**
     * 做用:
     *         將關係型數據庫數據導入hdfs(sql方式)
     *         支持mysql和oracle
     *         支持覆蓋和追加模式
     *         支持增量導入(取checkColumn字段的最大值)
     *         支持去重導入(數據源主鍵inKey,hdfs表主鍵outKey,多字段使用concat函數(以實際數據源字段鏈接函數爲準))
     */
    public static String ip = "127.0.0.1";
    public static String port = "3306";
    public static String baseType = "mysql";
    public static String inBase = "in_base";
    public static String userName = "un";
    public static String password = "pas";
    public static String sql = "select 1";
    public static String hdfs="hdfs://127.0.0.1:9000";
    public static String outBase = "base";
    public static String outTable = "table";
    public static String noticeUrl="http://127.0.0.1:6009/schedule/schedule/donothing";
    public static String writeMode = "append";
    public static String checkColumn = "";
    public static String inKey = "";
    public static String outKey = "";

    public static void main( String[] args )
    {
        for (int i = 0; i < args.length-1; i++) {
            if (args[i].equals("-ip"))                     ip=args[i + 1];         //數據源地址
            if (args[i].equals("-port"))                   port=args[i + 1];       //數據源端口
            if (args[i].equals("-base_type"))              baseType=args[i + 1];   //數據源類型
            if (args[i].equals("-in_base"))                inBase = args[i + 1];   //數據源數據庫名稱
            if (args[i].equals("-in_key"))                 inKey = args[i + 1];    //數據源主鍵
            if (args[i].equals("-out_key"))                outKey = args[i + 1];   //HDFS表主鍵
            if (args[i].equals("-user_name"))              userName=args[i + 1];   //數據源用戶名
            if (args[i].equals("-password"))               password=args[i + 1];   //數據源密碼
            if (args[i].equals("-sql"))                    sql=args[i + 1];        //導出語句(普通查詢語句)
            if (args[i].equals("-hdfs"))                   hdfs=args[i + 1];       //HDFS地址
            if (args[i].equals("-out_base"))               outBase=args[i + 1];    //輸出數據庫名
            if (args[i].equals("-out_table"))              outTable=args[i + 1];   //輸出表名
            if (args[i].equals("-notice_url"))             noticeUrl=args[i + 1];  //完成通知地址
            if (args[i].equals("-write_mode"))             writeMode=args[i + 1];  //寫入模式:overwrite|append
            if (args[i].equals("-check_column"))           checkColumn=args[i + 1];//增量追加檢查字段

        }
        /**
         * 必要的臨時變量
         */
        SparkSession spark = SparkSession.builder().getOrCreate();
        String tmpTable = outBase+"_"+outTable;
        String condition = "";
        String driver = "";
        String url = "";
        /**
         * 根據數據源類型加載驅動
         */
        if ("mysql".equals(baseType.toLowerCase())) {
            driver = "com.mysql.cj.jdbc.Driver";
            url = "jdbc:mysql://" + ip + ":" + port + "/" + outBase+ "?serverTimezone=Asia/Shanghai&characterEncoding=utf8&useSSL=false";
        } else if ("oracle".equals(baseType.toLowerCase())) {
            driver = "oracle.jdbc.driver.OracleDriver";
            url = "jdbc:oracle:thin:@" + ip + ":" + port + ":" + outBase;
        }
        /**
         * 寫入模式:追加|覆蓋
         */
        SaveMode saveMode = SaveMode.Append;
        if("overwrite".equals(writeMode))
            saveMode = SaveMode.Overwrite;

        String outSql = "select * from rdbTmpTable ";
        
        try {
            FileSystem fs = FileSystem.get(new URI(hdfs), new Configuration(), "root");
            /**
             * 檢查給定庫表的路徑是否存在
             * 若存在則註冊該路徑到臨時表
             * 表存在條件下checkColumn增量檢查字段和inKey、outKey主鍵才起效,並拼裝到導出語句
             */
            if(fs.exists(new Path("/user/"+outBase+"/"+outTable))&&fs.exists(new Path("/user/"+outBase+"/"+outTable+"/_SUCCESS"))){
                spark.read().parquet(hdfs+"/user/"+outBase+"/"+outTable+"/*").createOrReplaceTempView(outBase+"_"+outTable);
                /**
                 * 增量檢查字段拼裝
                 */
                if (checkColumn != null && !"".equals(checkColumn)) {
                    String lastValue = spark.sql("select max("+checkColumn+") from "+outBase+"_"+outTable).collectAsList().get(0).get(0).toString();
                    condition = " where " + checkColumn + " >'" + lastValue + "'";
                }
                /**
                 * 加載遠程數據源並註冊臨時表
                 */
                spark.read().format("jdbc").option("driver", driver).option("url", url)
                        .option("user", userName).option("password",password)
                        .option("dbtable", "(select * from (" +sql+ ") tmp_table1 " + condition +")  tmp_table2 ")
                        .load().registerTempTable("rdbTmpTable");
                /**
                 * 若inKey、outKey都不爲空,添加主鍵約束
                 */
                if(!"".equals(inKey)&&!"".equals(outKey))
                    outSql = "select * from rdbTmpTable where "+inKey+" not in ( select "+outKey+" from "+tmpTable+")";
            }
            /**
             * 打印
             */
            spark.sql("select * from rdbTmpTable").show();
            spark.sql("select "+outKey+" from "+tmpTable).show();
            spark.sql(outSql).show();
            /**
             * 將數據寫入hdfs
             */
            spark.sql(outSql).write().format("parquet").mode(saveMode).save(hdfs+"/user/"+outBase+"/"+outTable);
        } catch (Exception e) {
            e.printStackTrace();
        }
        /**
         * 通知後續服務直到後續服務接受了請求
         */
        boolean noticed=false;
        try {
            while(!noticed){
                Thread.sleep(2000);
                noticed = connectSuccess(noticeUrl);
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
        spark.log().info("---------------:成功!!");
    }
    /**
     * 根據地址請求服務,請求成功則返回true
     */
    public static boolean connectSuccess(String path){
        URL url;
        try {
            url = new URL(noticeUrl);
            HttpURLConnection con = (HttpURLConnection) url.openConnection();
            if(con.getResponseCode()==200) return true;
        } catch (Exception e) {
            return false;
        }
        return false;
    }
}

maven打包後使用:mysql

sh /opt/apps/spark/bin/spark-submit  --name mysql2hdfs --class com.gbd.App --master spark://127.0.0.1:7077  --deploy-mode client  --executor-memory 8G --total-executor-cores 4  /opt/apps/schedule/sparkrdbms2hdfs-2.0.jar -ip 127.0.0.1 -port 3306 -base_type mysql  -user_name root -password root -base_type mysql -out_base od -out_table table1 -hdfs hdfs://127.0.0.1:9000  -in_key "concat(id,datetime)" -out_key "concat(id,datetime)" -in_base ulanqab  -sql "select t.* from table1 t where datetime >=CONCAT(DATE_ADD(CURDATE(),INTERVAL 1 DAY),' ','00:00:00') and datetime <=CONCAT(DATE_ADD(CURDATE(),INTERVAL 2 DAY),' ','23:00:00')  " -notice_url http://127.0.0.1:6009/schedule/schedule/donothing

pom.xmlsql

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <groupId>gbd</groupId>
    <artifactId>sparkrdbms2hdfs</artifactId>
    <version>2.0</version>
    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    </properties>
    <dependencies>
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>8.0.11</version><!--$NO-MVN-MAN-VER$ -->
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>com.oracle</groupId>
            <artifactId>ojdbc8</artifactId>
            <version>12.1.0</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_2.11</artifactId>
            <version>2.4.3</version>
            <scope>provided</scope>
        </dependency>
    </dependencies>
    <build>
        <sourceDirectory>src</sourceDirectory>
        <plugins>
            <plugin>
                <artifactId>maven-assembly-plugin</artifactId>
                <configuration>
                    <appendAssemblyId>false</appendAssemblyId>
                    <descriptorRefs>
                        <descriptorRef>jar-with-dependencies</descriptorRef>
                    </descriptorRefs>
                    <archive>
                        <manifest>
                            <mainClass>com.shenyuchong.App</mainClass>
                        </manifest>
                    </archive>
                </configuration>
                <executions>
                    <execution>
                        <id>make-assembly</id>
                        <phase>package</phase>
                        <goals>
                            <goal>assembly</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>
</project>
相關文章
相關標籤/搜索