當前位置:
首頁 >
python读写kafka集群(转载+自己验证)
發布時間:2023/12/31
35
豆豆
生活随笔
收集整理的這篇文章主要介紹了
python读写kafka集群(转载+自己验证)
小編覺得挺不錯的,現在分享給大家,幫大家做個參考.
###############################版本信息#########################################
| 組件/系統 | 版本 |
| Python | 3.6 |
| Kafka | 2.12-2.5.0 |
| Ubuntu | 19.10 |
##############################準備工作##############################################
apt-get install libsnappy-dev
pip install python-snappy
pip install kafka-python
################################具體操作###########################################
python producer.py
python consumer.py
##############################附錄##############################################
producer.py
from kafka import KafkaProducer from time import sleepdef start_producer():producer = KafkaProducer(bootstrap_servers= ['Desktop:9091', 'Laptop:9092','Laptop:9093'])for i in range(0,100000):msg = 'msg is ' + str(i)producer.send('my_favorite_topic2', msg.encode('utf-8'))sleep(3)if __name__ == '__main__':start_producer()consumer.py
from kafka import KafkaConsumer import timedef start_consumer():consumer = KafkaConsumer('my_favorite_topic2', bootstrap_servers = ['Desktop:9091', 'Laptop:9092','Laptop:9093'])for msg in consumer:print(msg)print("topic = %s" % msg.topic) # topic default is stringprint("partition = %d" % msg.offset)print("value = %s" % msg.value.decode()) # bytes to stringprint("timestamp = %d" % msg.timestamp)print("time = ", time.strftime("%Y-%m-%d %H:%M:%S", time.localtime( msg.timestamp/1000 )) )if __name__ == '__main__':start_consumer()?
總結
以上是生活随笔為你收集整理的python读写kafka集群(转载+自己验证)的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 火绒剑获取IP位置
- 下一篇: SecureCRT出现 libpytho