做者:林武康,花名知瑕, 阿里巴巴計算平臺事業部EMR團隊的高級開發工程師,Apache HUE Contributor, 參與了多個開源項目的研發工做,對於分佈式系統設計應用有較豐富的經驗,目前主要專一於EMR數據開發相關的產品的研發工做。html
在Apache Spark 2.4中引入了一個新的內置數據源, 圖像數據源.用戶能夠經過DataFrame API加載指定目錄的中圖像文件,生成一個DataFrame對象.經過該DataFrame對象,用戶能夠對圖像數據進行簡單的處理,而後使用MLlib進行特定的訓練和分類計算.
先經過一個例子來簡單的瞭解下圖像數據源使用方法. 本例設定有一組圖像文件存放在阿里雲的OSS上, 須要對這組圖像加水印,並壓縮存儲到parquet文件中. 廢話不說,先上代碼:算法
def main(args: Array[String]): Unit = { val conf = new SparkConf().setMaster("local[*]") val spark = SparkSession.builder() .config(conf) .getOrCreate() val imageDF = spark.read.format("image").load("oss://<bucket>/path/to/src/dir") imageDF.select("image.origin", "image.width", "image.height", "image.nChannels", "image.mode", "image.data") .map(row => { val origin = row.getAs[String]("origin") val width = row.getAs[Int]("width") val height = row.getAs[Int]("height") val mode = row.getAs[Int]("mode") val nChannels = row.getAs[Int]("nChannels") val data = row.getAs[Array[Byte]]("data") Row(Row(origin, height, width, nChannels, mode, markWithText(width, height, BufferedImage.TYPE_3BYTE_BGR, data, "EMR"))) }).write.format("parquet").save("oss://<bucket>/path/to/dst/dir") } def markWithText(width: Int, height: Int, imageType: Int, data: Array[Byte], text: String): Array[Byte] = { val image = new BufferedImage(width, height, imageType) val raster = image.getData.asInstanceOf[WritableRaster] val pixels = data.map(_.toInt) raster.setPixels(0, 0, width, height, pixels) image.setData(raster) val buffImg = new BufferedImage(width, height, imageType) val g = buffImg.createGraphics g.drawImage(image, 0, 0, null) g.setColor(Color.red) g.setFont(new Font("宋體", Font.BOLD, 30)) g.drawString(text, width/2, height/2) g.dispose() val buffer = new ByteArrayOutputStream ImageIO.write(buffImg, "JPG", buffer) buffer.toByteArray }
下面咱們深刻到spark源碼中來看一下實現細節.Apache Spark內置圖像數據源的實現代碼在spark-mllib這個模塊中.主要包括兩個類:分佈式
StructField("origin", StringType, true) :: StructField("height", IntegerType, false) :: StructField("width", IntegerType, false) :: StructField("nChannels", IntegerType, false) :: // OpenCV-compatible type: CV_8UC3 in most cases StructField("mode", IntegerType, false) :: // Bytes in OpenCV-compatible order: row-wise BGR in most cases StructField("data", BinaryType, false) :: Nil) val imageFields: Array[String] = columnSchema.fieldNames val imageSchema = StructType(StructField("image", columnSchema, true) :: Nil)
|image.origin |image.width|image.height|image.nChannels|image.mode|image.data | +--------------------+-----------+------------+---------------+----------+-------------------+ |oss://.../dir/1.jpg |600 |343 |3 |16 |55 45 21 56 ... | +--------------------+-----------+------------+---------------+----------+-------------------+
提示: 關於圖像的基礎支持,能夠參考以下文檔: Image file reading and writing
// 爲了簡化說明起見,代碼有刪減和改動 private[image] class ImageFileFormat extends FileFormat with DataSourceRegister { ...... override def prepareWrite( sparkSession: SparkSession, job: Job, options: Map[String, String], dataSchema: StructType): OutputWriterFactory = { throw new UnsupportedOperationException("Write is not supported for image data source") } override protected def buildReader( sparkSession: SparkSession, dataSchema: StructType, partitionSchema: StructType, requiredSchema: StructType, filters: Seq[Filter], options: Map[String, String], hadoopConf: Configuration): (PartitionedFile) => Iterator[InternalRow] = { ...... (file: PartitionedFile) => { ...... val path = new Path(origin) val stream = fs.open(path) val bytes = ByteStreams.toByteArray(stream) val resultOpt = ImageSchema.decode(origin, bytes) // <-- 解碼 val filteredResult = Iterator(resultOpt.getOrElse(ImageSchema.invalidImageRow(origin))) ...... val converter = RowEncoder(requiredSchema) filteredResult.map(row => converter.toRow(row)) ...... } } } }
// 爲了簡化說明起見,代碼有刪減和改動 private[spark] def decode(origin: String, bytes: Array[Byte]): Option[Row] = { // 使用ImageIO加載原始圖像數據 val img = ImageIO.read(new ByteArrayInputStream(bytes)) if (img != null) { // 獲取圖像的基本屬性 val isGray = img.getColorModel.getColorSpace.getType == ColorSpace.TYPE_GRAY val hasAlpha = img.getColorModel.hasAlpha val height = img.getHeight val width = img.getWidth // ImageIO::ImageType -> OpenCV Type val (nChannels, mode) = if (isGray) { (1, ocvTypes("CV_8UC1")) } else if (hasAlpha) { (4, ocvTypes("CV_8UC4")) } else { (3, ocvTypes("CV_8UC3")) } // 解碼 val imageSize = height * width * nChannels // 用於存儲解碼後的像素矩陣 val decoded = Array.ofDim[Byte](imageSize) if (isGray) { // 處理單通道圖像 ... } else { // 處理多通道圖像 var offset = 0 for (h <- 0 until height) { for (w <- 0 until width) { val color = new Color(img.getRGB(w, h), hasAlpha) // 解碼後的通道順序爲BGR(A) decoded(offset) = color.getBlue.toByte decoded(offset + 1) = color.getGreen.toByte decoded(offset + 2) = color.getRed.toByte if (hasAlpha) { decoded(offset + 3) = color.getAlpha.toByte } offset += nChannels } } } // 轉換爲一行數據 Some(Row(Row(origin, height, width, nChannels, mode, decoded))) } }
當前版本Apache Spark並無提供面向圖像數據的UDF,圖像數據的處理須要藉助ImageIO庫或其餘更專業的CV庫.
當前Apache Spark的內置圖像數據源能夠較爲方便的加載圖像文件進行分析.不過,當前的實現還十分簡陋,性能和資源消耗應該都不會太樂觀.而且,當前版本僅提供了圖像數據的加載能力,並無提供經常使用處理算法的封裝和實現,也不能很好的支持更爲專業的CV垂直領域的分析業務.固然,這和圖像數據源在Spark中的定位有關(將圖像數據做爲輸入用於訓練DL模型,這類任務對圖像的處理自己要求並很少).若是但願使用Spark框架完成更實際的圖像處理任務,還有不少工做要作,好比: