日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 运维知识 > windows >内容正文

windows

mac系统下使用flink消费docker运行的kafka

發(fā)布時間:2024/3/26 windows 43 豆豆
生活随笔 收集整理的這篇文章主要介紹了 mac系统下使用flink消费docker运行的kafka 小編覺得挺不錯的,現(xiàn)在分享給大家,幫大家做個參考.

版本

flink 1.12.0
scala 2.11
java 1.8
kafka 2.0.2

首先使用maven創(chuàng)建一個新的工程

mvn archetype:generate -DarchetypeGroupId=org.apache.flink -DarchetypeArtifactId=flink-quickstart-java -DarchetypeVersion=1.12.0 -DgroupId=learn.flink -DartifactId=flink-java -Dversion=1.0 -Dpackage=com.flink -DinteractiveMode=false

創(chuàng)建完之后打開kafka的pom注釋

之后就是flink程序:

import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;import java.util.Properties;public class streamkafka {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());//設置Kafka相關(guān)參數(shù)Properties properties = new Properties();//設置Kafka的地址和端口properties.setProperty("bootstrap.servers", "kafka:9090");//讀取偏移量策略:如果沒有記錄偏移量,就從頭讀,如果記錄過偏移量,就接著讀properties.setProperty("auto.offset.reset", "earliest");//設置消費者組IDproperties.setProperty("group.id", "g3");//沒有開啟checkpoint,讓flink提交偏移量的消費者定期自動提交偏移量properties.setProperty("enable.auto.commit", "true");//創(chuàng)建FlinkKafkaConsumer并傳入相關(guān)參數(shù)FlinkKafkaConsumer<String> kafkaConsumer = new FlinkKafkaConsumer<>("logsTopic", //要讀取數(shù)據(jù)的Topic名稱new SimpleStringSchema(), //讀取文件的反序列化Schemaproperties //傳入Kafka的參數(shù));//使用addSource添加kafkaConsumerDataStreamSource<String> lines = env.addSource(kafkaConsumer);lines.print();env.execute();} }

下面開始準備docker啟動kafka。
安裝docker不多說了,下面就是使用docker compose生成 zookeeper和kafka的運行容器:

version: "3.3" services:zookeeper:image: zookeeper:3.5.5restart: alwayscontainer_name: zookeeperports:- "2181:2181"expose:- "2181"environment:- ZOO_MY_ID=1kafka:image: wurstmeister/kafka:2.11-2.0.1restart: alwayscontainer_name: kafkaenvironment:- KAFKA_BROKER_ID=1- KAFKA_LISTENERS=PLAINTEXT://kafka:9090- KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181- KAFKA_MESSAGE_MAX_BYTES=2000000ports:- "9090:9090"depends_on:- zookeeper docker-compose -f docker-compose-kafka.yml up -d


運行成功后啟動創(chuàng)建kafka。

注意 :使用docker需要再hosts配置宿主機的映射

使用docker創(chuàng)建kafka topic

docker run -it --rm --network host wurstmeister/kafka:2.11-2.0.1 \bash /opt/kafka/bin/kafka-topics.sh \--zookeeper kafka:2181 \--create --topic flink --partitions 1 --replication-factor 1


創(chuàng)建成功后,啟動kafka生產(chǎn)者生產(chǎn)數(shù)據(jù):

docker run -it --rm --network host wurstmeister/kafka:2.11-2.0.1 \bash /opt/kafka/bin/kafka-console-producer.sh \--broker-list kafka:9090 --topic flink


回車后退出,在使用消費者測試數(shù)據(jù)是否可以消費:

docker run -it --rm --network host wurstmeister/kafka:2.11-2.0.1 \bash /opt/kafka/bin/kafka-console-consumer.sh \--bootstrap-server kafka:9090 --topic flink --from-beginning


可以看到消費者正常消費,后面就是啟動flink程序;

可以看到flink消費到了剛剛kafka生產(chǎn)者中生產(chǎn)的數(shù)據(jù),這一波串聯(lián)成功。

總結(jié)

以上是生活随笔為你收集整理的mac系统下使用flink消费docker运行的kafka的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網(wǎng)站內(nèi)容還不錯,歡迎將生活随笔推薦給好友。