大數據時代,海量數據的遷移會很廣泛地出如今各個應用場景,本文主要討論利用Sqoop的分佈式能力從關係型數據庫MySQL到Oracle的海量數據遷移和切割。java
1 JDK+Eclipse;mysql
2 Hadoop環境(version-2.6.5)git
3 Sqoop1.4.6-alpher(sqoop-1.4.6.bin__hadoop-2.0.4-alpha)github
這裏只是大體介紹數據遷移實現的流程,具體代碼可在[GitHub]下載
[GitHub]: https://github.com/Jacker-Wang/sqoop-MysqlToOracle "GitHub"sql
所須要的maven依賴包主要有:數據庫
1 sqoop1.4.6版本的包(sqoop目前有版本1和版本2。sqoop1.4.6對應sqoop1,sqoop1.99.7對應於sqoop2。maven中的sqoop依賴下載不了,因此須要將sqoop-1.4.6.bin__hadoop-2.0.4-alpha中的sqoop-1.4.6.jar拷貝到你的本地倉庫對應的位置)。apache
2 鏈接MySQL的jar包mysql-connector-java。oracle
3 鏈接Oracle的jar包oracle-ojdbc7。app
4 hadoop基礎包maven
5 MapReduce基礎包
<properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <hadoop.version>2.5.0</hadoop.version> </properties> <dependencies> <dependency> <groupId>org.apache.sqoop</groupId> <artifactId>sqoop</artifactId> <version>1.4.6</version> </dependency> <dependency> <groupId>org.apache.commons</groupId> <artifactId>commons-lang3</artifactId> <version>3.3.2</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-hdfs</artifactId> <version>${hadoop.version}</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-common</artifactId> <version>${hadoop.version}</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-mapreduce-client-core</artifactId> <version>${hadoop.version}</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-mapreduce-client-app</artifactId> <version>${hadoop.version}</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-mapreduce-client-common</artifactId> <version>${hadoop.version}</version> </dependency> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <version>5.1.36</version> </dependency> <dependency> <groupId>com.oracle</groupId> <artifactId>oracle-ojdbc7</artifactId> <version>12.1.0.2</version> </dependency> </dependencies>
一 主類說明:
主類主要的實現步驟以下
1:讀取配置文件中的Mysql,Oracle數據鏈接信息,HDFS目標目錄,映射表名稱,和映射表相關的序列名稱。
2:實現靜態MySQL,Oracle操做類,用以創建相關表,獲取外鍵關聯表等相關數據。
3:創建靜態映射表(映射表做用是映射MySQL表名稱到Oracle表名稱,方便後續的使用)。
4:數據導入函數的實現importMySQLToOracle的實現,函數是遞歸的,由於導入數據以前,因爲約束的緣由,須要先導入外鍵關聯表的數據,因此這裏須要遞歸創建外鍵關聯表和導入外間關聯表的數據。
5:最後將數據導入的記錄添加到映射表中。
二 代碼實現:
package com.ctg.odp.collect.dbloader.importToOracle; import java.io.IOException; import java.util.ArrayList; import java.util.List; import org.apache.commons.cli.CommandLine; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import com.ctg.odp.collect.dbloader.sqoop.ExportToOracle; import com.ctg.odp.collect.dbloader.sqoop.ImportToHDFS; public class ImportData { private static String mapTable = null; private static String sequence = null; private static CommandLine commandLine = null; private static String targetDir = null; private static final Log LOG = LogFactory.getLog(ImportData.class); private static MysqlInfo mysqlInfo = null; private static OracleInfo oracleInfo = null; private static OperateMysql operateMysql = null; private static OperateOracle operateOracle = null; static { mapTable = DataUtil.getField("MapTableName"); sequence = DataUtil.getField("SequenceName"); targetDir = DataUtil.getField("targetDir"); LOG.info("****讀取到的targetDir鏈接信息****\n" + targetDir); } public static void main(String[] args) { commandLine = DataUtil.getCommandLine(args); mysqlInfo = DataUtil.getMysqlInfo(commandLine); LOG.info("****讀取到的MySQL鏈接信息****\n" + mysqlInfo); oracleInfo = DataUtil.getOracleInfo(commandLine); LOG.info("****讀取到的Oracle鏈接信息****\n" + oracleInfo); operateMysql = new OperateMysql(mysqlInfo); operateOracle = new OperateOracle(oracleInfo, mapTable, sequence); // 創建索引表 operateOracle.createMapTable(); // 沒有指定表名,則導出全部表的數據 if (mysqlInfo.getMysqlTable() == null) { List<String> tablesList = operateMysql.getAllTables(); for (String table : tablesList) { LOG.info("****開始導入數據表****\n" + table); mysqlInfo.setMysqlTable(table); oracleInfo.setOracleTable(table); operateOracle.setOracleInfo(oracleInfo); operateMysql.setMysqlInfo(mysqlInfo); importMySQLToOracle(operateMysql, operateOracle, targetDir); } } else { LOG.info("****開始導入數據表****\n" + mysqlInfo.getMysqlTable()); importMySQLToOracle(operateMysql, operateOracle, targetDir); } operateMysql.releaseResource(); operateOracle.releaseResource(); } public static Boolean importMySQLToOracle(OperateMysql operateMysql, OperateOracle operateOracle, String targetDir) { Boolean result = false; // 處理表名稱並建立oracle表 OracleInfo oracleInfo = operateOracle.getOracleInfo(); MysqlInfo mysqlInfo = operateMysql.getMysqlInfo(); String resultTable = DataUtil.checkTableName(oracleInfo.getOracleTable()); // 獲得oracle建表語句 List<String> referenceTables = new ArrayList<String>(); String createTableSQLOnOracle = operateMysql.getCreateTableSQLOnOracle(referenceTables); // 檢查外鍵關聯表是否存在 for (String referenceTable : referenceTables) { if (!operateOracle.isExistTable(referenceTable)) { // 不存在外鍵關聯表,則要先建立關聯表 MysqlInfo mysqlInfoReference = operateMysql.getMysqlInfo(); mysqlInfoReference.setMysqlTable(referenceTable); operateMysql.setMysqlInfo(mysqlInfoReference); OracleInfo oracleInfoReference = operateOracle.getOracleInfo(); oracleInfoReference.setOracleTable(referenceTable); operateOracle.setOracleInfo(oracleInfoReference); importMySQLToOracle(operateMysql, operateOracle, targetDir); } } operateOracle.createTable(resultTable, createTableSQLOnOracle); oracleInfo.setOracleTable(resultTable); // 數據從Mysql遷移到hdfs String SQLString = operateMysql.getSelectQuery(); ImportToHDFS importToHDFS = new ImportToHDFS(mysqlInfo, targetDir, SQLString); try { int importResult = importToHDFS.importData(); // 導入hdfs成功 if (importResult == 0) { LOG.info("****表 " + mysqlInfo.getMysqlTable() + " 成功導出到HDFS目錄" + targetDir + "****\n"); ExportToOracle exportToOracle = new ExportToOracle(oracleInfo, targetDir); int exportResult = exportToOracle.exportData(); // 導出到oracle成功 if (exportResult == 0) { LOG.info("****表 " + mysqlInfo.getMysqlTable() + " 成功導出到oracle表" + oracleInfo.getOracleTable() + "****\n"); // 插入成功則創建將表映射到表中 if (operateOracle.deleteDataFromMap(mysqlInfo.getMysqlTable())) { LOG.info("****表記錄 " + mysqlInfo.getMysqlTable() + " 從映射表中刪除****\n"); } if (operateOracle.addDataToMap(mysqlInfo.getMysqlTable(), resultTable)) { LOG.info("****表記錄 " + mysqlInfo.getMysqlTable() + " 添加到映射表****\n"); } LOG.info("**********************************************************************************"); result = true; } else { LOG.info("****表 " + mysqlInfo.getMysqlTable() + "導出到Oracle失敗****"); result = false; } } else { LOG.info("****表 " + mysqlInfo.getMysqlTable() + "導出到HDFS失敗****"); result = false; } } catch (IOException e) { e.printStackTrace(); } return result; } }
一 MySQL操做類說明
操做類主要是實現獲取相關表的外間關聯表,判斷相關表是否存在等操。
二 如下類主要使用JDBC來操做MySQL
package com.ctg.odp.collect.dbloader.importToOracle; import java.sql.Connection; import java.sql.DriverManager; import java.sql.PreparedStatement; import java.sql.ResultSet; import java.sql.SQLException; import java.util.ArrayList; import java.util.List; import org.apache.commons.lang3.StringUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; public class OperateMysql { private static final Log LOG = LogFactory.getLog(OperateMysql.class); private MysqlInfo mysqlInfo; private static String DRVIER = "com.mysql.jdbc.Driver"; private String URL = null; // 建立一個數據庫鏈接 Connection connection = null; // 建立預編譯語句對象,通常都是用這個而不用Statement PreparedStatement pstm = null; // 建立一個結果集對象 ResultSet rs = null; public OperateMysql(MysqlInfo mysqlInfo) { this.mysqlInfo = mysqlInfo; URL = "jdbc:mysql://" + mysqlInfo.getMysqlHost() + ":3306/" + mysqlInfo.getMysqlDatabase() + "?useUnicode=true&characterEncoding=utf-8&allowMultiQueries=true&zeroDateTimeBehavior=convertToNull"; LOG.info("****開始鏈接到MySQL......****\n"); connection = getConnection(); } public ResultSet selectFromMysql(String table, int limitBgin, int limitEnd) { String sql = "select * from " + table + " limit " + limitBgin + "," + limitEnd; try { // 計算數據庫表中數據總數 System.out.println("查詢語句 " + sql); pstm = connection.prepareStatement(sql); rs = pstm.executeQuery(); } catch (SQLException e) { e.printStackTrace(); } return rs; } public Connection getConnection() { try { Class.forName(DRVIER); connection = DriverManager.getConnection(URL, mysqlInfo.getMysqlUserName(), mysqlInfo.getMysqlPassWord()); LOG.info("****成功鏈接到MySql數據庫****\n" + mysqlInfo.getMysqlDatabase()); } catch (ClassNotFoundException e) { throw new RuntimeException("class not find !", e); } catch (SQLException e) { throw new RuntimeException("get connection error!", e); } return connection; } /** * 釋放資源 */ public void releaseResource() { if (rs != null) { try { rs.close(); } catch (SQLException e) { e.printStackTrace(); } } if (pstm != null) { try { pstm.close(); } catch (SQLException e) { e.printStackTrace(); } } if (connection != null) { try { connection.close(); } catch (SQLException e) { e.printStackTrace(); } } } // 獲得表的元數據信息 public List<String> getTableInfo() { List<String> colums = new ArrayList<String>(); try { ResultSet colSet = connection.getMetaData().getColumns(null, "%", mysqlInfo.getMysqlTable(), "%"); ResultSet primarySet = connection.getMetaData().getPrimaryKeys(null, "%", mysqlInfo.getMysqlTable()); primarySet.next(); String primaryKey = primarySet.getString(4); System.out.println("key=" + primaryKey); while (colSet.next()) { String COLUMN_NAME = colSet.getString("COLUMN_NAME"); String TYPE_NAME = colSet.getString("TYPE_NAME"); String COLUMN_SIZE = colSet.getString("COLUMN_SIZE"); String isAutoIncrement = colSet.getString("IS_AUTOINCREMENT"); String iS_NULLABLE = colSet.getString("IS_NULLABLE"); StringBuilder col = new StringBuilder(COLUMN_NAME + " "); // 設置列的大小而且更換類型 if ("INT".equals(TYPE_NAME)) { COLUMN_SIZE = "11"; TYPE_NAME = "NUMBER"; } if ("BIGINT".equals(TYPE_NAME)) { COLUMN_SIZE = "20"; TYPE_NAME = "NUMBER"; } if ("VARCHAR".equals(TYPE_NAME)) { TYPE_NAME = "VARCHAR2"; } // 添加列的名稱和列的大小 if ("DATETIME".equals(TYPE_NAME)) { TYPE_NAME = "TIMESTAMP"; col.append(TYPE_NAME); } else { col.append(TYPE_NAME).append("(").append(COLUMN_SIZE).append(")"); } // 添加主鍵 if (primaryKey.equals(COLUMN_NAME)) { col.append(" PRIMARY KEY"); } if ("YES".equals(isAutoIncrement)) { col.append(" AUTO_INCREMENT"); } if ("NO".equals(iS_NULLABLE)) { col.append(" NOT NULL"); } colums.add(col.toString()); } } catch (SQLException e) { e.printStackTrace(); } return colums; } // 獲取整形字段的列索引 public List<Integer> getIntFileds() { List<Integer> result = new ArrayList<Integer>(); ResultSet colSet = null; try { colSet = connection.getMetaData().getColumns(null, "%", mysqlInfo.getMysqlTable(), "%"); while (colSet.next()) { String TYPE_NAME = colSet.getString("TYPE_NAME"); if ("INT".equals(TYPE_NAME) || "BIGINT".equals(TYPE_NAME)) { result.add(colSet.getInt("ORDINAL_POSITION")); } } } catch (SQLException e) { e.printStackTrace(); } return result; } // 獲取Date字段的列索引 public List<Integer> getDateFileds() { List<Integer> result = new ArrayList<Integer>(); ResultSet colSet = null; try { colSet = connection.getMetaData().getColumns(null, "%", mysqlInfo.getMysqlTable(), "%"); while (colSet.next()) { String TYPE_NAME = colSet.getString("TYPE_NAME"); if ("DATETIME".equals(TYPE_NAME)) { result.add(colSet.getInt("ORDINAL_POSITION")); } } } catch (SQLException e) { e.printStackTrace(); } return result; } // 計算表的總行數 public Integer getTableCount() { Integer rows = 0; String sql = "SELECT COUNT(*) FROM " + mysqlInfo.getMysqlTable(); try { pstm=connection.prepareStatement(sql); ResultSet resultSet=pstm.executeQuery(); while (resultSet.next()) { rows = resultSet.getInt(1); } } catch (SQLException e) { e.printStackTrace(); } return rows; } // 獲取在oracle建表的Sql語句 public String getCreateTableSQLOnOracle(List<String> referenceTables) { String createSQL = null; // 獲取建表sql語句 String getCreateSQL = "SHOW CREATE TABLE " + mysqlInfo.getMysqlTable(); try { pstm = connection.prepareStatement(getCreateSQL); ResultSet result = pstm.executeQuery(); while (result.next()) { createSQL = result.getString(result.getMetaData().getColumnName(2)); } createSQL = ConvertStatement.fromMySqlToOracle(createSQL, referenceTables); } catch (SQLException e1) { e1.printStackTrace(); return null; } return createSQL; } public String getSelectQuery() { String selectSQL = null; String tableName = mysqlInfo.getMysqlTable(); String showColQuery = "SHOW COLUMNS FROM " + tableName; try { pstm = connection.prepareStatement(showColQuery); ResultSet result = pstm.executeQuery(); ArrayList<String> cols = new ArrayList<String>(); while (result.next()) { String field = result.getString("Field"); String isNull = result.getString("Null"); if (isNull.toUpperCase().equals("NO")) { cols.add("IF(" + field + "='',' '," + field + ")"); } else { cols.add(field); } } selectSQL = "SELECT " + StringUtils.join(cols, ",") + " FROM " + tableName + " WHERE $CONDITIONS"; } catch (SQLException e1) { e1.printStackTrace(); return null; } return selectSQL; } // 獲得數據庫中全部表 public List<String> getAllTables() { List<String> tables = new ArrayList<String>(); try { String[] types = { "TABLE" }; ResultSet result = connection.getMetaData().getTables(null, null, null, types); while (result.next()) { tables.add(result.getString(3)); } } catch (SQLException e) { e.printStackTrace(); } return tables; } public MysqlInfo getMysqlInfo() { return mysqlInfo; } public void setMysqlInfo(MysqlInfo mysqlInfo) { this.mysqlInfo = mysqlInfo; } }
一 主要是使用JDBC來實現基本的Oracle操做
二 如下類用來操做Oracle
package com.ctg.odp.collect.dbloader.importToOracle; import java.sql.Connection; import java.sql.DriverManager; import java.sql.PreparedStatement; import java.sql.ResultSet; import java.sql.SQLException; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; public class OperateOracle { private static final Log LOG = LogFactory.getLog(OperateOracle.class); private OracleInfo oracleInfo; private static String DRVIER = "oracle.jdbc.driver.OracleDriver"; private String URL = null; private String mapTable = null; private String sequenceName = null; // 建立一個數據庫鏈接 Connection connection = null; // 建立預編譯語句對象,通常都是用這個而不用Statement PreparedStatement pstm = null; // 建立一個結果集對象 ResultSet rs = null; public OperateOracle(OracleInfo oracleInfo) { this.oracleInfo = oracleInfo; URL = "jdbc:oracle:thin:@" + oracleInfo.getOracleHost() + ":1521:" + oracleInfo.getOracleDatabase(); LOG.info("****開始鏈接到Oracle......****\n"); connection = getConnection(); } public OperateOracle(OracleInfo oracleInfo, String mapTable, String sequenceName) { this.mapTable = mapTable; this.sequenceName = sequenceName; this.oracleInfo = oracleInfo; URL = "jdbc:oracle:thin:@" + oracleInfo.getOracleHost() + ":1521:" + oracleInfo.getOracleDatabase(); LOG.info("****開始鏈接到Oracle****\n"); connection = getConnection(); } // 檢查表是否存在 public Boolean isExistTable(String table) { String checkSql = "select count(*) from USER_OBJECTS where OBJECT_NAME = " + "'" + table.toUpperCase() + "'"; LOG.info("****檢查表 " + table + " 是否已經存在****\n"); try { pstm = connection.prepareStatement(checkSql); ResultSet result = pstm.executeQuery(); result.next(); if (result.getInt(1) == 1) { LOG.info("****表 " + table + " 已經存在****\n"); return true; } else { return false; } } catch (SQLException e1) { e1.printStackTrace(); return false; } } // 根據給出的表名稱和列的相關信息建立表 public boolean createTable(String resultTable, String createTableSQLOnOracle) { if (isExistTable(resultTable)) { return false; } else { // 在oracle中建表 createTableSQLOnOracle = createTableSQLOnOracle.replace(oracleInfo.getOracleTable(), resultTable); createTableSQLOnOracle = createTableSQLOnOracle.replace(oracleInfo.getOracleTable().toUpperCase(), resultTable); LOG.info("****開始建表,建表語句爲****\n" + createTableSQLOnOracle); try { String[] SQLArray = createTableSQLOnOracle.split(";"); for (String sql : SQLArray) { sql = sql.trim(); if (null != sql && !"\n".equals(sql) && !"".equals(sql)) { pstm = connection.prepareStatement(sql); int result = pstm.executeUpdate(); } } } catch (SQLException e) { e.printStackTrace(); return false; } oracleInfo.setOracleTable(resultTable); LOG.info("****成功建立表****\n" + resultTable); return true; } } // 刪除映射表中數據 public boolean deleteDataFromMap(String oldTable) { String insertSQL = "delete from " + mapTable + " where old_table=?"; try { pstm = connection.prepareStatement(insertSQL); pstm.setString(1, oldTable); int result = pstm.executeUpdate(); return result == 1 ? true : false; } catch (SQLException e) { e.printStackTrace(); return false; } } // 向映射表中添加數據 public boolean addDataToMap(String oldTable, String newTable) { String insertSQL = "insert into " + mapTable + " values(" + sequenceName + ".NEXTVAL,?,?)"; try { pstm = connection.prepareStatement(insertSQL); pstm.setString(1, oldTable); pstm.setString(2, newTable); int result = pstm.executeUpdate(); return result == 1 ? true : false; } catch (SQLException e) { e.printStackTrace(); return false; } } // 檢查序列是否存在 public Boolean isExistSequence(String sequence) { LOG.info("****檢查序列 " + sequence + " 是否存在****"); String checkSQL = "select count(*) from user_sequences where sequence_name='" + sequenceName.toUpperCase() + "'"; try { pstm = connection.prepareStatement(checkSQL); ResultSet result = pstm.executeQuery(); result.next(); return result.getInt(1) == 1 ? true : false; } catch (SQLException e) { e.printStackTrace(); return false; } } // 建立映射表 @SuppressWarnings("finally") public Boolean createMapTable() { Boolean result = false; String createTabkleSQL = "CREATE TABLE " + mapTable + "(id NUMBER PRIMARY KEY,old_table VARCHAR2(100),new_table VARCHAR2(100) )"; String SequenceSQL = "CREATE SEQUENCE " + sequenceName + " INCREMENT BY 1 START WITH 1 MINVALUE 1 NOMAXVALUE NOCYCLE NOCACHE"; if (!isExistTable(mapTable)) try { pstm = connection.prepareStatement(createTabkleSQL); int exeResultA = pstm.executeUpdate(); if (exeResultA == 0) { LOG.info("****成功建立映射表表****\n" + mapTable); if (!isExistSequence(sequenceName)) { pstm = connection.prepareStatement(SequenceSQL); int exeResultB = pstm.executeUpdate(); if (exeResultB == 0) { LOG.info("****成功建立序列****\n" + sequenceName); } } else { LOG.info("****序列 " + sequenceName + " 已經存在****\n"); } } } catch (SQLException e) { e.printStackTrace(); } return result; } public Connection getConnection() { try { Class.forName(DRVIER); System.out.println(URL); connection = DriverManager.getConnection(URL, oracleInfo.getOracleUserName(), oracleInfo.getOraclePassWord()); LOG.info("****成功鏈接到Oracle數據庫****\n" + oracleInfo.getOracleDatabase()); } catch (ClassNotFoundException e) { throw new RuntimeException("class not find !", e); } catch (SQLException e) { throw new RuntimeException("get connection error!", e); } return connection; } /** * 釋放資源 */ public void releaseResource() { if (rs != null) { try { rs.close(); } catch (SQLException e) { e.printStackTrace(); } } if (pstm != null) { try { pstm.close(); } catch (SQLException e) { e.printStackTrace(); } } if (connection != null) { try { connection.close(); } catch (SQLException e) { e.printStackTrace(); } } } public OracleInfo getOracleInfo() { return oracleInfo; } public void setOracleInfo(OracleInfo oracleInfo) { this.oracleInfo = oracleInfo; } public String getMapTable() { return mapTable; } public void setMapTable(String mapTable) { this.mapTable = mapTable; } public String getSequenceName() { return sequenceName; } public void setSequenceName(String sequenceName) { this.sequenceName = sequenceName; } }
一 主要功能:
將數據從MySQL遷移到HDFS,造成文本文件
二 如下是代碼實現
package com.ctg.odp.collect.dbloader.sqoop; import java.io.IOException; import org.apache.sqoop.tool.ImportAllTablesTool; import org.apache.sqoop.tool.ImportTool; import com.cloudera.sqoop.SqoopOptions; import com.ctg.odp.collect.dbloader.importToOracle.MysqlInfo; public class ImportToHDFS { private MysqlInfo mysqlInfo; private String hdfsTargetDir; private String SQLString; public ImportToHDFS(MysqlInfo mysqlInfo) { // this(mysqlInfo, "/apps/odp/data/dbloadTest"); } public ImportToHDFS(MysqlInfo mysqlInfo, String hdfsTargetDir, String SQLString) { this.mysqlInfo = mysqlInfo; this.hdfsTargetDir = hdfsTargetDir; this.SQLString = SQLString; } @SuppressWarnings("deprecation") public SqoopOptions getSqoopOptions() throws IOException { String targetDir = hdfsTargetDir; SqoopOptions options = new SqoopOptions(); ImportTool importTool = new ImportTool(); options.setActiveSqoopTool(importTool); String connecString = "jdbc:mysql://" + mysqlInfo.getMysqlHost() + ":3306/" + mysqlInfo.getMysqlDatabase(); options.setConnectString(connecString); options.setUsername(mysqlInfo.getMysqlUserName()); options.setPassword(mysqlInfo.getMysqlPassWord()); // options.setTableName(mysqlInfo.getMysqlTable()); options.setSqlQuery(SQLString); options.setTargetDir(targetDir); options.setNumMappers(4); options.setDriverClassName("com.mysql.jdbc.Driver"); options.setNullNonStringValue(""); options.setDeleteMode(true); options.setSplitByCol("1"); options.setFieldsTerminatedBy('^'); return options; } // 導入指定表的數據 public int importData() throws IOException { @SuppressWarnings("deprecation") SqoopOptions options = getSqoopOptions(); ImportTool importTablesTool = new ImportTool(); int result = importTablesTool.run(options); return result; } // 導入全部表的數據 public int importAllTableData() throws IOException { @SuppressWarnings("deprecation") SqoopOptions options = getSqoopOptions(); ImportAllTablesTool importAllTablesTool = new ImportAllTablesTool(); int result = importAllTablesTool.run(options); return result; } }
一 主要功能:將數據從HDFS遷移到Oracle
二 如下是實現代碼
package com.ctg.odp.collect.dbloader.sqoop; import java.io.IOException; import org.apache.sqoop.tool.ExportTool; import com.cloudera.sqoop.SqoopOptions; import com.ctg.odp.collect.dbloader.importToOracle.OracleInfo; public class ExportToOracle { private OracleInfo oracleInfo; private String hdfsSourceDir; public ExportToOracle(OracleInfo oracleInfo) { this(oracleInfo, "/apps/odp/data/dbload"); } public ExportToOracle(OracleInfo oracleInfo, String hdfsSourceDir) { this.oracleInfo = oracleInfo; this.hdfsSourceDir = hdfsSourceDir; } @SuppressWarnings("deprecation") public SqoopOptions getSqoopOptions() throws IOException { SqoopOptions options = new SqoopOptions(); ExportTool exportTool = new ExportTool(); options.setActiveSqoopTool(exportTool); String connecString = "jdbc:oracle:thin:@" + oracleInfo.getOracleHost() + ":1521:" + oracleInfo.getOracleDatabase(); options.setConnectString("jdbc:oracle:thin:@132.122.1.163:1521:orcl2"); options.setUsername(oracleInfo.getOracleUserName()); options.setPassword(oracleInfo.getOraclePassWord()); options.setDirectMode(true); options.setNumMappers(4); options.setExportDir(hdfsSourceDir); options.setTableName(oracleInfo.getOracleTable().toUpperCase()); String oracleManager = "org.apache.sqoop.manager.OracleManager"; options.setConnManagerClassName(oracleManager); options.setInputFieldsTerminatedBy('^'); return options; } public int exportData() throws IOException { @SuppressWarnings("deprecation") SqoopOptions options = getSqoopOptions(); ExportTool exportTool = new ExportTool(); int result = exportTool.run(options); return result; } }
1:Mysql服務,準備一張用做數據導入的數據表。這裏使用表dbload_task_instance_run_result
2:Oracle服務,開啓Oracle服務。
3:開啓Hadoop服務。
由於主類中已將數據的建表,導入,數據導入到HDFS,數據導出到Oracle這幾個過程進行了集成。因此這裏只需以Java Application運行程序便可。而後咱們能夠查看運行結果。
1:在Hadoop上能夠查看目標文件內容,以下:
1^46^增量參照列名稱^Integer^1234 1^91^qw^Integer^12 1^92^3^Integer^3 2^1^id^int^0 2^2^creattime^timestamp^2016-03-23 00:00:0.0 3^5^id^int^0 3^6^creattime^timestamp^2016-03-23 00:00:12.0 3^69^增量採集^Integer^2 42^51^creattime^timestamp^2016-03-23 00:00:12.0 47^64^qw^Date^21 47^65^wqwq^Integer^NaN 47^66^name^Integer^2 47^70^re^Integer^23 47^71^testParamsItem003^Date^1471190400000 51^62^id^Integer^NaN 58^75^id^Integer^12789499 58^76^creattime^Date^1458662405000 60^78^id^Integer^12789499 60^80^id^Integer^12789499 63^81^id^Integer^0 65^82^mdn^Integer^1 73^89^task_item_id^Integer^92 74^93^task_item_id^Integer^95 82^95^增量採集列名稱^Integer^12 88^98^task_id^Integer^93 91^103^col^Integer^112
上圖中字符^爲程序中設置的字段分隔符(默認爲英文逗號)。
2:查看結果:
在Oracle中也能夠看到實際對應的表。
JackerWang 於2017年秋天(10月26日)下午的廣州