Spark內置圖像數據源初探

做者:林武康,花名知瑕, 阿里巴巴計算平臺事業部EMR團隊的高級開發工程師,Apache HUE Contributor, 參與了多個開源項目的研發工做,對於分佈式系統設計應用有較豐富的經驗,目前主要專一於EMR數據開發相關的產品的研發工做。html

概述

在Apache Spark 2.4中引入了一個新的內置數據源, 圖像數據源.用戶能夠經過DataFrame API加載指定目錄的中圖像文件,生成一個DataFrame對象.經過該DataFrame對象,用戶能夠對圖像數據進行簡單的處理,而後使用MLlib進行特定的訓練和分類計算.
本文將介紹圖像數據源的實現細節和使用方法.java

簡單使用

先經過一個例子來簡單的瞭解下圖像數據源使用方法. 本例設定有一組圖像文件存放在阿里雲的OSS上, 須要對這組圖像加水印,並壓縮存儲到parquet文件中. 廢話不說,先上代碼:算法

def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setMaster("local[*]")
    val spark = SparkSession.builder()
      .config(conf)
      .getOrCreate()

    val imageDF = spark.read.format("image").load("oss://<bucket>/path/to/src/dir")
    imageDF.select("image.origin", "image.width", "image.height", "image.nChannels", "image.mode", "image.data")
        .map(row => {
          val origin = row.getAs[String]("origin")
          val width = row.getAs[Int]("width")
          val height = row.getAs[Int]("height")
          val mode = row.getAs[Int]("mode")
          val nChannels = row.getAs[Int]("nChannels")
          val data = row.getAs[Array[Byte]]("data")
          Row(Row(origin, height, width, nChannels, mode,
            markWithText(width, height, BufferedImage.TYPE_3BYTE_BGR, data, "EMR")))
        }).write.format("parquet").save("oss://<bucket>/path/to/dst/dir")
  }

  def markWithText(width: Int, height: Int, imageType: Int, data: Array[Byte], text: String): Array[Byte] = {
    val image = new BufferedImage(width, height, imageType)
    val raster = image.getData.asInstanceOf[WritableRaster]
    val pixels = data.map(_.toInt)
    raster.setPixels(0, 0, width, height, pixels)
    image.setData(raster)
    val buffImg = new BufferedImage(width, height, imageType)
    val g = buffImg.createGraphics
    g.drawImage(image, 0, 0, null)
    g.setColor(Color.red)
    g.setFont(new Font("宋體", Font.BOLD, 30))
    g.drawString(text, width/2, height/2)
    g.dispose()
    val buffer = new ByteArrayOutputStream
    ImageIO.write(buffImg, "JPG", buffer)
    buffer.toByteArray
  }

從生成的parquet文件中抽取一條圖像二進制數據,保存爲本地jpg,效果以下:apache

圖1 左圖爲原始圖像,右圖爲處理後的圖像

你可能注意到兩個圖像到顏色並不相同,這是由於Spark的圖像數據將圖像解碼爲BGR順序的數據,而示例程序在保存的時候,沒有處理這個變換,致使顏色出現了反差.框架

實現初窺

下面咱們深刻到spark源碼中來看一下實現細節.Apache Spark內置圖像數據源的實現代碼在spark-mllib這個模塊中.主要包括兩個類:分佈式

  • org.apache.spark.ml.image.ImageSchema
  • org.apache.spark.ml.source.image.ImageFileFormat

其中,ImageSchema定義了圖像文件加載爲DataFrame的Row的格式和解碼方法.ImageFileFormat提供了面向存儲層的讀寫接口.ide

格式定義

一個圖像文件被加載爲DataFrame後,對應的以下:函數

StructField("origin", StringType, true) ::
    StructField("height", IntegerType, false) ::
    StructField("width", IntegerType, false) ::
    StructField("nChannels", IntegerType, false) ::
    // OpenCV-compatible type: CV_8UC3 in most cases
    StructField("mode", IntegerType, false) ::
    // Bytes in OpenCV-compatible order: row-wise BGR in most cases
    StructField("data", BinaryType, false) :: Nil)

  val imageFields: Array[String] = columnSchema.fieldNames
  val imageSchema = StructType(StructField("image", columnSchema, true) :: Nil)

若是將該DataFrame打印出來,能夠獲得以下形式的表:oop

|image.origin        |image.width|image.height|image.nChannels|image.mode|image.data         |
+--------------------+-----------+------------+---------------+----------+-------------------+
|oss://.../dir/1.jpg |600        |343         |3              |16        |55 45 21 56  ...   |
+--------------------+-----------+------------+---------------+----------+-------------------+

其中:性能

  • origin: 原始圖像文件的路徑
  • width: 圖像的寬度,單位像素
  • height: 圖像的高度,單位像素
  • nChannels: 圖像的通道數, 如常見的RGB位圖爲通道數爲3
  • mode: 像素矩陣(data)中元素的數值類型和通道順序, 與OpenCV的類型兼容
  • data: 解碼後的像素矩陣

提示: 關於圖像的基礎支持,能夠參考以下文檔: Image file reading and writing

加載和解碼

圖像文件經過ImageFileFormat加載爲一個Row對象.

// 爲了簡化說明起見,代碼有刪減和改動
private[image] class ImageFileFormat extends FileFormat with DataSourceRegister {
  ......

  override def prepareWrite(
      sparkSession: SparkSession,
      job: Job,
      options: Map[String, String],
      dataSchema: StructType): OutputWriterFactory = {
    throw new UnsupportedOperationException("Write is not supported for image data source")
  }

  override protected def buildReader(
      sparkSession: SparkSession,
      dataSchema: StructType,
      partitionSchema: StructType,
      requiredSchema: StructType,
      filters: Seq[Filter],
      options: Map[String, String],
      hadoopConf: Configuration): (PartitionedFile) => Iterator[InternalRow] = {    
    ......
    (file: PartitionedFile) => {
    ......
        val path = new Path(origin)
        val stream = fs.open(path)
        val bytes = ByteStreams.toByteArray(stream)
        val resultOpt = ImageSchema.decode(origin, bytes) // <-- 解碼 
        val filteredResult = Iterator(resultOpt.getOrElse(ImageSchema.invalidImageRow(origin)))
     ......
          val converter = RowEncoder(requiredSchema)
          filteredResult.map(row => converter.toRow(row))
     ......
      }
    }
  }
}

從上能夠看出:

  • 當前的圖像數據源實現並不支持保存操做;
  • 圖像數據的解碼工做在ImageSchema中完成.

下面來看一下具體的解碼過程:

// 爲了簡化說明起見,代碼有刪減和改動
private[spark] def decode(origin: String, bytes: Array[Byte]): Option[Row] = {
        // 使用ImageIO加載原始圖像數據
    val img = ImageIO.read(new ByteArrayInputStream(bytes))
    if (img != null) {
      // 獲取圖像的基本屬性
      val isGray = img.getColorModel.getColorSpace.getType == ColorSpace.TYPE_GRAY
      val hasAlpha = img.getColorModel.hasAlpha
      val height = img.getHeight
      val width = img.getWidth
      // ImageIO::ImageType -> OpenCV Type
      val (nChannels, mode) = if (isGray) {
        (1, ocvTypes("CV_8UC1"))
      } else if (hasAlpha) {
        (4, ocvTypes("CV_8UC4"))
      } else {
        (3, ocvTypes("CV_8UC3"))
      }
            // 解碼
      val imageSize = height * width * nChannels
      // 用於存儲解碼後的像素矩陣
      val decoded = Array.ofDim[Byte](imageSize)
      if (isGray) {
        // 處理單通道圖像
        ...
      } else {
        // 處理多通道圖像
        var offset = 0
        for (h <- 0 until height) {
          for (w <- 0 until width) {
            val color = new Color(img.getRGB(w, h), hasAlpha)
            // 解碼後的通道順序爲BGR(A)
            decoded(offset) = color.getBlue.toByte
            decoded(offset + 1) = color.getGreen.toByte
            decoded(offset + 2) = color.getRed.toByte
            if (hasAlpha) {
              decoded(offset + 3) = color.getAlpha.toByte
            }
            offset += nChannels
          }
        }
      }
      // 轉換爲一行數據
      Some(Row(Row(origin, height, width, nChannels, mode, decoded)))
    }
  }

從上能夠看出:

  • 本數據源在實現上使用javax的ImageIO庫實現各種格式的圖像文件的解碼.ImageIO雖然是一個十分強大和專業的java圖像處理庫,可是和更專業的CV庫(如OpenCV)比起來,性能上和功能上差距仍是很是大的;
  • 解碼後的圖像通道順序和像素數值類型是固定的, 順序固定爲BGR(A), 像素數值類型爲8U;
  • 最多支持4個通道,所以像多光譜遙感圖像這類可能包含數十個波段信息的圖像就沒法支持了;
  • 解碼後輸出的信息僅包含基本的長寬、通道數和模式等字段,若是須要獲取更爲詳細元數據,如exif,GPS座標等就心有餘而力不足了;
  • 數據源在生成DataFrame時執行了圖像的解碼操做,而且解碼後的數據存儲在Java堆內內存中.這在實際項目應該是一個比較粗放的實現方式,會佔用大量的資源,包括內存和帶寬(若是發生shuffle的話,能夠考慮參考同一個圖像文件保存爲BMP和JPG的大小差異).

編碼和存儲

從上分析能夠看出,當前圖像數據源並不支持對處理後的像素矩陣進行編碼並保存爲指定格式的圖像文件.

圖像處理能力

當前版本Apache Spark並無提供面向圖像數據的UDF,圖像數據的處理須要藉助ImageIO庫或其餘更專業的CV庫.

小結

當前Apache Spark的內置圖像數據源能夠較爲方便的加載圖像文件進行分析.不過,當前的實現還十分簡陋,性能和資源消耗應該都不會太樂觀.而且,當前版本僅提供了圖像數據的加載能力,並無提供經常使用處理算法的封裝和實現,也不能很好的支持更爲專業的CV垂直領域的分析業務.固然,這和圖像數據源在Spark中的定位有關(將圖像數據做爲輸入用於訓練DL模型,這類任務對圖像的處理自己要求並很少).若是但願使用Spark框架完成更實際的圖像處理任務,還有不少工做要作,好比:

  • 支持更加豐富的元數據模型
  • 使用更專業的編解碼庫和更靈活編解碼流程控制
  • 封裝面向CV的算子和函數
  • 更高效的內存管理
  • 支持GPU

等等諸如此類的工做,限於篇幅,這裏就不展開了.
好了,再多說一句,如今Spark已經支持處理圖像數據了(雖然支持有限),那麼,視頻流數據還會遠嗎?



本文做者:開源大數據EMR

閱讀原文

本文爲雲棲社區原創內容,未經容許不得轉載。

相關文章
相關標籤/搜索