做者:林武康,花名知瑕, 阿里巴巴計算平臺事業部EMR團隊的高級開發工程師,Apache HUE Contributor, 參與了多個開源項目的研發工做,對於分佈式系統設計應用有較豐富的經驗,目前主要專一於EMR數據開發相關的產品的研發工做。html
在Apache Spark 2.4中引入了一個新的內置數據源, 圖像數據源.用戶能夠經過DataFrame API加載指定目錄的中圖像文件,生成一個DataFrame對象.經過該DataFrame對象,用戶能夠對圖像數據進行簡單的處理,而後使用MLlib進行特定的訓練和分類計算.
本文將介紹圖像數據源的實現細節和使用方法.java
先經過一個例子來簡單的瞭解下圖像數據源使用方法. 本例設定有一組圖像文件存放在阿里雲的OSS上, 須要對這組圖像加水印,並壓縮存儲到parquet文件中. 廢話不說,先上代碼:算法
def main(args: Array[String]): Unit = { val conf = new SparkConf().setMaster("local[*]") val spark = SparkSession.builder() .config(conf) .getOrCreate() val imageDF = spark.read.format("image").load("oss://<bucket>/path/to/src/dir") imageDF.select("image.origin", "image.width", "image.height", "image.nChannels", "image.mode", "image.data") .map(row => { val origin = row.getAs[String]("origin") val width = row.getAs[Int]("width") val height = row.getAs[Int]("height") val mode = row.getAs[Int]("mode") val nChannels = row.getAs[Int]("nChannels") val data = row.getAs[Array[Byte]]("data") Row(Row(origin, height, width, nChannels, mode, markWithText(width, height, BufferedImage.TYPE_3BYTE_BGR, data, "EMR"))) }).write.format("parquet").save("oss://<bucket>/path/to/dst/dir") } def markWithText(width: Int, height: Int, imageType: Int, data: Array[Byte], text: String): Array[Byte] = { val image = new BufferedImage(width, height, imageType) val raster = image.getData.asInstanceOf[WritableRaster] val pixels = data.map(_.toInt) raster.setPixels(0, 0, width, height, pixels) image.setData(raster) val buffImg = new BufferedImage(width, height, imageType) val g = buffImg.createGraphics g.drawImage(image, 0, 0, null) g.setColor(Color.red) g.setFont(new Font("宋體", Font.BOLD, 30)) g.drawString(text, width/2, height/2) g.dispose() val buffer = new ByteArrayOutputStream ImageIO.write(buffImg, "JPG", buffer) buffer.toByteArray }
從生成的parquet文件中抽取一條圖像二進制數據,保存爲本地jpg,效果以下:apache
你可能注意到兩個圖像到顏色並不相同,這是由於Spark的圖像數據將圖像解碼爲BGR順序的數據,而示例程序在保存的時候,沒有處理這個變換,致使顏色出現了反差.框架
下面咱們深刻到spark源碼中來看一下實現細節.Apache Spark內置圖像數據源的實現代碼在spark-mllib這個模塊中.主要包括兩個類:分佈式
其中,ImageSchema定義了圖像文件加載爲DataFrame的Row的格式和解碼方法.ImageFileFormat提供了面向存儲層的讀寫接口.ide
一個圖像文件被加載爲DataFrame後,對應的以下:函數
StructField("origin", StringType, true) :: StructField("height", IntegerType, false) :: StructField("width", IntegerType, false) :: StructField("nChannels", IntegerType, false) :: // OpenCV-compatible type: CV_8UC3 in most cases StructField("mode", IntegerType, false) :: // Bytes in OpenCV-compatible order: row-wise BGR in most cases StructField("data", BinaryType, false) :: Nil) val imageFields: Array[String] = columnSchema.fieldNames val imageSchema = StructType(StructField("image", columnSchema, true) :: Nil)
若是將該DataFrame打印出來,能夠獲得以下形式的表:oop
|image.origin |image.width|image.height|image.nChannels|image.mode|image.data | +--------------------+-----------+------------+---------------+----------+-------------------+ |oss://.../dir/1.jpg |600 |343 |3 |16 |55 45 21 56 ... | +--------------------+-----------+------------+---------------+----------+-------------------+
其中:性能
提示: 關於圖像的基礎支持,能夠參考以下文檔: Image file reading and writing
圖像文件經過ImageFileFormat加載爲一個Row對象.
// 爲了簡化說明起見,代碼有刪減和改動 private[image] class ImageFileFormat extends FileFormat with DataSourceRegister { ...... override def prepareWrite( sparkSession: SparkSession, job: Job, options: Map[String, String], dataSchema: StructType): OutputWriterFactory = { throw new UnsupportedOperationException("Write is not supported for image data source") } override protected def buildReader( sparkSession: SparkSession, dataSchema: StructType, partitionSchema: StructType, requiredSchema: StructType, filters: Seq[Filter], options: Map[String, String], hadoopConf: Configuration): (PartitionedFile) => Iterator[InternalRow] = { ...... (file: PartitionedFile) => { ...... val path = new Path(origin) val stream = fs.open(path) val bytes = ByteStreams.toByteArray(stream) val resultOpt = ImageSchema.decode(origin, bytes) // <-- 解碼 val filteredResult = Iterator(resultOpt.getOrElse(ImageSchema.invalidImageRow(origin))) ...... val converter = RowEncoder(requiredSchema) filteredResult.map(row => converter.toRow(row)) ...... } } } }
從上能夠看出:
下面來看一下具體的解碼過程:
// 爲了簡化說明起見,代碼有刪減和改動 private[spark] def decode(origin: String, bytes: Array[Byte]): Option[Row] = { // 使用ImageIO加載原始圖像數據 val img = ImageIO.read(new ByteArrayInputStream(bytes)) if (img != null) { // 獲取圖像的基本屬性 val isGray = img.getColorModel.getColorSpace.getType == ColorSpace.TYPE_GRAY val hasAlpha = img.getColorModel.hasAlpha val height = img.getHeight val width = img.getWidth // ImageIO::ImageType -> OpenCV Type val (nChannels, mode) = if (isGray) { (1, ocvTypes("CV_8UC1")) } else if (hasAlpha) { (4, ocvTypes("CV_8UC4")) } else { (3, ocvTypes("CV_8UC3")) } // 解碼 val imageSize = height * width * nChannels // 用於存儲解碼後的像素矩陣 val decoded = Array.ofDim[Byte](imageSize) if (isGray) { // 處理單通道圖像 ... } else { // 處理多通道圖像 var offset = 0 for (h <- 0 until height) { for (w <- 0 until width) { val color = new Color(img.getRGB(w, h), hasAlpha) // 解碼後的通道順序爲BGR(A) decoded(offset) = color.getBlue.toByte decoded(offset + 1) = color.getGreen.toByte decoded(offset + 2) = color.getRed.toByte if (hasAlpha) { decoded(offset + 3) = color.getAlpha.toByte } offset += nChannels } } } // 轉換爲一行數據 Some(Row(Row(origin, height, width, nChannels, mode, decoded))) } }
從上能夠看出:
從上分析能夠看出,當前圖像數據源並不支持對處理後的像素矩陣進行編碼並保存爲指定格式的圖像文件.
當前版本Apache Spark並無提供面向圖像數據的UDF,圖像數據的處理須要藉助ImageIO庫或其餘更專業的CV庫.
當前Apache Spark的內置圖像數據源能夠較爲方便的加載圖像文件進行分析.不過,當前的實現還十分簡陋,性能和資源消耗應該都不會太樂觀.而且,當前版本僅提供了圖像數據的加載能力,並無提供經常使用處理算法的封裝和實現,也不能很好的支持更爲專業的CV垂直領域的分析業務.固然,這和圖像數據源在Spark中的定位有關(將圖像數據做爲輸入用於訓練DL模型,這類任務對圖像的處理自己要求並很少).若是但願使用Spark框架完成更實際的圖像處理任務,還有不少工做要作,好比:
等等諸如此類的工做,限於篇幅,這裏就不展開了.
好了,再多說一句,如今Spark已經支持處理圖像數據了(雖然支持有限),那麼,視頻流數據還會遠嗎?
本文爲雲棲社區原創內容,未經容許不得轉載。