代碼以下: java
import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONObject; import com.mongodb.BasicDBObject; import com.mongodb.hadoop.MongoInputFormat; import org.apache.hadoop.conf.Configuration; import org.apache.spark.api.java.function.Function; import org.bson.BSONObject; import scala.Tuple2; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import java.text.SimpleDateFormat; import java.util.*; public class SparkBatchSync { public static void main(String[] args) { SimpleDateFormat sdf = new SimpleDateFormat("yyyyMMdd"); String today = sdf.format(new Date()); String name = UUID.randomUUID().toString(); JavaSparkContext sc = new JavaSparkContext( "spark://192.168.1.1:7999", "mongoSparkTest" ); Configuration mongodbConfig = new Configuration(); mongodbConfig.set("mongo.job.input.format", "com.mongodb.hadoop.MongoInputFormat"); //只取出3條數據 mongodbConfig.setInt("mongo.input.limit", 3); //key字段不要 mongodbConfig.set("mongo.input.fields", "{key:0}"); //配置相應的mongos庫表 mongodbConfig.set("mongo.input.uri", "mongodb://192.168.1.1:27017/testdb.collection"); JavaPairRDD<Object, BSONObject> documents = sc.newAPIHadoopRDD( mongodbConfig, // Configuration MongoInputFormat.class, // InputFormat: read from a live cluster. Object.class, // Key class BSONObject.class // Value class ); //這個方法是將bson數據中的_id字段提取出來,返回一個subbson對象的rdd JavaRDD<BSONObject> resultRDD = documents.map( new Function<Tuple2<Object, BSONObject>, BSONObject>() { @Override public BSONObject call(Tuple2<Object, BSONObject> v1) throws Exception { Object id = v1._2().get("_id"); JSONObject o = JSON.parseObject(id.toString()); BSONObject subbson = new BasicDBObject(); subbson.putAll(o); return subbson; } } ); //將rdd存儲到hdfs中 resultRDD.repartition(1).saveAsTextFile("hdfs://192.168.1.1:9000/user/testdb/collection/" + today + "-" + name); //將rdd存儲到hdfs,該方法出現了亂碼 // resultRDD.repartition(1).saveAsNewAPIHadoopFile( // "hdfs://192.168.1.1:9000/user/testdb/collection/" + today + "-" + name, // Object.class, // BSONObject.class, // BSONFileOutputFormat.class, // new Configuration() // ); } }
該代碼測試可用,若是有寫的不對的地方請指出來mongodb