日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問(wèn) 生活随笔!

生活随笔

當(dāng)前位置: 首頁(yè) > 编程资源 > 编程问答 >内容正文

编程问答

Kafka Consumer多线程消费

發(fā)布時(shí)間:2024/1/23 编程问答 26 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Kafka Consumer多线程消费 小編覺(jué)得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.
  • 概述?
  • OrdinaryConsumer類
  • ConsumerWorker.java
  • MultiThreadedConsumer.java
  • MultiThreadedRebalanceListener.java
  • Test.java

上一篇《Kafka Consumer多線程實(shí)例續(xù)篇》修正了多線程提交位移的問(wèn)題,但依然可能出現(xiàn)數(shù)據(jù)丟失的情況,原因在于多個(gè)線程可能拿到相同分區(qū)的數(shù)據(jù),而消費(fèi)的順序會(huì)破壞消息本身在分區(qū)中的順序,因而擾亂位移的提交。這次我使用KafkaConsumer的pause和resume方法來(lái)防止這種情形的發(fā)生。另外,本次我會(huì)編寫(xiě)一個(gè)測(cè)試類用于驗(yàn)證消費(fèi)相同數(shù)量消息時(shí),單線程消費(fèi)速度要遠(yuǎn)遜于多線程消費(fèi)。

回到頂部

概述?

這一次,我編寫(xiě)了5個(gè)java文件,它們分別是:

  • OrdinaryConsumer.java:普通的單線程Consumer,用于后面進(jìn)行性能測(cè)試對(duì)比用。
  • ConsumerWorker.java:多線程消息處理類,本質(zhì)上就是一個(gè)Runnable。會(huì)被提交給線程池用于實(shí)際消息處理。
  • MultiThreadedConsumer.java:多線程Consumer主控類,用于將消息分配給不同的ConsumerWorker,并且管理位移的提交。
  • MultiThreadedRebalanceListener.java:為多線程Consumer服務(wù)的Rebalance監(jiān)聽(tīng)器。
  • Test.java:用于測(cè)試單線程和多線程性能。

回到頂部

OrdinaryConsumer類

單線程的Consumer最簡(jiǎn)單,我首先給出它的代碼:

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

package?huxihx.mtc;

?

import?org.apache.kafka.clients.consumer.Consumer;

import?org.apache.kafka.clients.consumer.ConsumerConfig;

import?org.apache.kafka.clients.consumer.ConsumerRecord;

import?org.apache.kafka.clients.consumer.ConsumerRecords;

import?org.apache.kafka.clients.consumer.KafkaConsumer;

import?org.apache.kafka.common.serialization.StringDeserializer;

?

import?java.time.Duration;

import?java.util.Collections;

import?java.util.Properties;

import?java.util.concurrent.ThreadLocalRandom;

?

/**

?* 單線程Consumer

?*/

public?class?OrdinaryConsumer {

?

????private?final?Consumer<String, String> consumer;

????private?final?int?expectedCount;?// 用于測(cè)試的消息數(shù)量

?

????public?OrdinaryConsumer(String brokerId, String topic, String groupID,?int?expectedCount) {

????????Properties props =?new?Properties();

????????props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerId);

????????props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());

????????props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());

????????props.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,?"true");

????????props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, groupID);

????????props.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,?"earliest");

????????consumer =?new?KafkaConsumer<>(props);

????????consumer.subscribe(Collections.singletonList(topic));

????????this.expectedCount = expectedCount;

????}

?

????public?void?run() {

????????try?{

????????????int?alreadyConsumed =?0;

????????????while?(alreadyConsumed < expectedCount) {

????????????????ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));

????????????????alreadyConsumed += records.count();

????????????????records.forEach(this::handleRecord);

????????????}

????????}?finally?{

????????????consumer.close();

????????}

????}

?

????private?void?handleRecord(ConsumerRecord<String, String> record) {

????????try?{

????????????// 模擬每條消息10毫秒處理

????????????Thread.sleep(ThreadLocalRandom.current().nextInt(10));

????????}?catch?(InterruptedException ignored) {

????????????Thread.currentThread().interrupt();

????????}

????????System.out.println(Thread.currentThread().getName() +?" finished message processed. Record offset = "?+ record.offset());

????}

} 

?代碼很簡(jiǎn)單,沒(méi)什么可說(shuō)的。唯一要說(shuō)的是Consumer會(huì)模擬10毫秒處理一條事件。后面多線程Consumer我們也會(huì)使用相同的標(biāo)準(zhǔn)。

回到頂部

ConsumerWorker.java

接下來(lái)是消息處理的Runnable類:ConsumerWorker。和上一篇相比,這次最大的不同在于每個(gè)Worker只處理相同分區(qū)下的消息,而不是向之前那樣處理多個(gè)分區(qū)中的消息。這樣做的好處在于一旦某個(gè)分區(qū)的消息分配給了這個(gè)Worker,我可以暫停這個(gè)分區(qū)的可消費(fèi)狀態(tài),直到這個(gè)Worker全部處理完成。如果是混著多個(gè)分區(qū)的消息一起處理,實(shí)現(xiàn)這個(gè)就比較困難。ConsumerWorker代碼如下:

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

62

63

64

65

66

67

68

69

70

71

72

73

74

75

76

77

78

package?huxihx.mtc;

?

import?org.apache.kafka.clients.consumer.ConsumerRecord;

?

import?java.util.List;

import?java.util.concurrent.CompletableFuture;

import?java.util.concurrent.ThreadLocalRandom;

import?java.util.concurrent.TimeUnit;

import?java.util.concurrent.atomic.AtomicLong;

import?java.util.concurrent.locks.ReentrantLock;

?

public?class?ConsumerWorker<K, V> {

?

????private?final?List<ConsumerRecord<K, V>> recordsOfSamePartition;

????private?volatile?boolean?started =?false;

????private?volatile?boolean?stopped =?false;

????private?final?ReentrantLock lock =?new?ReentrantLock();

?

????private?final?long?INVALID_COMMITTED_OFFSET = -1L;

????private?final?AtomicLong latestProcessedOffset =?new?AtomicLong(INVALID_COMMITTED_OFFSET);

????private?final?CompletableFuture<Long> future =?new?CompletableFuture<>();

?

????public?ConsumerWorker(List<ConsumerRecord<K, V>> recordsOfSamePartition) {

????????this.recordsOfSamePartition = recordsOfSamePartition;

????}

?

????public?boolean?run() {

????????lock.lock();

????????if?(stopped)

????????????return?false;

????????started =?true;

????????lock.unlock();

????????for?(ConsumerRecord<K, V> record : recordsOfSamePartition) {

????????????if?(stopped)

????????????????break;

????????????handleRecord(record);

????????????if?(latestProcessedOffset.get() < record.offset() +?1)

????????????????latestProcessedOffset.set(record.offset() +?1);

????????}

????????return?future.complete(latestProcessedOffset.get());

????}

?

????public?long?getLatestProcessedOffset() {

????????return?latestProcessedOffset.get();

????}

?

????private?void?handleRecord(ConsumerRecord<K, V> record) {

????????try?{

????????????Thread.sleep(ThreadLocalRandom.current().nextInt(10));

????????}?catch?(InterruptedException ignored) {

????????????Thread.currentThread().interrupt();

????????}

????????System.out.println(Thread.currentThread().getName() +?" finished message processed. Record offset = "?+ record.offset());

????}

?

????public?void?close() {

????????lock.lock();

????????this.stopped =?true;

????????if?(!started) {

????????????future.complete(latestProcessedOffset.get());

????????}

????????lock.unlock();

????}

?

????public?boolean?isFinished() {

????????return?future.isDone();

????}

?

????public?long?waitForCompletion(long?timeout, TimeUnit timeUnit) {

????????try?{

????????????return?future.get(timeout, timeUnit);

????????}?catch?(Exception e) {

????????????if?(e?instanceof?InterruptedException)

????????????????Thread.currentThread().interrupt();

????????????return?INVALID_COMMITTED_OFFSET;

????????}

????}

}

需要說(shuō)明的地方有以下幾點(diǎn):

  • latestProcessedOffset:使用這個(gè)變量保存該Worker當(dāng)前已消費(fèi)的最新位移。
  • future:使用CompletableFuture來(lái)保存Worker要提交的位移。
  • Worker成功操作與否的標(biāo)志就是看這個(gè)future是否將latestProcessedOffset值封裝到結(jié)果中。
  • handleRecord和單線程Consumer中的一致,模擬10ms處理消息。

回到頂部

MultiThreadedConsumer.java

構(gòu)建好了ConsumerWorker類之后,下面是編寫(xiě)多線程Consumer的主控類,該類循環(huán)執(zhí)行:1、創(chuàng)建Consumer;2、讀取訂閱分區(qū)的消息;3、將消息按照不同分區(qū)進(jìn)行歸組分發(fā)給不同的線程;4、暫停這些分區(qū)的后續(xù)消費(fèi),同時(shí)等待Worker線程完成消息處理;5、提交這些分區(qū)的位移;6、恢復(fù)這些分區(qū)的消費(fèi)。

以下代碼是MultiThreadedConsumer類的完整代碼:

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

62

63

64

65

66

67

68

69

70

71

72

73

74

75

76

77

78

79

80

81

82

83

84

85

86

87

88

89

90

91

92

93

94

95

96

97

98

99

100

101

102

103

104

105

106

107

108

109

110

111

112

113

114

115

116

117

118

119

120

121

122

package?huxihx.mtc;

?

import?org.apache.kafka.clients.consumer.Consumer;

import?org.apache.kafka.clients.consumer.ConsumerConfig;

import?org.apache.kafka.clients.consumer.ConsumerRecord;

import?org.apache.kafka.clients.consumer.ConsumerRecords;

import?org.apache.kafka.clients.consumer.KafkaConsumer;

import?org.apache.kafka.clients.consumer.OffsetAndMetadata;

import?org.apache.kafka.common.TopicPartition;

import?org.apache.kafka.common.serialization.StringDeserializer;

?

import?java.time.Duration;

import?java.util.Collections;

import?java.util.HashMap;

import?java.util.HashSet;

import?java.util.List;

import?java.util.Map;

import?java.util.Properties;

import?java.util.Set;

import?java.util.concurrent.CompletableFuture;

import?java.util.concurrent.Executor;

import?java.util.concurrent.Executors;

?

public?class?MultiThreadedConsumer {

?

????private?final?Map<TopicPartition, ConsumerWorker<String, String>> outstandingWorkers =?new?HashMap<>();

????private?final?Map<TopicPartition, OffsetAndMetadata> offsetsToCommit =?new?HashMap<>();

????private?long?lastCommitTime = System.currentTimeMillis();

????private?final?Consumer<String, String> consumer;

????private?final?int?DEFAULT_COMMIT_INTERVAL =?3000;

????private?final?Map<TopicPartition, Long> currentConsumedOffsets =?new?HashMap<>();

????private?final?long?expectedCount;

?

????private?final?static?Executor executor = Executors.newFixedThreadPool(

????????????Runtime.getRuntime().availableProcessors() *?10, r -> {

????????????????Thread t =?new?Thread(r);

????????????????t.setDaemon(true);

????????????????return?t;

????????????});

?

????public?MultiThreadedConsumer(String brokerId, String topic, String groupID,?long?expectedCount) {

????????Properties props =?new?Properties();

????????props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerId);

????????props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());

????????props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());

????????props.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,?"false");

????????props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, groupID);

????????props.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,?"earliest");

????????consumer =?new?KafkaConsumer<>(props);

????????consumer.subscribe(Collections.singletonList(topic),?new?MultiThreadedRebalanceListener(consumer, outstandingWorkers, offsetsToCommit));

????????this.expectedCount = expectedCount;

????}

?

????public?void?run() {

????????try?{

????????????while?(true) {

????????????????ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));

????????????????distributeRecords(records);

????????????????checkOutstandingWorkers();

????????????????commitOffsets();

????????????????if?(currentConsumedOffsets.values().stream().mapToLong(Long::longValue).sum() >= expectedCount) {

????????????????????break;

????????????????}

????????????}

????????}?finally?{

????????????consumer.close();

????????}

????}

?

????/**

?????* 對(duì)已完成消息處理并提交位移的分區(qū)執(zhí)行resume操作

?????*/

????private?void?checkOutstandingWorkers() {

????????Set<TopicPartition> completedPartitions =?new?HashSet<>();

????????outstandingWorkers.forEach((tp, worker) -> {

????????????if?(worker.isFinished()) {

????????????????completedPartitions.add(tp);

????????????}

????????????long?offset = worker.getLatestProcessedOffset();

????????????currentConsumedOffsets.put(tp, offset);

????????????if?(offset > 0L) {

????????????????offsetsToCommit.put(tp,?new?OffsetAndMetadata(offset));

????????????}

????????});

????????completedPartitions.forEach(outstandingWorkers::remove);

????????consumer.resume(completedPartitions);

????}

?

????/**

?????* 提交位移

?????*/

????private?void?commitOffsets() {

????????try?{

????????????long?currentTime = System.currentTimeMillis();

????????????if?(currentTime - lastCommitTime > DEFAULT_COMMIT_INTERVAL && !offsetsToCommit.isEmpty()) {

????????????????consumer.commitSync(offsetsToCommit);

????????????????offsetsToCommit.clear();

????????????}

????????????lastCommitTime = currentTime;

????????}?catch?(Exception e) {

????????????e.printStackTrace();

????????}

????}

?

????/**

?????* 將不同分區(qū)的消息交由不同的線程,同時(shí)暫停該分區(qū)消息消費(fèi)

?????* @param records

?????*/

????private?void?distributeRecords(ConsumerRecords<String, String> records) {

????????if?(records.isEmpty())

????????????return;

????????Set<TopicPartition> pausedPartitions =?new?HashSet<>();

????????records.partitions().forEach(tp -> {

????????????List<ConsumerRecord<String, String>> partitionedRecords = records.records(tp);

????????????pausedPartitions.add(tp);

????????????final?ConsumerWorker<String, String> worker =?new?ConsumerWorker<>(partitionedRecords);

????????????CompletableFuture.supplyAsync(worker::run, executor);

????????????outstandingWorkers.put(tp, worker);

????????});

????????consumer.pause(pausedPartitions);

????}

}  

?該類代碼需要說(shuō)明的地方包括:

  • executor:我創(chuàng)建了一個(gè)包含10倍CPU核數(shù)的線程數(shù)。具體線程數(shù)根據(jù)你自己的業(yè)務(wù)需求而定。如果你的事件處理邏輯是I/O密集型操作(比如寫(xiě)入外部系統(tǒng)),那么設(shè)置一個(gè)大一點(diǎn)的線程數(shù)通常都是有意義的。當(dāng)然,我個(gè)人覺(jué)得最好不要超過(guò)Consumer分配到的總分區(qū)數(shù)。
  • 一定要將自動(dòng)提交位移的參數(shù)設(shè)置為false。多線程Consumer的一個(gè)關(guān)鍵設(shè)計(jì)就是要手動(dòng)提交位移。
  • Rebalance監(jiān)聽(tīng)器設(shè)置為MultiThreadedRebalanceListener。這個(gè)類如何響應(yīng)分區(qū)的回收與分配我們稍后討論。
  • run方法的邏輯基本上遵循了上面提到的流程:消息獲取 -> 分發(fā) -> 檢查消費(fèi)進(jìn)度 -> 提交位移
  • expectedCount:這是為了后面進(jìn)行性能測(cè)試比對(duì)用到的總消息消費(fèi)數(shù)。

回到頂部

MultiThreadedRebalanceListener.java

多線程Consumer在Rebalance操作開(kāi)啟后要小心處理。首先,主線程的poll方法與Worker線程處理消息是并行執(zhí)行的。此時(shí)如果發(fā)生Rebalance,那么有些分區(qū)就會(huì)被分配給其他Consumer,但Worker線程依然可能正在處理這些分區(qū)。因此,就可能出現(xiàn)這樣的場(chǎng)景:兩個(gè)Consumer都會(huì)處理這些分區(qū)中的消息。這就破壞了消費(fèi)者組的設(shè)計(jì)理念。針對(duì)這種情況,我們必須要確保要被回收的那些分區(qū)的處理必須首先完成,之后才能被重新分配。

總體而言,在要回收分區(qū)前,多線程Consumer必須完成:

  • 停止對(duì)應(yīng)的Worker線程
  • 提交位移
  • 當(dāng)然,一旦分區(qū)被重新分配后,事情就變得簡(jiǎn)單了,我們調(diào)用resume恢復(fù)這些分區(qū)的可消費(fèi)狀態(tài)即可。如果這些分區(qū)之前就是可以消費(fèi)的,那么調(diào)用resume方法就沒(méi)有任何效果,總之是一個(gè)“無(wú)害”操作。MultiThreadedRebalanceListener類完整代碼如下:

    1

    2

    3

    4

    5

    6

    7

    8

    9

    10

    11

    12

    13

    14

    15

    16

    17

    18

    19

    20

    21

    22

    23

    24

    25

    26

    27

    28

    29

    30

    31

    32

    33

    34

    35

    36

    37

    38

    39

    40

    41

    42

    43

    44

    45

    46

    47

    48

    49

    50

    51

    52

    53

    54

    55

    56

    57

    58

    59

    60

    61

    62

    63

    64

    package?huxihx.mtc;

    ?

    import?org.apache.kafka.clients.consumer.Consumer;

    import?org.apache.kafka.clients.consumer.ConsumerRebalanceListener;

    import?org.apache.kafka.clients.consumer.OffsetAndMetadata;

    import?org.apache.kafka.common.TopicPartition;

    ?

    import?java.util.Collection;

    import?java.util.HashMap;

    import?java.util.Map;

    import?java.util.concurrent.TimeUnit;

    ?

    public?class?MultiThreadedRebalanceListener?implements?ConsumerRebalanceListener {

    ?

    ????private?final?Consumer<String, String> consumer;

    ????private?final?Map<TopicPartition, ConsumerWorker<String, String>> outstandingWorkers;

    ????private?final?Map<TopicPartition, OffsetAndMetadata> offsets;

    ?

    ????public?MultiThreadedRebalanceListener(Consumer<String, String> consumer,

    ??????????????????????????????????????????Map<TopicPartition, ConsumerWorker<String, String>> outstandingWorkers,

    ??????????????????????????????????????????Map<TopicPartition, OffsetAndMetadata> offsets) {

    ????????this.consumer = consumer;

    ????????this.outstandingWorkers = outstandingWorkers;

    ????????this.offsets = offsets;

    ????}

    ?

    ????@Override

    ????public?void?onPartitionsRevoked(Collection<TopicPartition> partitions) {

    ????????Map<TopicPartition, ConsumerWorker<String, String>> stoppedWorkers =?new?HashMap<>();

    ????????for?(TopicPartition tp : partitions) {

    ????????????ConsumerWorker<String, String> worker = outstandingWorkers.remove(tp);

    ????????????if?(worker !=?null) {

    ????????????????worker.close();

    ????????????????stoppedWorkers.put(tp, worker);

    ????????????}

    ????????}

    ?

    ????????stoppedWorkers.forEach((tp, worker) -> {

    ????????????long?offset = worker.waitForCompletion(1, TimeUnit.SECONDS);

    ????????????if?(offset > 0L) {

    ????????????????offsets.put(tp,?new?OffsetAndMetadata(offset));

    ????????????}

    ????????});

    ?

    ????????Map<TopicPartition, OffsetAndMetadata> revokedOffsets =?new?HashMap<>();

    ????????partitions.forEach(tp -> {

    ????????????OffsetAndMetadata offset = offsets.remove(tp);

    ????????????if?(offset !=?null) {

    ????????????????revokedOffsets.put(tp, offset);

    ????????????}

    ????????});

    ?

    ????????try?{

    ????????????consumer.commitSync(revokedOffsets);

    ????????}?catch?(Exception e) {

    ????????????e.printStackTrace();

    ????????}

    ????}

    ?

    ????@Override

    ????public?void?onPartitionsAssigned(Collection<TopicPartition> partitions) {

    ????????consumer.resume(partitions);

    ????}

    }

    該類代碼需要說(shuō)明的地方包括:

    • 任何Rebalance監(jiān)聽(tīng)器都要實(shí)現(xiàn)ConsumerRebalanceListener接口。
    • 該類定義了3個(gè)字段,分別保存Consumer實(shí)例、要停掉的Worker線程實(shí)例以及要提交的位移數(shù)據(jù)。
    • 主要的邏輯在onPartitionsRevoked方法中實(shí)現(xiàn)。第一步是停掉Worker線程;第二步是手動(dòng)提交位移。

    回到頂部

    Test.java

    說(shuō)完了以上4個(gè)Java類之后,現(xiàn)在我們編寫(xiě)一個(gè)測(cè)試類來(lái)比較單線程Consumer和多線程Consumer的性能對(duì)比。首先我們創(chuàng)建一個(gè)topic,50個(gè)分區(qū),單副本,并使用kafka-producer-perf-test工具創(chuàng)建5萬(wàn)條消息,每個(gè)分區(qū)1000條。之后編寫(xiě)如下代碼分別測(cè)試兩個(gè)Consumer的消費(fèi)耗時(shí):

    1

    2

    3

    4

    5

    6

    7

    8

    9

    10

    11

    12

    13

    14

    15

    16

    17

    18

    19

    20

    21

    22

    23

    package?huxihx.mtc;

    ?

    public?class?Test {

    ????public?static?void?main(String[] args)?throws?InterruptedException {

    ????????int?expectedCount =?50?*?900;

    ????????String brokerId =?"localhost:9092";

    ????????String groupId =?"test-group";

    ????????String topic =?"test";

    ?

    ????????OrdinaryConsumer consumer =?new?OrdinaryConsumer(brokerId, topic, groupId +?"-single", expectedCount);

    ????????long?start = System.currentTimeMillis();

    ????????consumer.run();

    ????????System.out.println("Single-threaded consumer costs "?+ (System.currentTimeMillis() - start));

    ?

    ????????Thread.sleep(1L);

    ?

    ????????MultiThreadedConsumer multiThreadedConsumer =

    ????????????????new?MultiThreadedConsumer(brokerId, topic, groupId +?"-multi", expectedCount);

    ????????start = System.currentTimeMillis();

    ????????multiThreadedConsumer.run();

    ????????System.out.println("Multi-threaded consumer costs "?+ (System.currentTimeMillis() - start));

    ????}

    }

    最后結(jié)果顯示。單線程Consumer消費(fèi)45000條消息共耗時(shí)232秒,而多線程Consumer耗時(shí)6.2秒,如下:

    Single-threaded consumer costs 232336

    Multi-threaded consumer costs 6246

    顯然,采用多線程Consumer的消費(fèi)性能大約是單線程Consumer的37倍。當(dāng)然實(shí)際的提升效果依具體環(huán)境而定。不過(guò)結(jié)論是肯定的,多線程Consumer在CPU核數(shù)很多且消息處理邏輯為I/O密集型操作的情形下會(huì)比單線程Consumer表現(xiàn)更好。

    總結(jié)

    以上是生活随笔為你收集整理的Kafka Consumer多线程消费的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。

    如果覺(jué)得生活随笔網(wǎng)站內(nèi)容還不錯(cuò),歡迎將生活随笔推薦給好友。