在spark中經過hdfs的java接口併發寫文件出現了數據丟失的問題,一頓操做後發現原來是FileSystem的緩存機制。補一課先java
FileSystem.get(config)是如何建立一個hadoop的FileSystem。
分爲3個步驟。
1. 初始化全部支持的FileSystem(沒有實例話,只是緩存類)
2. 經過uri的scheme拿到相應FileSystem
3. 緩存機制(若是不關閉的話,默認是開啓)
下面詳細分析一下各步驟流程
1. 初始化
經過java提供的ServiceLoader來錄入全部可能的FileSystem,就像這樣
ServiceLoader<FileSystem> serviceLoader = ServiceLoader.load(FileSystem.class); for (FileSystem fs : serviceLoader) { SERVICE_FILE_SYSTEMS.put(fs.getScheme(), fs.getClass()); }
待初始化的類經過配置文件聲明,配置能夠在hadoop-hdfs.jar裏找到
捎帶一嘴,java提供的ServiceLoader有點像乞丐版spring的依賴反轉。spring
2.scheme
經過對Uri的解析來判斷建立一個什麼FileSystem,
例如
hdfs://master:9200/test的scheme就是hdfs。
而後經過scheme和已經緩存好的FileSystem映射,找到須要實例化的類。緩存
例如scheme是hdfs,那麼就會建立一個DistributedFileSystem。
3. 緩存
FileSystem類中有一個Cache內部類,用於緩存已經被實例化的FileSystem。注意這個跟鏈接池仍是有區別的,Cache中的緩存只是一個map,能夠被多個線程拿到。這就會有一個問題,當你多線程同時get FileSystem的時候,可能返回的是同一個對象。因此切記,在多線程場景中,不要隨意調用FileSystem.close,你關的鏈接可能會影響到其餘正在使用的線程。多線程
注意: 當你在其餘框架上拿fileSystem對象須要額外注意,例如在spark上進行 FileSystem.get(),若是你想自定義某些配置,設置hdfs的副本數(dfs.replication) 之類,你必須在configuration中關閉FileSystem的緩存機制,也就是設置併發
configuration.set("fs.hdfs.impl.disable.cache","true")
這很重要,由於你不肯定spark是否在你以前建立了一個FileSystem,而你獲得的可能不是你想要的。框架
參考資料oop
// 遇到的相同問題spa