最近項目中,使用Spark作離線計算,結果須要輸出一份結果到文件中保存,而且須要按Key來放置不一樣的目錄。由於spark經過saveAsTextFile()方法默認輸出是以part-0000的形式。java
經過搜索,很輕易的就能搜索到使用saveAsHadoopFile()方法能夠將文件輸出到自定義文件目錄。網上大部分都是scala的寫法,java的具體操做以下:json
//首先,構造出一個PariRDD形式的RDD
JavaPairRDD<String, JSONObject> javaPairRDD =xxx
//使用saveAsHadoopFile方法輸出到目標目錄下,方法參數分別爲(目標目錄,key的class類型,value的class類型,輸出format類)
javaPairRDD.saveAsHadoopFile("D:\\Test",String.class,JSONObject.class,RDDMultipleTextOutputFormat2.class);
//自定義一個RDDMultipleTextOutputFormat2繼承MultipleTextOutputFormat
public static class RDDMultipleTextOutputFormat2 extends MultipleTextOutputFormat<String, JSONObject> {
@Override
public String generateFileNameForKeyValue(String key, JSONObject value,
String name) {
String object_type = value.getString("object_type");
String object_id = value.getString("object_id");
return object_type + "/" + object_id+".json";
}
}
複製代碼
最後輸出的結果就是按"D:\Test\object_type\object_id.json
來區分保存。bash
美滋滋的打開按咱們要求輸出目錄的輸出文件。結果卻發現,輸出文件中,並非僅僅將value寫入了文件中,同時把key也寫了進去。可是咱們的文件格式是不須要key寫入文件的。ide
//輸出文件內容形式以下
key value
複製代碼
遇事不決百度Google,經過搜索引擎,能夠找到咱們經過設置rdd的key爲NullWritable,使得輸出文件中不包含key,網上一樣大多數是scala的,下面是java的具體操做:oop
/首先,構造出一個PariRDD形式的RDD
JavaPairRDD<String, JSONObject> javaPairRDD =xxx
//將PariRDD轉爲<NullWritable,T>的形式
JavaPairRDD<NullWritable, JSONObject> nullKeyJavaPairRDD = javaPairRDD.mapToPair(tuple2 -> {
return new Tuple2(NullWritable.get(),tuple2._2);
});
//接下來的操做和上面同樣
nullKeyJavaPairRDD.saveAsHadoopFile("D:\\Test",NullWritable.class,JSONObject.class,RDDMultipleTextOutputFormat.class);
public static class RDDMultipleTextOutputFormat extends MultipleTextOutputFormat<NullWritable, JSONObject> {
@Override
public String generateFileNameForKeyValue(NullWritable key, JSONObject value,
String name) {
String object_type = value.getString("object_type");
String object_id = value.getString("object_id");
return object_type + "/" + object_id+".json";
}
}
複製代碼
如上方法確實是能夠解決問題,可是若是咱們須要的輸出目錄與key有關係,想將key使用在自定義目錄中,這就辦不到了。因此這個解決方法仍是有缺陷的,僅僅能知足輸出目錄只與value有關係的狀況。ui
這裏想到另外一個解決思路,往文件寫內容老是須要一個類的,咱們找到這個類,重寫它,把key的輸出去掉,不就能夠了。this
因而咱們跟進咱們繼承的MultipleTextOutputFormat
類中搜索引擎
public class MultipleTextOutputFormat<K, V>
extends MultipleOutputFormat<K, V> {
private TextOutputFormat<K, V> theTextOutputFormat = null;
@Override
protected RecordWriter<K, V> getBaseRecordWriter(FileSystem fs, JobConf job,
String name, Progressable arg3) throws IOException {
if (theTextOutputFormat == null) {
theTextOutputFormat = new TextOutputFormat<K, V>();
}
return theTextOutputFormat.getRecordWriter(fs, job, name, arg3);
}
}
複製代碼
並無發現有相關的方法,咱們繼續跟進父類MultipleOutputFormat
,在這個類中,咱們發現了一個write方法:spa
public void write(K key, V value) throws IOException {
// get the file name based on the key
String keyBasedPath = generateFileNameForKeyValue(key, value, myName);
// get the file name based on the input file name
String finalPath = getInputFileBasedOutputFileName(myJob, keyBasedPath);
// get the actual key
K actualKey = generateActualKey(key, value);
V actualValue = generateActualValue(key, value);
RecordWriter<K, V> rw = this.recordWriters.get(finalPath);
if (rw == null) {
// if we don't have the record writer yet for the final path, create // one // and add it to the cache rw = getBaseRecordWriter(myFS, myJob, finalPath, myProgressable); this.recordWriters.put(finalPath, rw); } rw.write(actualKey, actualValue); }; 複製代碼
感受真相就在眼前了,咱們繼續跟進rw.write(actualKey, actualValue);
方法,經過斷點咱們能夠知道他進入的是TextOutPutFormat #write()
方法:scala
public synchronized void write(K key, V value)
throws IOException {
boolean nullKey = key == null || key instanceof NullWritable;
boolean nullValue = value == null || value instanceof NullWritable;
if (nullKey && nullValue) {
return;
}
if (!nullKey) {
writeObject(key);
}
if (!(nullKey || nullValue)) {
out.write(keyValueSeparator);
}
if (!nullValue) {
writeObject(value);
}
out.write(newline);
}
複製代碼
這段代碼就很簡單了,只要key不是null或者NullWritable類,他就會往文件裏輸出。這也解釋了爲何上面方法一中將key轉換爲NullWritable類就不會輸出到文件中了。
瞭解到這裏,咱們就很容易得出解決方案,咱們只要將傳入write()
方法中的key傳入null就能夠了。回到MultipleOutputFormat
類中,咱們看到傳入的key是由這個方法獲取的K actualKey = generateActualKey(key, value);
,繼續跟進:
protected K generateActualKey(K key, V value) {
return key;
}
複製代碼
接下很簡單了,咱們在自定義的format類中重寫這個方法,改成返回null便可。
public static class RDDMultipleTextOutputFormat3 extends MultipleTextOutputFormat<String, JSONObject> {
@Override
public String generateFileNameForKeyValue(String key, JSONObject value,
String name) {
String object_id = value.getString("object_id");
return key + "/" + object_id+".json";
}
@Override
public String generateActualKey(String key, JSONObject value) {
return null;
}
}
複製代碼
其實此次踩坑仍是挺簡單的,循序漸進的一路跟進就找到了,其中還有很多點是能夠延伸的,好比saveAsHadoopFile方法的的底層實現,和傳統的saveAsTextFile方法的異同,我在查資料的過程當中也發現了不少延伸的知識點,不過這些應該就是另外一篇博客了。