spark mongo 批處理

代碼以下: java


import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.mongodb.BasicDBObject;
import com.mongodb.hadoop.MongoInputFormat;
import org.apache.hadoop.conf.Configuration;
import org.apache.spark.api.java.function.Function;
import org.bson.BSONObject;
import scala.Tuple2;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;

import java.text.SimpleDateFormat;
import java.util.*;

public class SparkBatchSync {

    public static void main(String[] args) {
        SimpleDateFormat sdf = new SimpleDateFormat("yyyyMMdd");
        String today = sdf.format(new Date());
        String name = UUID.randomUUID().toString();
        JavaSparkContext sc = new JavaSparkContext(
                "spark://192.168.1.1:7999",
                "mongoSparkTest"
        );
        Configuration mongodbConfig = new Configuration();
        mongodbConfig.set("mongo.job.input.format",
                "com.mongodb.hadoop.MongoInputFormat");
        //只取出3條數據
        mongodbConfig.setInt("mongo.input.limit", 3);
        //key字段不要
        mongodbConfig.set("mongo.input.fields",
                "{key:0}");
        //配置相應的mongos庫表
        mongodbConfig.set("mongo.input.uri",
                "mongodb://192.168.1.1:27017/testdb.collection");

        JavaPairRDD<Object, BSONObject> documents = sc.newAPIHadoopRDD(
                mongodbConfig,            // Configuration
                MongoInputFormat.class,   // InputFormat: read from a live cluster.
                Object.class,             // Key class
                BSONObject.class          // Value class
        );


        //這個方法是將bson數據中的_id字段提取出來,返回一個subbson對象的rdd
        JavaRDD<BSONObject> resultRDD = documents.map(
            new Function<Tuple2<Object, BSONObject>, BSONObject>() {
                @Override
                public BSONObject call(Tuple2<Object, BSONObject> v1) throws Exception {
                    Object id = v1._2().get("_id");
                    JSONObject o = JSON.parseObject(id.toString());
                    BSONObject subbson = new BasicDBObject();
                    subbson.putAll(o);
                    return subbson;
                }
            }
        );
        
        //將rdd存儲到hdfs中
        resultRDD.repartition(1).saveAsTextFile("hdfs://192.168.1.1:9000/user/testdb/collection/" + today + "-" + name);

        //將rdd存儲到hdfs,該方法出現了亂碼
//        resultRDD.repartition(1).saveAsNewAPIHadoopFile(
//                "hdfs://192.168.1.1:9000/user/testdb/collection/" + today + "-" + name,
//                Object.class,
//                BSONObject.class,
//                BSONFileOutputFormat.class,
//                new Configuration()
//        );
    }
}

 

 

該代碼測試可用,若是有寫的不對的地方請指出來mongodb

相關文章
相關標籤/搜索