當前位置:
首頁 >
前端技术
> javascript
>内容正文
javascript
SpringBoot快速集成kafka
生活随笔
收集整理的這篇文章主要介紹了
SpringBoot快速集成kafka
小編覺得挺不錯的,現(xiàn)在分享給大家,幫大家做個參考.
1.添加依賴
? ? ? ? <dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId><version>2.2.6.RELEASE</version></dependency>2.修改properties配置文件
# Kafka config spring.kafka.producer.bootstrap-servers=192.168.37.129:9092 spring.kafka.consumer.bootstrap-servers=192.168.37.129:9092 # 事務支持 # spring-kafka提供了spring.kafka.producer.transaction-id-prefix屬性開啟事務 # 僅需要配置事務前綴,并且在所有涉及到kafka操作及監(jiān)聽的方法上增加@Transcational注解。 spring.kafka.producer.transaction-id-prefix=kafka_tx.3.發(fā)送者
手動開啟事務
? ?@Autowiredprivate KafkaTemplate template;private static final String topic = "yfy"; ? ?@GetMapping("/send/{input}")public String sendToKafka(@PathVariable String input) {// 事務操作template.executeInTransaction(t -> {t.send(topic, input);if ("error".equals(input)) {throw new RuntimeException("input is error");}t.send(topic, input + " anthor");return true;});return "send success";}注解聲明式事務
? ?@GetMapping("/sendt/{input}")@Transactional(rollbackFor = RuntimeException.class)public String sendToKafka2(@PathVariable String input) {template.send(topic, input);if ("error".equals(input)) {throw new RuntimeException("input is error");}template.send(topic, input + " anthor");return "send success";}4.消費者
? ?/*** 接收消息*/@KafkaListener(id = "", topics = topic, groupId = "group.demo")public void listener(String input) {logger.info("input value:{}", input);}5.演示
當發(fā)送正常消息時,消費者接受到消息
2020-05-27 17:32:28.080 INFO 31200 --- [ntainer#0-0-C-1] com.heima.kafka.KafkaApplication ? ? ? ? : input value:nihao 2020-05-27 17:32:28.088 INFO 31200 --- [ntainer#0-0-C-1] com.heima.kafka.KafkaApplication ? ? ? ? : input value:nihao anthor當發(fā)送error時,發(fā)生異常回滾,消費者沒有接受到消息
總結(jié)
以上是生活随笔為你收集整理的SpringBoot快速集成kafka的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Kafka消费者详解
- 下一篇: 利用Spring的Aop实现项目的日志监