日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問(wèn) 生活随笔!

生活随笔

當(dāng)前位置: 首頁(yè) > 前端技术 > javascript >内容正文

javascript

(转) SpringBoot接入两套kafka集群

發(fā)布時(shí)間:2023/12/3 javascript 24 豆豆
生活随笔 收集整理的這篇文章主要介紹了 (转) SpringBoot接入两套kafka集群 小編覺(jué)得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.

轉(zhuǎn)自:

SpringBoot接入兩套kafka集群 - 風(fēng)小雅 - 博客園引入依賴 compile 'org.springframework.kafka:spring-kafka' 第一套kafka配置 package myapp.kafka; importhttps://www.cnblogs.com/ylty/p/13673357.html


引入依賴

compile 'org.springframework.kafka:spring-kafka'

第一套kafka配置

package myapp.kafka;import lombok.Data; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.common.serialization.StringSerializer; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.Primary; import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; import org.springframework.kafka.config.KafkaListenerContainerFactory; import org.springframework.kafka.core.*; import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;import java.util.HashMap; import java.util.Map;/*** 默認(rèn)的kafka配置** @author zhengqian*/ @Slf4j @Configuration @Data public class K1KafkaConfiguration {@Value("${app-name.kafka.k1.consumer.bootstrap-servers}")private String consumerBootstrapServers;@Value("${app-name.kafka.k1.consumer.group-id}")private String groupId;@Value("${app-name.kafka.k1.consumer.auto-offset-reset}")private String autoOffsetReset;@Value("${app-name.kafka.k1.consumer.enable-auto-commit}")private Boolean enableAutoCommit;@Value("${app-name.kafka.k2.producer.bootstrap-servers}")private String producerBootstrapServers;@Bean@PrimaryKafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>> kafkaListenerContainerFactory() {ConcurrentKafkaListenerContainerFactory<Integer, String> factory = new ConcurrentKafkaListenerContainerFactory<>();factory.setConsumerFactory(consumerFactory());factory.setConcurrency(3);factory.getContainerProperties().setPollTimeout(3000);return factory;}@Beanpublic ConsumerFactory<Integer, String> consumerFactory() {return new DefaultKafkaConsumerFactory<>(consumerConfigs());}@Beanpublic Map<String, Object> consumerConfigs() {Map<String, Object> props = new HashMap<>();props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, consumerBootstrapServers);props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, enableAutoCommit);props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);return props;}@Beanpublic Map<String, Object> producerConfigs() {Map<String, Object> props = new HashMap<>();props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, producerBootstrapServers);props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);return props;}@Beanpublic ProducerFactory<String, String> producerFactory() {return new DefaultKafkaProducerFactory<>(producerConfigs());}@Beanpublic KafkaTemplate<String, String> kafkaTemplate() {return new KafkaTemplate<>(producerFactory());} }

第二套kafka配置

package myapp.kafka;import lombok.Data; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.common.serialization.StringSerializer; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.Primary; import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; import org.springframework.kafka.config.KafkaListenerContainerFactory; import org.springframework.kafka.core.*; import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;import java.util.HashMap; import java.util.Map;/*** 默認(rèn)的kafka配置** @author zhengqian*/ @Slf4j @Configuration @Data public class K2KafkaConfiguration {@Value("${app-name.kafka.k2.consumer.bootstrap-servers}")private String consumerBootstrapServers;@Value("${app-name.kafka.k2.consumer.group-id}")private String groupId;@Value("${app-name.kafka.k2.consumer.auto-offset-reset}")private String autoOffsetReset;@Value("${app-name.kafka.k2.consumer.enable-auto-commit}")private Boolean enableAutoCommit;@Value("${app-name.kafka.k2.producer.bootstrap-servers}")private String producerBootstrapServers;@Bean@PrimaryKafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>> kafkaListenerContainerFactoryK2() {ConcurrentKafkaListenerContainerFactory<Integer, String> factory = new ConcurrentKafkaListenerContainerFactory<>();factory.setConsumerFactory(consumerFactoryK2());factory.setConcurrency(3);factory.getContainerProperties().setPollTimeout(3000);return factory;}@Beanpublic ConsumerFactory<Integer, String> consumerFactoryK2() {return new DefaultKafkaConsumerFactory<>(consumerConfigsK2());}@Beanpublic Map<String, Object> consumerConfigsK2() {Map<String, Object> props = new HashMap<>();props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, consumerBootstrapServers);props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, enableAutoCommit);props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);return props;}@Beanpublic Map<String, Object> producerConfigsK2() {Map<String, Object> props = new HashMap<>();props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, producerBootstrapServers);props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);return props;}@Beanpublic ProducerFactory<String, String> producerFactoryK2() {return new DefaultKafkaProducerFactory<>(producerConfigsK2());}@Beanpublic KafkaTemplate<String, String> kafkaTemplateK2() {return new KafkaTemplate<>(producerFactoryK2());} }

配置文件

app-name: kafka:k1:consumer:bootstrap-servers: host1:9092group-id: my-appauto-offset-reset: earliestkey-deserializer: org.apache.kafka.common.serialization.StringDeserializervalue-deserializer: org.apache.kafka.common.serialization.StringDeserializerenable-auto-commit: trueproducer:bootstrap-servers: host1:9092key-serializer: org.apache.kafka.common.serialization.StringSerializervalue-serializer: org.apache.kafka.common.serialization.StringSerializerk2:consumer:bootstrap-servers: host2:9092group-id: my-appauto-offset-reset: earliestkey-deserializer: org.apache.kafka.common.serialization.StringDeserializervalue-deserializer: org.apache.kafka.common.serialization.StringDeserializerenable-auto-commit: trueproducer:bootstrap-servers: host2:9092key-serializer: org.apache.kafka.common.serialization.StringSerializervalue-serializer: org.apache.kafka.common.serialization.StringSerializer

指定消費(fèi)的kafka集群

@KafkaListener(topics = "topic-name", containerFactory = "kafkaListenerContainerFactoryK2")public void onEvent(ConsumerRecord<String, String> record) {// 省略}

指定生產(chǎn)者發(fā)生的kafka集群

public class KafkaTest {@Autowiredprivate KafkaTemplate<String, String> kafkaTemplate;@Testpublic void test() {ListenableFuture<SendResult<String, String>> result = kafkaTemplate.send("topic", "data");try {SendResult<String, String> value = result.get(2, TimeUnit.SECONDS);System.out.println(value.getProducerRecord());System.out.println(value.getRecordMetadata());} catch (Exception e) {e.printStackTrace();}} }

總結(jié)

以上是生活随笔為你收集整理的(转) SpringBoot接入两套kafka集群的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。

如果覺(jué)得生活随笔網(wǎng)站內(nèi)容還不錯(cuò),歡迎將生活随笔推薦給好友。