有時候你會遇到這樣的需求,根據數據中的某一個字段進行輸出,將相同key的數據輸出到一個文件夾下的一個文件中。java
使用saveAsHadoopFile對數據進行輸出。apache
1. 保證同一個Key的數據在同一個分區api
2. 自定義MultipleTextOutputFormat類oop
import org.apache.hadoop.mapred.lib.MultipleTextOutputFormat; import org.apache.spark.HashPartitioner; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.PairFunction; import scala.Tuple2; /** * Created on 16/6/3 13:22 * * @author Daniel */ public class SparkMultipleTextOutput { public static class RDDMultipleTextOutputFormat extends MultipleTextOutputFormat<String, String> { public String generateFileNameForKeyValue(String key, String value, String name) { //輸出格式 /ouput/key/key.csv return key + "/" + key+".csv"; } } public static void main(String[] args) { SparkConf conf = new SparkConf().setAppName("Test").setMaster("local[2]"); conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer"); JavaSparkContext sc = new JavaSparkContext(conf); //加載文件 JavaRDD<String> rdd1 = sc.textFile("/Users/Daniel/test/test_file.csv"); //將data轉化爲K,V JavaPairRDD<String,String> rdd2 = rdd1.mapToPair(new PairFunction<String, String, String>() { public Tuple2<String, String> call(String s) throws Exception { String[] data = s.split(","); String key = data[0]; String value = data[1]; return new Tuple2<String, String>(key, value); } //這裏是關鍵點,只要保證同一個Key的數據在同一個分區便可 //上邊的要求通常有2種方式實現 //第一種 .repartition(1); 測試功能的時候能夠使用,現網本身看着辦吧. //第二種 .partitionBy(); 保證同一個key到一個分區. }).partitionBy(new HashPartitioner(2)); //將JavaPairRDD類型的RDD輸出. rdd2.saveAsHadoopFile("/Users/Daniel/test2", String.class, String.class, RDDMultipleTextOutputFormat.class); } }
測試數據以下圖:測試
測試結果以下圖:spa
不足的地方就是輸出的KV 不少狀況下你們都是隻需求V,這個我稍後會二次開發一下。而後再發博客哈scala