日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問(wèn) 生活随笔!

生活随笔

當(dāng)前位置: 首頁(yè) > 编程资源 > 编程问答 >内容正文

编程问答

springboot集成kafka及kafka web UI的使用

發(fā)布時(shí)間:2025/1/21 编程问答 26 豆豆
生活随笔 收集整理的這篇文章主要介紹了 springboot集成kafka及kafka web UI的使用 小編覺(jué)得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.

springboot集成kafka

application.properties

spring.kafka.bootstrap-servers=CentOSA:9092,CentOSB:9092,CentOSC:9092spring.kafka.producer.retries=5 spring.kafka.producer.acks=all spring.kafka.producer.batch-size=16384 spring.kafka.producer.buffer-memory=33554432 spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer spring.kafka.producer.properties.enable.idempotence=true spring.kafka.producer.transaction-id-prefix=transaction-id-spring.kafka.consumer.group-id=group1 spring.kafka.consumer.auto-offset-reset=earliest spring.kafka.consumer.enable-auto-commit=true spring.kafka.consumer.auto-commit-interval=100 spring.kafka.consumer.properties.isolation.level=read_committed spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializerspring.kafka.streams.application-id=wordcount_id spring.kafka.streams.client-id=app1 spring.kafka.streams.auto-startup=true spring.kafka.streams.state-dir=/Users/admin/Desktop/checkpoint spring.kafka.streams.replication-factor=1 spring.kafka.streams.properties.processing.guarantee=exactly_once

pom.xml

<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>com.zhangxueliang</groupId><artifactId>springboot-kafka</artifactId><version>1.0-SNAPSHOT</version><parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>2.1.5.RELEASE</version></parent><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter</artifactId></dependency><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId></dependency><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-streams</artifactId><version>2.0.1</version></dependency><!--測(cè)試--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency></dependencies><build><plugins><plugin><groupId>org.springframework.boot</groupId><artifactId>spring-boot-maven-plugin</artifactId></plugin></plugins></build> </project>

@KafkaListener消費(fèi)消息

@SendTo轉(zhuǎn)發(fā)消息

kafka web UI創(chuàng)建topic


KSQL的使用

mock發(fā)送消息


此時(shí)消息就被處理后(加了個(gè)后綴)發(fā)到了topic03中:

使用KafkaTemplate發(fā)送消息


數(shù)據(jù)是發(fā)往topic02,但是進(jìn)行了轉(zhuǎn)發(fā),topic03會(huì)收到加了后綴的消息數(shù)據(jù):

開啟事務(wù)


開啟事務(wù)后發(fā)送消息有兩種編碼方式:

  • 使用executeInTransaction方法


此時(shí)消息就被轉(zhuǎn)發(fā)到了topic03:

  • 所在類加@Transactional注解


測(cè)試:

查看轉(zhuǎn)發(fā)的消息:

總結(jié)

以上是生活随笔為你收集整理的springboot集成kafka及kafka web UI的使用的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。

如果覺(jué)得生活随笔網(wǎng)站內(nèi)容還不錯(cuò),歡迎將生活随笔推薦給好友。