javascript
Kafka集成Spring-AcknowledgeMessageListener接口实现
2019獨角獸企業重金招聘Python工程師標準>>>
前言
因工作需要,需在系統利用Kafka監聽接口,實現消息隊列中,對消息的消費,首選Kafka,因為看中其超高的吞吐量。
基本概念
- 1 Producer: 特指消息的生產者
- 2 Consumer :特指消息的消費者
- 3 Consumer Group :消費者組,可以并行消費Topic中partition的消息
- 4 Broker:緩存代理,Kafa 集群中的一臺或多臺服務器統稱為 broker。
- 5 Topic:特指 Kafka 處理的消息源(feeds of messages)的不同分類。
- 6 Partition:Topic 物理上的分組,一個 topic 可以分為多個 partition,每個 partition 是一個有序的隊列。partition 中的每條消息都會被分配一個有序的 id(offset)
- 7 Message:消息,是通信的基本單位,每個 producer 可以向一個 topic(主題)發布一些消息
- 8 稀疏索引:采用稀疏索引的方式,利用二分查找,定位消息。
集成Spring
- 添加Maven依賴?
由于項目使用Maven進行管理,引入Kafka-Spring相關Jar包,需要添加依賴,此處使用的是Kafka0.10.2
- 1
- 2
- 3
- 4
- 5
-
1 版本兼容性?
配置完Maven依賴以后,還需要確認,因為Kafka與Spring有依賴關系,需要確定Spring的版本是否能和Kafka0.10.2完美兼容,查閱Spring For Apache Kafka 文檔可知:?
Compatibility- Apache Kafka 0.10.2.0
- Tested with Spring Framework version dependency is 4.3.7 but it is expected that the framework will work with earlier versions of Spring.
- Annotation-based listeners require Spring Framework 4.1 or higher, however.
- Minimum Java version: 7.?
Kafka 0.10.2 需要SpringFrameWork 4.3.7,但后續會逐漸兼容SpringFrameWork更早期的版本,實踐發現,Kafka的生產者里面的api會受SpringFrameWork版本影響,而消費者無影響,因此,可以保持項目中原有springframework不變。
-
2 排除重復包?
引入Maven依賴以后,Kafka的maven依賴,自動包含了springframework相關jar包,需要排除。
- 3 接口區別?
Kafka消費者,實現有兩種方式:client客戶端和listener監聽接口,這里因業務需要,采用監聽接口的方式實現,Spring提供了四種接口,如下所示:
對應的解釋如下?
1、Use this for processing individual ConsumerRecord s received from the kafka consumer poll() operation when using auto-commit, or one of the container-managed commit methods.?
使用MessageListener接口實現時,當消費者拉取消息之后,消費完成會自動提交offset,即enable.auto.commit為true時,適合使用此接口?
2、Use this for processing individual ConsumerRecord s received from the kafka consumer poll() operation when using one of the manual commit methods.?
使用AcknowledgeMessageListener時,當消費者消費一條消息之后,不會自動提交offset,需要手動ack,即enable.auto.commit為false時,適合使用此接口?
3、Use this for processing all ConsumerRecord s received from the kafka consumer poll() operation when using auto-commit, or one of the container-managed commit methods. AckMode.RECORD is not supported when using this interface since the listener is given the complete batch.
4、Use this for processing all ConsumerRecord s received from the kafka consumer poll() operation when using one of the manual commit methods.
BatchMessageListener和BatchAcknowledgingMessageListener接口作用與上述兩個接口大體類似,只是適合批量消費消息決定是否自動提交offset
由于業務較重,且offset自動提交時,出現消費異常或者消費失敗的情況,消費者容易丟失消息,所以需要采用手動提交offset的方式,因此,這里實現了AcknowledgeMessageListener接口。
Spring配置文件
配置思路:?
1、確定需要定義的beans:
- 1 consumerProperties 消費者的基本屬性,包括指定bootstrap.servers,group.id等
- 2 consumerFactory :消費者工廠,配置完consumerProperties 后,需要將consumerProperties 作為參數,配置進consumerFactory中
- 3 containProperties: 消費者容器屬性對象的bean,這個bean會指定后續自定義的監聽接口bean及ackMode(手動提交時,采取什么提交方式)
- 4 messageListenerContainer:消費者容器,啟動監聽接口的bean,需要將先前定義的consumerFactory 、containProperties配置進這個bean,并定義其init-method = doStart,在啟動spring時,便會自動啟動監聽接口,同時,此bean指定了topic
- 5 kafkaMessageListener:監聽接口,這個接口由自己定義,需要將其配置進containProperties中,?
具體完整消費者的配置文件如下所示:
示例代碼
寫了個簡單的測試用例?
生產者:?
實現每秒定時向brokers發送一條消息
消費者
public class KafkaMessageListener implements AcknowledgingMessageListener<Integer, String> {private static final Logger LOGGER = LoggerFactory.getLogger(KafkaMessageListener.class);@Overridepublic void onMessage(final ConsumerRecord<Integer, String> message, final Acknowledgment acknowledgment) {//TODO 這里具體實現個人業務邏輯// 最后 調用acknowledgment的ack方法,提交offsetacknowledgment.acknowledge();} }消費者使用示例:這里參考spring官方文檔,簡單實現了一個消費者監聽接口示例
import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.common.serialization.IntegerDeserializer; import org.apache.kafka.common.serialization.StringDeserializer; import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.kafka.core.DefaultKafkaConsumerFactory; import org.springframework.kafka.listener.AbstractMessageListenerContainer; import org.springframework.kafka.listener.AcknowledgingMessageListener; import org.springframework.kafka.listener.KafkaMessageListenerContainer; import org.springframework.kafka.listener.config.ContainerProperties; import org.springframework.kafka.support.Acknowledgment;import javax.annotation.Resource; import java.util.HashMap; import java.util.Map; import java.util.concurrent.CountDownLatch;public class SimpleKafkaConsumer extends SpringUnitTest {protected static final Logger LOGGER = LoggerFactory.getLogger(SimpleKafkaConsumer.class);@Resource(name = "kafkaMessageListener")private KafkaMessageListener kafkaMessageListener;@Testpublic void TestLinstener(){ContainerProperties containerProps = new ContainerProperties("testTopic");containerProps.setMessageListener(kafkaMessageListener);KafkaMessageListenerContainer<Integer, String> container = createContainer(containerProps);container.setBeanName("messageListenerContainer");container.start();}private static KafkaMessageListenerContainer<Integer, String> createContainer(ContainerProperties containerProps) {Map<String, Object> props = consumerProps();DefaultKafkaConsumerFactory<Integer, String> cf =new DefaultKafkaConsumerFactory<>(props);KafkaMessageListenerContainer<Integer, String> container =new KafkaMessageListenerContainer<>(cf, containerProps);return container;}private static Map<String, Object> consumerProps() {Map<String, Object> props = new HashMap<>();props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "10.33.106.94:9092");props.put(ConsumerConfig.GROUP_ID_CONFIG, "test3");props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); // props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"earliest");props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "15000");props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class);props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);return props;}}實現acknowledgeMessageListener接口之前,查閱了網上現有的文檔,結果不盡如人意,只能試著自己去參考spring官方文檔,慢慢摸索,最終實現手動提交offset的監聽接口,當然,Kafka的知識點,遠不止這些,后續還將繼續學習。
轉載于:https://my.oschina.net/xiaominmin/blog/1810338
總結
以上是生活随笔為你收集整理的Kafka集成Spring-AcknowledgeMessageListener接口实现的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 基于ffmpeg和libvlc的视频剪辑
- 下一篇: SpringCloud实战4-Hystr