Kafka消费者详解
一.基本概念
1.消費者和消費組
Kafka消費者是消費組的一部分,當多個消費者形成一個消費組來消費主題時,每個消費者會收到不同分區的消息。假設有一個T1主題,該主題有4個分區;同時我們有一個消費組G1,這個消費組只有一個消費者C1。那么消費者C1將會收到這4個分區的消息,如下所示:
?
Kafka一個很重要的特性就是,只需寫入一次消息,可以支持任意多的應用讀取這個消息。換句話說,每個應用都可以讀到全量的消息。為了使得每個應用都能讀到全量消息,應用需要有不同的消費組。對于上面的例子,假如我們新增了一個新的消費組G2,而這個消費組有兩個消費者,那么會是這樣的:
?
二.消息接收
1.必要參數設置
/*** Kafka 消費者分析*/ @Slf4j public class KafkaConsumerAnalysis {public static final String brokerList = "192.168.37.129:9092";public static final String topic = "test";public static final String groupId = "group.yfy";public static final AtomicBoolean isRunning = new AtomicBoolean(true); ?public static Properties initConfig() {Properties props = new Properties();// 與KafkaProducer中設置保持一致props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");// 必填參數,該參數和KafkaProducer中的相同,制定連接Kafka集群所需的broker地址清單,可以設置一個或者多個props.put("bootstrap.servers", brokerList);// 消費者隸屬于的消費組,默認為空,如果設置為空,則會拋出異常,這個參數要設置成具有一定業務含義的名稱props.put("group.id", groupId);// 指定KafkaConsumer對應的客戶端ID,默認為空,如果不設置KafkaConsumer會自動生成一個非空字符串props.put("client.id", "consumer.client.id.demo"); ?return props;}public static void main(String[] args) {Properties props = initConfig();KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);consumer.subscribe(Arrays.asList(topic)); ?// 正則訂閱主題 // ? ? ? consumer.subscribe(Pattern.compile("test.*")); ?// 指定訂閱的分區 // ? ? ? consumer.assign(Arrays.asList(new TopicPartition("test", 0))); ?try {while (isRunning.get()) {ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));for (ConsumerRecord<String, String> record : records) {System.out.println("topic = " + record.topic() + ", partition = " + record.partition() + ", offset = " + record.offset());System.out.println("key = " + record.key() + ", value = " + record.value());//do something to process record.}}} catch (Exception e) {log.error("occur exception ", e);} finally {consumer.close();}} }1)這是 個無限循環。消費者實際上是一個長期運行的應用程序,它通過持續輪詢kafka請求數據。
2)消費者必須持續對 Kafka進行輪詢,否則會被認為己經死亡,分區會被移交給群組里的其他消費者。傳給poll() 方法的參數是一個超時時間,用于控制poll()方法的的阻塞時間。如果該參數被設為 0,poll ()會立即返回 ,否則它會在指定的毫秒數內一直等待 broker 返回數據。
3)poll ()方法能返回一個記錄列表。每條記錄都包含了記錄所屬主題的信息、記錄分區的信息、記錄在分區里的偏移量以及記錄的鍵值對。我們一般會遍歷這個列表 ,逐條處理這些記錄。 poll()方法有一個超時參數,它指定了方法在多久之后可以返回,不管有沒有可用的數據都要返回。 超時時間的設置取決于應用程序對響應速度的要求,比如要在多長時間內把控制權歸還給執行輪詢的線程。4)在退出應用程序之前 close()方怯關閉消費者。網絡連接和 socket 也會隨之關閉,并立即觸發一次再均衡,而不是等待群組協調器發現它不再發送心跳井認定它已死亡,因為那樣需要更長的時間,導致整個群組在一段時間內無法讀取消息
2.訂閱主題和分區
創建完消費者后我們便可以訂閱主題了,只需要通過調用subscribe()方法即可,這個方法接收一個主題列表
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); consumer.subscribe(Arrays.asList(topic));另外,我們也可以使用正則表達式來匹配多個主題,而且訂閱之后如果又有匹配的新主題,那么這個消費組會立即對其進行消費。正則表達式在連接Kafka與其他系統時非常有用。比如訂閱所有的測試主題:
consumer.subscribe(Pattern.compile("test.*"));指定訂閱的分區
consumer.assign(Arrays.asList(new TopicPartition("test", 0)));3.反序列化
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");4.位移提交
對于Kafka中的分區而言,它的每條消息都有唯一的offset,用來表示消息在分區中的位置。
當我們調用poll()時,該方法會返回我們沒有消費的消息。當消息從broker返回消費者時,broker并不跟蹤這些消息是否被消費者接收到;Kafka讓消費者自身來管理消費的位移,并向消費者提供更新位移的接口,這種更新位移方式稱為提交(commit)。
自動提交
這種方式讓消費者來管理位移,應用本身不需要顯式操作。當我們將enable.auto.commit設置為true,那么消費者會在poll方法調用后每隔5秒(由auto.commit.interval.ms指定)提交一次位移。和很多其他操作一樣,自動提交也是由poll()方法來驅動的;在調用poll()時,消費者判斷是否到達提交時間,如果是則提交上一次poll返回的最大位移。
需要注意到,這種方式可能會導致消息重復消費。假如,某個消費者poll消息后,應用正在處理消息,在3秒后Kafka進行了重平衡,那么由于沒有更新位移導致重平衡后這部分消息重復消費。
同步提交
public class CheckOffsetAndCommit {public static final String brokerList = "192.168.37.129:9092";public static final String topic = "test";public static final String groupId = "group.yfy";private static AtomicBoolean running = new AtomicBoolean(true); ?public static Properties initConfig() {Properties props = new Properties();props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList);props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); ?// 手動提交開啟props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);return props;} ?public static void main(String[] args) {Properties props = initConfig();KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); ? ?TopicPartition tp = new TopicPartition(topic, 0);consumer.assign(Arrays.asList(tp));long lastConsumedOffset = -1;while (true) {ConsumerRecords<String, String> records = consumer.poll(1000);if (records.isEmpty()) {break;}List<ConsumerRecord<String, String>> partitionRecords = records.records(tp);lastConsumedOffset = partitionRecords.get(partitionRecords.size() - 1).offset();consumer.commitSync();//同步提交消費位移}System.out.println("comsumed offset is " + lastConsumedOffset);OffsetAndMetadata offsetAndMetadata = consumer.committed(tp);System.out.println("commited offset is " + offsetAndMetadata.offset());long posititon = consumer.position(tp);System.out.println("the offset of the next record is " + posititon);} }異步提交
手動提交有一個缺點,那就是當發起提交調用時應用會阻塞。當然我們可以減少手動提交的頻率,但這個會增加消息重復的概率(和自動提交一樣)。另外一個解決辦法是,使用異步提交的API。
public class OffsetCommitAsyncCallback extends ConsumerClientConfig { ?private static AtomicBoolean running = new AtomicBoolean(true); ?public static void main(String[] args) {Properties props = initConfig();KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);consumer.subscribe(Arrays.asList(topic)); ?try {while (running.get()) {ConsumerRecords<String, String> records = consumer.poll(1000);for (ConsumerRecord<String, String> record : records) {//do some logical processing.}// 異步回調consumer.commitAsync(new OffsetCommitCallback() {@Overridepublic void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets,Exception exception) {if (exception == null) {System.out.println(offsets);} else {log.error("fail to commit offsets {}", offsets, exception);}}});}} finally {consumer.close();} ?try {while (running.get()) {consumer.commitAsync();}} finally {try {consumer.commitSync();} finally {consumer.close();}}} }但是異步提交也有個缺點,那就是如果服務器返回提交失敗,異步提交不會進行重試。相比較起來,同步提交會進行重試直到成功或者最后拋出異常給應用。異步提交沒有實現重試是因為,如果同時存在多個異步提交,進行重試可能會導致位移覆蓋。舉個例子,假如我們發起了一個異步提交commitA,此時的提交位移為2000,隨后又發起了一個異步提交commitB且位移為3000;commitA提交失敗但commitB提交成功,此時commitA進行重試并成功的話,會將實際上將已經提交的位移從3000回滾到2000,導致消息重復消費。
同步和異步組合提交
一般情況下,針對偶爾出現的提交失敗,我們可以使用同步和異步提交結合的方式提交。在消費者關閉前commitSync()提交一下。
? ? ? ?try {while (running.get()) {ConsumerRecords<String, String> records = consumer.poll(1000);consumer.commitAsync();}} finally {try {consumer.commitSync();} finally {consumer.close();}}-
如果一切正常,我們使用commitAsync()方法來提交。這樣速度更快,而且即使這次提交失敗,下一次提交很可能會成功。
-
如果直接關閉消費者,就沒有所謂的“下一次提交”了。使用 commitSync()方法也會一直重試,直到提交成功或發生無法恢復的錯誤。
5.指定位移消費
到目前為止,我們知道消息的拉取是根據poll()方法中的邏輯來處理的,但是這個方法對于普通開發人員來說就是個黑盒處理,無法精確掌握其消費的起始位置。
seek()方法正好提供了這個功能,讓我們得以追蹤以前的消費或者回溯消費
public class SeekDemo extends ConsumerClientConfig { ? ?public static void main(String[] args) {Properties props = initConfig();KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);consumer.subscribe(Arrays.asList(topic));// timeout參數設置多少合適?太短會使分區分配失敗,太長又有可能造成一些不必要的等待consumer.poll(Duration.ofMillis(2000));// 獲取消費者所分配到的分區Set<TopicPartition> assignment = consumer.assignment();System.out.println(assignment);for (TopicPartition tp : assignment) {// 參數partition表示分區,offset表示指定從分區的哪個位置開始消費consumer.seek(tp, 10);} // ? ? ? consumer.seek(new TopicPartition(topic,0),10);while (true) {ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));//consume the record.for (ConsumerRecord<String, String> record : records) {System.out.println(record.offset() + ":" + record.value());}}} ? }增加判斷是否分配到了分區
public class SeekDemoAssignment extends ConsumerClientConfig { ? ?public static void main(String[] args) {Properties props = initConfig();KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);consumer.subscribe(Arrays.asList(topic));long start = System.currentTimeMillis();Set<TopicPartition> assignment = new HashSet<>();while (assignment.size() == 0) {consumer.poll(Duration.ofMillis(100));assignment = consumer.assignment();}long end = System.currentTimeMillis();System.out.println(end - start);System.out.println(assignment);for (TopicPartition tp : assignment) {consumer.seek(tp, 10);}while (true) {ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));//consume the record.for (ConsumerRecord<String, String> record : records) {System.out.println(record.offset() + ":" + record.value());}}} } ?指定從分區末尾開始消費
? ? ? ?// 指定從分區末尾開始消費Map<TopicPartition, Long> offsets = consumer.endOffsets(assignment);for (TopicPartition tp : assignment) {consumer.seek(tp, offsets.get(tp));}演示位移越界操作,修改代碼如下:
? ? ? ?// 指定從分區末尾開始消費Map<TopicPartition, Long> offsets = consumer.endOffsets(assignment);for (TopicPartition tp : assignment) {consumer.seek(tp, offsets.get(tp)+1);}6.消費者攔截器
消費者攔截器主要是在消費到消息或者在提交消費位移時進行的一些定制化的操作。
public class ConsumerInterceptorTTL implements ConsumerInterceptor<String, String> {private static final long EXPIRE_INTERVAL = 10 * 1000; ?@Overridepublic ConsumerRecords<String, String> onConsume(ConsumerRecords<String, String> records) {System.out.println("before:" + records);long now = System.currentTimeMillis();Map<TopicPartition, List<ConsumerRecord<String, String>>> newRecords= new HashMap<>();for (TopicPartition tp : records.partitions()) {List<ConsumerRecord<String, String>> tpRecords = records.records(tp);List<ConsumerRecord<String, String>> newTpRecords = new ArrayList<>();for (ConsumerRecord<String, String> record : tpRecords) {if (now - record.timestamp() < EXPIRE_INTERVAL) {newTpRecords.add(record);}}if (!newTpRecords.isEmpty()) {newRecords.put(tp, newTpRecords);}}return new ConsumerRecords<>(newRecords);} ?@Overridepublic void onCommit(Map<TopicPartition, OffsetAndMetadata> offsets) {offsets.forEach((tp, offset) ->System.out.println(tp + ":" + offset.offset()));} ?@Overridepublic void close() {} ?@Overridepublic void configure(Map<String, ?> configs) {} }實現自定義攔截器之后,需要在KafkaConsumer中配置指定這個攔截器,如下
props.put(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG,ConsumerInterceptorTTL.class.getName());發送端同時發送兩條消息,其中一條修改timestamp的值來使其變得超時,如下:
? ? ? ?ProducerRecord<String, String> record = new ProducerRecord<>(topic, "Kafka-demo-001", "hello, Kafka!");ProducerRecord<String, String> record2 = new ProducerRecord<>(topic, 0, System.currentTimeMillis() - 10 * 1000, "Kafka-demo-001", "hello, Kafka!->超時");啟動消費端運行如下,只收到了未超時的消息:
before:org.apache.kafka.clients.consumer.ConsumerRecords@7adda9cc topic = test, partition = 1, offset = 18 key = Kafka-demo-001, value = hello, Kafka!三.其它消費者參數
fetch.min.bytes
這個參數可以減少broker和消費者的壓力,因為減少了往返的時間。而對于有大量消費者的主題來說,則可以明顯減輕broker壓力。
2.fetch.max.wait.ms
上面的fetch.min.bytes參數指定了消費者讀取的最小數據量,而這個參數則指定了消費者讀取時最長等待時間,從而避免長時間阻塞。這個參數默認為500ms。
3.max.partition.fetch.bytes
這個參數指定了每個分區返回的最多字節數,默認為1M。也就是說,KafkaConsumer.poll()返回記錄列表時,每個分區的記錄字節數最多為1M。如果一個主題有20個分區,同時有5個消費者,那么每個消費者需要4M的空間來處理消息。實際情況中,我們需要設置更多的空間,這樣當存在消費者宕機時,其他消費者可以承擔更多的分區。
4.max.poll.records
這個參數控制一個poll()調用返回的記錄數,這個可以用來控制應用在拉取循環中的處理數據量。
總結
以上是生活随笔為你收集整理的Kafka消费者详解的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Kafka生产者详解
- 下一篇: SpringBoot快速集成kafka