sparksql加載mongodb指定字段,並對加載進來的json作解析

若是是要讀取mongo全表的數據的話,推薦使用mongo-spark,更簡單方便java

我我的的需求是要讀取mongo的指定列,由於全表數據量太大,sql

並對加載進來的json數據進行解析,解析框架用的是alibaba封裝的fastjson框架。mongodb

package spark_read;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;apache

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.VoidFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
import com.mongodb.MongoClient;
import com.mongodb.MongoClientURI;json

public class Read_Mongo {api

public static void main(String[] args) {
JavaSparkContext jsc = createJavaSparkContext(args);
String uri = getMongoClientURI(args);

SQLContext sqlContext = new SQLContext(jsc);

//只加載須要讀取的字段,減小內存的消耗,並用sparksql的方式進行數據融合,減小資源消耗並提高開發效率以及代碼簡潔性
String list = "_record_id,_in_time,_src";
String[] strings = list.split(",");

//對想要獲取的字段切割後,建立structFields,而後建立StructType(DF的schema)
List<StructField> structFields = new ArrayList<StructField>();
for (String s : strings) {
structFields.add(DataTypes.createStructField(s, DataTypes.StringType, true ));
}
StructType schema = DataTypes.createStructType( structFields );
//將options的配置信息存儲到一個map裏
Map<String, String> map = new HashMap<String, String>();
map.put("uri",uri);
map.put("database", " ");
map.put("collection"," ");
//利用上面自定義的DF的schema 和配置信息讀取momngo指定表裏的指定列,對讀取的列聲明成虛擬表,能夠進一步操做,若有須要能夠轉換成RDD進行操做
Dataset<Row> load = sqlContext.read().schema(schema).format("com.mongodb.spark.sql").options(map).load();
load.registerTempTable("mongo");
Dataset<Row> result = sqlContext.sql("select * from mongo");

result.toJavaRDD().foreach(new VoidFunction<Row>() {
private static final long serialVersionUID = 1L;
public void call(Row arg0) throws Exception {
System.out.println(arg0.toString());
}
});


//一次性加載momgo表裏的全部數據,利用alibaba封裝的json轉換工具轉換成想要的格式
// JavaMongoRDD<Document> mongoRDD = MongoSpark.load(jsc);
// mongoRDD.foreach(new VoidFunction<Document>() {
//
// private static final long serialVersionUID = 1L;
//
// public void call(Document document) throws Exception {
//
// String data = document.toJson();
//
// JSONObject jsonObject = JSON.parseObject(data);
//
// JSONArray src = jsonObject.getJSONArray("_src");
//
// JSONObject src_obj = (JSONObject) src.get(0);
//
// System.out.println(src_obj.getString("site"));app

// }
// });
}框架

/**
建立spark鏈接,並設置mongodb讀寫路徑信息
*/
private static JavaSparkContext createJavaSparkContext(final String[] args) {
String uri = getMongoClientURI(args);
//dropDatabase(uri);
SparkConf conf = new SparkConf()
.setMaster("local")
.setAppName("MongoSparkConnectorTest")
.set("spark.app.id", "MongoSparkConnectorTour")
.set("spark.mongodb.input.uri", uri)
.set("spark.mongodb.output.uri", uri); 工具

return new JavaSparkContext(conf);
} spa

/**
刪除mongo已存在文件
*/
private static void dropDatabase(final String connectionString) {
MongoClientURI uri = new MongoClientURI(connectionString);
new MongoClient(uri).dropDatabase(uri.getDatabase());
}

/**
獲取mondo讀寫路徑
*/
private static String getMongoClientURI(final String[] args) {
String uri;
if (args.length == 0) {
uri = "mongodb://ip:27017"; // default
} else {
uri = args[0];
}
return uri;
}

}

相關文章
相關標籤/搜索