日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 运维知识 > 数据库 >内容正文

数据库

kafka偏移量保存到mysql里_SparkStreaming+kafka保存offset的偏移量到mysql案例

發布時間:2025/5/22 数据库 40 豆豆
生活随笔 收集整理的這篇文章主要介紹了 kafka偏移量保存到mysql里_SparkStreaming+kafka保存offset的偏移量到mysql案例 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

MySQL創建存儲offset的表格

mysql> use test

mysql> create table hlw_offset(

topic varchar(32),

groupid varchar(50),

partitions int,

fromoffset bigint,

untiloffset bigint,

primary key(topic,groupid,partitions)

);

2.Maven依賴包

2.11.8

2.3.1

2.5.0

--------------------------------------------------

org.scala-lang

scala-library

${scala.version}

org.apache.spark

spark-core_2.11

${spark.version}

org.apache.spark

spark-sql_2.11

${spark.version}

org.apache.spark

spark-streaming_2.11

${spark.version}

org.apache.spark

spark-streaming-kafka-0-8_2.11

${spark.version}

mysql

mysql-connector-java

5.1.27

org.scalikejdbc

scalikejdbc_2.11

2.5.0

org.scalikejdbc

scalikejdbc-config_2.11

2.5.0

com.typesafe

config

1.3.0

org.apache.commons

commons-lang3

3.5

實現思路

1)StreamingContext

2)從kafka中獲取數據(從外部存儲獲取offset-->根據offset獲取kafka中的數據)

3)根據業務進行邏輯處理

4)將處理結果存到外部存儲中--保存offset

5)啟動程序,等待程序結束

代碼實現

1:SparkStreaming主體代碼如下

import kafka.common.TopicAndPartition

import kafka.message.MessageAndMetadata

import kafka.serializer.StringDecoder

import org.apache.spark.SparkConf

import org.apache.spark.streaming.kafka.{HasOffsetRanges, KafkaUtils}

import org.apache.spark.streaming.{Seconds, StreamingContext}

import scalikejdbc._

import scalikejdbc.config._

object JDBCOffsetApp {

def main(args: Array[String]): Unit = {

//創建SparkStreaming入口

val conf = new SparkConf().setMaster("local[2]").setAppName("JDBCOffsetApp")

val ssc = new StreamingContext(conf,Seconds(5))

//kafka消費主題

val topics = ValueUtils.getStringValue("kafka.topics").split(",").toSet

//kafka參數

//這里應用了自定義的ValueUtils工具類,來獲取application.conf里的參數,方便后期修改

val kafkaParams = Map[String,String](

"metadata.broker.list"->ValueUtils.getStringValue("metadata.broker.list"),

"auto.offset.reset"->ValueUtils.getStringValue("auto.offset.reset"),

"group.id"->ValueUtils.getStringValue("group.id")

)

//先使用scalikejdbc從MySQL數據庫中讀取offset信息

//+------------+------------------+------------+------------+-------------+

//| topic | groupid | partitions | fromoffset | untiloffset |

//+------------+------------------+------------+------------+-------------+

//MySQL表結構如上,將“topic”,“partitions”,“untiloffset”列讀取出來

//組成 fromOffsets: Map[TopicAndPartition, Long],后面createDirectStream用到

DBs.setup()

val fromOffset = DB.readOnly( implicit session => {

SQL("select * from hlw_offset").map(rs => {

(TopicAndPartition(rs.string("topic"),rs.int("partitions")),rs.long("untiloffset"))

}).list().apply()

}).toMap

//如果MySQL表中沒有offset信息,就從0開始消費;如果有,就從已經存在的offset開始消費

val messages = if (fromOffset.isEmpty) {

println("從頭開始消費...")

KafkaUtils.createDirectStream[String,String,StringDecoder,StringDecoder](ssc,kafkaParams,topics)

} else {

println("從已存在記錄開始消費...")

val messageHandler = (mm:MessageAndMetadata[String,String]) => (mm.key(),mm.message())

KafkaUtils.createDirectStream[String,String,StringDecoder,StringDecoder,(String,String)](ssc,kafkaParams,fromOffset,messageHandler)

}

messages.foreachRDD(rdd=>{

if(!rdd.isEmpty()){

//輸出rdd的數據量

println("數據統計記錄為:"+rdd.count())

//官方案例給出的獲得rdd offset信息的方法,offsetRanges是由一系列offsetRange組成的數組

// trait HasOffsetRanges {

// def offsetRanges: Array[OffsetRange]

// }

val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges

offsetRanges.foreach(x => {

//輸出每次消費的主題,分區,開始偏移量和結束偏移量

println(s"---${x.topic},${x.partition},${x.fromOffset},${x.untilOffset}---")

//將最新的偏移量信息保存到MySQL表中

DB.autoCommit( implicit session => {

SQL("replace into hlw_offset(topic,groupid,partitions,fromoffset,untiloffset) values (?,?,?,?,?)")

.bind(x.topic,ValueUtils.getStringValue("group.id"),x.partition,x.fromOffset,x.untilOffset)

.update().apply()

})

})

}

})

ssc.start()

ssc.awaitTermination()

}

}

2:自定義的ValueUtils工具類如下

import com.typesafe.config.ConfigFactory

import org.apache.commons.lang3.StringUtils

object ValueUtils {

val load = ConfigFactory.load()

def getStringValue(key:String, defaultValue:String="") = {

val value = load.getString(key)

if(StringUtils.isNotEmpty(value)) {

value

} else {

defaultValue

}

}

}

3:application.conf內容如下

metadata.broker.list = "192.168.137.251:9092"

auto.offset.reset = "smallest"

group.id = "hlw_offset_group"

kafka.topics = "hlw_offset"

serializer.class = "kafka.serializer.StringEncoder"

request.required.acks = "1"

# JDBC settings

db.default.driver = "com.mysql.jdbc.Driver"

db.default.url="jdbc:mysql://hadoop000:3306/test"

db.default.user="root"

db.default.password="123456"

4:自定義kafka producer

import java.util.{Date, Properties}

import kafka.producer.{KeyedMessage, Producer, ProducerConfig}

object KafkaProducer {

def main(args: Array[String]): Unit = {

val properties = new Properties()

properties.put("serializer.class",ValueUtils.getStringValue("serializer.class"))

properties.put("metadata.broker.list",ValueUtils.getStringValue("metadata.broker.list"))

properties.put("request.required.acks",ValueUtils.getStringValue("request.required.acks"))

val producerConfig = new ProducerConfig(properties)

val producer = new Producer[String,String](producerConfig)

val topic = ValueUtils.getStringValue("kafka.topics")

//每次產生100條數據

var i = 0

for (i

測試

1:啟動kafka服務,并創建主題

[hadoop@hadoop000 bin]$ ./kafka-server-start.sh -daemon /home/hadoop/app/kafka_2.11-0.10.0.1/config/server.properties

[hadoop@hadoop000 bin]$ ./kafka-topics.sh --list --zookeeper localhost:2181/kafka

[hadoop@hadoop000 bin]$ ./kafka-topics.sh --create --zookeeper localhost:2181/kafka --replication-factor 1 --partitions 1 --topic hlw_offset

2:測試前查看MySQL中offset表,剛開始是個空表

mysql> select * from hlw_offset;

Empty set (0.00 sec)

3:通過kafka producer產生500條數據

4:啟動SparkStreaming程序

//控制臺輸出結果:

從頭開始消費...

數據統計記錄為:500

---hlw_offset,0,0,500---

查看MySQL表,offset記錄成功

mysql> select * from hlw_offset;

+------------+------------------+------------+------------+-------------+

| topic | groupid | partitions | fromoffset | untiloffset |

+------------+------------------+------------+------------+-------------+

| hlw_offset | hlw_offset_group | 0 | 0 | 500 |

+------------+------------------+------------+------------+-------------+

5:關閉SparkStreaming程序,再使用kafka producer生產300條數據,再次啟動spark程序(如果spark從500開始消費,說明成功讀取了offset,做到了只讀取一次語義)

6:查看更新后的offset MySQL數據

mysql> select * from hlw_offset;

+------------+------------------+------------+------------+-------------+

| topic | groupid | partitions | fromoffset | untiloffset |

+------------+------------------+------------+------------+-------------+

| hlw_offset | hlw_offset_group | 0 | 500 | 800 |

+------------+------------------+------------+------------+-------------+

總結

以上是生活随笔為你收集整理的kafka偏移量保存到mysql里_SparkStreaming+kafka保存offset的偏移量到mysql案例的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。

主站蜘蛛池模板: 成人综合色站 | 男女做爰猛烈刺激 | 一级黄色大片网站 | 涩天堂 | 伊人网在线观看 | 亚洲天堂日韩在线 | 亚洲精品国产精品乱码不99热 | 久久伊人在 | 妺妺窝人体色WWW精品 | 自拍偷拍福利 | 精品国产一区二区三区噜噜噜 | 99热首页| xxxx性视频 | 99思思| 久久久久国产精品一区二区 | 最好看的中文字幕国语电影mv | 亚洲精品一区二三区不卡 | www.亚洲一区二区三区 | 亚洲视频小说 | www黄色com| 色99999| 久久久久久网址 | 前任攻略在线观看免费完整版 | 四虎影视在线播放 | 一区二区三区中文字幕 | av在线官网 | 无码人妻丰满熟妇区bbbbxxxx | 成人颜色网站 | 成年男女免费视频网站 | 国产中文字幕一区二区三区 | zzji欧美大片 | 黄色国产视频网站 | 亚洲精品久久久中文字幕 | 欧美一区视频 | 色婷婷久久一区二区三区麻豆 | 日韩黄色av | 国产日b视频 | 日韩影院一区二区 | 午夜三区 | 蜜桃99视频一区二区三区 | 免费av网站在线观看 | 人妻一区二区在线 | 嫩草视频在线免费观看 | 青青草一区二区 | 翔田千里一区 | 中文字幕在线观看线人 | av 日韩 人妻 黑人 综合 无码 | 99av视频| 国产免费黄色 | 日本毛片在线观看 | 91免费福利 | 日韩三级a | 亚洲自啪 | 69社| 亚洲视频一二三四 | 伊人日韩| 女同一区二区三区 | 日皮视频网站 | 日韩精品免费电影 | 884aa四虎影成人精品一区 | 欧美色第一页 | 日本中文在线视频 | 波多野吉衣视频在线观看 | 青青青在线视频免费观看 | 中文字幕第11页 | 亚洲精品一区在线 | av色图| 99热播 | 牲欲强的熟妇农村老妇女视频 | 在线观看日本一区二区 | 激情文学8888| 太久av| 可以看的黄色网 | www香蕉视频| 西西人体44www大胆无码 | 粉嫩av渣男av蜜乳av | 先锋资源av| 中文字幕av不卡 | 喷潮在线观看 | 亚洲av女人18毛片水真多 | 精品久久视频 | 日日碰狠狠添天天爽 | 国产女同视频 | yy6080午夜 | 在线观看国产一区 | 福利国产在线 | 欧美日韩中文国产一区发布 | 日韩在线观看网址 | 亚洲清纯唯美 | 爱情岛亚洲首页论坛小巨 | 污视频网站免费 | 性色av免费观看 | 国产精品久久久久久吹潮 | 成人久久久久 | 极品少妇在线观看 | 久久久久久久久网站 | 成 人 黄 色 片 在线播放 | 91免费看网站 | 欧美日韩图片 |