import java.io.FileInputStream; import java.io.FileOutputStream; import java.io.IOException; import java.net.URI; import org.apache.commons.io.IOUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.junit.Before; import org.junit.Test; /** * 用流的方式來操做hdfs上的文件 * 能夠實現讀取指定偏移量範圍的數據 * @author * */ public class HdfsStreamAccess { FileSystem fs = null; Configuration conf = null; @Before public void init() throws Exception{ conf = new Configuration(); //拿到一個文件系統操做的客戶端實例對象 // fs = FileSystem.get(conf); //能夠直接傳入 uri和用戶身份 fs = FileSystem.get(new URI("hdfs://node2:8020"),conf,"root"); } /** * 經過流的方式上傳文件到hdfs * @throws Exception */ @Test public void testUpload() throws Exception { FSDataOutputStream outputStream = fs.create(new Path("/aaa"), true); FileInputStream inputStream = new FileInputStream("d:/bbb"); IOUtils.copy(inputStream, outputStream); } /** * 經過流的方式獲取hdfs上數據 * @throws Exception */ @Test public void testDownLoad() throws Exception { FSDataInputStream inputStream = fs.open(new Path("/aaa")); FileOutputStream outputStream = new FileOutputStream("d:/ccc"); IOUtils.copy(inputStream, outputStream); } @Test public void testRandomAccess() throws Exception{ FSDataInputStream inputStream = fs.open(new Path("/aaa")); inputStream.seek(12); FileOutputStream outputStream = new FileOutputStream("d:/ddd"); IOUtils.copy(inputStream, outputStream); } /** * 顯示hdfs上文件的內容 * @throws IOException * @throws IllegalArgumentException */ @Test public void testCat() throws IllegalArgumentException, IOException{ FSDataInputStream in = fs.open(new Path("/aaa")); IOUtils.copy(in, System.out); // IOUtils.copyBytes(in, System.out, 1024); } }