你遇到了嗎?Caused by: org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.fs.FileAlreadyExistsExcepti

我在使用 Structured Streaming 的 ForeachWriter,寫 HDFS 文件時,出現了這個異常
image多線程

這個異常出現的緣由是HDFS做爲一個分佈式文件系統,支持多線程讀,可是不支持多線程寫入。因此HDFS引入了一個時間類型的鎖機制,也就是HDFS的租約機制(** lease holder**)。
這個知識點來源於這篇文章 http://blog.csdn.net/weixin_44252761/article/details/89517393併發

大數據計算時,多線程與分佈式的並行能夠很好的加速數據的處理速度。可在大數據存儲時,分佈式的文件存儲系統對併發的寫請求支持存在自然的缺陷。這是一對自然的矛盾,暫時沒法解決,只能緩和。app

怎麼緩和呢?不得不崇拜Spark開發者的智商,很是的簡單和實用。不能同時寫一個文件,可是能夠同時寫多個文件啊,只要我(spark或者程序)認爲這多個文件是一個文件,那寫一個和多個就沒有區別了。分佈式

按照這個想法,修改個人代碼,真正代碼篇幅太長,主要就是一個地方:
val hdfsWritePath = new Path(path) 改成 val hdfsWritePath = new Path(path + "/" + partitionId) 便可。ide

有興趣的朋友能夠看看更全面的代碼,原來的源代碼以下:大數據

inputStream match {
            case Some(is) =>
                is.writeStream
                        .foreach(new ForeachWriter[Row]() {
                            var successBufferedWriter: Option[BufferedWriter] = None

                            def openHdfs(path: String, partitionId: Long, version: Long): Option[BufferedWriter] = {
                                val configuration: Configuration = new Configuration()
                                configuration.set("fs.defaultFS", hdfsAddr)

                                val fileSystem: FileSystem = FileSystem.get(configuration)
                                val hdfsWritePath = new Path(path)

                                val fsDataOutputStream: FSDataOutputStream =
                                    if (fileSystem.exists(hdfsWritePath))
                                        fileSystem.append(hdfsWritePath)
                                    else
                                        fileSystem.create(hdfsWritePath)

                                Some(new BufferedWriter(new OutputStreamWriter(fsDataOutputStream, StandardCharsets.UTF_8)))
                            }

                            override def open(partitionId: Long, version: Long): Boolean = {
                                successBufferedWriter =
                                        if (successBufferedWriter.isEmpty) openHdfs(successPath, partitionId, version)
                                        else successBufferedWriter
                                true
                            }

                            override def process(value: Row): Unit = {
                                successBufferedWriter.get.write(value.mkString(","))
                                successBufferedWriter.get.newLine()
                            }

                            override def close(errorOrNull: Throwable): Unit = {
                                successBufferedWriter.get.flush()
                                successBufferedWriter.get.close()
                            }
                        })
                        .start()
                        .awaitTermination()

上述代碼初看沒問題,卻會致使標題錯誤,修改以下:spa

inputStream match {
            case Some(is) =>
                is.writeStream
                        .foreach(new ForeachWriter[Row]() {
                            var successBufferedWriter: Option[BufferedWriter] = None

                            def openHdfs(path: String, partitionId: Long, version: Long): Option[BufferedWriter] = {
                                val configuration: Configuration = new Configuration()
                                configuration.set("fs.defaultFS", hdfsAddr)

                                val fileSystem: FileSystem = FileSystem.get(configuration)
                                val hdfsWritePath = new Path(path + "/" + partitionId)

                                val fsDataOutputStream: FSDataOutputStream =
                                    if (fileSystem.exists(hdfsWritePath))
                                        fileSystem.append(hdfsWritePath)
                                    else
                                        fileSystem.create(hdfsWritePath)

                                Some(new BufferedWriter(new OutputStreamWriter(fsDataOutputStream, StandardCharsets.UTF_8)))
                            }

                            override def open(partitionId: Long, version: Long): Boolean = {
                                successBufferedWriter =
                                        if (successBufferedWriter.isEmpty) openHdfs(successPath, partitionId, version)
                                        else successBufferedWriter
                                true
                            }

                            override def process(value: Row): Unit = {
                                successBufferedWriter.get.write(value.mkString(","))
                                successBufferedWriter.get.newLine()
                            }

                            override def close(errorOrNull: Throwable): Unit = {
                                successBufferedWriter.get.flush()
                                successBufferedWriter.get.close()
                            }
                        })
                        .start()
                        .awaitTermination()

如此輕鬆(其實困擾了我一天)就解決了這個可能你們都會遇到的問題,讀取時路徑到 successPath 便可,分享出來。.net

若是有什麼問題或不足,但願你們能夠與我聯繫,共同進步。線程

完~~~~code

相關文章
相關標籤/搜索