日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

Flink系列之:基于scala语言实现flink实时消费Kafka Topic中的数据

發布時間:2024/1/1 编程问答 31 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Flink系列之:基于scala语言实现flink实时消费Kafka Topic中的数据 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

Flink系列之:基于scala語言實現flink實時消費Kafka Topic中的數據

  • 一、引入flink相關依賴
  • 二、properties保存連接kafka的配置
  • 三、構建flink實時消費環境
  • 四、添加Kafka源和處理數據
  • 五、完整代碼
  • 六、執行程序查看消費到的數據

一、引入flink相關依賴

<groupId>com.bigdata</groupId><artifactId>flink</artifactId><version>1.0-SNAPSHOT</version><properties><maven.compiler.source>11</maven.compiler.source><maven.compiler.target>11</maven.compiler.target><flink.version>1.13.1</flink.version><scala.binary.version>2.12</scala.binary.version></properties><dependencies><!-- https://mvnrepository.com/artifact/org.apache.flink/flink-scala --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-scala_${scala.binary.version}</artifactId><version>${flink.version}</version></dependency><!-- https://mvnrepository.com/artifact/org.apache.flink/flink-streaming-scala --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-scala_${scala.binary.version}</artifactId><version>${flink.version}</version><!-- <scope>provided</scope>--></dependency><!-- https://mvnrepository.com/artifact/org.apache.flink/flink-clients --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients_${scala.binary.version}</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-kafka_${scala.binary.version}</artifactId><version>${flink.version}</version></dependency></dependencies>

二、properties保存連接kafka的配置

//用properties保存kafka連接的相關配置val properties = new Properties()properties.setProperty("bootstrap.servers","10.129.44.26:9092,10.129.44.32:9092,10.129.44.39:9092")properties.setProperty("sasl.jaas.config","org.apache.kafka.common.security.plain.PlainLoginModule required username=\"debezium\" password=\"swlfalfal\";")properties.setProperty("security.protocol","SASL_PLAINTEXT")properties.setProperty("sasl.mechanism", "PLAIN")properties.setProperty("group.id","flink-test")properties.setProperty("auto.offset.reset","earliest")

三、構建flink實時消費環境

val env = StreamExecutionEnvironment.getExecutionEnvironmentenv.setParallelism(1)env.setRestartStrategy(RestartStrategies.noRestart())

四、添加Kafka源和處理數據

val lines: DataStream[String] = env.addSource(new FlinkKafkaConsumer[String]("debezium-test-optics_uds",new SimpleStringSchema(),properties))lines.print()//觸發執行env.execute()

五、完整代碼

import org.apache.flink.api.common.restartstrategy.RestartStrategies import org.apache.flink.api.common.serialization.SimpleStringSchema import org.apache.flink.streaming.api.scala._ import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerimport java.util.Propertiesobject SourceKafka {def main(args: Array[String]): Unit = {val env = StreamExecutionEnvironment.getExecutionEnvironmentenv.setParallelism(1)env.setRestartStrategy(RestartStrategies.noRestart())//用properties保存kafka連接的相關配置val properties = new Properties()properties.setProperty("bootstrap.servers","10.129.44.26:9092,10.129.44.32:9092,10.129.44.39:9092")properties.setProperty("sasl.jaas.config","org.apache.kafka.common.security.plain.PlainLoginModule required username=\"debezium\" password=\"******\";")properties.setProperty("security.protocol","SASL_PLAINTEXT")properties.setProperty("sasl.mechanism", "PLAIN")properties.setProperty("group.id","flink-test")properties.setProperty("auto.offset.reset","earliest")//添加kafka源,并打印數據val lines: DataStream[String] = env.addSource(new FlinkKafkaConsumer[String]("debezium-test-optics_uds",new SimpleStringSchema(),properties))lines.print()//觸發執行env.execute()}}

六、執行程序查看消費到的數據

{"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"sid"},{"type":"string","optional":false,"field":"sname"},{"type":"int64","optional":false,"name":"io.debezium.time.Timestamp","version":1,"field":"updatetime"},{"type":"string","optional":false,"field":"ssex"}],"optional":true,"name":"debezium_test_optics_uds.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"field":"sid"},{"type":"string","optional":false,"field":"sname"},{"type":"int64","optional":false,"name":"io.debezium.time.Timestamp","version":1,"field":"updatetime"},{"type":"string","optional":false,"field":"ssex"}],"optional":true,"name":"debezium_test_optics_uds.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false,incremental"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":true,"field":"sequence"},{"type":"string","optional":true,"field":"table"},{"type":"int64","optional":false,"field":"server_id"},{"type":"string","optional":true,"field":"gtid"},{"type":"string","optional":false,"field":"file"},{"type":"int64","optional":false,"field":"pos"},{"type":"int32","optional":false,"field":"row"},{"type":"int64","optional":true,"field":"thread"},{"type":"string","optional":true,"field":"query"}],"optional":false,"name":"io.debezium.connector.mysql.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"int64","optional":false,"field":"total_order"},{"type":"int64","optional":false,"field":"data_collection_order"}],"optional":true,"field":"transaction"}],"optional":false,"name":"debezium_test_optics_uds.Envelope"},"payload":{"before":null,"after":{"sid":3600,"sname":"f","updatetime":1661126400000,"ssex":"a"},"source":{"version":"1.9.6.Final","connector":"mysql","name":"debezium-uds8-optics8-test_1h","ts_ms":1665155935000,"snapshot":"false","db":"dw","sequence":null,"table":"student","server_id":223344,"gtid":null,"file":"mysql-bin.000012","pos":6193972,"row":0,"thread":66072,"query":"/* ApplicationName=DBeaver 21.0.1 - SQLEditor <Script-3.sql> */ insert into dw.student values(3600,'f','20220822','a')"},"op":"c","ts_ms":1665155935640,"transaction":null} }

總結

以上是生活随笔為你收集整理的Flink系列之:基于scala语言实现flink实时消费Kafka Topic中的数据的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。