Spark内置图像数据源初探
概述
? ? 在Apache Spark 2.4中引入了一個新的內(nèi)置數(shù)據(jù)源, 圖像數(shù)據(jù)源.用戶可以通過DataFrame API加載指定目錄的中圖像文件,生成一個DataFrame對象.通過該DataFrame對象,用戶可以對圖像數(shù)據(jù)進行簡單的處理,然后使用MLlib進行特定的訓(xùn)練和分類計算.
? ? 本文將介紹圖像數(shù)據(jù)源的實現(xiàn)細節(jié)和使用方法.
簡單使用
? ? 先通過一個例子來簡單的了解下圖像數(shù)據(jù)源使用方法. 本例設(shè)定有一組圖像文件存放在阿里云的OSS上, 需要對這組圖像加水印,并壓縮存儲到parquet文件中. 廢話不說,先上代碼:
// 為了突出重點,代碼簡化圖像格式相關(guān)的處理邏輯def main(args: Array[String]): Unit = {val conf = new SparkConf().setMaster("local[*]")val spark = SparkSession.builder().config(conf).getOrCreate()val imageDF = spark.read.format("image").load("oss://<bucket>/path/to/src/dir")imageDF.select("image.origin", "image.width", "image.height", "image.nChannels", "image.mode", "image.data").map(row => {val origin = row.getAs[String]("origin")val width = row.getAs[Int]("width")val height = row.getAs[Int]("height")val mode = row.getAs[Int]("mode")val nChannels = row.getAs[Int]("nChannels")val data = row.getAs[Array[Byte]]("data")Row(Row(origin, height, width, nChannels, mode,markWithText(width, height, BufferedImage.TYPE_3BYTE_BGR, data, "EMR")))}).write.format("parquet").save("oss://<bucket>/path/to/dst/dir")}def markWithText(width: Int, height: Int, imageType: Int, data: Array[Byte], text: String): Array[Byte] = {val image = new BufferedImage(width, height, imageType)val raster = image.getData.asInstanceOf[WritableRaster]val pixels = data.map(_.toInt)raster.setPixels(0, 0, width, height, pixels)image.setData(raster)val buffImg = new BufferedImage(width, height, imageType)val g = buffImg.createGraphicsg.drawImage(image, 0, 0, null)g.setColor(Color.red)g.setFont(new Font("宋體", Font.BOLD, 30))g.drawString(text, width/2, height/2)g.dispose()val buffer = new ByteArrayOutputStreamImageIO.write(buffImg, "JPG", buffer)buffer.toByteArray}從生成的parquet文件中抽取一條圖像二進制數(shù)據(jù),保存為本地jpg,效果如下:
圖1 左圖為原始圖像,右圖為處理后的圖像
你可能注意到兩個圖像到顏色并不相同,這是因為Spark的圖像數(shù)據(jù)將圖像解碼為BGR順序的數(shù)據(jù),而示例程序在保存的時候,沒有處理這個變換,導(dǎo)致顏色出現(xiàn)了反差.
實現(xiàn)初窺
下面我們深入到spark源碼中來看一下實現(xiàn)細節(jié).Apache Spark內(nèi)置圖像數(shù)據(jù)源的實現(xiàn)代碼在spark-mllib這個模塊中.主要包括兩個類:
- org.apache.spark.ml.image.ImageSchema
- org.apache.spark.ml.source.image.ImageFileFormat
其中,ImageSchema定義了圖像文件加載為DataFrame的Row的格式和解碼方法.ImageFileFormat提供了面向存儲層的讀寫接口.
格式定義
一個圖像文件被加載為DataFrame后,對應(yīng)的如下:
val columnSchema = StructType(StructField("origin", StringType, true) ::StructField("height", IntegerType, false) ::StructField("width", IntegerType, false) ::StructField("nChannels", IntegerType, false) ::// OpenCV-compatible type: CV_8UC3 in most casesStructField("mode", IntegerType, false) ::// Bytes in OpenCV-compatible order: row-wise BGR in most casesStructField("data", BinaryType, false) :: Nil)val imageFields: Array[String] = columnSchema.fieldNamesval imageSchema = StructType(StructField("image", columnSchema, true) :: Nil)如果將該DataFrame打印出來,可以得到如下形式的表:
+--------------------+-----------+------------+---------------+----------+-------------------+ |image.origin |image.width|image.height|image.nChannels|image.mode|image.data | +--------------------+-----------+------------+---------------+----------+-------------------+ |oss://.../dir/1.jpg |600 |343 |3 |16 |55 45 21 56 ... | +--------------------+-----------+------------+---------------+----------+-------------------+其中:
- origin: 原始圖像文件的路徑
- width: 圖像的寬度,單位像素
- height: 圖像的高度,單位像素
- nChannels: 圖像的通道數(shù), 如常見的RGB位圖為通道數(shù)為3
- mode: 像素矩陣(data)中元素的數(shù)值類型和通道順序, 與OpenCV的類型兼容
- data: 解碼后的像素矩陣
提示: 關(guān)于圖像的基礎(chǔ)支持,可以參考如下文檔:?Image file reading and writing
加載和解碼
圖像文件通過ImageFileFormat加載為一個Row對象.
// 文件: ImageFileFormat.scala // 為了簡化說明起見,代碼有刪減和改動 private[image] class ImageFileFormat extends FileFormat with DataSourceRegister {......override def prepareWrite(sparkSession: SparkSession,job: Job,options: Map[String, String],dataSchema: StructType): OutputWriterFactory = {throw new UnsupportedOperationException("Write is not supported for image data source")}override protected def buildReader(sparkSession: SparkSession,dataSchema: StructType,partitionSchema: StructType,requiredSchema: StructType,filters: Seq[Filter],options: Map[String, String],hadoopConf: Configuration): (PartitionedFile) => Iterator[InternalRow] = { ......(file: PartitionedFile) => {......val path = new Path(origin)val stream = fs.open(path)val bytes = ByteStreams.toByteArray(stream)val resultOpt = ImageSchema.decode(origin, bytes) // <-- 解碼 val filteredResult = Iterator(resultOpt.getOrElse(ImageSchema.invalidImageRow(origin)))......val converter = RowEncoder(requiredSchema)filteredResult.map(row => converter.toRow(row))......}}} }從上可以看出:
- 當(dāng)前的圖像數(shù)據(jù)源實現(xiàn)并不支持保存操作;
- 圖像數(shù)據(jù)的解碼工作在ImageSchema中完成.
下面來看一下具體的解碼過程:
// 文件: ImageSchema.scala // 為了簡化說明起見,代碼有刪減和改動 private[spark] def decode(origin: String, bytes: Array[Byte]): Option[Row] = {// 使用ImageIO加載原始圖像數(shù)據(jù)val img = ImageIO.read(new ByteArrayInputStream(bytes))if (img != null) {// 獲取圖像的基本屬性val isGray = img.getColorModel.getColorSpace.getType == ColorSpace.TYPE_GRAYval hasAlpha = img.getColorModel.hasAlphaval height = img.getHeightval width = img.getWidth// ImageIO::ImageType -> OpenCV Typeval (nChannels, mode) = if (isGray) {(1, ocvTypes("CV_8UC1"))} else if (hasAlpha) {(4, ocvTypes("CV_8UC4"))} else {(3, ocvTypes("CV_8UC3"))}// 解碼val imageSize = height * width * nChannels// 用于存儲解碼后的像素矩陣val decoded = Array.ofDim[Byte](imageSize)if (isGray) {// 處理單通道圖像...} else {// 處理多通道圖像var offset = 0for (h <- 0 until height) {for (w <- 0 until width) {val color = new Color(img.getRGB(w, h), hasAlpha)// 解碼后的通道順序為BGR(A)decoded(offset) = color.getBlue.toBytedecoded(offset + 1) = color.getGreen.toBytedecoded(offset + 2) = color.getRed.toByteif (hasAlpha) {decoded(offset + 3) = color.getAlpha.toByte}offset += nChannels}}}// 轉(zhuǎn)換為一行數(shù)據(jù)Some(Row(Row(origin, height, width, nChannels, mode, decoded)))}}從上可以看出:
- 本數(shù)據(jù)源在實現(xiàn)上使用javax的ImageIO庫實現(xiàn)各類格式的圖像文件的解碼.ImageIO雖然是一個十分強大和專業(yè)的java圖像處理庫,但是和更專業(yè)的CV庫(如OpenCV)比起來,性能上和功能上差距還是非常大的;
- 解碼后的圖像通道順序和像素數(shù)值類型是固定的, 順序固定為BGR(A), 像素數(shù)值類型為8U;
- 最多支持4個通道,因此像多光譜遙感圖像這類可能包含數(shù)十個波段信息的圖像就無法支持了;
- 解碼后輸出的信息僅包含基本的長寬、通道數(shù)和模式等字段,如果需要獲取更為詳細元數(shù)據(jù),如exif,GPS坐標等就愛莫能助了;
- 數(shù)據(jù)源在生成DataFrame時執(zhí)行了圖像的解碼操作,并且解碼后的數(shù)據(jù)存儲在Java堆內(nèi)內(nèi)存中.這在實際項目應(yīng)該是一個比較粗放的實現(xiàn)方式,會占用大量的資源,包括內(nèi)存和帶寬(如果發(fā)生shuffle的話,可以考慮參考同一個圖像文件保存為BMP和JPG的大小差別).
編碼和存儲
從上分析可以看出,當(dāng)前圖像數(shù)據(jù)源并不支持對處理后的像素矩陣進行編碼并保存為指定格式的圖像文件.
圖像處理能力
當(dāng)前版本Apache Spark并沒有提供面向圖像數(shù)據(jù)的UDF,圖像數(shù)據(jù)的處理需要借助ImageIO庫或其他更專業(yè)的CV庫.
小結(jié)
當(dāng)前Apache Spark的內(nèi)置圖像數(shù)據(jù)源可以較為方便的加載圖像文件進行分析.不過,當(dāng)前的實現(xiàn)還十分簡陋,性能和資源消耗應(yīng)該都不會太樂觀.并且,當(dāng)前版本僅提供了圖像數(shù)據(jù)的加載能力,并沒有提供常用處理算法的封裝和實現(xiàn),也不能很好的支持更為專業(yè)的CV垂直領(lǐng)域的分析業(yè)務(wù).當(dāng)然,這和圖像數(shù)據(jù)源在Spark中的定位有關(guān)(將圖像數(shù)據(jù)作為輸入用于訓(xùn)練DL模型,這類任務(wù)對圖像的處理本身要求并不多).如果希望使用Spark框架完成更實際的圖像處理任務(wù),還有很多工作要做,比如:
- 支持更加豐富的元數(shù)據(jù)模型
- 使用更專業(yè)的編解碼庫和更靈活編解碼流程控制
- 封裝面向CV的算子和函數(shù)
- 更高效的內(nèi)存管理
- 支持GPU
等等諸如此類的工作,限于篇幅,這里就不展開了.
好了,再多說一句,現(xiàn)在Spark已經(jīng)支持處理圖像數(shù)據(jù)了(雖然支持有限),那么,視頻流數(shù)據(jù)還會遠嗎?
原文鏈接
本文為云棲社區(qū)原創(chuàng)內(nèi)容,未經(jīng)允許不得轉(zhuǎn)載。
總結(jié)
以上是生活随笔為你收集整理的Spark内置图像数据源初探的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 云原生应用 Kubernetes 监控与
- 下一篇: 使用EMR-Kafka Connect进