日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問(wèn) 生活随笔!

生活随笔

當(dāng)前位置: 首頁(yè) > 编程资源 > 编程问答 >内容正文

编程问答

【原创】大叔问题定位分享(12)Spark保存文本类型文件(text、csv、json等)到hdfs时为什么是压缩格式的...

發(fā)布時(shí)間:2025/3/15 编程问答 39 豆豆
生活随笔 收集整理的這篇文章主要介紹了 【原创】大叔问题定位分享(12)Spark保存文本类型文件(text、csv、json等)到hdfs时为什么是压缩格式的... 小編覺得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.

問(wèn)題重現(xiàn)

rdd.repartition(1).write.csv(outPath)

寫文件之后發(fā)現(xiàn)文件是壓縮過(guò)的

?

write時(shí)首先會(huì)獲取hadoopConf,然后從中獲取是否壓縮以及壓縮格式

org.apache.spark.sql.execution.datasources.DataSource

def write(

org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand

val hadoopConf = sparkSession.sessionState.newHadoopConfWithOptions(options)

org.apache.hadoop.mapreduce.lib.output.TextOutputFormat

public RecordWriter<K, V> getRecordWriter(TaskAttemptContext job) throws IOException, InterruptedException {Configuration conf = job.getConfiguration();boolean isCompressed = getCompressOutput(job);String keyValueSeparator = conf.get(SEPERATOR, "\t");CompressionCodec codec = null;String extension = "";if (isCompressed) {Class<? extends CompressionCodec> codecClass = getOutputCompressorClass(job, GzipCodec.class);codec = (CompressionCodec)ReflectionUtils.newInstance(codecClass, conf);extension = codec.getDefaultExtension();}

isCompressed取的是mapreduce.output.fileoutputformat.compress,codecClass取的是mapreduce.output.fileoutputformat.compress.codec

?

hadoopConf初始化過(guò)程為

org.apache.spark.sql.internal.SessionState

def newHadoopConf(): Configuration = {val hadoopConf = new Configuration(sparkSession.sparkContext.hadoopConfiguration)

org.apache.spark.SparkContext

_hadoopConfiguration = SparkHadoopUtil.get.newConfiguration(_conf)def newConfiguration(conf: SparkConf): Configuration = {val hadoopConf = new Configuration()appendS3AndSparkHadoopConfigurations(conf, hadoopConf)hadoopConf}def appendS3AndSparkHadoopConfigurations(conf: SparkConf, hadoopConf: Configuration): Unit = {...conf.getAll.foreach { case (key, value) =>if (key.startsWith("spark.hadoop.")) {hadoopConf.set(key.substring("spark.hadoop.".length), value)}}

?

hadoopConf默認(rèn)會(huì)從classpath中加載所有的hadoop相關(guān)配置文件,可以通過(guò)spark-shell來(lái)簡(jiǎn)單測(cè)試:

scala> val hc = spark.sparkContext.hadoopConfiguration

hc: org.apache.hadoop.conf.Configuration = Configuration: core-default.xml, core-site.xml, mapred-default.xml, mapred-site.xml, yarn-default.xml, yarn-site.xml, hdfs-default.xml, hdfs-site.xml

scala> println(hc.get("mapreduce.output.fileoutputformat.compress"))

true

scala> println(hc.get("mapreduce.output.fileoutputformat.compress.codec"))

org.apache.hadoop.io.compress.DefaultCodec

?

綜上,只需要在創(chuàng)建SparkConf的時(shí)候設(shè)置spark.hadoop.mapreduce.output.fileoutputformat.compress=false即可不壓縮,

val sparkConf = new SparkConf().set("spark.hadoop.mapreduce.output.fileoutputformat.compress", "false")

另外還可以通過(guò)option來(lái)控制

rdd.repartition(1).write.option("compression", "none").csv(outPath)

?

轉(zhuǎn)載于:https://www.cnblogs.com/barneywill/p/10109568.html

總結(jié)

以上是生活随笔為你收集整理的【原创】大叔问题定位分享(12)Spark保存文本类型文件(text、csv、json等)到hdfs时为什么是压缩格式的...的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。

如果覺得生活随笔網(wǎng)站內(nèi)容還不錯(cuò),歡迎將生活随笔推薦給好友。