日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

Spark内置图像数据源初探

發布時間:2024/8/23 编程问答 37 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Spark内置图像数据源初探 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

概述

? ? 在Apache Spark 2.4中引入了一個新的內置數據源, 圖像數據源.用戶可以通過DataFrame API加載指定目錄的中圖像文件,生成一個DataFrame對象.通過該DataFrame對象,用戶可以對圖像數據進行簡單的處理,然后使用MLlib進行特定的訓練和分類計算.
? ? 本文將介紹圖像數據源的實現細節和使用方法.

簡單使用

? ? 先通過一個例子來簡單的了解下圖像數據源使用方法. 本例設定有一組圖像文件存放在阿里云的OSS上, 需要對這組圖像加水印,并壓縮存儲到parquet文件中. 廢話不說,先上代碼:

// 為了突出重點,代碼簡化圖像格式相關的處理邏輯def main(args: Array[String]): Unit = {val conf = new SparkConf().setMaster("local[*]")val spark = SparkSession.builder().config(conf).getOrCreate()val imageDF = spark.read.format("image").load("oss://<bucket>/path/to/src/dir")imageDF.select("image.origin", "image.width", "image.height", "image.nChannels", "image.mode", "image.data").map(row => {val origin = row.getAs[String]("origin")val width = row.getAs[Int]("width")val height = row.getAs[Int]("height")val mode = row.getAs[Int]("mode")val nChannels = row.getAs[Int]("nChannels")val data = row.getAs[Array[Byte]]("data")Row(Row(origin, height, width, nChannels, mode,markWithText(width, height, BufferedImage.TYPE_3BYTE_BGR, data, "EMR")))}).write.format("parquet").save("oss://<bucket>/path/to/dst/dir")}def markWithText(width: Int, height: Int, imageType: Int, data: Array[Byte], text: String): Array[Byte] = {val image = new BufferedImage(width, height, imageType)val raster = image.getData.asInstanceOf[WritableRaster]val pixels = data.map(_.toInt)raster.setPixels(0, 0, width, height, pixels)image.setData(raster)val buffImg = new BufferedImage(width, height, imageType)val g = buffImg.createGraphicsg.drawImage(image, 0, 0, null)g.setColor(Color.red)g.setFont(new Font("宋體", Font.BOLD, 30))g.drawString(text, width/2, height/2)g.dispose()val buffer = new ByteArrayOutputStreamImageIO.write(buffImg, "JPG", buffer)buffer.toByteArray}

從生成的parquet文件中抽取一條圖像二進制數據,保存為本地jpg,效果如下:

圖1 左圖為原始圖像,右圖為處理后的圖像

你可能注意到兩個圖像到顏色并不相同,這是因為Spark的圖像數據將圖像解碼為BGR順序的數據,而示例程序在保存的時候,沒有處理這個變換,導致顏色出現了反差.

實現初窺

下面我們深入到spark源碼中來看一下實現細節.Apache Spark內置圖像數據源的實現代碼在spark-mllib這個模塊中.主要包括兩個類:

  • org.apache.spark.ml.image.ImageSchema
  • org.apache.spark.ml.source.image.ImageFileFormat

其中,ImageSchema定義了圖像文件加載為DataFrame的Row的格式和解碼方法.ImageFileFormat提供了面向存儲層的讀寫接口.

格式定義

一個圖像文件被加載為DataFrame后,對應的如下:

val columnSchema = StructType(StructField("origin", StringType, true) ::StructField("height", IntegerType, false) ::StructField("width", IntegerType, false) ::StructField("nChannels", IntegerType, false) ::// OpenCV-compatible type: CV_8UC3 in most casesStructField("mode", IntegerType, false) ::// Bytes in OpenCV-compatible order: row-wise BGR in most casesStructField("data", BinaryType, false) :: Nil)val imageFields: Array[String] = columnSchema.fieldNamesval imageSchema = StructType(StructField("image", columnSchema, true) :: Nil)

如果將該DataFrame打印出來,可以得到如下形式的表:

+--------------------+-----------+------------+---------------+----------+-------------------+ |image.origin |image.width|image.height|image.nChannels|image.mode|image.data | +--------------------+-----------+------------+---------------+----------+-------------------+ |oss://.../dir/1.jpg |600 |343 |3 |16 |55 45 21 56 ... | +--------------------+-----------+------------+---------------+----------+-------------------+

其中:

  • origin: 原始圖像文件的路徑
  • width: 圖像的寬度,單位像素
  • height: 圖像的高度,單位像素
  • nChannels: 圖像的通道數, 如常見的RGB位圖為通道數為3
  • mode: 像素矩陣(data)中元素的數值類型和通道順序, 與OpenCV的類型兼容
  • data: 解碼后的像素矩陣

提示: 關于圖像的基礎支持,可以參考如下文檔:?Image file reading and writing

加載和解碼

圖像文件通過ImageFileFormat加載為一個Row對象.

// 文件: ImageFileFormat.scala // 為了簡化說明起見,代碼有刪減和改動 private[image] class ImageFileFormat extends FileFormat with DataSourceRegister {......override def prepareWrite(sparkSession: SparkSession,job: Job,options: Map[String, String],dataSchema: StructType): OutputWriterFactory = {throw new UnsupportedOperationException("Write is not supported for image data source")}override protected def buildReader(sparkSession: SparkSession,dataSchema: StructType,partitionSchema: StructType,requiredSchema: StructType,filters: Seq[Filter],options: Map[String, String],hadoopConf: Configuration): (PartitionedFile) => Iterator[InternalRow] = { ......(file: PartitionedFile) => {......val path = new Path(origin)val stream = fs.open(path)val bytes = ByteStreams.toByteArray(stream)val resultOpt = ImageSchema.decode(origin, bytes) // <-- 解碼 val filteredResult = Iterator(resultOpt.getOrElse(ImageSchema.invalidImageRow(origin)))......val converter = RowEncoder(requiredSchema)filteredResult.map(row => converter.toRow(row))......}}} }

從上可以看出:

  • 當前的圖像數據源實現并不支持保存操作;
  • 圖像數據的解碼工作在ImageSchema中完成.

下面來看一下具體的解碼過程:

// 文件: ImageSchema.scala // 為了簡化說明起見,代碼有刪減和改動 private[spark] def decode(origin: String, bytes: Array[Byte]): Option[Row] = {// 使用ImageIO加載原始圖像數據val img = ImageIO.read(new ByteArrayInputStream(bytes))if (img != null) {// 獲取圖像的基本屬性val isGray = img.getColorModel.getColorSpace.getType == ColorSpace.TYPE_GRAYval hasAlpha = img.getColorModel.hasAlphaval height = img.getHeightval width = img.getWidth// ImageIO::ImageType -> OpenCV Typeval (nChannels, mode) = if (isGray) {(1, ocvTypes("CV_8UC1"))} else if (hasAlpha) {(4, ocvTypes("CV_8UC4"))} else {(3, ocvTypes("CV_8UC3"))}// 解碼val imageSize = height * width * nChannels// 用于存儲解碼后的像素矩陣val decoded = Array.ofDim[Byte](imageSize)if (isGray) {// 處理單通道圖像...} else {// 處理多通道圖像var offset = 0for (h <- 0 until height) {for (w <- 0 until width) {val color = new Color(img.getRGB(w, h), hasAlpha)// 解碼后的通道順序為BGR(A)decoded(offset) = color.getBlue.toBytedecoded(offset + 1) = color.getGreen.toBytedecoded(offset + 2) = color.getRed.toByteif (hasAlpha) {decoded(offset + 3) = color.getAlpha.toByte}offset += nChannels}}}// 轉換為一行數據Some(Row(Row(origin, height, width, nChannels, mode, decoded)))}}

從上可以看出:

  • 本數據源在實現上使用javax的ImageIO庫實現各類格式的圖像文件的解碼.ImageIO雖然是一個十分強大和專業的java圖像處理庫,但是和更專業的CV庫(如OpenCV)比起來,性能上和功能上差距還是非常大的;
  • 解碼后的圖像通道順序和像素數值類型是固定的, 順序固定為BGR(A), 像素數值類型為8U;
  • 最多支持4個通道,因此像多光譜遙感圖像這類可能包含數十個波段信息的圖像就無法支持了;
  • 解碼后輸出的信息僅包含基本的長寬、通道數和模式等字段,如果需要獲取更為詳細元數據,如exif,GPS坐標等就愛莫能助了;
  • 數據源在生成DataFrame時執行了圖像的解碼操作,并且解碼后的數據存儲在Java堆內內存中.這在實際項目應該是一個比較粗放的實現方式,會占用大量的資源,包括內存和帶寬(如果發生shuffle的話,可以考慮參考同一個圖像文件保存為BMP和JPG的大小差別).

編碼和存儲

從上分析可以看出,當前圖像數據源并不支持對處理后的像素矩陣進行編碼并保存為指定格式的圖像文件.

圖像處理能力

當前版本Apache Spark并沒有提供面向圖像數據的UDF,圖像數據的處理需要借助ImageIO庫或其他更專業的CV庫.

小結

當前Apache Spark的內置圖像數據源可以較為方便的加載圖像文件進行分析.不過,當前的實現還十分簡陋,性能和資源消耗應該都不會太樂觀.并且,當前版本僅提供了圖像數據的加載能力,并沒有提供常用處理算法的封裝和實現,也不能很好的支持更為專業的CV垂直領域的分析業務.當然,這和圖像數據源在Spark中的定位有關(將圖像數據作為輸入用于訓練DL模型,這類任務對圖像的處理本身要求并不多).如果希望使用Spark框架完成更實際的圖像處理任務,還有很多工作要做,比如:

  • 支持更加豐富的元數據模型
  • 使用更專業的編解碼庫和更靈活編解碼流程控制
  • 封裝面向CV的算子和函數
  • 更高效的內存管理
  • 支持GPU

等等諸如此類的工作,限于篇幅,這里就不展開了.
好了,再多說一句,現在Spark已經支持處理圖像數據了(雖然支持有限),那么,視頻流數據還會遠嗎?


原文鏈接
本文為云棲社區原創內容,未經允許不得轉載。

創作挑戰賽新人創作獎勵來咯,堅持創作打卡瓜分現金大獎

總結

以上是生活随笔為你收集整理的Spark内置图像数据源初探的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。