Flink系列之:基于scala语言实现flink实时消费Kafka Topic中的数据
生活随笔
收集整理的這篇文章主要介紹了
Flink系列之:基于scala语言实现flink实时消费Kafka Topic中的数据
小編覺得挺不錯的,現在分享給大家,幫大家做個參考.
Flink系列之:基于scala語言實現flink實時消費Kafka Topic中的數據
- 一、引入flink相關依賴
- 二、properties保存連接kafka的配置
- 三、構建flink實時消費環境
- 四、添加Kafka源和處理數據
- 五、完整代碼
- 六、執行程序查看消費到的數據
一、引入flink相關依賴
<groupId>com.bigdata</groupId><artifactId>flink</artifactId><version>1.0-SNAPSHOT</version><properties><maven.compiler.source>11</maven.compiler.source><maven.compiler.target>11</maven.compiler.target><flink.version>1.13.1</flink.version><scala.binary.version>2.12</scala.binary.version></properties><dependencies><!-- https://mvnrepository.com/artifact/org.apache.flink/flink-scala --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-scala_${scala.binary.version}</artifactId><version>${flink.version}</version></dependency><!-- https://mvnrepository.com/artifact/org.apache.flink/flink-streaming-scala --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-scala_${scala.binary.version}</artifactId><version>${flink.version}</version><!-- <scope>provided</scope>--></dependency><!-- https://mvnrepository.com/artifact/org.apache.flink/flink-clients --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients_${scala.binary.version}</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-kafka_${scala.binary.version}</artifactId><version>${flink.version}</version></dependency></dependencies>二、properties保存連接kafka的配置
//用properties保存kafka連接的相關配置val properties = new Properties()properties.setProperty("bootstrap.servers","10.129.44.26:9092,10.129.44.32:9092,10.129.44.39:9092")properties.setProperty("sasl.jaas.config","org.apache.kafka.common.security.plain.PlainLoginModule required username=\"debezium\" password=\"swlfalfal\";")properties.setProperty("security.protocol","SASL_PLAINTEXT")properties.setProperty("sasl.mechanism", "PLAIN")properties.setProperty("group.id","flink-test")properties.setProperty("auto.offset.reset","earliest")三、構建flink實時消費環境
val env = StreamExecutionEnvironment.getExecutionEnvironmentenv.setParallelism(1)env.setRestartStrategy(RestartStrategies.noRestart())四、添加Kafka源和處理數據
val lines: DataStream[String] = env.addSource(new FlinkKafkaConsumer[String]("debezium-test-optics_uds",new SimpleStringSchema(),properties))lines.print()//觸發執行env.execute()五、完整代碼
import org.apache.flink.api.common.restartstrategy.RestartStrategies import org.apache.flink.api.common.serialization.SimpleStringSchema import org.apache.flink.streaming.api.scala._ import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerimport java.util.Propertiesobject SourceKafka {def main(args: Array[String]): Unit = {val env = StreamExecutionEnvironment.getExecutionEnvironmentenv.setParallelism(1)env.setRestartStrategy(RestartStrategies.noRestart())//用properties保存kafka連接的相關配置val properties = new Properties()properties.setProperty("bootstrap.servers","10.129.44.26:9092,10.129.44.32:9092,10.129.44.39:9092")properties.setProperty("sasl.jaas.config","org.apache.kafka.common.security.plain.PlainLoginModule required username=\"debezium\" password=\"******\";")properties.setProperty("security.protocol","SASL_PLAINTEXT")properties.setProperty("sasl.mechanism", "PLAIN")properties.setProperty("group.id","flink-test")properties.setProperty("auto.offset.reset","earliest")//添加kafka源,并打印數據val lines: DataStream[String] = env.addSource(new FlinkKafkaConsumer[String]("debezium-test-optics_uds",new SimpleStringSchema(),properties))lines.print()//觸發執行env.execute()}}六、執行程序查看消費到的數據
{"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"sid"},{"type":"string","optional":false,"field":"sname"},{"type":"int64","optional":false,"name":"io.debezium.time.Timestamp","version":1,"field":"updatetime"},{"type":"string","optional":false,"field":"ssex"}],"optional":true,"name":"debezium_test_optics_uds.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"field":"sid"},{"type":"string","optional":false,"field":"sname"},{"type":"int64","optional":false,"name":"io.debezium.time.Timestamp","version":1,"field":"updatetime"},{"type":"string","optional":false,"field":"ssex"}],"optional":true,"name":"debezium_test_optics_uds.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false,incremental"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":true,"field":"sequence"},{"type":"string","optional":true,"field":"table"},{"type":"int64","optional":false,"field":"server_id"},{"type":"string","optional":true,"field":"gtid"},{"type":"string","optional":false,"field":"file"},{"type":"int64","optional":false,"field":"pos"},{"type":"int32","optional":false,"field":"row"},{"type":"int64","optional":true,"field":"thread"},{"type":"string","optional":true,"field":"query"}],"optional":false,"name":"io.debezium.connector.mysql.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"int64","optional":false,"field":"total_order"},{"type":"int64","optional":false,"field":"data_collection_order"}],"optional":true,"field":"transaction"}],"optional":false,"name":"debezium_test_optics_uds.Envelope"},"payload":{"before":null,"after":{"sid":3600,"sname":"f","updatetime":1661126400000,"ssex":"a"},"source":{"version":"1.9.6.Final","connector":"mysql","name":"debezium-uds8-optics8-test_1h","ts_ms":1665155935000,"snapshot":"false","db":"dw","sequence":null,"table":"student","server_id":223344,"gtid":null,"file":"mysql-bin.000012","pos":6193972,"row":0,"thread":66072,"query":"/* ApplicationName=DBeaver 21.0.1 - SQLEditor <Script-3.sql> */ insert into dw.student values(3600,'f','20220822','a')"},"op":"c","ts_ms":1665155935640,"transaction":null} }總結
以上是生活随笔為你收集整理的Flink系列之:基于scala语言实现flink实时消费Kafka Topic中的数据的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 专利与论文-3:专利的三大原则是什么?不
- 下一篇: 考研刷题神器