日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

java spark读写hdfs_Spark读取HDFS数据输出到不同的文件

發布時間:2023/12/4 编程问答 46 豆豆
生活随笔 收集整理的這篇文章主要介紹了 java spark读写hdfs_Spark读取HDFS数据输出到不同的文件 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

最近有一個需求是這樣的:原來的數據是存儲在MySQL,然后通過Sqoop將MySQL的數據抽取到了HDFS集群上,抽取到HDFS上的數據都是純數據,字段值之間以\t分隔,現在需要將這部分數據還原為json格式的,因為這樣做的原因:一來是更清楚具體字段的含義;二來是后期的數據通過kafka直接消費存儲到HDFS,存的就是json數據,所以為了所有存儲數據格式一致,需要將歷史數據進行轉換。所以只能通過MR或者Spark進行一次數據清洗轉換了。因為需要根據每條數據中的一個時間字段將數據存儲到不同的文件中。比如一條純數據如下:

1 2019-04-26 00:32:09.0 null true 1025890 10004515

那么需要根據第二個字段信息來將數據分別存儲到不同的文件夾,分為4個時段,格式為:

/2019/04/26/00-06.txt,/2019/04/26/06-12.txt,/2019/04/26/12-18.txt,/2019/04/26/18-00.txt,

直接上spark代碼:

import org.apache.hadoop.mapred.lib.MultipleTextOutputFormat

import org.apache.spark.sql.SparkSession

/**

* spark版本將數據輸出到不同文件

* create date:2019-07-16

* author:ly

*/

object OutputToMultiFileApp {

def main(args: Array[String]): Unit = {

val inputPath = args(0)

val outputPath = args(1)

//val inputPath = "D:\\bigdata_workspace\\gey\\3\\in.txt"

//val outputPath = "D:\\bigdata_workspace\\gey\\3\\out"

val spark = SparkSession.builder().appName("OutputToMultiFileApp").master("local[*]").getOrCreate()

val data =spark.sparkContext.textFile(inputPath).map(item => {

val splits = item.toString.split("\t")

val str = "{\"id\":\"" + splits(4) + "\",\"uid\":\"" + splits(5) + "\",\"createTime\":\"" + splits(1) + "\",\"epochs\":\"1\"}"

//將時間字段作為key,包裝后的json作為value

(splits(1),str)

})

/**按Key保存到不同文件*/

data.saveAsHadoopFile(outputPath,

classOf[String],

classOf[String],

classOf[MyMultipleTextOutputFormat]

)

spark.stop()

}

}

class MyMultipleTextOutputFormat extends MultipleTextOutputFormat[Any, Any] {

//1)文件名:根據key生成我們自己的路徑

override def generateFileNameForKeyValue(key: Any, value: Any, name: String): String ={

//2019-04-26 16:32:09.0

val splits: Array[String] = key.toString.split(" ")

//2019-04-26

val ymd: String = splits(0)

//16:32:09.0

val hms: String = splits(1)

//[2019,04,26]

val arr1: Array[String] = ymd.split("-")

//[16,32,09]

val arr2: Array[String] = hms.split(":")

var temp: String = ""

val h6: Int = 6

val h12: Int = 12

val h18: Int = 18

val h24: Int = 24

val h: Int = arr2(0).toInt

if(h >= 0 && h <=6) temp = "00-06"

if(h > h6 && h <= h12) temp = "06-12"

if(h > h12 && h <= h18) temp = "12-18"

if(h > h18 && h < h24) temp = "18-00"

val paths = arr1(0) + "/" + arr1(1) + "/" + arr1(2) + "/" + temp + ".txt"

paths

}

//2)文件內容:默認同時輸出key和value。這里指定不輸出key。

override def generateActualKey(key: Any, value: Any): String = {

null

}

}

上述代碼直接在IDEA上運行,筆者是在win10上搞了一個比較小的文件測試,測試結果如下:

年份:

result1.png

月份:

result2.png

日期:

result3.png

最終數據:

result4.png

妥妥的成功了。。直接打包放到集群上運行。但是數據量大一些的話,好像會丟失數據,目前還不知道為啥。。。

歡迎大家留言討論

內容將同步到微信公眾號,歡迎關注微信公眾號:LearnBigData

qrcode.jpg

總結

以上是生活随笔為你收集整理的java spark读写hdfs_Spark读取HDFS数据输出到不同的文件的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。