日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

kafka 主动消费_Kafka核心API——Consumer消费者

發布時間:2025/3/15 编程问答 41 豆豆
生活随笔 收集整理的這篇文章主要介紹了 kafka 主动消费_Kafka核心API——Consumer消费者 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

Consumer之自動提交

在上文中介紹了Producer API的使用,現在我們已經知道如何將消息通過API發送到Kafka中了,那么現在的生產者/消費者模型就還差一位扮演消費者的角色了。因此,本文將介紹Consumer API的使用,使用API從Kafka中消費消息,讓應用成為一個消費者角色。

還是老樣子,首先我們得創建一個Consumer實例,并指定相關配置項,有了這個實例對象后我們才能進行其他的操作。代碼示例:

/**

* 創建Consumer實例

*/

public static Consumer createConsumer() {

Properties props = new Properties();

// 指定Kafka服務的ip地址及端口

props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "127。0.0.1:9092");

// 指定group.id,Kafka中的消費者需要在消費者組里

props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "test");

// 是否開啟自動提交

props.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");

// 自動提交的間隔,單位毫秒

props.setProperty(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");

// 消息key的序列化器

props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,

"org.apache.kafka.common.serialization.StringDeserializer");

// 消息value的序列化器

props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,

"org.apache.kafka.common.serialization.StringDeserializer");

return new KafkaConsumer<>(props);

}

在以上代碼中,可以看到設置了group.id這個配置項,這是一個Consumer的必要配置項,因為在Kafka中,Consumer需要位于一個Consumer Group里。具體如下圖所示:

在上圖中是一個Consumer消費一個Partition,是一對一的關系。但Consumer Group里可以只有一個Consumer,此時該Consumer可以消費多個Partition,是一對多的關系。如下圖所示:

一個Consumer可以只消費一個Partition,也可以消費多個Partition,但需要注意的是多個Consumer不能消費同一個Partition:

總結一下Consumer的注意事項:

單個Partition的消息只能由Consumer Group中的某個Consumer來消費

Consumer從Partition中消費消息是順序的,默認從頭開始消費

如果Consumer Group中只有一個Consumer,那么這個Consumer會消費所有Partition中的消息

在Kafka中,當消費者消費數據后,需要提交數據的offset來告知服務端成功消費了哪些數據。然后服務端就會移動數據的offset,下一次消費的時候就是從移動后的offset位置開始消費。

這樣可以在一定程度上保證數據是被消費成功的,并且由于數據不會被刪除,而只是移動數據的offset,這也保證了數據不易丟失。若消費者處理數據失敗時,只要不提交相應的offset,就可以在下一次重新進行消費。

和數據庫的事務一樣,Kafka消費者提交offset的方式也有兩種,分別是自動提交和手動提交。在本例中演示的是自動提交,這也是消費數據最簡單的方式。代碼示例:

/**

* 演示自動提交offset

*/

public static void autoCommitOffset() {

Consumer consumer = createConsumer();

List topics = List.of("MyTopic");

// 訂閱一個或多個Topic

consumer.subscribe(topics);

while (true) {

// 從Topic中拉取數據,每1000毫秒拉取一次

ConsumerRecords records = consumer.poll(Duration.ofMillis(1000));

// 每次拉取可能都是一組數據,需要遍歷出來

for (ConsumerRecord record : records) {

System.out.printf("partition = %d, offset = %d, key = %s, value = %s%n",

record.partition(), record.offset(), record.key(), record.value());

}

}

}

Consumer之手動提交

自動提交的方式是最簡單的,但不建議在實際生產中使用,因為可控性不高。所以更多時候我們使用的是手動提交,但想要使用手動提交,就需要先關閉自動提交,修改配置項如下:

props.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");

關閉了自動提交后,就得在代碼中調用commit相關的方法來提交offset,主要就是兩個方法:commitAsync和commitSync,看方法名也知道一個是異步提交一個是同步提交。

這里以commitAsync為例,實現思路主要是在發生異常的時候不要調用commitAsync方法,而在正常執行完畢后才調用commitAsync方法。代碼示例:

/**

* 演示手動提交offset

*/

public static void manualCommitOffset() {

Consumer consumer = createConsumer();

List topics = List.of("MyTopic");

// 訂閱一個或多個Topic

consumer.subscribe(topics);

while (true) {

// 從Topic中拉取數據,每1000毫秒拉取一次

ConsumerRecords records = consumer.poll(Duration.ofMillis(1000));

// 每次拉取可能都是一組數據,需要遍歷出來

for (ConsumerRecord record : records) {

try {

// 模擬將數據寫入數據庫

Thread.sleep(1000);

System.out.println("save to db...");

System.out.printf("partition = %d, offset = %d, key = %s, value = %s%n",

record.partition(), record.offset(), record.key(), record.value());

} catch (Exception e) {

// 寫入失敗則不要調用commit,這樣就相當于起到回滾的作用,

// 下次消費還是從之前的offset開始消費

e.printStackTrace();

return;

}

}

// 寫入成功則調用commit相關方法去手動提交offset

consumer.commitAsync();

}

}

##針對Partition提交offset

在前文中有介紹到,一個Consumer Group里可以只有一個Consumer,該Consumer可以消費多個Partition。在這種場景下,我們可能會在Consumer中開啟多線程去處理多個Partition中的數據,以提高性能。

為了防止某些Partition里的數據消費成功,而某些Partition里的數據消費失敗,卻都一并提交了offset。我們就需要針對單個Partition去提交offset,也就是將offset的提交粒度控制在Partition級別。

這里先簡單演示一下如何針對單個Partition提交offset,代碼示例:

/**

* 演示手動提交單個Partition的offset

*/

public static void manualCommitOffsetWithPartition() {

Consumer consumer = createConsumer();

List topics = List.of("MyTopic");

// 訂閱一個或多個Topic

consumer.subscribe(topics);

while (true) {

// 從Topic中拉取數據,每1000毫秒拉取一次

ConsumerRecords records = consumer.poll(Duration.ofMillis(1000));

// 單獨處理每一個Partition中的數據

for (TopicPartition partition : records.partitions()) {

System.out.println("======partition: " + partition + " start======");

// 從Partition中取出數據

List> partitionRecords = records.records(partition);

for (ConsumerRecord record : partitionRecords) {

try {

// 模擬將數據寫入數據庫

Thread.sleep(1000);

System.out.println("save to db...");

System.out.printf("partition = %d, offset = %d, key = %s, value = %s%n",

record.partition(), record.offset(), record.key(), record.value());

} catch (Exception e) {

// 發生異常直接結束,不提交offset

e.printStackTrace();

return;

}

}

// 執行成功則取出當前消費到的offset

long lastOffset = partitionRecords.get(partitionRecords.size() - 1).offset();

// 由于下一次開始消費的位置是最后一次offset+1的位置,所以這里要+1

OffsetAndMetadata metadata = new OffsetAndMetadata(lastOffset + 1);

// 針對Partition提交offset

Map offsets = new HashMap<>();

offsets.put(partition, metadata);

// 同步提交offset

consumer.commitSync(offsets);

System.out.println("======partition: " + partition + " end======");

}

}

}

Consumer針對一個或多個Partition進行訂閱

在之前的例子中,我們都是針對Topic去訂閱并消費數據,實際上也可以更細粒度一些針對Partition進行訂閱,這通常應用在一個Consumer多線程消費的場景下。代碼示例:

/**

* 演示將訂閱粒度控制到Partition級別

* 針對單個或多個Partition進行訂閱

*/

public static void manualCommitOffsetWithPartition2() {

Consumer consumer = createConsumer();

// 該Topic中有兩個Partition

TopicPartition p0 = new TopicPartition("MyTopic", 0);

TopicPartition p1 = new TopicPartition("MyTopic", 1);

// 訂閱該Topic下的一個Partition

consumer.assign(List.of(p0));

// 也可以訂閱該Topic下的多個Partition

// consumer.assign(List.of(p0, p1));

while (true) {

...與上一小節中的代碼一致,略...

}

}

Consumer多線程并發處理

前面兩個小節的內容基本都是為了本小節所介紹的多線程并發處理消息而鋪墊的,因為為了提高應用對消息的處理效率,我們通常會使用多線程來并行消費消息,從而加快消息的處理速度。

而多線程處理消息的方式主要有兩種,一種是按Partition數量創建線程,然后每個線程里創建一個Consumer,多個Consumer對多個Partition進行消費。這就和之前在介紹Consumer Group時,給出的那張圖所展示的一樣:

這種屬于是經典模式,實現起來也比較簡單,適用于對消息的順序和offset控制有要求的場景。代碼示例:

package com.zj.study.kafka.consumer;

import org.apache.kafka.clients.consumer.ConsumerRecord;

import org.apache.kafka.clients.consumer.ConsumerRecords;

import org.apache.kafka.clients.consumer.KafkaConsumer;

import org.apache.kafka.clients.consumer.OffsetAndMetadata;

import org.apache.kafka.common.TopicPartition;

import org.apache.kafka.common.errors.WakeupException;

import java.time.Duration;

import java.util.Collections;

import java.util.List;

import java.util.Properties;

import java.util.concurrent.atomic.AtomicBoolean;

/**

* 經典模式

*

* @author 01

* @date 2020-05-21

**/

public class ConsumerThreadSample {

private final static String TOPIC_NAME = "MyTopic";

/**

* 這種類型是經典模式,每一個線程單獨創建一個KafkaConsumer,用于保證線程安全

*/

public static void main(String[] args) throws InterruptedException {

KafkaConsumerRunner r1 = new KafkaConsumerRunner();

Thread t1 = new Thread(r1);

t1.start();

Thread.sleep(15000);

r1.shutdown();

}

public static class KafkaConsumerRunner implements Runnable {

private final AtomicBoolean closed = new AtomicBoolean(false);

private final KafkaConsumer consumer;

public KafkaConsumerRunner() {

Properties props = new Properties();

props.put("bootstrap.servers", "192.168.220.128:9092");

props.put("group.id", "test");

props.put("enable.auto.commit", "false");

props.put("auto.commit.interval.ms", "1000");

props.put("session.timeout.ms", "30000");

props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

consumer = new KafkaConsumer<>(props);

TopicPartition p0 = new TopicPartition(TOPIC_NAME, 0);

TopicPartition p1 = new TopicPartition(TOPIC_NAME, 1);

consumer.assign(List.of(p0, p1));

}

@Override

public void run() {

try {

while (!closed.get()) {

//處理消息

ConsumerRecords records = consumer.poll(Duration.ofMillis(10000));

for (TopicPartition partition : records.partitions()) {

List> pRecord = records.records(partition);

// 處理每個分區的消息

for (ConsumerRecord record : pRecord) {

System.out.printf("patition = %d , offset = %d, key = %s, value = %s%n",

record.partition(), record.offset(), record.key(), record.value());

}

// 返回去告訴kafka新的offset

long lastOffset = pRecord.get(pRecord.size() - 1).offset();

// 注意加1

consumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(lastOffset + 1)));

}

}

} catch (WakeupException e) {

if (!closed.get()) {

throw e;

}

} finally {

consumer.close();

}

}

public void shutdown() {

closed.set(true);

consumer.wakeup();

}

}

}

另一種多線程的消費方式則是在一個線程池中只創建一個Consumer實例,然后通過這個Consumer去拉取數據后交由線程池中的線程去處理。如下圖所示:

但需要注意的是在這種模式下我們無法手動控制數據的offset,也無法保證數據的順序性,所以通常應用在流處理場景,對數據的順序和準確性要求不高。

經過之前的例子,我們知道每拉取一次數據返回的就是一個ConsumerRecords,這里面存放了多條數據。然后我們對ConsumerRecords進行迭代,就可以將多條數據交由線程池中的多個線程去并行處理了。代碼示例:

package com.zj.study.kafka.consumer;

import org.apache.kafka.clients.consumer.ConsumerRecord;

import org.apache.kafka.clients.consumer.ConsumerRecords;

import org.apache.kafka.clients.consumer.KafkaConsumer;

import java.util.List;

import java.util.Properties;

import java.util.concurrent.ArrayBlockingQueue;

import java.util.concurrent.ExecutorService;

import java.util.concurrent.ThreadPoolExecutor;

import java.util.concurrent.TimeUnit;

/**

* 一個Consumer,多個hander模式

*

* @author 01

* @date 2020-05-21

**/

public class ConsumerRecordThreadSample {

private final static String TOPIC_NAME = "MyTopic";

public static void main(String[] args) throws InterruptedException {

String brokerList = "192.168.220.128:9092";

String groupId = "test";

int workerNum = 5;

ConsumerExecutor consumers = new ConsumerExecutor(brokerList, groupId, TOPIC_NAME);

consumers.execute(workerNum);

Thread.sleep(1000000);

consumers.shutdown();

}

/**

* Consumer處理

*/

public static class ConsumerExecutor {

private final KafkaConsumer consumer;

private ExecutorService executors;

public ConsumerExecutor(String brokerList, String groupId, String topic) {

Properties props = new Properties();

props.put("bootstrap.servers", brokerList);

props.put("group.id", groupId);

props.put("enable.auto.commit", "true");

props.put("auto.commit.interval.ms", "1000");

props.put("session.timeout.ms", "30000");

props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

consumer = new KafkaConsumer<>(props);

consumer.subscribe(List.of(topic));

}

public void execute(int workerNum) {

executors = new ThreadPoolExecutor(workerNum, workerNum, 0L, TimeUnit.MILLISECONDS,

new ArrayBlockingQueue<>(1000), new ThreadPoolExecutor.CallerRunsPolicy());

while (true) {

ConsumerRecords records = consumer.poll(200);

for (final ConsumerRecord record : records) {

executors.submit(new ConsumerRecordWorker(record));

}

}

}

public void shutdown() {

if (consumer != null) {

consumer.close();

}

if (executors != null) {

executors.shutdown();

}

try {

if (executors != null && !executors.awaitTermination(10, TimeUnit.SECONDS)) {

System.out.println("Timeout.... Ignore for this case");

}

} catch (InterruptedException ignored) {

System.out.println("Other thread interrupted this shutdown, ignore for this case.");

Thread.currentThread().interrupt();

}

}

}

/**

* 記錄處理

*/

public static class ConsumerRecordWorker implements Runnable {

private ConsumerRecord record;

public ConsumerRecordWorker(ConsumerRecord record) {

this.record = record;

}

@Override

public void run() {

// 假如說數據入庫操作

System.out.println("Thread - " + Thread.currentThread().getName());

System.err.printf("patition = %d , offset = %d, key = %s, value = %s%n",

record.partition(), record.offset(), record.key(), record.value());

}

}

}

Consumer控制offset起始位置

上一小節中介紹的第二種多線程消息模式,通過Consumer拉取數據后交由多線程去處理是沒法控制offset的,如果此時程序出現錯誤或其他意外情況導致消息沒有被正確消費,我們就需要人為控制offset的起始位置重新進行消費。

通過調用seek方法可以指定從哪個Partition的哪個offset位置進行消費,代碼示例:

/**

* 手動控制offset的起始位置

*/

public static void manualCommitOffsetWithPartition2() {

Consumer consumer = createConsumer();

TopicPartition p0 = new TopicPartition("MyTopic", 0);

consumer.assign(List.of(p0));

// 指定offset的起始位置

consumer.seek(p0, 1);

while (true) {

...與上一小節中的代碼一致,略...

}

}

實際應用中的設計思路:

第一次從某個offset的起始位置進行消費

如果本次消費了100條數據,那么offset設置為101并存入Redis等緩存數據庫中

后續每次poll之前,從Redis中獲取offset值,然后從這個offset的起始位置進行消費

消費完后,再次將新的offset值存入Redis,周而復始

Consumer限流

為了避免Kafka中的流量劇增導致過大的流量打到Consumer端將Consumer給壓垮的情況,我們就需要針對Consumer進行限流。例如,當處理的數據量達到某個閾值時暫停消費,低于閾值時則恢復消費,這就可以讓Consumer保持一定的速率去消費數據,從而避免流量劇增時將Consumer給壓垮。大體思路如下:

在poll到數據之后,先去令牌桶中拿取令牌

如果獲取到令牌,則繼續業務處理

如果獲取不到令牌,則調用pause方法暫停Consumer,等待令牌

當令牌桶中的令牌足夠,則調用resume方法恢復Consumer的消費狀態

接下來編寫具體的代碼案例簡單演示一下這個限流思路,令牌桶算法使用Guava里內置的,所以需要在項目中添加對Guava的依賴。添加的依賴項如下:

com.google.guava

guava

29.0-jre

然后我們就可以使用Guava的限流器對Consumer進行限流了,代碼示例:

public class ConsumerCurrentLimiting {

/*** 令牌生成速率,單位為秒 */

public static final int permitsPerSecond = 1;

/*** 限流器 */

private static final RateLimiter LIMITER = RateLimiter.create(permitsPerSecond);

/**

* 創建Consumer實例

*/

public static Consumer createConsumer() {

... 與之前小節的代碼類似,略 ...

}

/**

* 演示對Consumer限流

*/

public static void currentLimiting() {

Consumer consumer = createConsumer();

TopicPartition p0 = new TopicPartition("MyTopic", 0);

TopicPartition p1 = new TopicPartition("MyTopic", 1);

consumer.assign(List.of(p0, p1));

while (true) {

// 從Topic中拉取數據,每100毫秒拉取一次

ConsumerRecords records = consumer.poll(Duration.ofMillis(1));

if (records.isEmpty()) {

continue;

}

// 限流

if (!LIMITER.tryAcquire()) {

System.out.println("無法獲取到令牌,暫停消費");

consumer.pause(List.of(p0, p1));

} else {

System.out.println("獲取到令牌,恢復消費");

consumer.resume(List.of(p0, p1));

}

// 單獨處理每一個Partition中的數據

for (TopicPartition partition : records.partitions()) {

System.out.println("======partition: " + partition + " start======");

// 從Partition中取出數據

List> partitionRecords = records.records(partition);

for (ConsumerRecord record : partitionRecords) {

try {

// 模擬將數據寫入數據庫

Thread.sleep(1000);

System.out.println("save to db...");

System.out.printf("partition = %d, offset = %d, key = %s, value = %s%n",

record.partition(), record.offset(), record.key(), record.value());

} catch (Exception e) {

// 發生異常直接結束,不提交offset

e.printStackTrace();

return;

}

}

// 執行成功則取出當前消費到的offset

long lastOffset = partitionRecords.get(partitionRecords.size() - 1).offset();

// 由于下一次開始消費的位置是最后一次offset+1的位置,所以這里要+1

OffsetAndMetadata metadata = new OffsetAndMetadata(lastOffset + 1);

// 針對Partition提交offset

Map offsets = new HashMap<>();

offsets.put(partition, metadata);

// 同步提交offset

consumer.commitSync(offsets);

System.out.println("======partition: " + partition + " end======");

}

}

}

public static void main(String[] args) {

currentLimiting();

}

}

Consumer Rebalance解析

Consumer有個Rebalance的特性,即重新負載均衡,該特性依賴于一個協調器來實現。每當Consumer Group中有Consumer退出或有新的Consumer加入都會觸發Rebalance。

之所以要重新負載均衡,是為了將退出的Consumer所負責處理的數據再重新分配到組內的其他Consumer上進行處理。或當有新加入的Consumer時,將組內其他Consumer的負載壓力,重新進均勻分配,而不會說新加入一個Consumer就閑在那。

下面就用幾張圖簡單描述一下,各種情況觸發Rebalance時,組內成員是如何與協調器進行交互的。

1、新成員加入組(member join):

Tips:圖中的Coordinator是協調器,而generation則類似于樂觀鎖中的版本號,每當成員入組成功就會更新,也是起到一個并發控制的作用

2、組成員崩潰/非正常退出(member failure):

3、組成員主動離組/正常退出(member leave group):

4、當Consumer提交位移(member commit offset)時,也會有類似的交互過程:

總結

以上是生活随笔為你收集整理的kafka 主动消费_Kafka核心API——Consumer消费者的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。

主站蜘蛛池模板: 国产精品卡一卡二 | 美女网站黄频 | 日韩一级片在线观看 | 骚虎av在线 | 日韩精品v | 精品人妻久久久久久888不卡 | 午夜免费网址 | 激情黄色av | 在线观看av一区 | 在线观看污 | 国产精品网友自拍 | 久久久老司机 | 在线免费观看黄色小视频 | 免费观看日韩毛片 | 蜜桃av噜噜一区二区三区 | 亚洲黄页 | 91久久一区 | 中文字幕在线观看国产 | 爱情岛亚洲品质自拍极速福利网站 | 在线观看中文字幕一区 | 久久国产一级 | 女王脚交玉足榨精调教 | 色在线视频 | 日韩美女视频在线 | 激情伊人网 | 欧美日韩高清一区二区 | 伊人99re | 国产在线观看xxx | 阿v天堂2014 这里有精品 | www.日韩高清 | 在线观看视频一区二区 | 欧美视频在线观看 | 亚洲网站在线观看 | 国产女18毛片多18精品 | 黄色一大片 | 哺乳期喷奶水丰满少妇 | 欧美自拍偷拍第一页 | 懂色av一区二区三区四区 | 超碰在线免费播放 | 国产不卡av在线播放 | 一起草av在线 | 国产黄色片在线免费观看 | 青青草原综合久久大伊人精品 | 日韩欧美爱爱 | 成人av免费在线看 | 黄色一级片免费看 | 在线免费观看污视频 | av永久 | 天天干天 | 亚洲美女色| 国产成人在线播放视频 | 一区二区三区www污污污网站 | 国产欧美精品区一区二区三区 | 毛片毛片毛片毛片毛片毛片毛片毛片毛片毛片 | 熟妇高潮一区二区三区 | 国产视频福利在线 | 亚洲第一页夜 | 污视频网站免费 | 亚洲欧美日本一区二区 | 一区二区三区视频 | 亚洲夜色| 亚洲精品1区2区3区 国产丝袜网站 | 国产乱子伦精品视频 | av影院在线播放 | 尤物国产在线 | 女同性做爰全过程 | 在线观看污视频网站 | 国产呦小j女精品视频 | 少妇闺蜜换浪荡h肉辣文 | 亚洲字幕av一区二区三区四区 | 青青草狠狠干 | 精品国产97 | 奇米影视四色777 | 97香蕉超级碰碰久久免费软件 | 午夜精品福利一区二区蜜股av | 欧美色悠悠| 丰满熟妇肥白一区二区在线 | 久操视频在线播放 | 超碰天天| 欧美亚洲国产日韩 | 欧美激情一区二区三区在线 | 三级三级久久三级久久 | 亚洲国产日本 | 影视av | 亚洲视频自拍 | 久久精品国产99国产 | 美国毛片av | 久久98| 一级黄色片在线播放 | 欧美日视频 | 国产一区二区在线电影 | 亚洲天堂男人的天堂 | 国产一区二区三区免费观看 | 特级西西www444人体聚色 | 国产又粗又大又爽 | 久久av免费看 | 手机看片1024在线 | 亚洲一区二区美女 | 97超碰在线免费观看 |