1 //overwrite JdbcDialect fitting for Oracle 2 val OracleDialect = new JdbcDialect { 3 override def canHandle(url: String): Boolean = url.startsWith("jdbc:oracle") || url.contains("oracle") 4 5 //getJDBCType is used when writing to a JDBC table 6 override def getJDBCType(dt: DataType): Option[JdbcType] = dt match { 7 case StringType => Some(JdbcType("VARCHAR2(255)", java.sql.Types.VARCHAR)) 8 case BooleanType => Some(JdbcType("NUMBER(1)", java.sql.Types.NUMERIC)) 9 case IntegerType => Some(JdbcType("NUMBER(16)", java.sql.Types.NUMERIC)) 10 case LongType => Some(JdbcType("NUMBER(16)", java.sql.Types.NUMERIC)) 11 case DoubleType => Some(JdbcType("NUMBER(16,4)", java.sql.Types.NUMERIC)) 12 case FloatType => Some(JdbcType("NUMBER(16,4)", java.sql.Types.NUMERIC)) 13 case ShortType => Some(JdbcType("NUMBER(5)", java.sql.Types.NUMERIC)) 14 case ByteType => Some(JdbcType("NUMBER(3)", java.sql.Types.NUMERIC)) 15 case BinaryType => Some(JdbcType("BLOB", java.sql.Types.BLOB)) 16 case TimestampType => Some(JdbcType("DATE", java.sql.Types.DATE)) 17 case DateType => Some(JdbcType("DATE", java.sql.Types.DATE)) 18 // case DecimalType.Fixed(precision, scale) => Some(JdbcType("NUMBER(" + precision + "," + scale + ")", java.sql.Types.NUMERIC)) 19 case DecimalType.Unlimited => Some(JdbcType("NUMBER(38,2)", java.sql.Types.NUMERIC)) 20 case _ => None 21 } 22 } 23 //Registering the OracleDialect 24 JdbcDialects.registerDialect(OracleDialect) 25 26 val connectProperties = new java.util.Properties() 27 connectProperties.put("user", username) 28 connectProperties.put("password", password) 29 Class.forName(driver).newInstance() 30 31 //write back Oracle 32 //Note: When writing the results back orale, be sure that the target table existing 33 JdbcUtils.saveTable(mr_case_df, oracleDriverUrl, "MR", connectProperties)
val test_df=hiveContext.sql("select * from test") test_df.foreachPartition(rows => { Class.forName(driver) val connection: Connection = DriverManager.getConnection(oracleDriverUrl, username, password) val prepareStatement: PreparedStatement = connection.prepareStatement("insert into RES_CELL(City,Latiude,longitude)values(?,?,?);") rows.foreach(row => { prepareStatement.setString(1, row.getAs[String]("city")) prepareStatement.setString(2, row.getAs[String]("latitude")) prepareStatement.setString(3, row.getAs[String]("longitude")) prepareStatement.addBatch() }) prepareStatement.executeBatch() prepareStatement.close() connection.close() })
使用sqlloader從spark任務提交節點讀取文件導入到oracle。java
爲何操做,緣由直接從spark中讀取hive中的數據使用網絡IO鏈接到集羣外的oracle服務器是spark集羣不樂意作的事情,對SPARK寶貴的網絡IO來講開銷特別大。git