日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程语言 > python >内容正文

python

python调用kafka拉取数据失败_无法使用kafkapython从另一个容器向Kafka容器发出请求...

發布時間:2024/10/8 python 47 豆豆
生活随笔 收集整理的這篇文章主要介紹了 python调用kafka拉取数据失败_无法使用kafkapython从另一个容器向Kafka容器发出请求... 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

環境:services:

zookeeper:

image: wurstmeister/zookeeper

ports:

- 2181

kafka:

image: wurstmeister/kafka

ports:

- 9092:9092

#- 8004:8004

links:

- zookeeper

environment:

KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181

KAFKA_ADVERTISED_HOST_NAME: kafka

KAFKA_ADVERTISED_PORT: 9092

KAFKA_AUTO_CREATE_TOPICS_ENABLE: 'true'

KAFKA_CREATE_TOPICS: "foo:10:1"

# JMX_PORT: 8004

clickhouse-01:

image: yandex/clickhouse-server

hostname: clickhouse-01

container_name: clickhouse-01

ports:

- 9001:9000

volumes:

- ./config/config.xml:/etc/clickhouse-server/config.xml

- ./config/metrika.xml:/etc/clickhouse-server/metrika.xml

- ./config/macros/macros-01.xml:/etc/clickhouse-server/config.d/macros.xml

ulimits:

nofile:

soft: 262144

hard: 262144

depends_on:

- "zookeeper"

clickhouse-02:

image: yandex/clickhouse-server

hostname: clickhouse-02

container_name: clickhouse-02

ports:

- 9002:9000

volumes:

- ./config/config.xml:/etc/clickhouse-server/config.xml

- ./config/metrika.xml:/etc/clickhouse-server/metrika.xml

- ./config/macros/macros-02.xml:/etc/clickhouse-server/config.d/macros.xml

ulimits:

nofile:

soft: 262144

hard: 262144

depends_on:

- "zookeeper"

clickhouse-03:

image: yandex/clickhouse-server

hostname: clickhouse-03

container_name: clickhouse-03

ports:

- 9003:9000

volumes:

- ./config/config.xml:/etc/clickhouse-server/config.xml

- ./config/metrika.xml:/etc/clickhouse-server/metrika.xml

- ./config/macros/macros-03.xml:/etc/clickhouse-server/config.d/macros.xml

ulimits:

nofile:

soft: 262144

hard: 262144

depends_on:

- "zookeeper"

通過Zookeeper容器查詢Kafka:

^{pr2}$

在zookeeper容器中得到Netstat結果:root@0a5f9a441da3:/opt/zookeeper-3.4.13# netstat

Active Internet connections (w/o servers)

Proto Recv-Q Send-Q Local Address Foreign Address State

tcp 0 0 0a5f9a441da3:2181 kafka_1:58622 ESTABLISHED

tcp 0 0 0a5f9a441da3:2181 clickhouse-02.cli:60728 ESTABLISHED

tcp 0 0 0a5f9a441da3:2181 clickhouse-01.cli:56448 ESTABLISHED

tcp 0 0 0a5f9a441da3:2181 clickhouse-03.cli:39656 ESTABLISHED

從運行kafka python的容器Telnet到broker:root@f10fe1b58fa9:~# telnet kafka 9092

Trying 172.18.0.8...

Connected to kafka.

Escape character is '^]'.

來自telnet的Kafka錯誤:kafka_1 | [2019-06-23 13:38:05,350] WARN [SocketServer brokerId=1019] Unexpected error from /172.18.0.5; closing connection (org.apache.kafka.common.network.Selector)

kafka_1 | org.apache.kafka.common.network.InvalidReceiveException: Invalid receive (size = 1903520116 larger than 104857600)

kafka_1 | at org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:104)

kafka_1 | at org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:424)

kafka_1 | at org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:385)

kafka_1 | at org.apache.kafka.common.network.Selector.attemptRead(Selector.java:651)

kafka_1 | at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:572)

kafka_1 | at org.apache.kafka.common.network.Selector.poll(Selector.java:483)

kafka_1 | at kafka.network.Processor.poll(SocketServer.scala:830)

kafka_1 | at kafka.network.Processor.run(SocketServer.scala:730)

kafka_1 | at java.lang.Thread.run(Thread.java:748)

嘗試使用python向kafka主題發送數據時出錯:>>> from kafka import KafkaProducer

>>> producer = KafkaProducer(bootstrap_servers=['kafka:9092'])

>>> producer

>>> producer.send('foo', b'raw_bytes')

Traceback (most recent call last):

File "", line 1, in

File "/usr/local/lib/python3.7/site-packages/kafka/producer/kafka.py", line 564, in send

self._wait_on_metadata(topic, self.config['max_block_ms'] / 1000.0)

File "/usr/local/lib/python3.7/site-packages/kafka/producer/kafka.py", line 691, in _wait_on_metadata

"Failed to update metadata after %.1f secs." % (max_wait,))

kafka.errors.KafkaTimeoutError: KafkaTimeoutError: Failed to update metadata after 60.0 secs.

我在網上上下翻了幾遍試圖找到解決辦法。我從確保容器中的KAFKA_ADVERTISED_HOST_NAME是正確的開始,并嘗試更改它,但是沒有任何結果。當我更改bootstrap_servers=['kafka:9092']條目的端點時,我得到一個錯誤:>>> consumer = KafkaConsumer('foo',

... group_id='test-group',

... bootstrap_servers=['localhost:9092'])

Traceback (most recent call last):

File "", line 3, in

File "/usr/local/lib/python3.7/site-packages/kafka/consumer/group.py", line 353, in __init__

self._client = KafkaClient(metrics=self._metrics, **self.config)

File "/usr/local/lib/python3.7/site-packages/kafka/client_async.py", line 239, in __init__

self.config['api_version'] = self.check_version(timeout=check_timeout)

File "/usr/local/lib/python3.7/site-packages/kafka/client_async.py", line 865, in check_version

raise Errors.NoBrokersAvailable()

kafka.errors.NoBrokersAvailable: NoBrokersAvailable

所以看起來我可能在建立聯系,但可能根本上誤解了我試圖向制作人提出的請求。在

編輯:我已經成功地從我們的產品kafka環境返回了消息,該環境使用消費者在裸機上運行。在

總結

以上是生活随笔為你收集整理的python调用kafka拉取数据失败_无法使用kafkapython从另一个容器向Kafka容器发出请求...的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。