spark dataframe 從mysql 中查詢數據

本例是使用spark的dataframe方式從mysql中獲取數據,DataFrame這個抽象類在spark sql 2.0.1的版本中已經沒有了,取而代之的是使用DataSet<Row> 來解析從關係型數據庫查出來的數據java

 

import com.alibaba.fastjson.JSONObject;
import org.apache.spark.SparkConf;
import org.apache.spark.SparkContext;
import org.apache.spark.api.java.function.ForeachFunction;
import org.apache.spark.sql.*;
import org.apache.spark.sql.types.StructField;
import java.util.Properties;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.collection.Iterator;

public class SparkDataFrame {

    private static final Logger logger = LoggerFactory.getLogger(SparkDataFrame.class);

    private static final SparkContext sc =
            new SparkContext(new SparkConf().setAppName("SparkJdbcDs").setMaster("local[*]"));
    private static final SparkSession ss = new SparkSession(sc);

    private static final SQLContext sqlContext = new SQLContext(ss);

    public static void main(String[] args) {

        // 一個條件表示一個分區,手動分區,能夠在配置文件中配置
        String[] predicates = new String[] {
                "1=1 order by insertdate between '2017-03-11 16:37:05' and '2017-03-14 09:52:28'",
                "1=1 order by insertdate between '2017-03-15 16:37:05' and '2017-03-16 09:52:28'",
                "1=1 order by insertdate between '2017-03-16 16:37:05' and '2017-03-17 09:52:28'",
                "1=1 order by insertdate between '2017-03-17 16:37:05' and '2017-03-18 09:52:28'",
                "1=1 order by insertdate between '2017-03-19 16:37:05' and '2017-03-20 09:52:28'" };


        String url = "jdbc:mysql://192.168.3.50:3306/database";
        String table = "t_electroniceinfowhole";
        Properties connectionProperties = new Properties();
        connectionProperties.setProperty("dbtable", table);// 設置表
        connectionProperties.setProperty("user", "root");// 設置用戶名
        connectionProperties.setProperty("password", "123456");// 設置密碼

        //Load MySQL query result as DataFrame
        Dataset<Row> jdbcDF = sqlContext.read().jdbc(url, table, predicates, connectionProperties);

        jdbcDF.foreach(new ForeachFunction<Row>() {
            @Override
            public void call(Row row) throws Exception {
                Iterator<StructField> it = row.schema().iterator();
                JSONObject jsonObject = new JSONObject();
                while (it.hasNext()) {
                    StructField i = it.next();
                    int index = row.fieldIndex(i.name());
                    Object o = row.get(index);
                    jsonObject.put(i.name(), o);
                }
                logger.info(jsonObject.toJSONString());
                //TODO 
            }
        });
    }
}

 

源碼中jdbc()方法有兩個能夠調用,一個就是本身傳入一個String[] 做爲分區,一個是使用lowerbound和upperbounds分區,但使用lowerbounds和upperbounds的時候須要在關係型數據庫中有個type爲int或者long的columnName,根據這個字段來分區,若是沒有這個字段就不能分區了mysql

sparksql jdbc方法的源碼:sql

//入參中有個columnName,用這個列值的範圍進行分區
public Dataset<Row> jdbc(String url, String table, String columnName, long lowerBound, long upperBound, int numPartitions, Properties connectionProperties) {
    JDBCPartitioningInfo partitioning = new JDBCPartitioningInfo(columnName, lowerBound, upperBound, numPartitions);
    Partition[] parts = org.apache.spark.sql.execution.datasources.jdbc.JDBCRelation..MODULE$.columnPartition(partitioning);
    return this.jdbc(url, table, parts, connectionProperties);
}

//這個就好一些,能夠自定義分區範圍
public Dataset<Row> jdbc(String url, String table, String[] predicates, Properties connectionProperties) {
    Partition[] parts = (Partition[]).MODULE$.refArrayOps((Object[]).MODULE$.refArrayOps((Object[])predicates).zipWithIndex(scala.Array..MODULE$.canBuildFrom(scala.reflect.ClassTag..MODULE$.apply(Tuple2.class)))).map(new Serializable(this) {
        public static final long serialVersionUID = 0L;

        public final Partition apply(Tuple2<String, Object> x0$1) {
            if(x0$1 != null) {
                String part = (String)x0$1._1();
                int i = x0$1._2$mcI$sp();
                JDBCPartition var5 = new JDBCPartition(part, i);
                return var5;
            } else {
                throw new MatchError(x0$1);
            }
        }
    }, scala.Array..MODULE$.canBuildFrom(scala.reflect.ClassTag..MODULE$.apply(Partition.class)));
    return this.jdbc(url, table, parts, connectionProperties);
}
相關文章
相關標籤/搜索