KafkaConsumer 长时间地在poll(long )方法中阻塞
一,問題描述
搭建的用來測(cè)試的單節(jié)點(diǎn)Kafka集群(Zookeeper和Kafka Broker都在同一臺(tái)Ubuntu上),在命令行下使用:
./bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 3 --topic topicForTest創(chuàng)建了一個(gè)3個(gè)分區(qū)的Topic如下:(Topic名稱為 topicForTest)
?
使用 Console producer/consumer都能夠正常地向topicForTest發(fā)送和接收消息:
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic topicForTest bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic topicForTest --from-beginning?
但是在自己的windows 機(jī)器的開發(fā)環(huán)境下,使用kafka client JAVA API (0.10版本)中的KafkaConsumer 卻無法接收消息,表現(xiàn)為:在poll()方法中阻塞了。
更具體一點(diǎn)地,是在:org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient類的awaitMetadataUpdate方法中長(zhǎng)時(shí)間阻塞了。類似問題可參考:這里
然而,在windows機(jī)器上,使用telnet client 能夠連接到 kafka broker 的9092默認(rèn)端口。
后面發(fā)現(xiàn)是Kafka server中,配置文件 config/server.properties中 沒有配置:advertised.host.name 或者 listener 參數(shù)。官網(wǎng)查了下這個(gè)參數(shù)的解釋如下:
advertised.host.name Hostname to publish to ZooKeeper for clients to use. If this is not set, it will use the value for `host.name` if configured.Otherwise it will use the value returned from java.net.InetAddress.getCanonicalHostName().advertised.listeners Listeners to publish to ZooKeeper for clients to use, if different than the listeners above.If this is not set, the value for `listeners` will be used.
?
這里的原因是: JAVA API中的kafkaConsumer找不到Zookeeper去獲取元數(shù)據(jù)信息。
The first time you call poll() with a new consumer, it is responsible for finding the GroupCoordinator,joining the consumer group and receiving a partition assignment.
?
使用bin/kafka-verifiable-producer.sh --topic topicForTest --max-messages 200 --broker-list localhost:9092 向該Topic中寫入200條消息。啟動(dòng)下面的程序測(cè)試:
import java.util.Arrays; import java.util.Properties;import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer;public class ConsumerTest {public static void main(String[] args) {Properties props = new Properties();props.put("bootstrap.servers", "192.168.121.34:9092");props.put("group.id", "mygroup");props.put("enable.auto.commit", "true");props.put("auto.commit.interval.ms", "1000");props.put("auto.offset.reset", "earliest");props.put("session.timeout.ms", "30000");props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<String, String>(props);kafkaConsumer.subscribe(Arrays.asList("topicForTest"));while(true){System.out.println("nothing available...");ConsumerRecords<String, String> records = kafkaConsumer.poll(1000);for(ConsumerRecord<String, String> record : records){System.out.printf("offset = %d, value = %s", record.offset(), record.value());System.out.println();}}} }?
程序拋出的DEBUG異常如下:
2017-08-17 18:14:48.210 [main] INFO o.a.kafka.common.utils.AppInfoParser SEQ - Kafka version : 0.10.1.0 2017-08-17 18:14:48.210 [main] INFO o.a.kafka.common.utils.AppInfoParser SEQ - Kafka commitId : 3402a74efb23d1d4 2017-08-17 18:14:48.211 [main] DEBUG o.a.k.clients.consumer.KafkaConsumer SEQ - Kafka consumer created 2017-08-17 18:14:48.212 [main] DEBUG o.a.k.clients.consumer.KafkaConsumer SEQ - Subscribed to topic(s): topicForTest 2017-08-17 18:14:48.212 [main] DEBUG o.a.k.c.c.i.AbstractCoordinator SEQ - Sending coordinator request for group group_test109 to broker xxx:9092 (id: -1 rack: null) ..... ..... 2017-08-17 18:14:48.274 [main] DEBUG o.a.kafka.common.network.Selector SEQ - Created socket with SO_RCVBUF = 65536, SO_SNDBUF = 131072, SO_TIMEOUT = 0 to node -1 2017-08-17 18:14:48.275 [main] DEBUG o.apache.kafka.clients.NetworkClient SEQ - Completed connection to node -1 2017-08-17 18:14:48.337 [main] DEBUG o.apache.kafka.clients.NetworkClient SEQ - Sending metadata request {topics=[topicForTest]} to node -1 2017-08-17 18:14:48.396 [main] DEBUG org.apache.kafka.clients.Metadata SEQ - Updated cluster metadata version 2 to Cluster(id = xgdvTIvHTn2dL3cnEm-dRQ, nodes = [ubuntu:9092 (id: 0 rack: null)], partitions = [Partition(topic = topicForTest,partition = 0, leader = 0, replicas = [0,], isr = [0,])]) 2017-08-17 18:14:48.398 [main] DEBUG o.a.k.c.c.i.AbstractCoordinator SEQ - Received group coordinator response ClientResponse(receivedTimeMs=1502964888398, disconnected=false, request=ClientRequest(expectResponse=true, callback=org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler@144d0b84, request=RequestSend(header={api_key=10,api_version=0,correlation_id=0,client_id=consumer-1}, body={group_id=group_test109}), createdTimeMs=1502964888230, sendTimeMs=1502964888338), responseBody={error_code=0,coordinator={node_id=0,host=ubuntu,port=9092}}) 2017-08-17 18:14:48.399 [main] INFO o.a.k.c.c.i.AbstractCoordinator SEQ - Discovered coordinator ubuntu:9092 (id: 2147483647 rack: null) for group group_test109. 2017-08-17 18:14:48.399 [main] DEBUG o.apache.kafka.clients.NetworkClient SEQ - Initiating connection to node 2147483647 at ubuntu:9092. 2017-08-17 18:14:51.127 [main] DEBUG o.apache.kafka.clients.NetworkClient SEQ - Error connecting to node 2147483647 at ubuntu:9092: java.io.IOException: Can't resolve address: ubuntu:9092at org.apache.kafka.common.network.Selector.connect(Selector.java:180) ~[kafka-clients-0.10.1.0.jar:na]at org.apache.kafka.clients.NetworkClient.initiateConnect(NetworkClient.java:498) [kafka-clients-0.10.1.0.jar:na]at org.apache.kafka.clients.NetworkClient.ready(NetworkClient.java:159) [kafka-clients-0.10.1.0.jar:na]at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.tryConnect(ConsumerNetworkClient.java:454) [kafka-clients-0.10.1.0.jar:na]at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$GroupCoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:556) [kafka-clients-0.10.1.0.jar:na]....[main] INFO o.a.k.c.c.i.AbstractCoordinator SEQ - Marking the coordinator ubuntu:9092 (id: 2147483647 rack: null) dead for group xxx
?
再來看ubuntu上的etc/hosts文件:
127.0.0.1 ubuntu localhost 127.0.1.1 ubuntu localhost?
因此,只需要在config/server.properties里面配置 listeners 參數(shù)就可以了。
listeners=PLAINTEXT://your.host.name:9092?
二,關(guān)于Kafka的一些簡(jiǎn)單理解
①目錄結(jié)構(gòu)
前面testForTopic一共有三個(gè)分區(qū),因此在 log.dirs目錄下關(guān)于該Topic一共有三個(gè)目錄,每個(gè)目錄下內(nèi)容如下:
?
使用命令:./bin/kafka-topics.sh --list --zookeeper?localhost:2181 可以查看當(dāng)前Topic信息。
使用命令:./bin/kafka-consumer-groups.sh --list --bootstrap-server?YOUR_IP_ADDRESS:9092 可以查看consumer group的信息
(如果提示:Error: Executing consumer group command failed due to Request METADATA failed on brokers List(ubuntu:9092 (id: -1 rack: null)))(ip地址/主機(jī)名/localhost 試試?)
使用命令:./bin/kafka-consumer-groups.sh --bootstrap-server YOUR_IP_ADDRESS:9092 --describe --group groupName? 查看某個(gè)具體的group的情況
?
② Topic 、Partition、 ConsumerGroup、Consumer 之間的一些關(guān)系?
一個(gè)Topic一般會(huì) 分為?多個(gè)?分區(qū)(Partition),生產(chǎn)者可以同時(shí)向這個(gè)Topic的多個(gè)分區(qū)寫入消息,而消費(fèi)者則以 組 為單位,訂閱這個(gè)Topic,消費(fèi)者組里面的 某個(gè)消費(fèi)者 負(fù)責(zé) 消費(fèi) 某個(gè)Partition。 感覺 Topic 像是邏輯上的概念。
一般是訂閱了同一Topic的若干個(gè)Consumer 屬于某個(gè)ConsumerGroup。對(duì)于一個(gè)ConsumerGroup而言,其中的某個(gè)Consumer負(fù)責(zé)消費(fèi)某個(gè)Partition,則該P(yáng)artition中的消息就不會(huì)被其他的Consumer消費(fèi)了。如下圖:
?Topic T1有四個(gè)分區(qū),即TopicT1中的消息存儲(chǔ)在這四個(gè)分區(qū)中,它被ConsumerGroup1 這個(gè)組中的消費(fèi)者訂閱,其中Consumer1負(fù)責(zé)消費(fèi)Partition0和2,Consumer2負(fù)責(zé)消費(fèi)Partition1和3。正常情況下,Topic T1中被ConsumerGroup中的消費(fèi)者?消費(fèi)一次,也即:TopicT1中的某條消息被Consumer1消費(fèi)了,就不會(huì)被Consumer2消費(fèi)---對(duì)于ConsumerGroup組內(nèi)成員而言,Consumer1消費(fèi)了 消息A,Consumer2就不會(huì)消費(fèi) 消息A了。
若要想讓TopicT1中的消費(fèi)被多個(gè) 消費(fèi)者消費(fèi),可以再創(chuàng)建一個(gè) 消費(fèi)者組ConsumerGroup2,ConsumerGroup2中的消費(fèi)者 去訂閱TopicT1 即可。如下圖:TopicT1中的消息,都會(huì)被消費(fèi)2次,一次是ConsumerGroup1中的消費(fèi)者消費(fèi);另一次是被ConsumerGroup2中的消費(fèi)者消費(fèi)。
每個(gè)ConsumerGroup里面有個(gè) group leader。group leader一般是最先加入到該消費(fèi)者組的 消費(fèi)者。group leader從 group coordinator那里接受分區(qū)信息,然后分配給各個(gè)consumer去訂閱。
?
When a consumer wants to join a group, it sends a JoinGroup request to the group coordinator. The first consumer to join the group becomes the group leader. The leader receives a list of all consumers in the group from the group coordinator and it is responsible for assigning a subset of partitions to each consumer?
?ConsumerGroup中消費(fèi)者數(shù)量大于 Topic中的分區(qū)數(shù)量,則某個(gè)消費(fèi)者 將沒有 Partition 可消費(fèi)。如下圖:Consumer5,消費(fèi)不到 任何消息。
?
Partition rebalance:
從上面圖片中可看出,消息的消費(fèi)是以 Partition為單位的。若,ConsumerGroup新增了 幾個(gè)消費(fèi)者,或者減少了幾個(gè)消費(fèi)者,那么Kafka Broker就會(huì)重新分配Partition給Consumer。這個(gè)重新分配的過程就是 rebalance。比如說,ConsumerA 正在消費(fèi)PartitionA,某個(gè)原因ConsumerA掛了,PartitionA中的消息就沒有Consumer消費(fèi)了。因此Broker發(fā)現(xiàn)ConsumerA掛了之后,就要把PartitionA交給另外還存活的Consumer去消費(fèi)。
The event in which partition ownership is moved from one consumer to another is called a rebalancerebalance過程會(huì)有很多問題,比如:1,在 rebalance這個(gè)過程中,Conusmer是不能消費(fèi)消息的。
During a rebalance, consumers can’t consume messaged, so a rebalance is in effect a short window of unavailability on the entire consumer group?
2,會(huì)造成消息被重復(fù)消費(fèi)。比如ConsumerA 得到了 PartitionA 的幾條消息,進(jìn)行了一定的處理,然后還未 來得及 向Broker 確認(rèn)它消費(fèi)完了這幾條消息(未commit),它就掛了。Broker rebalance之后,把PartitionA 交給了ComsumerB訂閱,那么 ConsumerB 也會(huì)得到 ?ConsumerA 處理了 但未提交?的?那幾條消息。那這幾條消息 就被 重復(fù)消費(fèi)了。
3,Broker是如何發(fā)現(xiàn)Consumer掛了的呢?
這是通過KafkaConsumer 中的poll(long )方法實(shí)現(xiàn)的。
?
③KafkaConsumer 的 poll(long )方法
poll方法干了哪些事兒?coordination、分區(qū)平衡、consumer與broker之間心跳包 keep alive、獲取消息...
Once the consumer subscribes to topics, the poll loop handles all details of coordination, partition rebalances, heartbeats and data fetching消費(fèi)者必須不停地 執(zhí)行 poll 方法,一是不斷地從kafka那里獲得消息,另一個(gè)是告訴kafka,我沒有發(fā)生故障,與 broker是 keep alive的。
consumers must keep polling Kafka or they will be considered dead and the partitions they are consuming will be handed to another consumer in the group to continue consuming.?
poll(long )方法有一個(gè) long 類型的參數(shù),這些參數(shù)受 consumer 參數(shù)配置的影響,也與具體的應(yīng)用 如何 處理消息 有關(guān)。
This specifies how long it will take poll to return, with or without data. The value is typically driven by application needs for quick responses - how fast do you want to return control to the thread that does the polling??
消費(fèi)者消費(fèi)完消息后,不再消費(fèi)了,要記得關(guān)閉。因?yàn)?#xff0c;consumer要離開了,那么就會(huì)造成 rebalance,consumer.close() 使得consumer主動(dòng) 通知 Group Coordinator 進(jìn)行 rebalance,而不是靠 GroupCoordinator去等待一段時(shí)間發(fā)現(xiàn) Consumer離開了(Consumer不再執(zhí)行poll方法了),然后再進(jìn)行 rebalance。
consumer.close();?
④Kafka 中的一些配置參數(shù)
Broker的配置參數(shù);Producer的配置參數(shù);Consumer的配置參數(shù)
auto.commit.interval.ms???The frequency in ms that the consumer offsets are committed to zookeeper.(consumer 隔多久提交 offsets --消費(fèi)指針)
group.id?? ?A unique string that identifies the Connect cluster group this worker belongs to.
heartbeat.interval.ms? ??
session.timeout.ms? ....這些參數(shù)的設(shè)置與具體的應(yīng)用相關(guān),也會(huì)影響 rebalance時(shí)機(jī),具體不是太了解。
具體的配置參數(shù)可參考:Kafka配置參數(shù)解釋。
參考文獻(xiàn):
書籍:Kafka_ The Definitive Guide
Zookeeper中Kafka元數(shù)據(jù)解釋
轉(zhuǎn)載于:https://www.cnblogs.com/cxhfuujust/p/9066354.html
總結(jié)
以上是生活随笔為你收集整理的KafkaConsumer 长时间地在poll(long )方法中阻塞的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問題。