基於Hadoop(M/R)的MySQL到Oracle海量數據切割

# 背景介紹

大數據時代,海量數據的遷移會很廣泛地出如今各個應用場景,本文主要討論利用Sqoop的分佈式能力從關係型數據庫MySQL到Oracle的海量數據遷移和切割。java

# 所需環境

1 JDK+Eclipse;mysql

2 Hadoop環境(version-2.6.5)git

3 Sqoop1.4.6-alpher(sqoop-1.4.6.bin__hadoop-2.0.4-alpha)github

# 實現細節

代碼說明

這裏只是大體介紹數據遷移實現的流程,具體代碼可在[GitHub]下載
[GitHub]: https://github.com/Jacker-Wang/sqoop-MysqlToOracle "GitHub"sql

Java實現所需maven依賴

所須要的maven依賴包主要有:數據庫

1 sqoop1.4.6版本的包(sqoop目前有版本1和版本2。sqoop1.4.6對應sqoop1,sqoop1.99.7對應於sqoop2。maven中的sqoop依賴下載不了,因此須要將sqoop-1.4.6.bin__hadoop-2.0.4-alpha中的sqoop-1.4.6.jar拷貝到你的本地倉庫對應的位置)。apache

2 鏈接MySQL的jar包mysql-connector-java。oracle

3 鏈接Oracle的jar包oracle-ojdbc7。app

4 hadoop基礎包maven

5 MapReduce基礎包

<properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <hadoop.version>2.5.0</hadoop.version>
    </properties>

    <dependencies>

        <dependency>
            <groupId>org.apache.sqoop</groupId>
            <artifactId>sqoop</artifactId>
            <version>1.4.6</version>
        </dependency>
        <dependency>
            <groupId>org.apache.commons</groupId>
            <artifactId>commons-lang3</artifactId>
            <version>3.3.2</version>
        </dependency>

        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-hdfs</artifactId>
            <version>${hadoop.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-common</artifactId>
            <version>${hadoop.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-mapreduce-client-core</artifactId>
            <version>${hadoop.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-mapreduce-client-app</artifactId>
            <version>${hadoop.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-mapreduce-client-common</artifactId>
            <version>${hadoop.version}</version>
        </dependency>

        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>5.1.36</version>
        </dependency>
        <dependency>
            <groupId>com.oracle</groupId>
            <artifactId>oracle-ojdbc7</artifactId>
            <version>12.1.0.2</version>
        </dependency>
    </dependencies>

個人主類

一 主類說明:
主類主要的實現步驟以下

1:讀取配置文件中的Mysql,Oracle數據鏈接信息,HDFS目標目錄,映射表名稱,和映射表相關的序列名稱。

2:實現靜態MySQL,Oracle操做類,用以創建相關表,獲取外鍵關聯表等相關數據。

3:創建靜態映射表(映射表做用是映射MySQL表名稱到Oracle表名稱,方便後續的使用)。

4:數據導入函數的實現importMySQLToOracle的實現,函數是遞歸的,由於導入數據以前,因爲約束的緣由,須要先導入外鍵關聯表的數據,因此這裏須要遞歸創建外鍵關聯表和導入外間關聯表的數據。

5:最後將數據導入的記錄添加到映射表中。

二 代碼實現:

package com.ctg.odp.collect.dbloader.importToOracle;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

import org.apache.commons.cli.CommandLine;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

import com.ctg.odp.collect.dbloader.sqoop.ExportToOracle;
import com.ctg.odp.collect.dbloader.sqoop.ImportToHDFS;

public class ImportData {
    private static String mapTable = null;
    private static String sequence = null;
    private static CommandLine commandLine = null;
    private static String targetDir = null;
    private static final Log LOG = LogFactory.getLog(ImportData.class);
    private static MysqlInfo mysqlInfo = null;
    private static OracleInfo oracleInfo = null;

    private static OperateMysql operateMysql = null;
    private static OperateOracle operateOracle = null;

    static {
        mapTable = DataUtil.getField("MapTableName");
        sequence = DataUtil.getField("SequenceName");
        targetDir = DataUtil.getField("targetDir");
        LOG.info("****讀取到的targetDir鏈接信息****\n" + targetDir);
    }

    public static void main(String[] args) {
        commandLine = DataUtil.getCommandLine(args);

        mysqlInfo = DataUtil.getMysqlInfo(commandLine);
        LOG.info("****讀取到的MySQL鏈接信息****\n" + mysqlInfo);
        oracleInfo = DataUtil.getOracleInfo(commandLine);
        LOG.info("****讀取到的Oracle鏈接信息****\n" + oracleInfo);

        operateMysql = new OperateMysql(mysqlInfo);
        operateOracle = new OperateOracle(oracleInfo, mapTable, sequence);

        // 創建索引表
        operateOracle.createMapTable();

        // 沒有指定表名,則導出全部表的數據
        if (mysqlInfo.getMysqlTable() == null) {
            List<String> tablesList = operateMysql.getAllTables();
            for (String table : tablesList) {
                LOG.info("****開始導入數據表****\n" + table);
                mysqlInfo.setMysqlTable(table);
                oracleInfo.setOracleTable(table);
                operateOracle.setOracleInfo(oracleInfo);
                operateMysql.setMysqlInfo(mysqlInfo);
                importMySQLToOracle(operateMysql, operateOracle, targetDir);
            }
        } else {
            LOG.info("****開始導入數據表****\n" + mysqlInfo.getMysqlTable());
            importMySQLToOracle(operateMysql, operateOracle, targetDir);
        }
        operateMysql.releaseResource();
        operateOracle.releaseResource();
    }

    public static Boolean importMySQLToOracle(OperateMysql operateMysql, OperateOracle operateOracle, String targetDir) {
        Boolean result = false;
        // 處理表名稱並建立oracle表
        OracleInfo oracleInfo = operateOracle.getOracleInfo();
        MysqlInfo mysqlInfo = operateMysql.getMysqlInfo();
        String resultTable = DataUtil.checkTableName(oracleInfo.getOracleTable());
        // 獲得oracle建表語句
        List<String> referenceTables = new ArrayList<String>();
        String createTableSQLOnOracle = operateMysql.getCreateTableSQLOnOracle(referenceTables);

        // 檢查外鍵關聯表是否存在
        for (String referenceTable : referenceTables) {
            if (!operateOracle.isExistTable(referenceTable)) {
                // 不存在外鍵關聯表,則要先建立關聯表
                MysqlInfo mysqlInfoReference = operateMysql.getMysqlInfo();
                mysqlInfoReference.setMysqlTable(referenceTable);
                operateMysql.setMysqlInfo(mysqlInfoReference);

                OracleInfo oracleInfoReference = operateOracle.getOracleInfo();
                oracleInfoReference.setOracleTable(referenceTable);
                operateOracle.setOracleInfo(oracleInfoReference);
                importMySQLToOracle(operateMysql, operateOracle, targetDir);
            }
        }

        operateOracle.createTable(resultTable, createTableSQLOnOracle);
        oracleInfo.setOracleTable(resultTable);

        // 數據從Mysql遷移到hdfs
        String SQLString = operateMysql.getSelectQuery();
        ImportToHDFS importToHDFS = new ImportToHDFS(mysqlInfo, targetDir, SQLString);
        try {
            int importResult = importToHDFS.importData();
            // 導入hdfs成功
            if (importResult == 0) {
                LOG.info("****表 " + mysqlInfo.getMysqlTable() + " 成功導出到HDFS目錄" + targetDir + "****\n");
                ExportToOracle exportToOracle = new ExportToOracle(oracleInfo, targetDir);
                int exportResult = exportToOracle.exportData();
                // 導出到oracle成功
                if (exportResult == 0) {
                    LOG.info("****表 " + mysqlInfo.getMysqlTable() + " 成功導出到oracle表" + oracleInfo.getOracleTable() + "****\n");
                    // 插入成功則創建將表映射到表中
                    if (operateOracle.deleteDataFromMap(mysqlInfo.getMysqlTable())) {
                        LOG.info("****表記錄 " + mysqlInfo.getMysqlTable() + " 從映射表中刪除****\n");

                    }
                    if (operateOracle.addDataToMap(mysqlInfo.getMysqlTable(), resultTable)) {
                        LOG.info("****表記錄 " + mysqlInfo.getMysqlTable() + " 添加到映射表****\n");

                    }
                    LOG.info("**********************************************************************************");
                    result = true;
                } else {
                    LOG.info("****表 " + mysqlInfo.getMysqlTable() + "導出到Oracle失敗****");
                    result = false;
                }
            } else {
                LOG.info("****表 " + mysqlInfo.getMysqlTable() + "導出到HDFS失敗****");
                result = false;
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
        return result;
    }
}

MySQL操做

一 MySQL操做類說明
操做類主要是實現獲取相關表的外間關聯表,判斷相關表是否存在等操。

二 如下類主要使用JDBC來操做MySQL

package com.ctg.odp.collect.dbloader.importToOracle;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.List;

import org.apache.commons.lang3.StringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

public class OperateMysql {
    private static final Log LOG = LogFactory.getLog(OperateMysql.class);
    private MysqlInfo mysqlInfo;
    private static String DRVIER = "com.mysql.jdbc.Driver";
    private String URL = null;
    // 建立一個數據庫鏈接
    Connection connection = null;
    // 建立預編譯語句對象,通常都是用這個而不用Statement
    PreparedStatement pstm = null;
    // 建立一個結果集對象
    ResultSet rs = null;

    public OperateMysql(MysqlInfo mysqlInfo) {
        this.mysqlInfo = mysqlInfo;
        URL = "jdbc:mysql://" + mysqlInfo.getMysqlHost() + ":3306/" + mysqlInfo.getMysqlDatabase()
                + "?useUnicode=true&amp;characterEncoding=utf-8&amp;allowMultiQueries=true&amp;zeroDateTimeBehavior=convertToNull";
        LOG.info("****開始鏈接到MySQL......****\n");
        connection = getConnection();
    }

    public ResultSet selectFromMysql(String table, int limitBgin, int limitEnd) {
        String sql = "select * from " + table + " limit " + limitBgin + "," + limitEnd;
        try {
            // 計算數據庫表中數據總數
            System.out.println("查詢語句 " + sql);
            pstm = connection.prepareStatement(sql);
            rs = pstm.executeQuery();
        } catch (SQLException e) {
            e.printStackTrace();
        }
        return rs;
    }

    public Connection getConnection() {
        try {
            Class.forName(DRVIER);
            connection = DriverManager.getConnection(URL, mysqlInfo.getMysqlUserName(), mysqlInfo.getMysqlPassWord());
            LOG.info("****成功鏈接到MySql數據庫****\n" + mysqlInfo.getMysqlDatabase());
        } catch (ClassNotFoundException e) {
            throw new RuntimeException("class not find !", e);
        } catch (SQLException e) {
            throw new RuntimeException("get connection error!", e);
        }

        return connection;
    }

    /**
     * 釋放資源
     */
    public void releaseResource() {
        if (rs != null) {
            try {
                rs.close();
            } catch (SQLException e) {
                e.printStackTrace();
            }
        }
        if (pstm != null) {
            try {
                pstm.close();
            } catch (SQLException e) {
                e.printStackTrace();
            }
        }
        if (connection != null) {
            try {
                connection.close();
            } catch (SQLException e) {
                e.printStackTrace();
            }
        }
    }

    // 獲得表的元數據信息
    public List<String> getTableInfo() {
        List<String> colums = new ArrayList<String>();
        try {
            ResultSet colSet = connection.getMetaData().getColumns(null, "%", mysqlInfo.getMysqlTable(), "%");
            ResultSet primarySet = connection.getMetaData().getPrimaryKeys(null, "%", mysqlInfo.getMysqlTable());

            primarySet.next();
            String primaryKey = primarySet.getString(4);
            System.out.println("key=" + primaryKey);

            while (colSet.next()) {
                String COLUMN_NAME = colSet.getString("COLUMN_NAME");
                String TYPE_NAME = colSet.getString("TYPE_NAME");
                String COLUMN_SIZE = colSet.getString("COLUMN_SIZE");
                String isAutoIncrement = colSet.getString("IS_AUTOINCREMENT");
                String iS_NULLABLE = colSet.getString("IS_NULLABLE");
                StringBuilder col = new StringBuilder(COLUMN_NAME + " ");
                // 設置列的大小而且更換類型
                if ("INT".equals(TYPE_NAME)) {
                    COLUMN_SIZE = "11";
                    TYPE_NAME = "NUMBER";
                }
                if ("BIGINT".equals(TYPE_NAME)) {
                    COLUMN_SIZE = "20";
                    TYPE_NAME = "NUMBER";
                }

                if ("VARCHAR".equals(TYPE_NAME)) {
                    TYPE_NAME = "VARCHAR2";
                }

                // 添加列的名稱和列的大小
                if ("DATETIME".equals(TYPE_NAME)) {
                    TYPE_NAME = "TIMESTAMP";
                    col.append(TYPE_NAME);
                } else {
                    col.append(TYPE_NAME).append("(").append(COLUMN_SIZE).append(")");
                }
                // 添加主鍵
                if (primaryKey.equals(COLUMN_NAME)) {
                    col.append(" PRIMARY KEY");
                }
                if ("YES".equals(isAutoIncrement)) {
                    col.append(" AUTO_INCREMENT");
                }
                if ("NO".equals(iS_NULLABLE)) {
                    col.append(" NOT NULL");
                }
                colums.add(col.toString());
            }
        } catch (SQLException e) {
            e.printStackTrace();
        }
        return colums;
    }

    // 獲取整形字段的列索引
    public List<Integer> getIntFileds() {
        List<Integer> result = new ArrayList<Integer>();
        ResultSet colSet = null;
        try {
            colSet = connection.getMetaData().getColumns(null, "%", mysqlInfo.getMysqlTable(), "%");
            while (colSet.next()) {
                String TYPE_NAME = colSet.getString("TYPE_NAME");
                if ("INT".equals(TYPE_NAME) || "BIGINT".equals(TYPE_NAME)) {
                    result.add(colSet.getInt("ORDINAL_POSITION"));
                }

            }
        } catch (SQLException e) {
            e.printStackTrace();
        }
        return result;
    }

    // 獲取Date字段的列索引
    public List<Integer> getDateFileds() {
        List<Integer> result = new ArrayList<Integer>();
        ResultSet colSet = null;
        try {
            colSet = connection.getMetaData().getColumns(null, "%", mysqlInfo.getMysqlTable(), "%");
            while (colSet.next()) {
                String TYPE_NAME = colSet.getString("TYPE_NAME");
                if ("DATETIME".equals(TYPE_NAME)) {
                    result.add(colSet.getInt("ORDINAL_POSITION"));
                }

            }
        } catch (SQLException e) {
            e.printStackTrace();
        }
        return result;
    }


    // 計算表的總行數
    public Integer getTableCount() {
        Integer rows = 0;
        String sql = "SELECT COUNT(*) FROM " + mysqlInfo.getMysqlTable();
        try {
           pstm=connection.prepareStatement(sql);
           ResultSet resultSet=pstm.executeQuery();
            while (resultSet.next()) {
                rows = resultSet.getInt(1);
            }
        } catch (SQLException e) {
            e.printStackTrace();
        }
        return rows;
    }

    // 獲取在oracle建表的Sql語句
    public String getCreateTableSQLOnOracle(List<String> referenceTables) {
        String createSQL = null;
        // 獲取建表sql語句
        String getCreateSQL = "SHOW CREATE TABLE " + mysqlInfo.getMysqlTable();
        try {
            pstm = connection.prepareStatement(getCreateSQL);
            ResultSet result = pstm.executeQuery();
            while (result.next()) {
                createSQL = result.getString(result.getMetaData().getColumnName(2));
            }
            createSQL = ConvertStatement.fromMySqlToOracle(createSQL, referenceTables);
        } catch (SQLException e1) {
            e1.printStackTrace();
            return null;
        }
        return createSQL;
    }

    public String getSelectQuery() {
        String selectSQL = null;
        String tableName = mysqlInfo.getMysqlTable();
        String showColQuery = "SHOW COLUMNS FROM " + tableName;
        try {
            pstm = connection.prepareStatement(showColQuery);
            ResultSet result = pstm.executeQuery();
            ArrayList<String> cols = new ArrayList<String>();
            while (result.next()) {
                String field = result.getString("Field");
                String isNull = result.getString("Null");
                if (isNull.toUpperCase().equals("NO")) {
                    cols.add("IF(" + field + "='',' '," + field + ")");
                } else {
                    cols.add(field);
                }
            }
            selectSQL = "SELECT " + StringUtils.join(cols, ",") + " FROM " + tableName + " WHERE $CONDITIONS";
        } catch (SQLException e1) {
            e1.printStackTrace();
            return null;
        }
        return selectSQL;
    }

    // 獲得數據庫中全部表
    public List<String> getAllTables() {
        List<String> tables = new ArrayList<String>();
        try {
            String[] types = { "TABLE" };
            ResultSet result = connection.getMetaData().getTables(null, null, null, types);
            while (result.next()) {
                tables.add(result.getString(3));
            }
        } catch (SQLException e) {
            e.printStackTrace();
        }
        return tables;
    }

    public MysqlInfo getMysqlInfo() {
        return mysqlInfo;
    }

    public void setMysqlInfo(MysqlInfo mysqlInfo) {
        this.mysqlInfo = mysqlInfo;
    }

}

操做Oracle實現

一 主要是使用JDBC來實現基本的Oracle操做

二 如下類用來操做Oracle

package com.ctg.odp.collect.dbloader.importToOracle;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

public class OperateOracle {
    private static final Log LOG = LogFactory.getLog(OperateOracle.class);
    private OracleInfo oracleInfo;
    private static String DRVIER = "oracle.jdbc.driver.OracleDriver";
    private String URL = null;

    private String mapTable = null;
    private String sequenceName = null;

    // 建立一個數據庫鏈接
    Connection connection = null;
    // 建立預編譯語句對象,通常都是用這個而不用Statement
    PreparedStatement pstm = null;
    // 建立一個結果集對象
    ResultSet rs = null;

    public OperateOracle(OracleInfo oracleInfo) {
        this.oracleInfo = oracleInfo;
        URL = "jdbc:oracle:thin:@" + oracleInfo.getOracleHost() + ":1521:" + oracleInfo.getOracleDatabase();
        LOG.info("****開始鏈接到Oracle......****\n");
        connection = getConnection();
    }

    public OperateOracle(OracleInfo oracleInfo, String mapTable, String sequenceName) {
        this.mapTable = mapTable;
        this.sequenceName = sequenceName;
        this.oracleInfo = oracleInfo;
        URL = "jdbc:oracle:thin:@" + oracleInfo.getOracleHost() + ":1521:" + oracleInfo.getOracleDatabase();
        LOG.info("****開始鏈接到Oracle****\n");
        connection = getConnection();
    }

    // 檢查表是否存在
    public Boolean isExistTable(String table) {
        String checkSql = "select count(*) from USER_OBJECTS where OBJECT_NAME = " + "'" + table.toUpperCase() + "'";
        LOG.info("****檢查表 " + table + " 是否已經存在****\n");
        try {
            pstm = connection.prepareStatement(checkSql);
            ResultSet result = pstm.executeQuery();
            result.next();
            if (result.getInt(1) == 1) {
                LOG.info("****表  " + table + " 已經存在****\n");
                return true;
            } else {
                return false;
            }
        } catch (SQLException e1) {
            e1.printStackTrace();
            return false;
        }
    }

    // 根據給出的表名稱和列的相關信息建立表
    public boolean createTable(String resultTable, String createTableSQLOnOracle) {
        if (isExistTable(resultTable)) {
            return false;
        } else {
            // 在oracle中建表
            createTableSQLOnOracle = createTableSQLOnOracle.replace(oracleInfo.getOracleTable(), resultTable);
            createTableSQLOnOracle = createTableSQLOnOracle.replace(oracleInfo.getOracleTable().toUpperCase(), resultTable);
            LOG.info("****開始建表,建表語句爲****\n" + createTableSQLOnOracle);
            try {
                String[] SQLArray = createTableSQLOnOracle.split(";");
                for (String sql : SQLArray) {
                    sql = sql.trim();
                    if (null != sql && !"\n".equals(sql) && !"".equals(sql)) {
                        pstm = connection.prepareStatement(sql);
                        int result = pstm.executeUpdate();
                    }
                }
            } catch (SQLException e) {
                e.printStackTrace();
                return false;
            }

            oracleInfo.setOracleTable(resultTable);
            LOG.info("****成功建立表****\n" + resultTable);
            return true;
        }
    }

    // 刪除映射表中數據
    public boolean deleteDataFromMap(String oldTable) {
        String insertSQL = "delete from " + mapTable + " where old_table=?";
        try {
            pstm = connection.prepareStatement(insertSQL);
            pstm.setString(1, oldTable);
            int result = pstm.executeUpdate();
            return result == 1 ? true : false;
        } catch (SQLException e) {
            e.printStackTrace();
            return false;
        }
    }

    // 向映射表中添加數據
    public boolean addDataToMap(String oldTable, String newTable) {
        String insertSQL = "insert into " + mapTable + " values(" + sequenceName + ".NEXTVAL,?,?)";
        try {
            pstm = connection.prepareStatement(insertSQL);
            pstm.setString(1, oldTable);
            pstm.setString(2, newTable);
            int result = pstm.executeUpdate();
            return result == 1 ? true : false;
        } catch (SQLException e) {
            e.printStackTrace();
            return false;
        }
    }

    // 檢查序列是否存在
    public Boolean isExistSequence(String sequence) {
        LOG.info("****檢查序列 " + sequence + " 是否存在****");
        String checkSQL = "select count(*) from user_sequences  where  sequence_name='" + sequenceName.toUpperCase() + "'";
        try {
            pstm = connection.prepareStatement(checkSQL);
            ResultSet result = pstm.executeQuery();
            result.next();
            return result.getInt(1) == 1 ? true : false;
        } catch (SQLException e) {
            e.printStackTrace();
            return false;
        }
    }

    // 建立映射表
    @SuppressWarnings("finally")
    public Boolean createMapTable() {
        Boolean result = false;
        String createTabkleSQL = "CREATE TABLE " + mapTable + "(id NUMBER PRIMARY KEY,old_table VARCHAR2(100),new_table VARCHAR2(100)  )";
        String SequenceSQL = "CREATE SEQUENCE " + sequenceName + " INCREMENT BY 1  START WITH 1  MINVALUE 1 NOMAXVALUE NOCYCLE NOCACHE";

        if (!isExistTable(mapTable))
            try {
                pstm = connection.prepareStatement(createTabkleSQL);
                int exeResultA = pstm.executeUpdate();
                if (exeResultA == 0) {
                    LOG.info("****成功建立映射表表****\n" + mapTable);
                    if (!isExistSequence(sequenceName)) {
                        pstm = connection.prepareStatement(SequenceSQL);
                        int exeResultB = pstm.executeUpdate();
                        if (exeResultB == 0) {
                            LOG.info("****成功建立序列****\n" + sequenceName);
                        }
                    } else {
                        LOG.info("****序列 " + sequenceName + " 已經存在****\n");
                    }
                }
            } catch (SQLException e) {
                e.printStackTrace();
            }
        return result;
    }

    public Connection getConnection() {
        try {
            Class.forName(DRVIER);
            System.out.println(URL);
            connection = DriverManager.getConnection(URL, oracleInfo.getOracleUserName(), oracleInfo.getOraclePassWord());
            LOG.info("****成功鏈接到Oracle數據庫****\n" + oracleInfo.getOracleDatabase());
        } catch (ClassNotFoundException e) {
            throw new RuntimeException("class not find !", e);
        } catch (SQLException e) {
            throw new RuntimeException("get connection error!", e);
        }

        return connection;
    }

    /**
     * 釋放資源
     */
    public void releaseResource() {
        if (rs != null) {
            try {
                rs.close();
            } catch (SQLException e) {
                e.printStackTrace();
            }
        }
        if (pstm != null) {
            try {
                pstm.close();
            } catch (SQLException e) {
                e.printStackTrace();
            }
        }
        if (connection != null) {
            try {
                connection.close();
            } catch (SQLException e) {
                e.printStackTrace();
            }
        }
    }

    public OracleInfo getOracleInfo() {
        return oracleInfo;
    }

    public void setOracleInfo(OracleInfo oracleInfo) {
        this.oracleInfo = oracleInfo;
    }

    public String getMapTable() {
        return mapTable;
    }

    public void setMapTable(String mapTable) {
        this.mapTable = mapTable;
    }

    public String getSequenceName() {
        return sequenceName;
    }

    public void setSequenceName(String sequenceName) {
        this.sequenceName = sequenceName;
    }

}

將數據從MySQL遷移到HDFS

一 主要功能:
將數據從MySQL遷移到HDFS,造成文本文件

二 如下是代碼實現

package com.ctg.odp.collect.dbloader.sqoop;

import java.io.IOException;

import org.apache.sqoop.tool.ImportAllTablesTool;
import org.apache.sqoop.tool.ImportTool;

import com.cloudera.sqoop.SqoopOptions;
import com.ctg.odp.collect.dbloader.importToOracle.MysqlInfo;

public class ImportToHDFS {
    private MysqlInfo mysqlInfo;
    private String hdfsTargetDir;
    private String SQLString;

    public ImportToHDFS(MysqlInfo mysqlInfo) {
        // this(mysqlInfo, "/apps/odp/data/dbloadTest");
    }


    public ImportToHDFS(MysqlInfo mysqlInfo, String hdfsTargetDir, String SQLString) {
        this.mysqlInfo = mysqlInfo;
        this.hdfsTargetDir = hdfsTargetDir;
        this.SQLString = SQLString;
    }

    @SuppressWarnings("deprecation")
    public SqoopOptions getSqoopOptions() throws IOException {
        String targetDir = hdfsTargetDir;
        SqoopOptions options = new SqoopOptions();
        ImportTool importTool = new ImportTool();
        options.setActiveSqoopTool(importTool);
        String connecString = "jdbc:mysql://" + mysqlInfo.getMysqlHost() + ":3306/" + mysqlInfo.getMysqlDatabase();
        options.setConnectString(connecString);
        options.setUsername(mysqlInfo.getMysqlUserName());
        options.setPassword(mysqlInfo.getMysqlPassWord());
        // options.setTableName(mysqlInfo.getMysqlTable());
        options.setSqlQuery(SQLString);
        options.setTargetDir(targetDir);
        options.setNumMappers(4);
        options.setDriverClassName("com.mysql.jdbc.Driver");
        options.setNullNonStringValue("");
        options.setDeleteMode(true);
        options.setSplitByCol("1");
        options.setFieldsTerminatedBy('^');
        return options;
    }

    // 導入指定表的數據
    public int importData() throws IOException {
        @SuppressWarnings("deprecation")
        SqoopOptions options = getSqoopOptions();
        ImportTool importTablesTool = new ImportTool();
        int result = importTablesTool.run(options);
        return result;
    }

    // 導入全部表的數據
    public int importAllTableData() throws IOException {
        @SuppressWarnings("deprecation")
        SqoopOptions options = getSqoopOptions();
        ImportAllTablesTool importAllTablesTool = new ImportAllTablesTool();
        int result = importAllTablesTool.run(options);
        return result;
    }
}

將數據從HDFS遷移到Oracle

一 主要功能:將數據從HDFS遷移到Oracle

二 如下是實現代碼

package com.ctg.odp.collect.dbloader.sqoop;
import java.io.IOException;
import org.apache.sqoop.tool.ExportTool;
import com.cloudera.sqoop.SqoopOptions;
import com.ctg.odp.collect.dbloader.importToOracle.OracleInfo;

public class ExportToOracle {
    private OracleInfo oracleInfo;
    private String hdfsSourceDir;

    public ExportToOracle(OracleInfo oracleInfo) {
        this(oracleInfo, "/apps/odp/data/dbload");
    }

    public ExportToOracle(OracleInfo oracleInfo, String hdfsSourceDir) {
        this.oracleInfo = oracleInfo;
        this.hdfsSourceDir = hdfsSourceDir;
    }

    @SuppressWarnings("deprecation")
    public SqoopOptions getSqoopOptions() throws IOException {
        SqoopOptions options = new SqoopOptions();
        ExportTool exportTool = new ExportTool();
        options.setActiveSqoopTool(exportTool);

        String connecString = "jdbc:oracle:thin:@" + oracleInfo.getOracleHost() + ":1521:" + oracleInfo.getOracleDatabase();
        options.setConnectString("jdbc:oracle:thin:@132.122.1.163:1521:orcl2");
        options.setUsername(oracleInfo.getOracleUserName());
        options.setPassword(oracleInfo.getOraclePassWord());
        options.setDirectMode(true);
        options.setNumMappers(4);
        options.setExportDir(hdfsSourceDir);
        options.setTableName(oracleInfo.getOracleTable().toUpperCase());
        String oracleManager = "org.apache.sqoop.manager.OracleManager";
        options.setConnManagerClassName(oracleManager);
        options.setInputFieldsTerminatedBy('^');
        return options;
    }

    public int exportData() throws IOException {
        @SuppressWarnings("deprecation")
        SqoopOptions options = getSqoopOptions();
        ExportTool exportTool = new ExportTool();
        int result = exportTool.run(options);
        return result;
    }
}

# 數據切割測試

測試環境準備

1:Mysql服務,準備一張用做數據導入的數據表。這裏使用表dbload_task_instance_run_result

2:Oracle服務,開啓Oracle服務。

3:開啓Hadoop服務。

測試步驟

由於主類中已將數據的建表,導入,數據導入到HDFS,數據導出到Oracle這幾個過程進行了集成。因此這裏只需以Java Application運行程序便可。而後咱們能夠查看運行結果。

測試結果

1:在Hadoop上能夠查看目標文件內容,以下:

1^46^增量參照列名稱^Integer^1234
1^91^qw^Integer^12
1^92^3^Integer^3
2^1^id^int^0
2^2^creattime^timestamp^2016-03-23 00:00:0.0
3^5^id^int^0
3^6^creattime^timestamp^2016-03-23 00:00:12.0
3^69^增量採集^Integer^2
42^51^creattime^timestamp^2016-03-23 00:00:12.0
47^64^qw^Date^21
47^65^wqwq^Integer^NaN
47^66^name^Integer^2
47^70^re^Integer^23
47^71^testParamsItem003^Date^1471190400000
51^62^id^Integer^NaN
58^75^id^Integer^12789499
58^76^creattime^Date^1458662405000
60^78^id^Integer^12789499
60^80^id^Integer^12789499
63^81^id^Integer^0
65^82^mdn^Integer^1
73^89^task_item_id^Integer^92
74^93^task_item_id^Integer^95
82^95^增量採集列名稱^Integer^12
88^98^task_id^Integer^93
91^103^col^Integer^112

上圖中字符^爲程序中設置的字段分隔符(默認爲英文逗號)。

2:查看結果:

在Oracle中也能夠看到實際對應的表。

JackerWang 於2017年秋天(10月26日)下午的廣州


我的技術站點

相關文章
相關標籤/搜索