在eclipse中調用JavaAPI實現HDFS中的相關操做java
一、建立一個java工程apache
二、右鍵工程,在屬性裏添加上hadoop解壓後的相關jar包(hadoop目錄下的jar包和lib目錄下的jar包)服務器
三、調用相關代碼,實現相關hdfs操做網絡
1 package hdfs; 2 3 import java.io.InputStream; 4 import java.net.URL; 5 6 import org.apache.hadoop.fs.FsUrlStreamHandlerFactory; 7 import org.apache.hadoop.io.IOUtils; 8 9 public class App1 { 10 /** 11 * 異常:unknown host: chaoren 本機沒有解析主機名chaoren 12 * 在C:\Windows\System32\drivers\etc\hosts文件中添加192.168.80.100 13 * chaoren(win10中要添加寫入權限才能寫入) 14 */ 15 static final String PATH = "hdfs://chaoren:9000/hello"; 16 17 public static void main(String[] args) throws Exception { 18 // 讓URL可以解析hdfs協議 19 URL.setURLStreamHandlerFactory(new FsUrlStreamHandlerFactory()); 20 URL url = new URL(PATH); 21 InputStream in = url.openStream(); 22 /** 23 * @param in 24 * 輸入流 25 * @param out 26 * 輸出流 27 * @param buffSize 28 * 緩衝大小 29 * @param close 30 * 在傳輸結束後是否關閉流 31 */ 32 IOUtils.copyBytes(in, System.out, 1024, true);// 讀取文件hello中的內容 33 } 34 35 }
1 package hdfs; 2 3 import java.io.FileInputStream; 4 import java.io.FileNotFoundException; 5 import java.io.IOException; 6 import java.net.URI; 7 import java.net.URISyntaxException; 8 9 import org.apache.hadoop.conf.Configuration; 10 import org.apache.hadoop.fs.FSDataInputStream; 11 import org.apache.hadoop.fs.FSDataOutputStream; 12 import org.apache.hadoop.fs.FileStatus; 13 import org.apache.hadoop.fs.FileSystem; 14 import org.apache.hadoop.fs.Path; 15 import org.apache.hadoop.io.IOUtils; 16 17 public class App2 { 18 static final String PATH = "hdfs://chaoren:9000/"; 19 static final String DIR = "/d1"; 20 static final String FILE = "/d1/hello"; 21 22 public static void main(String[] args) throws Exception { 23 FileSystem fileSystem = getFileSystem(); 24 25 // 建立文件夾 hadoop fs -mkdir /d1 26 mkDir(fileSystem); 27 28 // 上傳文件 hadoop fs -put src des 29 putData(fileSystem); 30 31 // 下載文件 hadoop fs -get src des 32 getData(fileSystem); 33 34 // 瀏覽文件夾 hadoop fs -lsr path 35 list(fileSystem); 36 37 // 刪除文件夾 hadoop fs -rmr /d1 38 remove(fileSystem); 39 } 40 41 private static void remove(FileSystem fileSystem) throws IOException { 42 fileSystem.delete(new Path(DIR), true); 43 } 44 45 private static void list(FileSystem fileSystem) throws IOException { 46 FileStatus[] listStatus = fileSystem.listStatus(new Path("/")); 47 for (FileStatus fileStatus : listStatus) { 48 String isDir = fileStatus.isDir() ? "文件夾" : "文件"; 49 String permission = fileStatus.getPermission().toString(); 50 int replication = fileStatus.getReplication(); 51 long len = fileStatus.getLen(); 52 String path = fileStatus.getPath().toString(); 53 System.out.println(isDir + "\t" + permission + "\t" + replication 54 + "\t" + len + "\t" + path); 55 } 56 } 57 58 private static void getData(FileSystem fileSystem) throws IOException { 59 FSDataInputStream inputStream = fileSystem.open(new Path(FILE)); 60 IOUtils.copyBytes(inputStream, System.out, 1024, true); 61 } 62 63 private static void putData(FileSystem fileSystem) throws IOException, 64 FileNotFoundException { 65 FSDataOutputStream out = fileSystem.create(new Path(FILE)); 66 FileInputStream in = new FileInputStream("C:/Users/ahu_lichang/cp.txt");// 斜槓方向跟Windows下是相反的 67 IOUtils.copyBytes(in, out, 1024, true); 68 } 69 70 private static void mkDir(FileSystem fileSystem) throws IOException { 71 fileSystem.mkdirs(new Path(DIR)); 72 } 73 74 private static FileSystem getFileSystem() throws IOException, 75 URISyntaxException { 76 FileSystem fileSystem = FileSystem.get(new URI(PATH), 77 new Configuration()); 78 return fileSystem; 79 } 80 81 }
RPC
1.1 RPC (remote procedure call)遠程過程調用.
遠程過程指的是否是同一個進程。
1.2 RPC至少有兩個過程。調用方(client),被調用方(server)。
1.3 client主動發起請求,調用指定ip和port的server中的方法,把調用結果返回給client。
1.4 RPC是hadoop構建的基礎。eclipse
示例:oop
1 package rpc; 2 3 import org.apache.hadoop.ipc.VersionedProtocol; 4 5 public interface MyBizable extends VersionedProtocol{ 6 long VERSION = 2345L; 7 public abstract String hello(String name); 8 }
1 package rpc; 2 3 import java.io.IOException; 4 5 public class MyBiz implements MyBizable{ 6 7 public long getProtocolVersion(String arg0, long arg1) throws IOException { 8 return VERSION; 9 } 10 11 public String hello(String name) { 12 System.out.println("方法被調用了(檢測方法是否是在服務器上被調用的?)"); 13 return "hello "+name; 14 } 15 16 }
1 package rpc; 2 3 import org.apache.hadoop.conf.Configuration; 4 import org.apache.hadoop.ipc.RPC; 5 import org.apache.hadoop.ipc.RPC.Server; 6 7 public class MyServer { 8 static final String ADDRESS = "localhost"; 9 static final int PORT = 12345; 10 public static void main(String[] args) throws Exception { 11 /** 12 * 構造一個RPC的服務端 13 * @param instance 這個實例中的方法會被調用 14 * @param bindAddress 綁定的地址是用於監聽鏈接的 15 * @param port 綁定的端口是用於監聽鏈接的 16 * @pparam conf 17 */ 18 Server server = RPC.getServer(new MyBiz(), ADDRESS, PORT, new Configuration()); 19 server.start(); 20 } 21 22 }
1 package rpc; 2 3 import java.net.InetSocketAddress; 4 5 import org.apache.hadoop.conf.Configuration; 6 import org.apache.hadoop.ipc.RPC; 7 8 public class MyClient { 9 public static void main(String[] args) throws Exception { 10 /** 11 * 構造一個客戶端代理對象,該代理對象實現了命名的協議。代理對象會與指定地址的服務器通話 12 */ 13 MyBizable proxy = (MyBizable) RPC.waitForProxy(MyBizable.class, 14 MyBizable.VERSION, new InetSocketAddress(MyServer.ADDRESS, 15 MyServer.PORT), new Configuration()); 16 String result = proxy.hello("hadoop!!!"); 17 System.out.println("客戶端RPC後的結果:" + result); 18 // 關閉網絡鏈接 19 RPC.stopProxy(proxy); 20 } 21 }
經過例子得到的認識
2.1 RPC是一個遠程過程調用。
2.2 客戶端調用服務端的方法,意味着調用服務端的對象中的方法。
2.3 若是服務端的對象容許客戶端調用,那麼這個對象必須實現接口。
2.4 若是客戶端可以調用到服務端對象的方法,那麼這些方法必定位於對象的接口中。url