javascript
Kafka集成Spring-AcknowledgeMessageListener接口实现
2019獨(dú)角獸企業(yè)重金招聘Python工程師標(biāo)準(zhǔn)>>>
前言
因工作需要,需在系統(tǒng)利用Kafka監(jiān)聽(tīng)接口,實(shí)現(xiàn)消息隊(duì)列中,對(duì)消息的消費(fèi),首選Kafka,因?yàn)榭粗衅涑叩耐掏铝俊?/p>
基本概念
- 1 Producer: 特指消息的生產(chǎn)者
- 2 Consumer :特指消息的消費(fèi)者
- 3 Consumer Group :消費(fèi)者組,可以并行消費(fèi)Topic中partition的消息
- 4 Broker:緩存代理,Kafa 集群中的一臺(tái)或多臺(tái)服務(wù)器統(tǒng)稱(chēng)為 broker。
- 5 Topic:特指 Kafka 處理的消息源(feeds of messages)的不同分類(lèi)。
- 6 Partition:Topic 物理上的分組,一個(gè) topic 可以分為多個(gè) partition,每個(gè) partition 是一個(gè)有序的隊(duì)列。partition 中的每條消息都會(huì)被分配一個(gè)有序的 id(offset)
- 7 Message:消息,是通信的基本單位,每個(gè) producer 可以向一個(gè) topic(主題)發(fā)布一些消息
- 8 稀疏索引:采用稀疏索引的方式,利用二分查找,定位消息。
集成Spring
- 添加Maven依賴(lài)?
由于項(xiàng)目使用Maven進(jìn)行管理,引入Kafka-Spring相關(guān)Jar包,需要添加依賴(lài),此處使用的是Kafka0.10.2
- 1
- 2
- 3
- 4
- 5
-
1 版本兼容性?
配置完Maven依賴(lài)以后,還需要確認(rèn),因?yàn)镵afka與Spring有依賴(lài)關(guān)系,需要確定Spring的版本是否能和Kafka0.10.2完美兼容,查閱Spring For Apache Kafka 文檔可知:?
Compatibility- Apache Kafka 0.10.2.0
- Tested with Spring Framework version dependency is 4.3.7 but it is expected that the framework will work with earlier versions of Spring.
- Annotation-based listeners require Spring Framework 4.1 or higher, however.
- Minimum Java version: 7.?
Kafka 0.10.2 需要SpringFrameWork 4.3.7,但后續(xù)會(huì)逐漸兼容SpringFrameWork更早期的版本,實(shí)踐發(fā)現(xiàn),Kafka的生產(chǎn)者里面的api會(huì)受SpringFrameWork版本影響,而消費(fèi)者無(wú)影響,因此,可以保持項(xiàng)目中原有springframework不變。
-
2 排除重復(fù)包?
引入Maven依賴(lài)以后,Kafka的maven依賴(lài),自動(dòng)包含了springframework相關(guān)jar包,需要排除。
- 3 接口區(qū)別?
Kafka消費(fèi)者,實(shí)現(xiàn)有兩種方式:client客戶端和listener監(jiān)聽(tīng)接口,這里因業(yè)務(wù)需要,采用監(jiān)聽(tīng)接口的方式實(shí)現(xiàn),Spring提供了四種接口,如下所示:
對(duì)應(yīng)的解釋如下?
1、Use this for processing individual ConsumerRecord s received from the kafka consumer poll() operation when using auto-commit, or one of the container-managed commit methods.?
使用MessageListener接口實(shí)現(xiàn)時(shí),當(dāng)消費(fèi)者拉取消息之后,消費(fèi)完成會(huì)自動(dòng)提交offset,即enable.auto.commit為true時(shí),適合使用此接口?
2、Use this for processing individual ConsumerRecord s received from the kafka consumer poll() operation when using one of the manual commit methods.?
使用AcknowledgeMessageListener時(shí),當(dāng)消費(fèi)者消費(fèi)一條消息之后,不會(huì)自動(dòng)提交offset,需要手動(dòng)ack,即enable.auto.commit為false時(shí),適合使用此接口?
3、Use this for processing all ConsumerRecord s received from the kafka consumer poll() operation when using auto-commit, or one of the container-managed commit methods. AckMode.RECORD is not supported when using this interface since the listener is given the complete batch.
4、Use this for processing all ConsumerRecord s received from the kafka consumer poll() operation when using one of the manual commit methods.
BatchMessageListener和BatchAcknowledgingMessageListener接口作用與上述兩個(gè)接口大體類(lèi)似,只是適合批量消費(fèi)消息決定是否自動(dòng)提交offset
由于業(yè)務(wù)較重,且offset自動(dòng)提交時(shí),出現(xiàn)消費(fèi)異常或者消費(fèi)失敗的情況,消費(fèi)者容易丟失消息,所以需要采用手動(dòng)提交offset的方式,因此,這里實(shí)現(xiàn)了AcknowledgeMessageListener接口。
Spring配置文件
配置思路:?
1、確定需要定義的beans:
- 1 consumerProperties 消費(fèi)者的基本屬性,包括指定bootstrap.servers,group.id等
- 2 consumerFactory :消費(fèi)者工廠,配置完consumerProperties 后,需要將consumerProperties 作為參數(shù),配置進(jìn)consumerFactory中
- 3 containProperties: 消費(fèi)者容器屬性對(duì)象的bean,這個(gè)bean會(huì)指定后續(xù)自定義的監(jiān)聽(tīng)接口bean及ackMode(手動(dòng)提交時(shí),采取什么提交方式)
- 4 messageListenerContainer:消費(fèi)者容器,啟動(dòng)監(jiān)聽(tīng)接口的bean,需要將先前定義的consumerFactory 、containProperties配置進(jìn)這個(gè)bean,并定義其init-method = doStart,在啟動(dòng)spring時(shí),便會(huì)自動(dòng)啟動(dòng)監(jiān)聽(tīng)接口,同時(shí),此bean指定了topic
- 5 kafkaMessageListener:監(jiān)聽(tīng)接口,這個(gè)接口由自己定義,需要將其配置進(jìn)containProperties中,?
具體完整消費(fèi)者的配置文件如下所示:
示例代碼
寫(xiě)了個(gè)簡(jiǎn)單的測(cè)試用例?
生產(chǎn)者:?
實(shí)現(xiàn)每秒定時(shí)向brokers發(fā)送一條消息
消費(fèi)者
public class KafkaMessageListener implements AcknowledgingMessageListener<Integer, String> {private static final Logger LOGGER = LoggerFactory.getLogger(KafkaMessageListener.class);@Overridepublic void onMessage(final ConsumerRecord<Integer, String> message, final Acknowledgment acknowledgment) {//TODO 這里具體實(shí)現(xiàn)個(gè)人業(yè)務(wù)邏輯// 最后 調(diào)用acknowledgment的ack方法,提交offsetacknowledgment.acknowledge();} }消費(fèi)者使用示例:這里參考spring官方文檔,簡(jiǎn)單實(shí)現(xiàn)了一個(gè)消費(fèi)者監(jiān)聽(tīng)接口示例
import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.common.serialization.IntegerDeserializer; import org.apache.kafka.common.serialization.StringDeserializer; import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.kafka.core.DefaultKafkaConsumerFactory; import org.springframework.kafka.listener.AbstractMessageListenerContainer; import org.springframework.kafka.listener.AcknowledgingMessageListener; import org.springframework.kafka.listener.KafkaMessageListenerContainer; import org.springframework.kafka.listener.config.ContainerProperties; import org.springframework.kafka.support.Acknowledgment;import javax.annotation.Resource; import java.util.HashMap; import java.util.Map; import java.util.concurrent.CountDownLatch;public class SimpleKafkaConsumer extends SpringUnitTest {protected static final Logger LOGGER = LoggerFactory.getLogger(SimpleKafkaConsumer.class);@Resource(name = "kafkaMessageListener")private KafkaMessageListener kafkaMessageListener;@Testpublic void TestLinstener(){ContainerProperties containerProps = new ContainerProperties("testTopic");containerProps.setMessageListener(kafkaMessageListener);KafkaMessageListenerContainer<Integer, String> container = createContainer(containerProps);container.setBeanName("messageListenerContainer");container.start();}private static KafkaMessageListenerContainer<Integer, String> createContainer(ContainerProperties containerProps) {Map<String, Object> props = consumerProps();DefaultKafkaConsumerFactory<Integer, String> cf =new DefaultKafkaConsumerFactory<>(props);KafkaMessageListenerContainer<Integer, String> container =new KafkaMessageListenerContainer<>(cf, containerProps);return container;}private static Map<String, Object> consumerProps() {Map<String, Object> props = new HashMap<>();props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "10.33.106.94:9092");props.put(ConsumerConfig.GROUP_ID_CONFIG, "test3");props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); // props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"earliest");props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "15000");props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class);props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);return props;}}實(shí)現(xiàn)acknowledgeMessageListener接口之前,查閱了網(wǎng)上現(xiàn)有的文檔,結(jié)果不盡如人意,只能試著自己去參考spring官方文檔,慢慢摸索,最終實(shí)現(xiàn)手動(dòng)提交offset的監(jiān)聽(tīng)接口,當(dāng)然,Kafka的知識(shí)點(diǎn),遠(yuǎn)不止這些,后續(xù)還將繼續(xù)學(xué)習(xí)。
轉(zhuǎn)載于:https://my.oschina.net/xiaominmin/blog/1810338
總結(jié)
以上是生活随笔為你收集整理的Kafka集成Spring-AcknowledgeMessageListener接口实现的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。
- 上一篇: 基于ffmpeg和libvlc的视频剪辑
- 下一篇: SpringCloud实战4-Hystr