日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當(dāng)前位置: 首頁 > 前端技术 > javascript >内容正文

javascript

Kafka笔记-Spring Boot消费者构造

發(fā)布時間:2025/3/15 javascript 19 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Kafka笔记-Spring Boot消费者构造 小編覺得挺不錯的,現(xiàn)在分享給大家,幫大家做個參考.

程序運行截圖如下:

生產(chǎn)者端

消費者打印:

那個HOW ARE YOU就是了!

這里關(guān)鍵是:

這個@KafkaListener注解,監(jiān)聽了數(shù)據(jù)。

相關(guān)的配置文件如下:

package com.kafkatest.kafkatest.config;import com.kafkatest.kafkatest.common.MessageEntity; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.common.serialization.StringDeserializer; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.kafka.annotation.EnableKafka; import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; import org.springframework.kafka.config.KafkaListenerContainerFactory; import org.springframework.kafka.core.ConsumerFactory; import org.springframework.kafka.core.DefaultKafkaConsumerFactory; import org.springframework.kafka.listener.ConcurrentMessageListenerContainer; import org.springframework.kafka.support.serializer.JsonDeserializer;import java.util.HashMap; import java.util.Map;@Configuration @EnableKafka public class KafkaConsumerConfig {@Value("${kafka.consumer.servers}")private String servers;@Value("${kafka.consumer.enable.auto.commit}")private boolean enableAutoCommit;@Value("${kafka.consumer.session.timeout}")private String sessionTimeout;@Value("${kafka.consumer.auto.commit.interval}")private String autoCommitInterval;@Value("${kafka.consumer.group.id}")private String groupId;@Value("${kafka.consumer.auto.offset.reset}")private String autoOffsetReset;@Value("${kafka.consumer.concurrency}")private int concurrency;@Beanpublic KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, MessageEntity>> kafkaListenerContainerFactory() {ConcurrentKafkaListenerContainerFactory<String, MessageEntity> factory = new ConcurrentKafkaListenerContainerFactory<>();factory.setConsumerFactory(consumerFactory());factory.setConcurrency(concurrency);factory.getContainerProperties().setPollTimeout(1500);return factory;}private ConsumerFactory<String, MessageEntity> consumerFactory() {return new DefaultKafkaConsumerFactory<>(consumerConfigs(),new StringDeserializer(),new JsonDeserializer<>(MessageEntity.class));}private Map<String, Object> consumerConfigs() {Map<String, Object> propsMap = new HashMap<>();propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, enableAutoCommit);propsMap.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, autoCommitInterval);propsMap.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, sessionTimeout);propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);propsMap.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);propsMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);return propsMap;} }

application.properties如下:

kafka.producer.servers=122.51.245.141:9092 kafka.producer.retries=0 kafka.producer.batch.size=4096 kafka.producer.linger=1 kafka.producer.buffer.memory=40960 kafka.topic.default=TESTkafka.consumer.zookeeper.connect=122.51.245.141:2181 kafka.consumer.servers=122.51.245.141:9092 kafka.consumer.enable.auto.commit=true kafka.consumer.session.timeout=6000 kafka.consumer.auto.commit.interval=100 kafka.consumer.auto.offset.reset=latest kafka.consumer.topic=TEST kafka.consumer.group.id=TEST kafka.consumer.concurrency=10

源碼下載地址:

https://github.com/fengfanchen/Java/tree/master/kafkatest_web_consumer%26producer

?

總結(jié)

以上是生活随笔為你收集整理的Kafka笔记-Spring Boot消费者构造的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網(wǎng)站內(nèi)容還不錯,歡迎將生活随笔推薦給好友。