日韩av黄I国产麻豆传媒I国产91av视频在线观看I日韩一区二区三区在线看I美女国产在线I麻豆视频国产在线观看I成人黄色短片

歡迎訪問 生活随笔!

生活随笔

當(dāng)前位置: 首頁 >

阿里云kafka使用记录(python版本)

發(fā)布時間:2023/12/20 31 豆豆
生活随笔 收集整理的這篇文章主要介紹了 阿里云kafka使用记录(python版本) 小編覺得挺不錯的,現(xiàn)在分享給大家,幫大家做個參考.
kafka端 consumer vpc版代碼 import socket from kafka import KafkaConsumer from kafka.errors import KafkaError# context.check_hostname = True consumer = KafkaConsumer(bootstrap_servers=['192.168.xx.xx:9092'],group_id='xx',api_version = (0,10))print('consumer start to consuming...') consumer.subscribe(('xx',)) for message in consumer:print(message.topic)print(message.offset)print(message.key)print(message.value)print(message.partition)

?

producer vpc版代碼

#!/usr/bin/env python # encoding: utf-8import socket from kafka import KafkaProducer from kafka.errors import KafkaErrorproducer = KafkaProducer(bootstrap_servers=['192.168.xx.xx:9092'],api_version = (0,10),retries=5)partitions = producer.partitions_for('xx') print('Topic下分區(qū): %s' % partitions)try:future = producer.send(topic='xx', value=b'hello aliyun-kafka!')future.get()print('send message succeed.') except KafkaError as e:print('send message failed.')print(e)

consumer公網(wǎng)版代碼

import ssl import socket from kafka import KafkaConsumer from kafka.errors import KafkaErrorcontext = ssl.create_default_context() context = ssl.SSLContext(ssl.PROTOCOL_SSLv23) context.verify_mode = ssl.CERT_REQUIRED # context.check_hostname = True context.load_verify_locations("/tmp/ca-cert")consumer = KafkaConsumer(bootstrap_servers=['kafka-ons-internet.aliyun.com:8080'],group_id='xxx',sasl_mechanism="PLAIN",ssl_context=context,security_protocol='SASL_SSL',api_version = (0,10),sasl_plain_username='xxx',sasl_plain_password='1234567890')print('consumer start to consuming...') consumer.subscribe(('xxx', )) for message in consumer:print(message.topic)print(message.offset)print(message.value)break

?

producer 公網(wǎng)版代碼 #!/usr/bin/env python # encoding: utf-8import ssl import socket from kafka import KafkaProducer from kafka.errors import KafkaErrorcontext = ssl.create_default_context() context = ssl.SSLContext(ssl.PROTOCOL_SSLv23) context.verify_mode = ssl.CERT_REQUIRED # context.check_hostname = True context.load_verify_locations("/tmp/ca-cert") #這個文件參考https://github.com/AliwareMQ/aliware-kafka-demos/tree/master/kafka-python-demo producer = KafkaProducer(bootstrap_servers=['kafka-ons-internet.aliyun.com:8080'],sasl_mechanism="PLAIN",ssl_context=context,security_protocol='SASL_SSL',api_version = (0,10),retries=5,sasl_plain_username='xx',sasl_plain_password='1234567890'#注意是access-key的最后十位) partitions = producer.partitions_for('xxx') print ('Topic下分區(qū): %s' % partitions)try:future = producer.send('xxx', b'hello aliyun-kafka!')future.get()print('send message succeed.') except KafkaError as e:print('send message failed.')print(e)

?

?

?

?

從阿里云控臺獲得連接信息

?

?

轉(zhuǎn)載于:https://www.cnblogs.com/castlevania/p/10370803.html

總結(jié)

以上是生活随笔為你收集整理的阿里云kafka使用记录(python版本)的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網(wǎng)站內(nèi)容還不錯,歡迎將生活随笔推薦給好友。