日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

sparkStreaming连接kafka整合hbase和redis

發布時間:2024/8/23 编程问答 34 豆豆
生活随笔 收集整理的這篇文章主要介紹了 sparkStreaming连接kafka整合hbase和redis 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

sparkStreaming消費kafka數據,并將數據保存到redis和hbase當中去,實現實時

import org.apache.hadoop.hbase.client.{Admin, Connection} import org.apache.hadoop.hbase.{HColumnDescriptor, HTableDescriptor, TableName} import org.apache.kafka.clients.consumer.ConsumerRecord import org.apache.kafka.common.serialization.StringDeserializer import org.apache.spark.sql.SparkSession import org.apache.spark.streaming.dstream.InputDStream import org.apache.spark.streaming.kafka010.{HasOffsetRanges, OffsetRange} import org.apache.spark.streaming.{Seconds, StreamingContext} import org.apache.spark.{SparkConf, SparkContext} import redis.clients.jedis.Jedisobject StreamingKafka extends Logging{def main(args: Array[String]): Unit = {val brokers = ConfigUtil.getConfig(Constants.KAFKA_BOOTSTRAP_SERVERS)val topics = Array(ConfigUtil.getConfig(Constants.CHENG_DU_GPS_TOPIC),ConfigUtil.getConfig(Constants.HAI_KOU_GPS_TOPIC))val conf = new SparkConf().setMaster("local[1]").setAppName("sparkKafka")val group:String = "gps_consum_group"val kafkaParams = Map[String, Object]("bootstrap.servers" -> brokers,"key.deserializer" -> classOf[StringDeserializer],"value.deserializer" -> classOf[StringDeserializer],"group.id" -> group,"auto.offset.reset" -> "latest",// earliest,latest,和none"enable.auto.commit" -> (false: java.lang.Boolean))val sparkSession: SparkSession = SparkSession.builder().config(conf).getOrCreate()val context: SparkContext = sparkSession.sparkContextcontext.setLogLevel("WARN")// val streamingContext = new StreamingContext(conf,Seconds(5))//獲取streamingContextval streamingContext: StreamingContext = new StreamingContext(context,Seconds(1))val result: InputDStream[ConsumerRecord[String, String]] = HbaseTools.getStreamingContextFromHBase(streamingContext,kafkaParams,topics,group,"(.*)gps_topic")result.foreachRDD(eachRdd =>{if(!eachRdd.isEmpty()){eachRdd.foreachPartition(eachPartition =>{val connection: Connection = HBaseUtil.getConnectionval jedis: Jedis = JedisUtil.getJedis//判斷表是否存在,如果不存在就進行創建val admin: Admin = connection.getAdminif(!admin.tableExists(TableName.valueOf(Constants.HTAB_GPS))){val htabgps = new HTableDescriptor(TableName.valueOf(Constants.HTAB_GPS))htabgps.addFamily(new HColumnDescriptor(Constants.DEFAULT_FAMILY))admin.createTable(htabgps)}if(!admin.tableExists(TableName.valueOf(Constants.HTAB_HAIKOU_ORDER))){val htabgps = new HTableDescriptor(TableName.valueOf(Constants.HTAB_HAIKOU_ORDER))htabgps.addFamily(new HColumnDescriptor(Constants.DEFAULT_FAMILY))admin.createTable(htabgps)}eachPartition.foreach(record =>{//保存到HBase和redisval consumerRecords: ConsumerRecord[String, String] = HbaseTools.saveToHBaseAndRedis(connection,jedis, record)})JedisUtil.returnJedis(jedis)connection.close()})//更新offsetval offsetRanges: Array[OffsetRange] = eachRdd.asInstanceOf[HasOffsetRanges].offsetRanges//result.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges) //將offset提交到默認的kafka的topic里面去保存for(eachrange <- offsetRanges){val startOffset: Long = eachrange.fromOffset //起始offsetval endOffset: Long = eachrange.untilOffset //結束offsetval topic: String = eachrange.topicval partition: Int = eachrange.partitionHbaseTools.saveBatchOffset(group,topic,partition+"",endOffset)}}})streamingContext.start()streamingContext.awaitTermination()} }

總結

以上是生活随笔為你收集整理的sparkStreaming连接kafka整合hbase和redis的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。

主站蜘蛛池模板: 综合色av| 五月天久久 | 日韩精品一二三区 | 五月天激情国产综合婷婷婷 | 久久精品国产欧美亚洲人人爽 | 草莓视频18免费观看 | 可以免费看的黄色网址 | 亚洲AV午夜精品 | jizz日本18 | 色综合综合色 | 日韩激情久久 | 日韩欧美高清片 | 天天摸天天插 | 色亚洲成人 | 秋霞国产午夜精品免费视频 | 在线日本中文字幕 | 日韩网站免费观看高清 | 香蕉成人av | 波多野一区二区三区 | 亚洲三级电影 | 国产情侣一区二区 | 日本欧美精品 | 久久久老熟女一区二区三区91 | 欧美性猛交ⅹ乱大交3 | 玖玖精品国产 | 99re在线观看| 成人在线观看小视频 | 成人aⅴ视频 | 亚洲香蕉中文网 | 一级二级av | 日本特黄特色aaa大片免费 | 色四虎 | 亚洲一区二区av | av香港经典三级级 在线 | 亚洲一二三精品 | 91.xxx.高清在线| 久久亚洲精品小早川怜子 | 夜夜成人 | 九九人人 | 欧美成人xxx | 中文字幕在线亚洲 | 三级小视频在线观看 | 亚洲av久久久噜噜噜噜 | 插插插干干干 | 亚洲自拍偷拍综合 | 国产资源视频 | 国产精品精华液网站 | 午夜伦理福利视频 | 国产精品久久精品 | 亚洲成人精品在线播放 | 天天射夜夜撸 | 欧美真人性野外做爰 | 丝袜熟女一区二区 | 久久精品黄aa片一区二区三区 | 亚洲第一男人天堂 | 日本老妇性生活 | 麻豆av免费在线观看 | 欧美一区二区三区日韩 | 精品一区视频 | 爆操老女人 | 一区二区日韩电影 | 久久精品无码一区二区三区 | 国产精品美女久久久久 | 91精品又粗又猛又爽 | 强videoshd酒醉 | 影音先锋人妻啪啪av资源网站 | 中文字幕精品三区 | 黄色片成人 | 综合网激情 | 黄色网址在线免费看 | 日本少妇高潮 | 波多野结衣在线看 | 91精品久久人妻一区二区夜夜夜 | 国产精品三区在线观看 | 国产真实的和子乱拍在线观看 | 日本天堂网在线观看 | 久久久久久成人精品 | 粗了大了 整进去好爽视频 日本女优中文字幕 | 婷婷丁香久久 | 四虎最新网址在线观看 | 激情婷婷六月天 | 黄色欧美在线观看 | 婷婷777| 免费又黄又爽又色的视频 | 天堂成人av | 亚洲老老头同性老头交j | 国产精品日 | 国产网站大全 | 污视频网站免费 | 美女啪啪网站 | 女人18毛片水真多18精品 | 污视频免费在线观看网站 | 午夜成人免费视频 | av中文字幕亚洲 | 青草青在线视频 | 精品国产乱码久久久久久鸭王1 | a在线免费 | 国产激情二区 | 中文字幕永久 |