日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

java上传kafka的方法_哪种方法是将所有数据从Kafka主题复制到接收器(文件或Hive表)的最佳方法?...

發布時間:2025/3/11 编程问答 23 豆豆
生活随笔 收集整理的這篇文章主要介紹了 java上传kafka的方法_哪种方法是将所有数据从Kafka主题复制到接收器(文件或Hive表)的最佳方法?... 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

我正在使用Kafka Consumer API將所有數據從Kafka主題復制到Hive表 . 為此,我使用HDFS作為中間步驟 . 我使用唯一的組ID并將偏移重置為“最早”,以便從頭開始獲取所有數據,并在執行后忽略提交 . 然后我遍歷Kafka主題中的記錄,并將每條記錄保存到HDFS中的臨時文件中 . 然后我使用Spark從HDFS讀取數據,然后使用日期作為文件名將其保存到Parquet文件中 . 然后,我在Hive表中創建一個帶日期的分區,最后在Parquet中將文件作為分區加載到Hive中 .

正如您在下面的代碼中看到的,我使用了幾個中間步驟,這使得我的代碼遠非最佳 . 這是從Kafka主題復制所有數據的最佳推薦方法嗎?我做了一些研究,到目前為止,這是我設法開始工作的變通方法,但是,隨著記錄數量每天增加,我的執行時間達到了可容忍的極限(從2分鐘變為6分鐘到6分鐘)周) .

代碼在這里:

def start( lowerDate: String, upperDate: String )={

// Configurations for kafka consumer

val conf = ConfigFactory.parseResources("properties.conf")

val brokersip = conf.getString("enrichment.brokers.value")

val topics_in = conf.getString("enrichment.topics_in.value")

// Crea la sesion de Spark

val spark = SparkSession

.builder()

.master("yarn")

.appName("ParaTiUserXY")

.getOrCreate()

spark.sparkContext.setLogLevel("ERROR")

import spark.implicits._

val properties = new Properties

properties.put("key.deserializer", classOf[StringDeserializer])

properties.put("value.deserializer", classOf[StringDeserializer])

properties.put("bootstrap.servers", brokersip)

properties.put("auto.offset.reset", "earliest")

properties.put("group.id", "ParaTiUserXYZZ12345")

//Schema para transformar los valores del topico de Kafka a JSON

val my_schema = new StructType()

.add("longitudCliente", StringType)

.add("latitudCliente", StringType)

.add("dni", StringType)

.add("alias", StringType)

.add("segmentoCliente", StringType)

.add("timestampCliente", StringType)

.add("dateCliente", StringType)

.add("timeCliente", StringType)

.add("tokenCliente", StringType)

.add("telefonoCliente", StringType)

val consumer = new KafkaConsumer[String, String](properties)

consumer.subscribe( util.Collections.singletonList("parati_rt_geoevents") )

val fs = {

val conf = new Configuration()

FileSystem.get(conf)

}

val temp_path:Path = new Path("hdfs:///tmp/s70956/tmpstgtopics")

if( fs.exists(temp_path)){

fs.delete(temp_path, true)

}

while(true)

{

val records=consumer.poll(100)

for (record

val data = record.value.toString

//println(data)

val dataos: FSDataOutputStream = fs.create(temp_path)

val bw: BufferedWriter = new BufferedWriter( new OutputStreamWriter(dataos, "UTF-8"))

bw.append(data)

bw.close

val data_schema = spark.read.schema(my_schema).json("hdfs:///tmp/s70956/tmpstgtopics")

val fechaCliente = data_schema.select("dateCliente").first.getString(0)

if( fechaCliente < upperDate && fechaCliente >= lowerDate){

data_schema.select("longitudCliente", "latitudCliente","dni", "alias",

"segmentoCliente", "timestampCliente", "dateCliente", "timeCliente",

"tokenCliente", "telefonoCliente")

.coalesce(1).write.mode(SaveMode.Append).parquet("/desa/landing/parati/xyuser/" + fechaCliente)

}

else if( fechaCliente < lowerDate){

//

}

else if( fechaCliente >= upperDate){

break;

}

}

}

consumer.close()

}

總結

以上是生活随笔為你收集整理的java上传kafka的方法_哪种方法是将所有数据从Kafka主题复制到接收器(文件或Hive表)的最佳方法?...的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。

主站蜘蛛池模板: 精品国产乱码久久久久久免费 | 色8久久| 91福利网址| 国产成人无码网站 | 91丨porny丨 | 三级精品视频 | 伊人7| 国产精品丝袜在线观看 | 青青草成人网 | 超碰伊人久久 | 欧美日韩国产第一页 | 美女日批在线观看 | 欧美怡红院视频一区二区三区 | 国产免费一区二区三区 | 久久精品欧美视频 | 久久国产成人精品国产成人亚洲 | 精品成人 | 欧美激情精品久久 | 青草视频在线免费观看 | 被各种性器调教到哭vk | 波多野结衣av一区二区全免费观看 | 国产精品久久久久久久久毛片 | 探花精品 | 国产成人一区二区三区视频 | 天天干夜夜 | 国产农村妇女毛片精品久久 | 五月婷婷网站 | 青青国产精品视频 | 激情婷婷综合 | 日韩v| 超碰国产97| 大白屁股一区二区视频 | 色骚网 | 亚洲欧美一区二区三区在线 | av色成人 | 深夜激情网站 | 亚洲资源在线观看 | 国产精品免费久久 | 国产亚洲一区二区不卡 | 亚洲精品乱码久久久久久日本蜜臀 | 色碰视频| 少妇特黄一区二区 | 亚洲永久精品国产 | 99re在线 | 欧美日韩免费在线视频 | www.日本在线观看 | 精品一区二区在线播放 | 一本色道久久88综合无码 | 成人涩涩视频 | 高潮又黄又刺激 | 久久撸视频 | 精品理论片 | 黄色网址在线播放 | 琪琪成人| 麻豆亚洲一区 | 成年人免费在线观看网站 | 依人在线 | 亚洲一级理论片 | 成人av番号网 | 一级黄色片免费播放 | 国产成人无码av | 夜夜爽妓女8888视频免费观看 | 欧美精品中文 | 亚洲日本香蕉视频 | 深夜视频在线看 | 日韩精品成人一区二区在线 | 夫妻自拍偷拍 | 视频在线播 | 伊人日韩| 精品无码人妻少妇久久久久久 | 99热日本 | 久久夜视频 | 99色网| 日日骑夜夜操 | 亚洲最大中文字幕 | 精品国产aⅴ | 日韩成人免费观看 | 综合亚洲网 | 久久视频这里只有精品 | 91av在线播放 | 免费观看高清在线 | 日韩国产91 | 久久国产片 | 高跟肉丝丝袜呻吟啪啪网站av | 古风h啪肉h文 | www.呦呦 | 欧美激情69| 秋霞一级视频 | 中文字幕第22页 | 国产成人免费 | 最近中文字幕一区二区 | 亚洲欧美另类综合 | 日本在线视频www | 成人免费毛片日本片视频 | 久久久久久九九九 | 色哟哟入口 | 亚洲一区和二区 | 欧美综合国产 | 一区二区精品 |