5.Flink对接Kafka入门
Flink Connector Kafka
- 1. Kafka
- 1.1. [Kafka官網(wǎng)](http://kafka.apache.org/)
- 1.2. Kafka 簡述
- 1.3. Kafka特性
- 1.4. kafka的應(yīng)用場景
- 1.5. kafka-manager的部署
- 1.6. `使用Kafka Connect導(dǎo)入/導(dǎo)出數(shù)據(jù)`
- 1.7. [Kafka日志存儲(chǔ)原理](https://blog.csdn.net/shujuelin/article/details/80898624)
- 2. Kafka與Flink的融合
- 2.1. kafka連接flink流計(jì)算,實(shí)現(xiàn)flink消費(fèi)kafka的數(shù)據(jù)
- 2.2. flink 讀取kafka并且自定義水印再將數(shù)據(jù)寫入kafka中
- 3. Airbnb 是如何通過 balanced Kafka reader 來擴(kuò)展 Spark streaming 實(shí)時(shí)流處理能力的
- 4. 寄語:海闊憑魚躍,天高任鳥飛
1. Kafka
1.1. Kafka官網(wǎng)
1.2. Kafka 簡述
- Kafka 是一個(gè)分布式消息系統(tǒng):具有生產(chǎn)者、消費(fèi)者的功能。它提供了類似于JMS 的特性,但是在設(shè)計(jì)實(shí)現(xiàn)上完全不同,此外它并不是JMS 規(guī)范的實(shí)現(xiàn)。
1.3. Kafka特性
- 消息持久化:基于文件系統(tǒng)來存儲(chǔ)和緩存消息
- 高吞吐量
- 多客戶端支持:核心模塊用Scala 語言開發(fā),Kafka 提供了多種開發(fā)語言的接入,如Java 、Scala、C 、C++、Python 、Go 、Erlang 、Ruby 、Node. 等
- 安全機(jī)制
- 通過SSL 和SASL(Kerberos), SASL/PLA時(shí)驗(yàn)證機(jī)制支持生產(chǎn)者、消費(fèi)者與broker連接時(shí)的身份認(rèn)證;
- 支持代理與ZooKeeper 連接身份驗(yàn)證
- 通信時(shí)數(shù)據(jù)加密
- 客戶端讀、寫權(quán)限認(rèn)證
- Kafka 支持與外部其他認(rèn)證授權(quán)服務(wù)的集成
- 數(shù)據(jù)備份
- 輕量級(jí)
- 消息壓縮
1.4. kafka的應(yīng)用場景
- Kafka作為消息傳遞系統(tǒng)
- Kafka 作為存儲(chǔ)系統(tǒng)
- Kafka用做流處理
- 消息,存儲(chǔ),流處理結(jié)合起來使用
1.5. kafka-manager的部署
Kafka Manager 由 yahoo 公司開發(fā),該工具可以方便查看集群 主題分布情況,同時(shí)支持對(duì) 多個(gè)集群的管理、分區(qū)平衡以及創(chuàng)建主題等操作。
-
Centos7安裝kafka-manager
-
啟動(dòng)腳本
- bin/cmak -Dconfig.file=conf/application.conf -java-home /usr/lib/jdk-11.0.6 -Dhttp.port=9008 &
-
界面效果
-
注意
1.6. 使用Kafka Connect導(dǎo)入/導(dǎo)出數(shù)據(jù)
- 替代Flume——Kafka Connect
- 集群模式
- 注意: 在集群模式下,配置并不會(huì)在命令行傳進(jìn)去,而是需要REST API來創(chuàng)建,修改和銷毀連接器。
- 通過一個(gè)示例了解kafka connect連接器
- kafka connect簡介以及部署
1.7. Kafka日志存儲(chǔ)原理
Kafka的Message存儲(chǔ)采用了分區(qū)(partition),分段(LogSegment)和稀疏索引這幾個(gè)手段來達(dá)到了高效性
-
查看分區(qū).index文件
bin/kafka-run-class.sh kafka.tools.DumpLogSegments --files kafka-logs/t2-2/00000000000000000000.index -
查看log文件
/bin/kafka-run-class.sh kafka.tools.DumpLogSegments --files t1-1/00000000000000000000.log --print-data-log -
查看TimeIndex文件
bin/kafka-run-class.sh kafka.tools.DumpLogSegments --files t1-2/00000000000000000000.timeindex --verify-index-only -
引入時(shí)間戳的作用
2. Kafka與Flink的融合
Flink 提供了專門的 Kafka 連接器,向 Kafka topic 中讀取或者寫入數(shù)據(jù)。Flink Kafka Consumer 集成了 Flink 的 Checkpoint 機(jī)制,可提供 exactly-once 的處理語義。為此,Flink 并不完全依賴于跟蹤 Kafka 消費(fèi)組的偏移量,而是在內(nèi)部跟蹤和檢查偏移量。
2.1. kafka連接flink流計(jì)算,實(shí)現(xiàn)flink消費(fèi)kafka的數(shù)據(jù)
-
創(chuàng)建flink項(xiàng)目
sbt new tillrohrmann/flink-project.g8 -
配置sbt
ThisBuild / resolvers ++= Seq("Apache Development Snapshot Repository" at "https://repository.apache.org/content/repositories/snapshots/",Resolver.mavenLocal )name := "FlinkKafkaProject"version := "1.0"organization := "com.xiaofan"ThisBuild / scalaVersion := "2.12.6"val flinkVersion = "1.10.0" val kafkaVersion = "2.2.0"val flinkDependencies = Seq("org.apache.flink" %% "flink-scala" % flinkVersion % "provided","org.apache.kafka" %% "kafka" % kafkaVersion % "provided","org.apache.flink" %% "flink-connector-kafka" % flinkVersion,"org.apache.flink" %% "flink-streaming-scala" % flinkVersion % "provided")lazy val root = (project in file(".")).settings(libraryDependencies ++= flinkDependencies)assembly / mainClass := Some("com.xiaofan.Job")// make run command include the provided dependencies Compile / run := Defaults.runTask(Compile / fullClasspath,Compile / run / mainClass,Compile / run / runner).evaluated// stays inside the sbt console when we press "ctrl-c" while a Flink programme executes with "run" or "runMain" Compile / run / fork := true Global / cancelable := true// exclude Scala library from assembly assembly / assemblyOption := (assembly / assemblyOption).value.copy(includeScala = false) -
源代碼
package com.xiaofanimport java.util.Propertiesimport org.apache.flink.api.common.serialization.SimpleStringSchema import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment, _} import org.apache.flink.streaming.api.{CheckpointingMode, TimeCharacteristic} import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer/*** 用flink消費(fèi)kafka** @author xiaofan*/ object ReadingFromKafka {val ZOOKEEPER_HOST = "192.168.1.23:2181,192.168.1.24:2181,192.168.1.25:2181"val KAFKA_BROKER = "192.168.1.23:9091,192.168.1.24:9091,192.168.1.25:9091"val TRANSACTION_GROUP = "com.xiaofan.flink"def main(args: Array[String]): Unit = {val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironmentenv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)env.enableCheckpointing(1000)env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)// configure kafka consumerval kafkaProps = new Properties()kafkaProps.setProperty("zookeeper.connect", ZOOKEEPER_HOST)kafkaProps.setProperty("bootstrap.servers", KAFKA_BROKER)kafkaProps.setProperty("group.id", TRANSACTION_GROUP)val transaction: DataStream[String] = env.addSource(new FlinkKafkaConsumer[String]("xiaofan01", new SimpleStringSchema(), kafkaProps))transaction.printenv.execute()} } -
啟動(dòng)kafka集群,運(yùn)行結(jié)果
2.2. flink 讀取kafka并且自定義水印再將數(shù)據(jù)寫入kafka中
-
需求說明(自定義窗口,每分鐘的詞頻統(tǒng)計(jì))
- 從kafka中讀取數(shù)據(jù)(topic:t1)
- kafka中有event time時(shí)間值,通過該時(shí)間戳來進(jìn)行時(shí)間劃分,窗口長度為10秒,窗口步長為5秒
- 由于生產(chǎn)中可能會(huì)因?yàn)榫W(wǎng)絡(luò)或者其他原因?qū)е聰?shù)據(jù)延時(shí),比如 00:00:10 時(shí)間的數(shù)據(jù)可能 00:00:12 才會(huì)傳入kafka中,所以在flink的處理中應(yīng)該設(shè)置延時(shí)等待處理,這里設(shè)置的2秒,可以自行修改。
- 結(jié)果數(shù)據(jù)寫入kafka中(topic:t2)(數(shù)據(jù)格式 time:時(shí)間 count:每分鐘的處理?xiàng)l數(shù))
-
準(zhǔn)備環(huán)境flink1.10.0 + kafka2.2.0
-
創(chuàng)建topic
bin/kafka-topics.sh --create --bootstrap-server 192.168.1.25:9091 --replication-factor 2 --partitions 3 --topic t1 bin/kafka-topics.sh --create --bootstrap-server 192.168.1.25:9091 --replication-factor 2 --partitions 3 --topic t2 -
向t1中生產(chǎn)數(shù)據(jù)
package com.xiaofanimport java.util.Propertiesimport org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}object ProduceData {def main(args: Array[String]): Unit = {val props = new Properties()props.put("bootstrap.servers", "192.168.1.25:9091")props.put("acks", "1")props.put("retries", "3")props.put("batch.size", "16384") // 16Kprops.put("linger.ms", "1")props.put("buffer.memory", "33554432") // 32Mprops.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")val producer = new KafkaProducer[String, String](props)var i = 0while (true) {i += 1// 模擬標(biāo)記事件時(shí)間val record = new ProducerRecord[String, String]("t1", i + "," + System.currentTimeMillis())// 只管發(fā)送消息,不管是否發(fā)送成功producer.send(record)Thread.sleep(300)}} } -
消費(fèi)t1數(shù)據(jù),處理后再次傳入kafka t2
package com.xiaofanimport java.text.SimpleDateFormat import java.util.{Date, Properties}import org.apache.flink.api.common.serialization.SimpleStringSchema import org.apache.flink.streaming.api.TimeCharacteristic import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment, _} import org.apache.flink.streaming.api.watermark.Watermark import org.apache.flink.streaming.api.windowing.time.Time import org.apache.flink.streaming.connectors.kafka.internals.KeyedSerializationSchemaWrapper import org.apache.flink.streaming.connectors.kafka.{FlinkKafkaConsumer, FlinkKafkaProducer}/*** Watermark 案例* 根據(jù)自定義水印定義時(shí)間,計(jì)算每秒的消息數(shù)并且寫入 kafka中*/ object StreamingWindowWatermarkScala {def main(args: Array[String]): Unit = {val env = StreamExecutionEnvironment.getExecutionEnvironmentenv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)env.setParallelism(1)val topic = "t1"val prop = new Properties()prop.setProperty("bootstrap.servers","192.168.1.25:9091")prop.setProperty("group.id","con1")val myConsumer = new FlinkKafkaConsumer[String](topic,new SimpleStringSchema(),prop)// 添加源val text = env.addSource(myConsumer)val inputMap = text.map(line=>{val arr = line.split(",")(arr(0),arr(1).trim.toLong)})// 添加水印val waterMarkStream = inputMap.assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks[(String, Long)] {var currentMaxTimestamp = 0Lvar maxOutOfOrderness = 3000L// 最大允許的亂序時(shí)間是10sval sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");override def getCurrentWatermark = new Watermark(currentMaxTimestamp - maxOutOfOrderness)override def extractTimestamp(element: (String, Long), previousElementTimestamp: Long) = {val timestamp = element._2currentMaxTimestamp = Math.max(timestamp, currentMaxTimestamp)val id = Thread.currentThread().getIdprintln("currentThreadId:"+id+",key:"+element._1+",eventtime:["+element._2+"|"+sdf.format(element._2)+"],currentMaxTimestamp:["+currentMaxTimestamp+"|"+ sdf.format(currentMaxTimestamp)+"],watermark:["+getCurrentWatermark().getTimestamp+"|"+sdf.format(getCurrentWatermark().getTimestamp)+"]")timestamp}})val window = waterMarkStream.map(x=>(x._2,1)).timeWindowAll(Time.seconds(1),Time.seconds(1)).sum(1).map(x=>"time:"+tranTimeToString(x._1.toString)+" count:"+x._2)// .window(TumblingEventTimeWindows.of(Time.seconds(3))) //按照消息的EventTime分配窗口,和調(diào)用TimeWindow效果一樣val topic2 = "t2"val props = new Properties()props.setProperty("bootstrap.servers","192.168.1.25:9091")//使用支持僅一次語義的形式val myProducer = new FlinkKafkaProducer[String](topic2,new KeyedSerializationSchemaWrapper[String](new SimpleStringSchema()), props, FlinkKafkaProducer.Semantic.EXACTLY_ONCE)window.addSink(myProducer)env.execute("StreamingWindowWatermarkScala")}def tranTimeToString(timestamp:String) :String={val fm = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")val time = fm.format(new Date(timestamp.toLong))time}} -
運(yùn)行效果
3. Airbnb 是如何通過 balanced Kafka reader 來擴(kuò)展 Spark streaming 實(shí)時(shí)流處理能力的
- 參考鏈接1
- 參考鏈接2
4. 寄語:海闊憑魚躍,天高任鳥飛
總結(jié)
以上是生活随笔為你收集整理的5.Flink对接Kafka入门的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 手机html端悬浮球,大屏手机绝配!一款
- 下一篇: Latex中插入.eps图片遇到的问题