package cn.edu.zucc.hdfs;java
import java.io.FileInputStream;apache
import java.io.IOException; import org.apache.hadoop.conf.Configuration;app
import org.apache.hadoop.fs.FSDataOutputStream;函數
import org.apache.hadoop.fs.FileSystem;oop
import org.apache.hadoop.fs.Path;blog
public class CopyFromLocalFile {hadoop
/** * 判斷路徑是否存在 */rem
public static boolean test(Configuration conf, String path) {get
try (FileSystem fs = FileSystem.get(conf))it
{
return fs.exists(new Path(path));
} catch (IOException e) {
e.printStackTrace();
return false;
}
}
/**
* 複製文件到指定路徑 若路徑已存在,則進行覆蓋
*/
public static void copyFromLocalFile(Configuration conf,
String localFilePath, String remoteFilePath) {
Path localPath = new Path(localFilePath);
Path remotePath = new Path(remoteFilePath);
try (FileSystem fs = FileSystem.get(conf)) {
/* fs.copyFromLocalFile 第一個參數表示是否刪除源文件,第二個參數表示是否覆蓋 */
fs.copyFromLocalFile(false, true, localPath, remotePath);
} catch (IOException e) {
e.printStackTrace();
}
}
/**
* 追加文件內容
*/
public static void appendToFile(Configuration conf, String localFilePath,
String remoteFilePath) {
Path remotePath = new Path(remoteFilePath);
try (FileSystem fs = FileSystem.get(conf);
FileInputStream in = new FileInputStream(localFilePath);) {
FSDataOutputStream out = fs.append(remotePath);
byte[] data = new byte[1024];
int read = -1;
while ((read = in.read(data)) > 0) {
out.write(data, 0, read);
}
out.close();
} catch (IOException e) {
e.printStackTrace();
}
}
/**
* 主函數
*/
public static void main(String[] args) {
Configuration conf = new Configuration();
conf.set("fs.defaultFS", "hdfs://localhost:9000");
String localFilePath = "/usr/local/hadoop/text.txt"; // 本地路徑
String remoteFilePath = "/user/tiny/text.txt"; // HDFS路徑
// String choice = "append"; // 若文件存在則追加到文件末尾
String choice = "overwrite"; // 若文件存在則覆蓋
try {
/* 判斷文件是否存在 */
boolean fileExists = false;
if (CopyFromLocalFile.test(conf, remoteFilePath)) {
fileExists = true;
System.out.println(remoteFilePath + " 已存在.");
} else {
System.out.println(remoteFilePath + " 不存在.");
}
/* 進行處理 */
if (!fileExists) { // 文件不存在,則上傳
CopyFromLocalFile.copyFromLocalFile(conf, localFilePath,
remoteFilePath);
System.out.println(localFilePath + " 已上傳至 " + remoteFilePath);
} else if (choice.equals("overwrite")) { // 選擇覆蓋
CopyFromLocalFile.copyFromLocalFile(conf, localFilePath,
remoteFilePath);
System.out.println(localFilePath + " 已覆蓋 " + remoteFilePath);
} else if (choice.equals("append")) { // 選擇追加
CopyFromLocalFile.appendToFile(conf, localFilePath,
remoteFilePath);
System.out.println(localFilePath + " 已追加至 " + remoteFilePath);
}
} catch (Exception e) {
e.printStackTrace();
}
}
}