Spark根據Key多目錄多文件輸出

1.前言

有時候你會遇到這樣的需求,根據數據中的某一個字段進行輸出,將相同key的數據輸出到一個文件夾下的一個文件中。java

2.實現方式

2.1 如何實現

使用saveAsHadoopFile對數據進行輸出。apache

1. 保證同一個Key的數據在同一個分區api

2. 自定義MultipleTextOutputFormat類oop

2.2 代碼

import org.apache.hadoop.mapred.lib.MultipleTextOutputFormat;
import org.apache.spark.HashPartitioner;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.PairFunction;
import scala.Tuple2;

/**
 * Created on 16/6/3 13:22
 *
 * @author Daniel
 */
public class SparkMultipleTextOutput {

    public static class RDDMultipleTextOutputFormat extends MultipleTextOutputFormat<String, String> {

        public String generateFileNameForKeyValue(String key, String value,
                                                     String name) {
            //輸出格式 /ouput/key/key.csv
            return key + "/" + key+".csv";
        }

    }

    public static void main(String[] args) {

        SparkConf conf = new SparkConf().setAppName("Test").setMaster("local[2]");
        conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");

        JavaSparkContext sc = new JavaSparkContext(conf);

        //加載文件
        JavaRDD<String> rdd1 = sc.textFile("/Users/Daniel/test/test_file.csv");

        //將data轉化爲K,V
        JavaPairRDD<String,String> rdd2 = rdd1.mapToPair(new PairFunction<String, String, String>() {
            public Tuple2<String, String> call(String s) throws Exception {

                String[] data = s.split(",");
                String key = data[0];
                String value = data[1];
                return new Tuple2<String, String>(key, value);
            }
            //這裏是關鍵點,只要保證同一個Key的數據在同一個分區便可
            //上邊的要求通常有2種方式實現
            //第一種 .repartition(1); 測試功能的時候能夠使用,現網本身看着辦吧.
            //第二種 .partitionBy(); 保證同一個key到一個分區.
        }).partitionBy(new HashPartitioner(2));

        //將JavaPairRDD類型的RDD輸出.
        rdd2.saveAsHadoopFile("/Users/Daniel/test2", String.class, String.class, RDDMultipleTextOutputFormat.class);
    }

}

3. 測試

 測試數據以下圖:測試

測試結果以下圖:spa

不足的地方就是輸出的KV 不少狀況下你們都是隻需求V,這個我稍後會二次開發一下。而後再發博客哈scala

相關文章
相關標籤/搜索