日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問(wèn) 生活随笔!

生活随笔

當(dāng)前位置: 首頁(yè) > 前端技术 > javascript >内容正文

javascript

Kafka集成Spring-AcknowledgeMessageListener接口实现

發(fā)布時(shí)間:2024/9/21 javascript 26 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Kafka集成Spring-AcknowledgeMessageListener接口实现 小編覺(jué)得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.

2019獨(dú)角獸企業(yè)重金招聘Python工程師標(biāo)準(zhǔn)>>>

前言

因工作需要,需在系統(tǒng)利用Kafka監(jiān)聽(tīng)接口,實(shí)現(xiàn)消息隊(duì)列中,對(duì)消息的消費(fèi),首選Kafka,因?yàn)榭粗衅涑叩耐掏铝俊?/p>

基本概念

  • 1 Producer: 特指消息的生產(chǎn)者
  • 2 Consumer :特指消息的消費(fèi)者
  • 3 Consumer Group :消費(fèi)者組,可以并行消費(fèi)Topic中partition的消息
  • 4 Broker:緩存代理,Kafa 集群中的一臺(tái)或多臺(tái)服務(wù)器統(tǒng)稱(chēng)為 broker。
  • 5 Topic:特指 Kafka 處理的消息源(feeds of messages)的不同分類(lèi)。
  • 6 Partition:Topic 物理上的分組,一個(gè) topic 可以分為多個(gè) partition,每個(gè) partition 是一個(gè)有序的隊(duì)列。partition 中的每條消息都會(huì)被分配一個(gè)有序的 id(offset)
  • 7 Message:消息,是通信的基本單位,每個(gè) producer 可以向一個(gè) topic(主題)發(fā)布一些消息
  • 8 稀疏索引:采用稀疏索引的方式,利用二分查找,定位消息。

集成Spring

  • 添加Maven依賴(lài)?
    由于項(xiàng)目使用Maven進(jìn)行管理,引入Kafka-Spring相關(guān)Jar包,需要添加依賴(lài),此處使用的是Kafka0.10.2
<dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId><version>1.2.2.RELEASE</version> </dependency>
  • 1
  • 2
  • 3
  • 4
  • 5
  • 1 版本兼容性?
    配置完Maven依賴(lài)以后,還需要確認(rèn),因?yàn)镵afka與Spring有依賴(lài)關(guān)系,需要確定Spring的版本是否能和Kafka0.10.2完美兼容,查閱Spring For Apache Kafka 文檔可知:?
      Compatibility

    • Apache Kafka 0.10.2.0
    • Tested with Spring Framework version dependency is 4.3.7 but it is expected that the framework will work with earlier versions of Spring.
    • Annotation-based listeners require Spring Framework 4.1 or higher, however.
    • Minimum Java version: 7.?
      Kafka 0.10.2 需要SpringFrameWork 4.3.7,但后續(xù)會(huì)逐漸兼容SpringFrameWork更早期的版本,實(shí)踐發(fā)現(xiàn),Kafka的生產(chǎn)者里面的api會(huì)受SpringFrameWork版本影響,而消費(fèi)者無(wú)影響,因此,可以保持項(xiàng)目中原有springframework不變。
  • 2 排除重復(fù)包?
    引入Maven依賴(lài)以后,Kafka的maven依賴(lài),自動(dòng)包含了springframework相關(guān)jar包,需要排除。

<dependency><groupId>org.springframework</groupId><artifactId>spring-beans</artifactId><version>4.3.9.RELEASE</version><scope>compile</scope></dependency><dependency><groupId>org.springframework</groupId><artifactId>spring-context</artifactId><version>4.3.9.RELEASE</version><scope>compile</scope></dependency><dependency><groupId>org.springframework</groupId><artifactId>spring-core</artifactId><version>4.3.9.RELEASE</version><scope>compile</scope></dependency><dependency><groupId>org.springframework</groupId><artifactId>spring-oxm</artifactId><version>4.3.9.RELEASE</version><scope>compile</scope><optional>true</optional></dependency>
  • 3 接口區(qū)別?
    Kafka消費(fèi)者,實(shí)現(xiàn)有兩種方式:client客戶端和listener監(jiān)聽(tīng)接口,這里因業(yè)務(wù)需要,采用監(jiān)聽(tīng)接口的方式實(shí)現(xiàn),Spring提供了四種接口,如下所示:
public interface MessageListener<K, V> {} 1void onMessage(ConsumerRecord<K, V> data);}public interface AcknowledgingMessageListener<K, V> {} 2void onMessage(ConsumerRecord<K, V> data, Acknowledgment acknowledgment);}public interface BatchMessageListener<K, V> {} 3void onMessage(List<ConsumerRecord<K, V>> data);}public interface BatchAcknowledgingMessageListener<K, V> {} 4void onMessage(List<ConsumerRecord<K, V>> data, Acknowledgment acknowledgment);}

對(duì)應(yīng)的解釋如下?
1、Use this for processing individual ConsumerRecord s received from the kafka consumer poll() operation when using auto-commit, or one of the container-managed commit methods.?
使用MessageListener接口實(shí)現(xiàn)時(shí),當(dāng)消費(fèi)者拉取消息之后,消費(fèi)完成會(huì)自動(dòng)提交offset,即enable.auto.commit為true時(shí),適合使用此接口?
2、Use this for processing individual ConsumerRecord s received from the kafka consumer poll() operation when using one of the manual commit methods.?
使用AcknowledgeMessageListener時(shí),當(dāng)消費(fèi)者消費(fèi)一條消息之后,不會(huì)自動(dòng)提交offset,需要手動(dòng)ack,即enable.auto.commit為false時(shí),適合使用此接口?
3、Use this for processing all ConsumerRecord s received from the kafka consumer poll() operation when using auto-commit, or one of the container-managed commit methods. AckMode.RECORD is not supported when using this interface since the listener is given the complete batch.

4、Use this for processing all ConsumerRecord s received from the kafka consumer poll() operation when using one of the manual commit methods.

BatchMessageListener和BatchAcknowledgingMessageListener接口作用與上述兩個(gè)接口大體類(lèi)似,只是適合批量消費(fèi)消息決定是否自動(dòng)提交offset

由于業(yè)務(wù)較重,且offset自動(dòng)提交時(shí),出現(xiàn)消費(fèi)異常或者消費(fèi)失敗的情況,消費(fèi)者容易丟失消息,所以需要采用手動(dòng)提交offset的方式,因此,這里實(shí)現(xiàn)了AcknowledgeMessageListener接口。

Spring配置文件

配置思路:?
1、確定需要定義的beans:

  • 1 consumerProperties 消費(fèi)者的基本屬性,包括指定bootstrap.servers,group.id等
  • 2 consumerFactory :消費(fèi)者工廠,配置完consumerProperties 后,需要將consumerProperties 作為參數(shù),配置進(jìn)consumerFactory中
  • 3 containProperties: 消費(fèi)者容器屬性對(duì)象的bean,這個(gè)bean會(huì)指定后續(xù)自定義的監(jiān)聽(tīng)接口bean及ackMode(手動(dòng)提交時(shí),采取什么提交方式)
  • 4 messageListenerContainer:消費(fèi)者容器,啟動(dòng)監(jiān)聽(tīng)接口的bean,需要將先前定義的consumerFactory 、containProperties配置進(jìn)這個(gè)bean,并定義其init-method = doStart,在啟動(dòng)spring時(shí),便會(huì)自動(dòng)啟動(dòng)監(jiān)聽(tīng)接口,同時(shí),此bean指定了topic
  • 5 kafkaMessageListener:監(jiān)聽(tīng)接口,這個(gè)接口由自己定義,需要將其配置進(jìn)containProperties中,?
    具體完整消費(fèi)者的配置文件如下所示:
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd"><!--1、consumer屬性配置,hashMap--><bean id="consumerProperties" class="java.util.HashMap"><constructor-arg><map><entry key="bootstrap.servers" value="${kafka.bootstrap.servers}"/><entry key="group.id" value="${kafka.group.id}"/><entry key="enable.auto.commit" value="false"/><entry key="session.timeout.ms" value="15000"/><!--<entry key="auto.offset.reset" value="earliest"/>--><entry key="key.deserializer" value="org.apache.kafka.common.serialization.StringDeserializer"/><entry key="value.deserializer" value="org.apache.kafka.common.serialization.StringDeserializer"/><entry key="value.deserializer.encoding" value="UTF8"/><entry key="value.deserializer.encoding" value="UTF8"/></map></constructor-arg></bean><!--2、Kafka消費(fèi)者工廠,DefaultKafkaConsumerFactory--><bean id="consumerFactory" class="org.springframework.kafka.core.DefaultKafkaConsumerFactory"><constructor-arg><ref bean="consumerProperties"/></constructor-arg></bean><!--3、監(jiān)聽(tīng)接口,AcknowledgingMessageListener--><bean id="kafkaMessageListener" class="com.lianjia.bigdata.dataarch.auth.kafka.KafkaMessageListener"><property name="threadPool" ref="kafkaWorkerThreadPool"/></bean><bean id="kafkaWorkerThreadPool" class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor"><property name="corePoolSize" value="20"/><property name="maxPoolSize" value="200"/><property name="queueCapacity" value="500"/><property name="keepAliveSeconds" value="1800"/><property name="rejectedExecutionHandler"><bean class="java.util.concurrent.ThreadPoolExecutor$CallerRunsPolicy"/></property></bean><!--4、Kafka消費(fèi)者容器,屬性配置--><bean id="containProperties" class="org.springframework.kafka.listener.config.ContainerProperties"><constructor-arg value="${kafka.topic}"/><property name="ackMode" value="MANUAL_IMMEDIATE"/><property name="messageListener" ref="kafkaMessageListener"/></bean><!--5、Kafka消費(fèi)者容器--><bean id="messageListenerContainer" class="org.springframework.kafka.listener.KafkaMessageListenerContainer" init-method="doStart" ><constructor-arg ref="consumerFactory"/><constructor-arg ref="containProperties"/></bean> </bean>

示例代碼

寫(xiě)了個(gè)簡(jiǎn)單的測(cè)試用例?
生產(chǎn)者:?
實(shí)現(xiàn)每秒定時(shí)向brokers發(fā)送一條消息

import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.serialization.IntegerSerializer; import org.apache.kafka.common.serialization.StringSerializer; import org.slf4j.Logger; import org.slf4j.LoggerFactory;import java.util.HashMap; import java.util.Map;public class SimpleKafkaProducer implements Runnable {protected static final Logger LOGGER = LoggerFactory.getLogger(SimpleKafkaProducer.class);@Overridepublic void run() {Map<String, Object> sendProps = senderProps();Producer producer = new KafkaProducer(sendProps);Integer currentNum = 0;try {LOGGER.info("start produce message");while (true){ProducerRecord<Integer, String> producerRecord = new ProducerRecord<>("testTopic",currentNum, currentNum);producer.send(producerRecord);LOGGER.info("send message:" + currentNum + " And value is " + producerRecord.value());currentNum++;Thread.sleep(1000);}}catch (Exception e){LOGGER.error("send message fail", e);}finally {producer.close();}}public static void main(String[] args) {SimpleKafkaProducer simpleKafkaProducer = new SimpleKafkaProducer();new Thread(simpleKafkaProducer).start();}private Map<String, Object> senderProps() {Map<String, Object> props = new HashMap<>();props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "10.33.106.94:9092");props.put(ProducerConfig.ACKS_CONFIG, "all");props.put(ProducerConfig.RETRIES_CONFIG, 0);props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);props.put(ProducerConfig.LINGER_MS_CONFIG, 1);props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);return props;} }

消費(fèi)者

public class KafkaMessageListener implements AcknowledgingMessageListener<Integer, String> {private static final Logger LOGGER = LoggerFactory.getLogger(KafkaMessageListener.class);@Overridepublic void onMessage(final ConsumerRecord<Integer, String> message, final Acknowledgment acknowledgment) {//TODO 這里具體實(shí)現(xiàn)個(gè)人業(yè)務(wù)邏輯// 最后 調(diào)用acknowledgment的ack方法,提交offsetacknowledgment.acknowledge();} }

消費(fèi)者使用示例:這里參考spring官方文檔,簡(jiǎn)單實(shí)現(xiàn)了一個(gè)消費(fèi)者監(jiān)聽(tīng)接口示例

import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.common.serialization.IntegerDeserializer; import org.apache.kafka.common.serialization.StringDeserializer; import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.kafka.core.DefaultKafkaConsumerFactory; import org.springframework.kafka.listener.AbstractMessageListenerContainer; import org.springframework.kafka.listener.AcknowledgingMessageListener; import org.springframework.kafka.listener.KafkaMessageListenerContainer; import org.springframework.kafka.listener.config.ContainerProperties; import org.springframework.kafka.support.Acknowledgment;import javax.annotation.Resource; import java.util.HashMap; import java.util.Map; import java.util.concurrent.CountDownLatch;public class SimpleKafkaConsumer extends SpringUnitTest {protected static final Logger LOGGER = LoggerFactory.getLogger(SimpleKafkaConsumer.class);@Resource(name = "kafkaMessageListener")private KafkaMessageListener kafkaMessageListener;@Testpublic void TestLinstener(){ContainerProperties containerProps = new ContainerProperties("testTopic");containerProps.setMessageListener(kafkaMessageListener);KafkaMessageListenerContainer<Integer, String> container = createContainer(containerProps);container.setBeanName("messageListenerContainer");container.start();}private static KafkaMessageListenerContainer<Integer, String> createContainer(ContainerProperties containerProps) {Map<String, Object> props = consumerProps();DefaultKafkaConsumerFactory<Integer, String> cf =new DefaultKafkaConsumerFactory<>(props);KafkaMessageListenerContainer<Integer, String> container =new KafkaMessageListenerContainer<>(cf, containerProps);return container;}private static Map<String, Object> consumerProps() {Map<String, Object> props = new HashMap<>();props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "10.33.106.94:9092");props.put(ConsumerConfig.GROUP_ID_CONFIG, "test3");props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); // props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"earliest");props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "15000");props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class);props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);return props;}}

實(shí)現(xiàn)acknowledgeMessageListener接口之前,查閱了網(wǎng)上現(xiàn)有的文檔,結(jié)果不盡如人意,只能試著自己去參考spring官方文檔,慢慢摸索,最終實(shí)現(xiàn)手動(dòng)提交offset的監(jiān)聽(tīng)接口,當(dāng)然,Kafka的知識(shí)點(diǎn),遠(yuǎn)不止這些,后續(xù)還將繼續(xù)學(xué)習(xí)。

轉(zhuǎn)載于:https://my.oschina.net/xiaominmin/blog/1810338

總結(jié)

以上是生活随笔為你收集整理的Kafka集成Spring-AcknowledgeMessageListener接口实现的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。

如果覺(jué)得生活随笔網(wǎng)站內(nèi)容還不錯(cuò),歡迎將生活随笔推薦給好友。