本例是使用spark的dataframe方式從mysql中獲取數據,DataFrame這個抽象類在spark sql 2.0.1的版本中已經沒有了,取而代之的是使用DataSet<Row> 來解析從關係型數據庫查出來的數據java
import com.alibaba.fastjson.JSONObject; import org.apache.spark.SparkConf; import org.apache.spark.SparkContext; import org.apache.spark.api.java.function.ForeachFunction; import org.apache.spark.sql.*; import org.apache.spark.sql.types.StructField; import java.util.Properties; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import scala.collection.Iterator; public class SparkDataFrame { private static final Logger logger = LoggerFactory.getLogger(SparkDataFrame.class); private static final SparkContext sc = new SparkContext(new SparkConf().setAppName("SparkJdbcDs").setMaster("local[*]")); private static final SparkSession ss = new SparkSession(sc); private static final SQLContext sqlContext = new SQLContext(ss); public static void main(String[] args) { // 一個條件表示一個分區,手動分區,能夠在配置文件中配置 String[] predicates = new String[] { "1=1 order by insertdate between '2017-03-11 16:37:05' and '2017-03-14 09:52:28'", "1=1 order by insertdate between '2017-03-15 16:37:05' and '2017-03-16 09:52:28'", "1=1 order by insertdate between '2017-03-16 16:37:05' and '2017-03-17 09:52:28'", "1=1 order by insertdate between '2017-03-17 16:37:05' and '2017-03-18 09:52:28'", "1=1 order by insertdate between '2017-03-19 16:37:05' and '2017-03-20 09:52:28'" }; String url = "jdbc:mysql://192.168.3.50:3306/database"; String table = "t_electroniceinfowhole"; Properties connectionProperties = new Properties(); connectionProperties.setProperty("dbtable", table);// 設置表 connectionProperties.setProperty("user", "root");// 設置用戶名 connectionProperties.setProperty("password", "123456");// 設置密碼 //Load MySQL query result as DataFrame Dataset<Row> jdbcDF = sqlContext.read().jdbc(url, table, predicates, connectionProperties); jdbcDF.foreach(new ForeachFunction<Row>() { @Override public void call(Row row) throws Exception { Iterator<StructField> it = row.schema().iterator(); JSONObject jsonObject = new JSONObject(); while (it.hasNext()) { StructField i = it.next(); int index = row.fieldIndex(i.name()); Object o = row.get(index); jsonObject.put(i.name(), o); } logger.info(jsonObject.toJSONString()); //TODO } }); } }
源碼中jdbc()方法有兩個能夠調用,一個就是本身傳入一個String[] 做爲分區,一個是使用lowerbound和upperbounds分區,但使用lowerbounds和upperbounds的時候須要在關係型數據庫中有個type爲int或者long的columnName,根據這個字段來分區,若是沒有這個字段就不能分區了mysql
sparksql jdbc方法的源碼:sql
//入參中有個columnName,用這個列值的範圍進行分區 public Dataset<Row> jdbc(String url, String table, String columnName, long lowerBound, long upperBound, int numPartitions, Properties connectionProperties) { JDBCPartitioningInfo partitioning = new JDBCPartitioningInfo(columnName, lowerBound, upperBound, numPartitions); Partition[] parts = org.apache.spark.sql.execution.datasources.jdbc.JDBCRelation..MODULE$.columnPartition(partitioning); return this.jdbc(url, table, parts, connectionProperties); } //這個就好一些,能夠自定義分區範圍 public Dataset<Row> jdbc(String url, String table, String[] predicates, Properties connectionProperties) { Partition[] parts = (Partition[]).MODULE$.refArrayOps((Object[]).MODULE$.refArrayOps((Object[])predicates).zipWithIndex(scala.Array..MODULE$.canBuildFrom(scala.reflect.ClassTag..MODULE$.apply(Tuple2.class)))).map(new Serializable(this) { public static final long serialVersionUID = 0L; public final Partition apply(Tuple2<String, Object> x0$1) { if(x0$1 != null) { String part = (String)x0$1._1(); int i = x0$1._2$mcI$sp(); JDBCPartition var5 = new JDBCPartition(part, i); return var5; } else { throw new MatchError(x0$1); } } }, scala.Array..MODULE$.canBuildFrom(scala.reflect.ClassTag..MODULE$.apply(Partition.class))); return this.jdbc(url, table, parts, connectionProperties); }