日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

springboot和kafka集成

發布時間:2025/6/15 编程问答 20 豆豆
生活随笔 收集整理的這篇文章主要介紹了 springboot和kafka集成 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

2019獨角獸企業重金招聘Python工程師標準>>>

1.pom.xml文件

<dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId><version>1.2.2.RELEASE</version> </dependency>

2.發送方的配置

package com.test.frame.kafka.configuration;import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.common.serialization.StringSerializer; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.kafka.annotation.EnableKafka; import org.springframework.kafka.core.DefaultKafkaProducerFactory; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.core.ProducerFactory; import org.springframework.kafka.support.converter.StringJsonMessageConverter;import java.util.HashMap; import java.util.Map;/*** created by guanguan on 2017/9/6**/ @Configuration @EnableKafka public class KafkaProducerConfiguration {@Value("${kafka.bootstrap_servers}")private String bootstrapServers;@Beanpublic Map<String, Object> producerConfigs() {Map<String, Object> props = new HashMap<>();props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);return props;}@Beanpublic ProducerFactory<String,String> producerFactory(){return new DefaultKafkaProducerFactory<String,String>(producerConfigs());}@Beanpublic KafkaTemplate<String,?> kafkaTemplate(){KafkaTemplate<String, String> template = new KafkaTemplate<>(producerFactory());//設置后可以傳送實體template.setMessageConverter(new StringJsonMessageConverter());return template; }@Beanpublic Producer producer(){return new Producer();}}

3.消費者方的配置

package com.test.frame.kafka.configuration;import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.common.serialization.StringDeserializer; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.kafka.annotation.EnableKafka; import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; import org.springframework.kafka.core.ConsumerFactory; import org.springframework.kafka.core.DefaultKafkaConsumerFactory; import org.springframework.kafka.support.converter.StringJsonMessageConverter;import java.util.HashMap; import java.util.Map;/*** created by guanguan on 2017/9/6**/ @Configuration @EnableKafka public class KafkaConsumerConfiguration {@Value("${kafka.bootstrap_servers}")private String bootstrapServers;@Beanpublic Map<String, Object> consumerConfigs() {Map<String, Object> props1 = new HashMap<>();props1.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);props1.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);props1.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);props1.put(ConsumerConfig.GROUP_ID_CONFIG, "jd-group"); //統一在一個組內return props1;}@Beanpublic ConsumerFactory<String, String> consumerFactory() {return new DefaultKafkaConsumerFactory<String, String>(consumerConfigs());}@Beanpublic ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();factory.setConsumerFactory(consumerFactory());factory.setMessageConverter(new StringJsonMessageConverter());return factory;}}

4.發送

package com.test.frame.kafka.configuration;import com.test.frame.kafka.model.User; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.support.KafkaHeaders; import org.springframework.messaging.support.MessageBuilder;/*** created by guanguan on 2017/9/6**/public class Producer {private static final Logger logger = LoggerFactory.getLogger(Producer.class);@Value("${kafka.topic}")private String topic;@Autowiredprivate KafkaTemplate<String, String> kafkaTemplate;public void sendMessage(User payload) {kafkaTemplate.send(MessageBuilder.withPayload(payload).setHeader(KafkaHeaders.TOPIC, topic).build());logger.info("send message=> "+payload.toString());}}

5.接收

package com.test.frame.kafka.configuration;import com.test.frame.kafka.model.User; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.stereotype.Component;/*** created by guanguan on 2017/9/6**/ @Component public class Consumer {private static final Logger logger = LoggerFactory.getLogger(Consumer.class);@KafkaListener(topics = "${kafka.topic}")public void recvMessage(User user) {logger.info("recv msg:=> " + user.toString());} }

6.測試:

package com.test.frame.kafka.controller;import com.test.frame.kafka.configuration.Producer; import com.test.frame.kafka.model.User; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RequestParam; import org.springframework.web.bind.annotation.RestController;@RestController @RequestMapping("/") public class ApiController {@AutowiredProducer producer;@RequestMapping("/")public String testkafka(@RequestParam String test) {User user = new User();user.setName("hh");producer.sendMessage(user);return "send kafak ok!";}}

表明已經接收成功。

application.yml文件

kafka:bootstrap_servers: localhost:9092topic: test-topic

?

轉載于:https://my.oschina.net/u/2263272/blog/1530299

總結

以上是生活随笔為你收集整理的springboot和kafka集成的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。

主站蜘蛛池模板: 一区二区国产视频 | 天天综合网在线观看 | 日韩污视频在线观看 | 中文字幕在线观看一区二区三区 | 成人在线播放av | 能免费看18视频网站 | 日韩毛片中文字幕 | 久久亚洲一区二区三区四区五区 | 亚洲av无码一区二区二三区 | 日韩在线专区 | 国产一区二区小视频 | 久久伊人网站 | 国产精品久久久久久久一区二区 | 男女草比视频 | 欲色网站 | 97蜜桃网| 日本伦理一区二区三区 | 亚洲黄v| 另类欧美日韩 | 国产女人毛片 | 三级黄毛片 | 在线免费h| 精品国产中文字幕 | 韩国黄色视屏 | 国产精品成人国产乱一区 | h视频免费在线观看 | 久久成人人人人精品欧 | 热久久最新 | 一区二区三区黄色片 | 欧美婷婷| 人人妻人人爽人人澡人人精品 | 日本亲与子乱xxx | 3p在线播放 | 狠狠久久久 | 日韩图色 | 在线一区二区三区四区五区 | 欧美成人免费播放 | 黄色国产小视频 | 亚洲一区二区视频在线观看 | 欧美日韩黄色 | 国产欧美久久一区二区三区 | 无码人妻丰满熟妇啪啪网站 | 日本成人激情视频 | www免费黄色 | 调教丰满的已婚少妇在线观看 | 中文字幕日韩精品无码内射 | 成年人一级黄色片 | 麻豆va | 97成人人妻一区二区三区 | 精品美女视频 | 色悠悠网址 | 欧美图片第一页 | 亚洲中文字幕无码av永久 | 久久久久久久久久久久国产 | 大肉大捧一进一出好爽视频动漫 | 男人天堂网在线视频 | 男人的天堂视频 | 999热精品 | 性开放耄耋老妇hd | 国产精品操 | 国产视频精品久久 | 日本一区二区三区成人 | 91精品国产麻豆 | 妞干网这里只有精品 | 欧美bbbbbbbbbbbb18av | 亚洲国产精品女人 | 91久久精品日日躁夜夜躁欧美 | 精品国产综合区久久久久久 | 日韩激情四射 | 亚洲国产看片 | 日韩有码在线观看 | 久久久久这里只有精品 | 欧美做爰爽爽爽爽爽爽 | 日韩三级一区二区三区 | 天天射夜夜| 美女三级视频 | 亚洲天堂网一区 | 精品久久久久久久久久久久久久久久久久 | 国产福利视频一区二区三区 | 欧美精品一区二区三区在线 | 精品在线一区 | n0659极腔濑亚美莉在线播放播放 | 天堂在线视频免费观看 | 先锋影音资源av | 日本美女在线 | 亚洲区在线播放 | 中文字幕一级片 | www日韩| 日韩在线激情 | 粉嫩av网| 国产女人18毛片18精品 | 国产又黄视频 | 一道本久在线中文字幕 | 另类欧美亚洲 | 欧美91精品久久久久国产性生爱 | 亚洲av无码成人精品国产 | 澳门色网 | 天天插天天狠 | 6080电视影片在线观看 |