本樣例是使用spark從mysql中讀出數據java
import java.io.Serializable; import java.sql.*; import java.util.Properties; import com.alibaba.fastjson.JSONObject; import com.yhxd.einvoicegd.InvoiceAPI; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.VoidFunction; import org.apache.spark.rdd.JdbcRDD; import scala.reflect.ClassManifestFactory$; import scala.runtime.AbstractFunction0; import scala.runtime.AbstractFunction1; public class Main { private static final org.apache.log4j.Logger LOGGER = org.apache.log4j.Logger.getLogger(Main.class); private static final JavaSparkContext sc = new JavaSparkContext(new SparkConf().setAppName("SparkJdbc").setMaster("local[*]")); private static final String MYSQL_DRIVER = "com.mysql.jdbc.Driver"; private static final String MYSQL_CONNECTION_URL = "jdbc:mysql://192.168.3.50:3306/9dfp"; private static final String MYSQL_USERNAME = "root"; private static final String MYSQL_PWD = "123456"; public static void main(String[] args) { DbConnection dbConnection = new DbConnection(MYSQL_DRIVER, MYSQL_CONNECTION_URL, MYSQL_USERNAME, MYSQL_PWD); // Load data from MySQL JdbcRDD<JSONObject> jdbcRDD = new JdbcRDD<>(sc.sc(), dbConnection, "select * from b_fo where 0 >= ? and 0 <= ?", 0, 10, 2, new MapResult(), ClassManifestFactory$.MODULE$.fromClass(JSONObject.class)); // Convert to JavaRDD JavaRDD<JSONObject> javaRDD = JavaRDD.fromRDD(jdbcRDD, ClassManifestFactory$.MODULE$.fromClass(JSONObject.class)); javaRDD.foreach(new VoidFunction<JSONObject>() { @Override public void call(JSONObject jsonObject) throws Exception { System.out.println(jsonObject); } }); } static class DbConnection extends AbstractFunction0<Connection> implements Serializable { private String driverClassName; private String connectionUrl; private String userName; private String password; public DbConnection(String driverClassName, String connectionUrl, String userName, String password) { this.driverClassName = driverClassName; this.connectionUrl = connectionUrl; this.userName = userName; this.password = password; } @Override public Connection apply() { try { Class.forName(driverClassName); } catch (ClassNotFoundException e) { LOGGER.error("Failed to load driver class", e); } Properties properties = new Properties(); properties.setProperty("user", userName); properties.setProperty("password", password); Connection connection = null; try { connection = DriverManager.getConnection(connectionUrl, properties); } catch (SQLException e) { LOGGER.error("Connection failed", e); } return connection; } } static class MapResult extends AbstractFunction1<ResultSet, JSONObject> implements Serializable { public JSONObject apply(ResultSet resultSet) { ResultSetMetaData metaData = null; JSONObject jsonObj = new JSONObject(); try { metaData = resultSet.getMetaData(); int columnCount = metaData.getColumnCount(); // 遍歷每一列 for (int i = 1; i <= columnCount; i++) { String columnName = metaData.getColumnLabel(i); String value = resultSet.getString(columnName); jsonObj.put(columnName, value); } } catch (SQLException e) { e.printStackTrace(); } return jsonObj; } } }
參考連接:mysql
http://www.sparkexpert.com/2015/01/02/load-database-data-into-spark-using-jdbcrdd-in-java/sql