正确处理kafka多线程消费的姿势
最近項(xiàng)目開(kāi)發(fā)過(guò)程使用kafka作為項(xiàng)目模塊間負(fù)載轉(zhuǎn)發(fā)器,實(shí)現(xiàn)實(shí)時(shí)接收不同產(chǎn)品線消息,分布式準(zhǔn)實(shí)時(shí)消費(fèi)產(chǎn)品線消息。通過(guò)kafka作為模塊間的轉(zhuǎn)換器,不僅有MQ的幾大好處:異步、?解耦、?削峰等幾大好處,而且開(kāi)始考慮最大的好處,可以實(shí)現(xiàn)架構(gòu)的水平擴(kuò)展,下游系統(tǒng)出現(xiàn)性能瓶頸,容器平臺(tái)伸縮增加一些實(shí)例消費(fèi)能力很快就提上來(lái)了,整體系統(tǒng)架構(gòu)上不用任何變動(dòng)。理論上,我們項(xiàng)目數(shù)據(jù)量再大整體架構(gòu)上高可用都沒(méi)有問(wèn)題。在使用kafka過(guò)程中也遇到一些問(wèn)題:
1. 消息逐漸積壓,消費(fèi)能力跟不上;
2.某個(gè)消費(fèi)者實(shí)例因?yàn)槟承┊惓T驋斓?#xff0c;造成少量數(shù)據(jù)丟失的問(wèn)題。
針對(duì)消費(fèi)積壓的問(wèn)題,通過(guò)研究kafka多線程消費(fèi)的原理,解決了消費(fèi)積壓的問(wèn)題。所以,理解多線程的Consumer模型是非常有必要,對(duì)于我們正確處理kafka多線程消費(fèi)很重要。
kafka多線程消費(fèi)模式
說(shuō)kafka多線程消費(fèi)模式前,我們先來(lái)說(shuō)下kafka本身設(shè)計(jì)的線程模型和ConcurrentmodificationException異常的原因。見(jiàn)官方文檔:
The Kafka consumer is NOT thread-safe. All network I/O happens in the thread of the application making the call. It is the responsibility of the user to ensure that multi-threaded access is properly synchronized. Un-synchronized access will result in?ConcurrentModificationException.
ConcurrentmodificationException異常的出處見(jiàn)以下代碼:
? /**
? ? ?* Acquire the light lock protecting this consumer from multi-threaded access. Instead of blocking
? ? ?* when the lock is not available, however, we just throw an exception (since multi-threaded usage is not
? ? ?* supported).
? ? ?* @throws IllegalStateException if the consumer has been closed
? ? ?* @throws ConcurrentModificationException if another thread already has the lock
? ? ?*/
? ? private void acquire() {
? ? ? ? ensureNotClosed();
? ? ? ? long threadId = Thread.currentThread().getId();
? ? ? ? if (threadId != currentThread.get() && !currentThread.compareAndSet(NO_CURRENT_THREAD, threadId))
? ? ? ? ? ? throw new ConcurrentModificationException("KafkaConsumer is not safe for multi-threaded access");
? ? ? ? refcount.incrementAndGet();
? ? }
該方法acquire 會(huì)在KafkaConsumer的大部分公有方法調(diào)用第一句就判斷是否正在同一個(gè)KafkaConsumer被多個(gè)線程調(diào)用。
"正在"怎么理解呢?我們順便看下KafkaConsumer的commitAsync 這個(gè)方法就知道了。
?@Override
? ? public void commitAsync(OffsetCommitCallback callback) {
? ? ? ? acquire(); // 引用開(kāi)始
? ? ? ? try {
? ? ? ? ? ? commitAsync(subscriptions.allConsumed(), callback);
? ? ? ? } finally {
? ? ? ? ? ? release(); //引用釋放
? ? ? ? }
? ? }
我們看KafkaConsumer的release方法就是釋放正在操作KafkaConsumer實(shí)例的引用。
?/**
? ? ?* Release the light lock protecting the consumer from multi-threaded access.
? ? ?*/
? ? private void release() {
? ? ? ? if (refcount.decrementAndGet() == 0)
? ? ? ? ? ? currentThread.set(NO_CURRENT_THREAD);
? ? }
通過(guò)以上的代碼理解,我們可以總結(jié)出來(lái)kafka多線程的要點(diǎn): kafka的KafkaConsumer必須保證只能被一個(gè)線程操作。
下面就來(lái)說(shuō)說(shuō),我理解的Kafka能支持的兩種多線程模型,首先,我們必須保證操作KafkaConsumer實(shí)例的只能是一個(gè)線程,那我們要想多線程只能用在消費(fèi)ConsumerRecord List上動(dòng)心思了。下面列舉我理解的kafka多線程消費(fèi)模式。
模式一??1個(gè)Consumer模型對(duì)應(yīng)一個(gè)線程消費(fèi),最多可以有topic對(duì)應(yīng)的partition個(gè)線程同時(shí)消費(fèi)Topic。
????????????
?
模式二 1個(gè)Consumer和多個(gè)線程消費(fèi)模型,保證只有一個(gè)線程操作KafkaConsumer,其它線程消費(fèi)ConsumerRecord列表。
注意 第二種模式其實(shí)也可以支持多個(gè)Consumer,用戶最多可以啟用partition總數(shù)個(gè)Consumer實(shí)例,然后,模式二跟模式一唯一的差別就是模式二在單個(gè)Consuemr里面是多線程消費(fèi),而模式一單個(gè)Consumer里面是單線程消費(fèi)。
以上兩種kafka多線程消費(fèi)模式優(yōu)缺點(diǎn)對(duì)比:
kafka多線程消費(fèi)模式實(shí)現(xiàn)????
關(guān)于多線程消費(fèi)模式具體實(shí)現(xiàn)都是選擇基于spring-kafka實(shí)現(xiàn),畢竟站在巨人肩膀上,站的高望的遠(yuǎn)少加班???,以下就是模式二的具體實(shí)現(xiàn),模式一的話就是對(duì)模式二的簡(jiǎn)化,具體實(shí)現(xiàn)如下。
@Configuration
@EnableKafka
public class KafkaConfig {
?
? ? @Value("${kafka.bootstrap-servers}")
? ? private String servers;
?
? ? @Value("${kafka.producer.retries}")
? ? private int retries;
? ? @Value("${kafka.producer.batch-size}")
? ? private int batchSize;
? ? @Value("${kafka.producer.linger}")
? ? private int linger;
?
? ? @Value("${kafka.consumer.enable.auto.commit}")
? ? private boolean enableAutoCommit;
? ? @Value("${kafka.consumer.session.timeout}")
? ? private String sessionTimeout;
? ? @Value("${kafka.consumer.group.id}")
? ? private String groupId;
? ? @Value("${kafka.consumer.auto.offset.reset}")
? ? private String autoOffsetReset;
?
? ? @Value("${msg.consumer.max.poll.records}")
? ? private int maxPollRecords;
?
? ? public Map<String, Object> producerConfigs() {
? ? ? ? Map<String, Object> props = new HashMap<>();
? ? ? ? props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);
? ? ? ? props.put(ProducerConfig.RETRIES_CONFIG, retries);
? ? ? ? props.put(ProducerConfig.BATCH_SIZE_CONFIG, batchSize);
? ? ? ? props.put(ProducerConfig.LINGER_MS_CONFIG, linger);
? ? ? ? props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
? ? ? ? props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
? ? ? ? return props;
? ? }
?
? ? public ProducerFactory producerFactory() {
? ? ? ? return new DefaultKafkaProducerFactory(producerConfigs());
? ? }
?
? ? @Bean
? ? public KafkaTemplate kafkaTemplate() {
? ? ? ? return new KafkaTemplate(producerFactory());
? ? }
?
? ? @Bean
? ? public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, Object>>
? ? kafkaListenerContainerFactory() {
? ? ? ? ConcurrentKafkaListenerContainerFactory<String, Object> factory =
? ? ? ? ? ? ? ? new ConcurrentKafkaListenerContainerFactory<>();
? ? ? ? factory.setConsumerFactory(consumerFactory());
? ? ? ? factory.setBatchListener(true);
? ? ? ? // 此處并發(fā)度設(shè)置的都是Consumer個(gè)數(shù),可以設(shè)置1到partition總數(shù),
? ? ? ? // 但是,所有機(jī)器實(shí)例上總的并發(fā)度之和必須小于等于partition總數(shù)
? ? ? ? // 如果,總的并發(fā)度小于partition總數(shù),有一個(gè)Consumer實(shí)例會(huì)消費(fèi)超過(guò)一個(gè)以上partition
? ? ? ? factory.setConcurrency(2);
? ? ? ? factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
? ? ? ? return factory;
? ? }
?
? ? public ConsumerFactory<String, Object> consumerFactory() {
? ? ? ? return new DefaultKafkaConsumerFactory<>(consumerConfigs());
? ? }
?
? ? public Map<String, Object> consumerConfigs() {
? ? ? ? Map<String, Object> propsMap = new HashMap<>();
? ? ? ? propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);
? ? ? ? propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, enableAutoCommit);
? ? ? ? propsMap.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, sessionTimeout);
? ? ? ? propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
? ? ? ? propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
? ? ? ? propsMap.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
? ? ? ? propsMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);
? ? ? ? propsMap.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollRecords);
? ? ? ? return propsMap;
? ? }
?
}
具體業(yè)務(wù)代碼在BaseConsumer:
public abstract class BaseConsumer implements ApplicationListener<ConsumerStoppedEvent> {
?
? ? private static final Logger LOG = LoggerFactory.getLogger(BaseConsumer.class);
?
? ? @Value("${kafka.consumer.thread.min}")
? ? private int consumerThreadMin;
?
? ? @Value("${kafka.consumer.thread.max}")
? ? private int consumerThreadMax;
?
? ? private ThreadPoolExecutor consumeExecutor;
?
? ? private volatile boolean isClosePoolExecutor = false;
?
? ? @PostConstruct
? ? public void init() {
?
? ? ? ? this.consumeExecutor = new ThreadPoolExecutor(
? ? ? ? ? ? ? ? getConsumeThreadMin(),
? ? ? ? ? ? ? ? getConsumeThreadMax(),
? ? ? ? ? ? ? ? // 此處最大最小不一樣沒(méi)啥大的意義,因?yàn)橄㈥?duì)列需要達(dá)到 Integer.MAX_VALUE 才有點(diǎn)作用,
? ? ? ? ? ? ? ? // 矛盾來(lái)了,我每次批量拉下來(lái)不可能設(shè)置Integer.MAX_VALUE這么多,
? ? ? ? ? ? ? ? // 個(gè)人覺(jué)得每次批量下拉的原則 覺(jué)得消費(fèi)可控就行,
? ? ? ? ? ? ? ? // 不然,如果出現(xiàn)異常情況下,整個(gè)服務(wù)示例突然掛了,拉下來(lái)太多,這些消息會(huì)被重復(fù)消費(fèi)一次。
? ? ? ? ? ? ? ? 1000 * 60,
? ? ? ? ? ? ? ? TimeUnit.MILLISECONDS,
? ? ? ? ? ? ? ? new LinkedBlockingQueue<>());
? ? }
?
? ? /**
? ? ?* 收到spring-kafka 關(guān)閉Consumer的通知
? ? ?* @param event 關(guān)閉Consumer 事件
? ? ?*/
? ? @Override
? ? public void onApplicationEvent(ConsumerStoppedEvent event) {
?
? ? ? ? isClosePoolExecutor = true;
? ? ? ? closeConsumeExecutorService();
?
? ? }
?
? ? private void closeConsumeExecutorService() {
?
? ? ? ? if (!consumeExecutor.isShutdown()) {
?
? ? ? ? ? ? ThreadUtil.shutdownGracefully(consumeExecutor, 120, TimeUnit.SECONDS);
? ? ? ? ? ? LOG.info("consumeExecutor stopped");
?
? ? ? ? }
?
? ? }
?
? ? @PreDestroy
? ? public void doClose() {
? ? ? ? if (!isClosePoolExecutor) {
? ? ? ? ? ? closeConsumeExecutorService();
? ? ? ? }
? ? }
?
? ? @KafkaListener(topics = "${msg.consumer.topic}", containerFactory = "kafkaListenerContainerFactory")
? ? public void onMessage(List<String> msgList, Acknowledgment ack) {
?
? ? ? ? CountDownLatch countDownLatch = new CountDownLatch(msgList.size());
?
? ? ? ? for (String message : msgList) {
? ? ? ? ? ? submitConsumeTask(message, countDownLatch);
? ? ? ? }
?
? ? ? ? try {
? ? ? ? ? ? countDownLatch.await();
? ? ? ? } catch (InterruptedException e) {
? ? ? ? ? ? LOG.error("countDownLatch exception ", e);
? ? ? ? }
?
? ? ? ? // 本次批量消費(fèi)完,手動(dòng)提交
? ? ? ? ack.acknowledge();
? ? ? ? LOG.info("finish commit offset");
?
? ? }
?
? ? private void submitConsumeTask(String message, CountDownLatch countDownLatch) {
? ? ? ? consumeExecutor.submit(() -> {
? ? ? ? ? ? try {
? ? ? ? ? ? ? ? onDealMessage(message);
? ? ? ? ? ? } catch (Exception ex) {
? ? ? ? ? ? ? ? LOG.error("on DealMessage exception:", ex);
? ? ? ? ? ? } finally {
? ? ? ? ? ? ? ? countDownLatch.countDown();
? ? ? ? ? ? }
? ? ? ? });
? ? }
?
? ? /**
? ? ?* 子類(lèi)實(shí)現(xiàn)該抽象方法處理具體消息的業(yè)務(wù)邏輯
? ? ?* @param message kafka的消息
? ? ?*/
? ? protected abstract void onDealMessage(String message);
?
? ? private int getConsumeThreadMax() {
? ? ? ? return consumerThreadMax;
? ? }
?
? ? private int getConsumeThreadMin() {
? ? ? ? return consumerThreadMin;
? ? }
?
? ? public void setConsumerThreadMax(int consumerThreadMax) {
? ? ? ? this.consumerThreadMax = consumerThreadMax;
? ? }
?
? ? public void setConsumerThreadMin(int consumerThreadMin) {
? ? ? ? this.consumerThreadMin = consumerThreadMin;
? ? }
}
其中,closeConsumeExecutorService方法就是為了服務(wù)實(shí)例異常退出或者多機(jī)房上線kill的情況下,盡最大可能保證本次拉下來(lái)的任務(wù)被消費(fèi)掉。最后,附上closeConsumeExecutorService實(shí)現(xiàn),覺(jué)得RocketMQ源碼這個(gè)實(shí)現(xiàn)的不錯(cuò),就借用過(guò)來(lái)了,在此表示感謝。
? public static void shutdownGracefully(ExecutorService executor, long timeout, TimeUnit timeUnit) {
? ? ? ? // Disable new tasks from being submitted.
? ? ? ? executor.shutdown();
? ? ? ? try {
? ? ? ? ? ? // Wait a while for existing tasks to terminate.
? ? ? ? ? ? if (!executor.awaitTermination(timeout, timeUnit)) {
? ? ? ? ? ? ? ? executor.shutdownNow();
? ? ? ? ? ? ? ? // Wait a while for tasks to respond to being cancelled.
? ? ? ? ? ? ? ? if (!executor.awaitTermination(timeout, timeUnit)) {
? ? ? ? ? ? ? ? ? ? LOGGER.warn(String.format("%s didn't terminate!", executor));
? ? ? ? ? ? ? ? }
? ? ? ? ? ? }
? ? ? ? } catch (InterruptedException ie) {
? ? ? ? ? ? // (Re-)Cancel if current thread also interrupted.
? ? ? ? ? ? executor.shutdownNow();
? ? ? ? ? ? // Preserve interrupt status.
? ? ? ? ? ? Thread.currentThread().interrupt();
? ? ? ? }
? ? }
下面回到使用kafka遇到的第二個(gè)問(wèn)題,怎么解決消費(fèi)者實(shí)例因?yàn)槟承┰驋斓?#xff0c;造成少量數(shù)據(jù)丟失的問(wèn)題。其實(shí),通過(guò)我們上面的寫(xiě)法,已經(jīng)不會(huì)出現(xiàn)因?yàn)槟承┰蚍?wù)實(shí)例(docker、物理機(jī))掛掉,丟數(shù)據(jù)的情況。因?yàn)槲覀兪窍壤『笙M(fèi),消費(fèi)完才手動(dòng)提交kafka確認(rèn)offset。實(shí)在還存在萬(wàn)一退出時(shí)候調(diào)用的closeConsumeExecutorService方法還沒(méi)有消費(fèi)完數(shù)據(jù),表示這個(gè)時(shí)候offset肯定沒(méi)有手動(dòng)提交,這一部分?jǐn)?shù)據(jù)也不會(huì)丟失,會(huì)在服務(wù)實(shí)例恢復(fù)了重新拉取消費(fèi)。
以上的代碼存在極小的可能瑕疵,比如,我們雙機(jī)房切換上線,某機(jī)房實(shí)例有一部分?jǐn)?shù)據(jù)沒(méi)有消費(fèi),下次會(huì)重復(fù)消費(fèi)的問(wèn)題。其實(shí),這個(gè)問(wèn)題我們?cè)跇I(yè)務(wù)上通過(guò)在配置中心配置一個(gè)標(biāo)識(shí)符來(lái)控制,當(dāng)改變標(biāo)識(shí)符控制某些機(jī)房停止拉取kafka消息,這個(gè)時(shí)候我們就可以安全操作,不擔(dān)心kafka沒(méi)有消費(fèi)完,下次重復(fù)消費(fèi)的問(wèn)題了。
————————————————
版權(quán)聲明:本文為CSDN博主「Johnniecsdn」的原創(chuàng)文章,遵循CC 4.0 BY-SA版權(quán)協(xié)議,轉(zhuǎn)載請(qǐng)附上原文出處鏈接及本聲明。
原文鏈接:https://blog.csdn.net/Johnnyz1234/article/details/98318528
總結(jié)
以上是生活随笔為你收集整理的正确处理kafka多线程消费的姿势的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。
- 上一篇: Kafka Consumer多线程实例
- 下一篇: Kafka Consumer多线程消费