日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當(dāng)前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

5.Flink对接Kafka入门

發(fā)布時(shí)間:2023/12/8 编程问答 26 豆豆
生活随笔 收集整理的這篇文章主要介紹了 5.Flink对接Kafka入门 小編覺得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.

Flink Connector Kafka

  • 1. Kafka
    • 1.1. [Kafka官網(wǎng)](http://kafka.apache.org/)
    • 1.2. Kafka 簡述
    • 1.3. Kafka特性
    • 1.4. kafka的應(yīng)用場景
    • 1.5. kafka-manager的部署
    • 1.6. `使用Kafka Connect導(dǎo)入/導(dǎo)出數(shù)據(jù)`
    • 1.7. [Kafka日志存儲(chǔ)原理](https://blog.csdn.net/shujuelin/article/details/80898624)
  • 2. Kafka與Flink的融合
    • 2.1. kafka連接flink流計(jì)算,實(shí)現(xiàn)flink消費(fèi)kafka的數(shù)據(jù)
    • 2.2. flink 讀取kafka并且自定義水印再將數(shù)據(jù)寫入kafka中
  • 3. Airbnb 是如何通過 balanced Kafka reader 來擴(kuò)展 Spark streaming 實(shí)時(shí)流處理能力的
  • 4. 寄語:海闊憑魚躍,天高任鳥飛

1. Kafka

1.1. Kafka官網(wǎng)

1.2. Kafka 簡述


  • Kafka 是一個(gè)分布式消息系統(tǒng):具有生產(chǎn)者、消費(fèi)者的功能。它提供了類似于JMS 的特性,但是在設(shè)計(jì)實(shí)現(xiàn)上完全不同,此外它并不是JMS 規(guī)范的實(shí)現(xiàn)。

1.3. Kafka特性

  • 消息持久化:基于文件系統(tǒng)來存儲(chǔ)和緩存消息
  • 高吞吐量
  • 多客戶端支持:核心模塊用Scala 語言開發(fā),Kafka 提供了多種開發(fā)語言的接入,如Java 、Scala、C 、C++、Python 、Go 、Erlang 、Ruby 、Node. 等
  • 安全機(jī)制
    • 通過SSL 和SASL(Kerberos), SASL/PLA時(shí)驗(yàn)證機(jī)制支持生產(chǎn)者、消費(fèi)者與broker連接時(shí)的身份認(rèn)證;
    • 支持代理與ZooKeeper 連接身份驗(yàn)證
    • 通信時(shí)數(shù)據(jù)加密
    • 客戶端讀、寫權(quán)限認(rèn)證
    • Kafka 支持與外部其他認(rèn)證授權(quán)服務(wù)的集成
  • 數(shù)據(jù)備份
  • 輕量級(jí)
  • 消息壓縮

1.4. kafka的應(yīng)用場景

  • Kafka作為消息傳遞系統(tǒng)
  • Kafka 作為存儲(chǔ)系統(tǒng)
  • Kafka用做流處理
  • 消息,存儲(chǔ),流處理結(jié)合起來使用

1.5. kafka-manager的部署

Kafka Manager 由 yahoo 公司開發(fā),該工具可以方便查看集群 主題分布情況,同時(shí)支持對(duì) 多個(gè)集群的管理、分區(qū)平衡以及創(chuàng)建主題等操作。

  • Centos7安裝kafka-manager

  • 啟動(dòng)腳本

    • bin/cmak -Dconfig.file=conf/application.conf -java-home /usr/lib/jdk-11.0.6 -Dhttp.port=9008 &
  • 界面效果

  • 注意

1.6. 使用Kafka Connect導(dǎo)入/導(dǎo)出數(shù)據(jù)

  • 替代Flume——Kafka Connect
  • 集群模式
    • 注意: 在集群模式下,配置并不會(huì)在命令行傳進(jìn)去,而是需要REST API來創(chuàng)建,修改和銷毀連接器。
    • 通過一個(gè)示例了解kafka connect連接器
    • kafka connect簡介以及部署

1.7. Kafka日志存儲(chǔ)原理

Kafka的Message存儲(chǔ)采用了分區(qū)(partition),分段(LogSegment)和稀疏索引這幾個(gè)手段來達(dá)到了高效性

  • 查看分區(qū).index文件

    bin/kafka-run-class.sh kafka.tools.DumpLogSegments --files kafka-logs/t2-2/00000000000000000000.index
  • 查看log文件

    /bin/kafka-run-class.sh kafka.tools.DumpLogSegments --files t1-1/00000000000000000000.log --print-data-log
  • 查看TimeIndex文件

    bin/kafka-run-class.sh kafka.tools.DumpLogSegments --files t1-2/00000000000000000000.timeindex --verify-index-only
  • 引入時(shí)間戳的作用

2. Kafka與Flink的融合

Flink 提供了專門的 Kafka 連接器,向 Kafka topic 中讀取或者寫入數(shù)據(jù)。Flink Kafka Consumer 集成了 Flink 的 Checkpoint 機(jī)制,可提供 exactly-once 的處理語義。為此,Flink 并不完全依賴于跟蹤 Kafka 消費(fèi)組的偏移量,而是在內(nèi)部跟蹤和檢查偏移量。

2.1. kafka連接flink流計(jì)算,實(shí)現(xiàn)flink消費(fèi)kafka的數(shù)據(jù)

  • 創(chuàng)建flink項(xiàng)目
    sbt new tillrohrmann/flink-project.g8

  • 配置sbt

    ThisBuild / resolvers ++= Seq("Apache Development Snapshot Repository" at "https://repository.apache.org/content/repositories/snapshots/",Resolver.mavenLocal )name := "FlinkKafkaProject"version := "1.0"organization := "com.xiaofan"ThisBuild / scalaVersion := "2.12.6"val flinkVersion = "1.10.0" val kafkaVersion = "2.2.0"val flinkDependencies = Seq("org.apache.flink" %% "flink-scala" % flinkVersion % "provided","org.apache.kafka" %% "kafka" % kafkaVersion % "provided","org.apache.flink" %% "flink-connector-kafka" % flinkVersion,"org.apache.flink" %% "flink-streaming-scala" % flinkVersion % "provided")lazy val root = (project in file(".")).settings(libraryDependencies ++= flinkDependencies)assembly / mainClass := Some("com.xiaofan.Job")// make run command include the provided dependencies Compile / run := Defaults.runTask(Compile / fullClasspath,Compile / run / mainClass,Compile / run / runner).evaluated// stays inside the sbt console when we press "ctrl-c" while a Flink programme executes with "run" or "runMain" Compile / run / fork := true Global / cancelable := true// exclude Scala library from assembly assembly / assemblyOption := (assembly / assemblyOption).value.copy(includeScala = false)
  • 源代碼

    package com.xiaofanimport java.util.Propertiesimport org.apache.flink.api.common.serialization.SimpleStringSchema import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment, _} import org.apache.flink.streaming.api.{CheckpointingMode, TimeCharacteristic} import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer/*** 用flink消費(fèi)kafka** @author xiaofan*/ object ReadingFromKafka {val ZOOKEEPER_HOST = "192.168.1.23:2181,192.168.1.24:2181,192.168.1.25:2181"val KAFKA_BROKER = "192.168.1.23:9091,192.168.1.24:9091,192.168.1.25:9091"val TRANSACTION_GROUP = "com.xiaofan.flink"def main(args: Array[String]): Unit = {val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironmentenv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)env.enableCheckpointing(1000)env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)// configure kafka consumerval kafkaProps = new Properties()kafkaProps.setProperty("zookeeper.connect", ZOOKEEPER_HOST)kafkaProps.setProperty("bootstrap.servers", KAFKA_BROKER)kafkaProps.setProperty("group.id", TRANSACTION_GROUP)val transaction: DataStream[String] = env.addSource(new FlinkKafkaConsumer[String]("xiaofan01", new SimpleStringSchema(), kafkaProps))transaction.printenv.execute()} }
  • 啟動(dòng)kafka集群,運(yùn)行結(jié)果

2.2. flink 讀取kafka并且自定義水印再將數(shù)據(jù)寫入kafka中

  • 需求說明(自定義窗口,每分鐘的詞頻統(tǒng)計(jì))

    • 從kafka中讀取數(shù)據(jù)(topic:t1)
    • kafka中有event time時(shí)間值,通過該時(shí)間戳來進(jìn)行時(shí)間劃分,窗口長度為10秒,窗口步長為5秒
    • 由于生產(chǎn)中可能會(huì)因?yàn)榫W(wǎng)絡(luò)或者其他原因?qū)е聰?shù)據(jù)延時(shí),比如 00:00:10 時(shí)間的數(shù)據(jù)可能 00:00:12 才會(huì)傳入kafka中,所以在flink的處理中應(yīng)該設(shè)置延時(shí)等待處理,這里設(shè)置的2秒,可以自行修改。
    • 結(jié)果數(shù)據(jù)寫入kafka中(topic:t2)(數(shù)據(jù)格式 time:時(shí)間 count:每分鐘的處理?xiàng)l數(shù))
  • 準(zhǔn)備環(huán)境flink1.10.0 + kafka2.2.0

  • 創(chuàng)建topic

    bin/kafka-topics.sh --create --bootstrap-server 192.168.1.25:9091 --replication-factor 2 --partitions 3 --topic t1 bin/kafka-topics.sh --create --bootstrap-server 192.168.1.25:9091 --replication-factor 2 --partitions 3 --topic t2
  • 向t1中生產(chǎn)數(shù)據(jù)

    package com.xiaofanimport java.util.Propertiesimport org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}object ProduceData {def main(args: Array[String]): Unit = {val props = new Properties()props.put("bootstrap.servers", "192.168.1.25:9091")props.put("acks", "1")props.put("retries", "3")props.put("batch.size", "16384") // 16Kprops.put("linger.ms", "1")props.put("buffer.memory", "33554432") // 32Mprops.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")val producer = new KafkaProducer[String, String](props)var i = 0while (true) {i += 1// 模擬標(biāo)記事件時(shí)間val record = new ProducerRecord[String, String]("t1", i + "," + System.currentTimeMillis())// 只管發(fā)送消息,不管是否發(fā)送成功producer.send(record)Thread.sleep(300)}} }
  • 消費(fèi)t1數(shù)據(jù),處理后再次傳入kafka t2

    package com.xiaofanimport java.text.SimpleDateFormat import java.util.{Date, Properties}import org.apache.flink.api.common.serialization.SimpleStringSchema import org.apache.flink.streaming.api.TimeCharacteristic import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment, _} import org.apache.flink.streaming.api.watermark.Watermark import org.apache.flink.streaming.api.windowing.time.Time import org.apache.flink.streaming.connectors.kafka.internals.KeyedSerializationSchemaWrapper import org.apache.flink.streaming.connectors.kafka.{FlinkKafkaConsumer, FlinkKafkaProducer}/*** Watermark 案例* 根據(jù)自定義水印定義時(shí)間,計(jì)算每秒的消息數(shù)并且寫入 kafka中*/ object StreamingWindowWatermarkScala {def main(args: Array[String]): Unit = {val env = StreamExecutionEnvironment.getExecutionEnvironmentenv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)env.setParallelism(1)val topic = "t1"val prop = new Properties()prop.setProperty("bootstrap.servers","192.168.1.25:9091")prop.setProperty("group.id","con1")val myConsumer = new FlinkKafkaConsumer[String](topic,new SimpleStringSchema(),prop)// 添加源val text = env.addSource(myConsumer)val inputMap = text.map(line=>{val arr = line.split(",")(arr(0),arr(1).trim.toLong)})// 添加水印val waterMarkStream = inputMap.assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks[(String, Long)] {var currentMaxTimestamp = 0Lvar maxOutOfOrderness = 3000L// 最大允許的亂序時(shí)間是10sval sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");override def getCurrentWatermark = new Watermark(currentMaxTimestamp - maxOutOfOrderness)override def extractTimestamp(element: (String, Long), previousElementTimestamp: Long) = {val timestamp = element._2currentMaxTimestamp = Math.max(timestamp, currentMaxTimestamp)val id = Thread.currentThread().getIdprintln("currentThreadId:"+id+",key:"+element._1+",eventtime:["+element._2+"|"+sdf.format(element._2)+"],currentMaxTimestamp:["+currentMaxTimestamp+"|"+ sdf.format(currentMaxTimestamp)+"],watermark:["+getCurrentWatermark().getTimestamp+"|"+sdf.format(getCurrentWatermark().getTimestamp)+"]")timestamp}})val window = waterMarkStream.map(x=>(x._2,1)).timeWindowAll(Time.seconds(1),Time.seconds(1)).sum(1).map(x=>"time:"+tranTimeToString(x._1.toString)+" count:"+x._2)// .window(TumblingEventTimeWindows.of(Time.seconds(3))) //按照消息的EventTime分配窗口,和調(diào)用TimeWindow效果一樣val topic2 = "t2"val props = new Properties()props.setProperty("bootstrap.servers","192.168.1.25:9091")//使用支持僅一次語義的形式val myProducer = new FlinkKafkaProducer[String](topic2,new KeyedSerializationSchemaWrapper[String](new SimpleStringSchema()), props, FlinkKafkaProducer.Semantic.EXACTLY_ONCE)window.addSink(myProducer)env.execute("StreamingWindowWatermarkScala")}def tranTimeToString(timestamp:String) :String={val fm = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")val time = fm.format(new Date(timestamp.toLong))time}}
  • 運(yùn)行效果



3. Airbnb 是如何通過 balanced Kafka reader 來擴(kuò)展 Spark streaming 實(shí)時(shí)流處理能力的

  • 參考鏈接1
  • 參考鏈接2

4. 寄語:海闊憑魚躍,天高任鳥飛

總結(jié)

以上是生活随笔為你收集整理的5.Flink对接Kafka入门的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網(wǎng)站內(nèi)容還不錯(cuò),歡迎將生活随笔推薦給好友。

主站蜘蛛池模板: 欧美一级大片在线观看 | 亚洲一区二区在线免费 | 亚洲高清视频一区二区 | 性福宝av| www五月天com | 亚洲激情在线 | 国产在线视频第一页 | 伊人天天 | 国产777 | 69热在线 | 波多野结衣中文字幕一区二区三区 | 欧美日韩国产一级 | 免费福利小视频 | 色撸撸在线观看 | 久草视频国产 | 伊人伊网 | 欧美一级久久 | 欧美特级特黄aaaaaa在线看 | 野外(巨肉高h) | 1024金沙人妻一区二区三区 | 精品久久一| 一区二区高清在线观看 | 咪咪色图 | 色爽爽爽 | 天天干夜夜欢 | 中文字幕一区二区三区门四区五区 | 日韩尤物 | 亚洲成人aaa | 亚洲欧美激情小说另类 | 久久综合中文字幕 | 国产中文字幕在线播放 | 涩涩在线看 | 欧美日韩精品一区二区三区四区 | 美女一二区 | 自拍欧美日韩 | 亚洲欧美国产视频 | 日本色视 | 国产午夜福利一区二区 | 色综合久久久久综合体桃花网 | 日韩av一区在线观看 | 天天干天天舔天天操 | 人成网站在线观看 | 超碰caoprom | 日本激情电影 | 国产精品免费一区二区三区都可以 | 日本a级网站 | 国产乱码精品一区二区三区精东 | 亚洲综合社区 | 精品一区二区三区无码按摩 | 国产伦精品一区二区三区视频黑人 | 在线不卡 | 日本在线观看视频网站 | jizz欧美大全 | www.色香蕉 | 色老头一区二区三区在线观看 | 亚洲伦理在线播放 | 精品成人一区二区 | 9i在线看片成人免费 | 台湾a级艳片潘金莲 | 亚洲天堂男人的天堂 | 99久久婷婷国产综合精品青牛牛 | 精品国产大片大片大片 | 亚洲视频图片小说 | 秋霞午夜鲁丝一区二区老狼 | 中文字幕日本一区二区 | 男人的天堂avav | 亚洲品质自拍视频 | 绿帽视频| 奇米第四色首页 | 欧洲女同同性吃奶 | 天天干天天做天天操 | 精品欧美乱码久久久久久 | 亚洲精品第三页 | www.污视频 | 视频在线观看一区二区三区 | 人妖粗暴刺激videos呻吟 | 亚洲精品乱码久久久久久按摩观 | 亚洲av无码不卡 | 美女视频三区 | 久久久视屏 | 成人av在线一区二区 | 鲁丝av| 精品婷婷色一区二区三区蜜桃 | 18深夜在线观看免费视频 | 国产对白在线 | 1769国产精品视频 | 午夜视频网站在线观看 | 中文字幕在线观看91 | 日本中文在线视频 | 日韩制服在线 | 久久午夜夜伦鲁鲁一区二区 | 日韩国产在线播放 | 黄色三级小视频 | 国产农村妇女aaaaa视频 | av在线不卡一区 | 一级片观看 | 日韩中文无 | 伊人网成人 | 麻豆蜜桃视频 |