首先來看一下, FileSystem(org.apache.hadoop.fs.FileSystem), 這是一個抽象類, 是全部文件系統的父類.java
而咱們要從HDFS(Hadoop Distributed FileSystem)下載數據, 應該獲取一個DistributedFileSystem的實例,那麼如何獲取一個DistributedFileSystem的實例呢?node
FileSystem fs = FileSystem.get(new Configuration());
在FileSystem中有3個重載的get()方法
apache
// 1.經過配置文件獲取一個FileSystem實例 public static FileSystem get(Configuration conf) // 2.經過指定的FileSystem的URI, 配置文件獲取一個FileSystem實例 public static FileSystem get(URI uri, Configuration conf) // 3.經過指定的FileSystem的URI, 配置文件, FileSystem用戶名獲取一個FileSystem實例 public static FileSystem get(final URI uri, final Configuration conf, final String user)
先調用FileSystem.get(Configuration conf)方法,再調用重載方法FileSystem.get(URI uri, Configuration conf)緩存
public static FileSystem get(URI uri, Configuration conf) throws IOException { // schem是FileSystem具體的URI方案如: file, hdfs, Webhdfs, har等等 String scheme = uri.getScheme(); // scheme = hdfs // authority是NameNode的主機名, 端口號 String authority = uri.getAuthority(); // authority = node1:9000 ... // disableCacheName = fs.hdfs.impl.disable.cache String disableCacheName = String.format("fs.%s.impl.disable.cache", scheme); // 讀取配置文件, 判斷是否禁用緩存 if (conf.getBoolean(disableCacheName, false)) { // 若禁用緩存 return createFileSystem(uri, conf); // 直接調用建立FileSystem實例的方法 } // 不由用緩存, 先從FileSystem的靜態成員變量CACHE中獲取FileSystem實例 return CACHE.get(uri, conf); }
再調用FileSystem$Cache.get(URI uri, Configuration conf)方法(Cache是FileSystem的靜態內部類)多線程
FileSystem get(URI uri, Configuration conf) throws IOException{ Key key = new Key(uri, conf); // key = (root (auth:SIMPLE))@hdfs://node1:9000 return getInternal(uri, conf, key); }
再調用FileSystem$Cache.getInternal(URI uri, Configuration conf, FileSystem$Cache$Key key)方法(Key又是Cache的靜態內部類)分佈式
private FileSystem getInternal(URI uri, Configuration conf, Key key) throws IOException{ FileSystem fs; synchronized (this) { // map是Cache中用來緩存FileSystem實例的成員變量, 其類型爲HashMap<Key, FileSystem> fs = map.get(key); } if (fs != null) { // 若是從緩存map中獲取到了相應的FileSystem實例 return fs; // 則返回這個實例 } // 不然, 調用FileSystem.createFileSystem(URI uri, Configuration conf)方法, 建立FileSystem實例 fs = createFileSystem(uri, conf); /* 分割線1, 期待着createFileSystem()方法的返回 */ synchronized (this) { // refetch the lock again /* * 在多線程環境下, 可能另外一個客戶端(另外一個線程)建立好了一個DistributedFileSystem實例, 並緩存到了map中 * 因此, 這時候就把當前客戶端新建立的DistributedFileSystem實例註銷 * 其實這是一個特殊的單例模式, 一個key映射一個DistributedFileSystem實例 */ FileSystem oldfs = map.get(key); if (oldfs != null) { // a file system is created while lock is releasing fs.close(); // close the new file system return oldfs; // return the old file system } /* * now insert the new file system into the map * 緩存當前新建立的DistributedFileSystem實例到map中 */ fs.key = key; map.put(key, fs); ... return fs; } }
來自分割線1, 先調用FileSystem.createFileSystem(URI uri, Configuration conf)方法ide
private static FileSystem createFileSystem(URI uri, Configuration conf ) throws IOException { // 經過讀取配置文件, 獲取FileSystem具體的URI模式: hdfs的類對象 Class<?> clazz = getFileSystemClass(uri.getScheme(), conf); // clazz = org.apache.hadoop.hdfs.DistributedFileSystem ... // 反射出一個DistributedFileSystem實例 FileSystem fs = (FileSystem)ReflectionUtils.newInstance(clazz, conf); // 對DistributedFileSystem實例初始化 fs.initialize(uri, conf); return fs; }
在調用DistributedFileSystem.initialize(URI uri, Configuration conf)方法以前, 先來看一下DistributedFileSystem類吧.函數
DistributedFileSystem是抽象類FileSystem的子類實現,oop
public class DistributedFileSystem extends FileSystem { ... DFSClient dfs; // DistributedFileSystem持有一個DFSClient類型的成員變量dfs, 最重要的成員變量! ... }
調用DistributedFileSystem.initialize(URI uri, Configuration conf)方法測試
public void initialize(URI uri, Configuration conf) throws IOException { ... // new一個DFSClient實例, 成員變量dfs引用這個DFSClient實例 this.dfs = new DFSClient(uri, conf, statistics ); /* 分割線2, 期待着new DFSClient()的返回 */ ... }
在new DFSClient實例以前, 先來看一下DFSClient類吧! 看一下到底要爲哪些成員變量賦值
public class DFSClient implements java.io.Closeable, RemotePeerFactory { ... final ClientProtocol namenode; //DFSClient持有一個ClientProtocol類型的成員變量namenode, 一個RPC代理對象 /* The service used for delegation tokens */ private Text dtService; ... }
來自分割線2, 調用DFSClient的構造函數DFSClient(URI nameNodeUri, Configuration conf, FileSystem$Statistics statistics), 再調用重載構造函數DFSClient(URI nameNodeUri, ClientProtocol rpcNamenode, Configuration conf, FileSystem$Statistics statistics)
public DFSClient(URI nameNodeUri, ClientProtocol rpcNamenode, Configuration conf, FileSystem.Statistics stats) throws IOException { ... NameNodeProxies.ProxyAndInfo<ClientProtocol> proxyInfo = null; if (numResponseToDrop > 0) { // numResponseToDrop = 0 // This case is used for testing. LOG.warn(DFSConfigKeys.DFS_CLIENT_TEST_DROP_NAMENODE_RESPONSE_NUM_KEY + " is set to " + numResponseToDrop + ", this hacked client will proactively drop responses"); proxyInfo = NameNodeProxies.createProxyWithLossyRetryHandler(conf, nameNodeUri, ClientProtocol.class, numResponseToDrop); } if (proxyInfo != null) { // proxyInfo = null this.dtService = proxyInfo.getDelegationTokenService(); this.namenode = proxyInfo.getProxy(); } else if (rpcNamenode != null) { // rpcNamenode = null // This case is used for testing. Preconditions.checkArgument(nameNodeUri == null); this.namenode = rpcNamenode; dtService = null; } else { // 前面兩個if只在測試的狀況下成立, 這個else的代碼塊纔是重點 ... /* * 建立一個NameNodeProxies.ProxyAndInfo<ClientProtocol>類型的對象, proxyInfo引用這個對象 * createProxy(conf, nameNodeUri, ClientProtocol.class)方法是否是和RPC.getProxy(Class<T> protocol, * long clientVersion, InetSocketAddress addr, Configuration conf)很像? * 沒錯! 你沒看錯! 這說明createProxy()方法內部必定會調用RPC的相關方法 * conf 都是Configuration類型的conf * nameNodeUri = hdfs://node1:9000 這不就是InetSocketAddress類型的addr的hostName和port * ClientProtocol.class 都是RPC protocol接口的類對象 * ClientProtocol is used by user code via DistributedFileSystem class to communicate * with the NameNode * ClientProtocol是DistributedFileSystem用來與NameNode通訊的 */ proxyInfo = NameNodeProxies.createProxy(conf, nameNodeUri, ClientProtocol.class); /* 分割線3, 期待着createProxy()方法的返回 */ this.dtService = proxyInfo.getDelegationTokenService(); this.namenode = proxyInfo.getProxy(); } ... }
來自分割線3, 調用NameNodeProxies.createProxy(Configuration conf, URI nameNodeUri, Class<T> xface)方法
/** * Creates the namenode proxy with the passed protocol. This will handle * creation of either HA- or non-HA-enabled proxy objects, depending upon * if the provided URI is a configured logical URI. * 經過傳過來的protocol參數, 建立namenode的代理對象. 至因而HA仍是非HA的namenode代理對象, * 這取決於實際搭建的Hadoop環境 **/ public static <T> ProxyAndInfo<T> createProxy(Configuration conf, URI nameNodeUri, Class<T> xface) throws IOException { // 獲取Hadoop實際環境中HA的配置 Class<FailoverProxyProvider<T>> failoverProxyProviderClass = getFailoverProxyProviderClass(conf, nameNodeUri, xface); if (failoverProxyProviderClass == null) { // 非HA,這裏是Hadoop的僞分佈式搭建 // Non-HA case, 建立一個非HA的namenode代理對象 return createNonHAProxy(conf, NameNode.getAddress(nameNodeUri), xface, UserGroupInformation.getCurrentUser(), true); } else { // HA // HA case FailoverProxyProvider<T> failoverProxyProvider = NameNodeProxies .createFailoverProxyProvider(conf, failoverProxyProviderClass, xface, nameNodeUri); Conf config = new Conf(conf); T proxy = (T) RetryProxy.create(xface, failoverProxyProvider, RetryPolicies.failoverOnNetworkException( RetryPolicies.TRY_ONCE_THEN_FAIL, config.maxFailoverAttempts, config.maxRetryAttempts, config.failoverSleepBaseMillis, config.failoverSleepMaxMillis)); Text dtService = HAUtil.buildTokenServiceForLogicalUri(nameNodeUri); // 返回一個proxy, dtService的封裝對象proxyInfo return new ProxyAndInfo<T>(proxy, dtService); } }
調用NameNodeProxies.createNonHAProxy(Configuration conf, InetSocketAddress nnAddr, Class<T> xface, UserGroupInformation ugi, boolean withRetries)方法
public static <T> ProxyAndInfo<T> createNonHAProxy(Configuration conf, InetSocketAddress nnAddr, Class<T> xface, UserGroupInformation ugi, boolean withRetries) throws IOException { Text dtService = SecurityUtil.buildTokenService(nnAddr); //dtService = 192.168.8.101:9000 T proxy; if (xface == ClientProtocol.class) { // xface = ClientProtocol.class // 建立一個namenode代理對象 proxy = (T) createNNProxyWithClientProtocol(nnAddr, conf, ugi, withRetries); /* 分割線4, 期待着createNNProxyWithClientProtocol()方法返回 */ } else if { ... } // 把proxy, dtService封裝成一個ProxyAndInfo對象, 並返回 return new ProxyAndInfo<T>(proxy, dtService); }