日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當(dāng)前位置: 首頁 > 编程语言 > python >内容正文

python

python连接kafka-python连接kafka生产者,消费者脚本

發(fā)布時間:2024/1/23 python 33 豆豆
生活随笔 收集整理的這篇文章主要介紹了 python连接kafka-python连接kafka生产者,消费者脚本 小編覺得挺不錯的,現(xiàn)在分享給大家,幫大家做個參考.

#-*- coding: utf-8 -*-

"""""

使用kafka-Python 1.3.3模塊

# pip install kafka==1.3.5

# pip install kafka-python==1.3.5"""

importsysimporttimeimportjsonfrom kafka importKafkaProducerfrom kafka importKafkaConsumerfrom kafka.errors importKafkaError

KAFAKA_HOST= "101.236.51.235"KAFAKA_PORT= 9092KAFAKA_TOPIC= "test"

classKafka_producer():"""""

生產(chǎn)模塊:根據(jù)不同的key,區(qū)分消息"""

def __init__(self, kafkahost,kafkaport, kafkatopic, key):

self.kafkaHost=kafkahost

self.kafkaPort=kafkaport

self.kafkatopic=kafkatopic

self.key=keyprint("producer:h,p,t,k",kafkahost,kafkaport,kafkatopic,key)

bootstrap_servers= "{kafka_host}:{kafka_port}".format(

kafka_host=self.kafkaHost,

kafka_port=self.kafkaPort

)print("boot svr:",bootstrap_servers)

self.producer= KafkaProducer(bootstrap_servers =bootstrap_servers

)defsendjsondata(self, params):try:

parmas_message= json.dumps(params,ensure_ascii=False)

producer=self.producerprint(parmas_message)

v= parmas_message.encode("utf-8")

k= key.encode("utf-8")print("send msg:(k,v)",k,v)

producer.send(self.kafkatopic, key=k, value=v)

producer.flush()exceptKafkaError as e:print(e)classKafka_consumer():"""""

消費模塊: 通過不同groupid消費topic里面的消息"""

def __init__(self, kafkahost, kafkaport, kafkatopic, groupid):

self.kafkaHost=kafkahost

self.kafkaPort=kafkaport

self.kafkatopic=kafkatopic

self.groupid=groupid

self.key=key

self.consumer= KafkaConsumer(self.kafkatopic, group_id =self.groupid,

bootstrap_servers= "{kafka_host}:{kafka_port}".format(

kafka_host=self.kafkaHost,

kafka_port=self.kafkaPort )

)defconsume_data(self):try:for message inself.consumer:yieldmessageexceptKeyboardInterrupt as e:print(e)defmain(xtype, group, key):"""""

測試consumer和producer"""

if xtype == "p":#生產(chǎn)模塊

producer =Kafka_producer(KAFAKA_HOST, KAFAKA_PORT, KAFAKA_TOPIC, key)print ("===========> producer:", producer)for _id in range(100):

params= "{"msg" : "%s"}" %str(_id)

params=[{"msg0" :_id},{"msg1":_id}]

producer.sendjsondata(params)

time.sleep(1)if xtype == "c":#消費模塊

consumer =Kafka_consumer(KAFAKA_HOST, KAFAKA_PORT, KAFAKA_TOPIC, group)print ("===========> consumer:", consumer)

message=consumer.consume_data()for msg inmessage:print ("msg---------------->k,v", msg.key,msg.value)print ("offset---------------->", msg.offset)if __name__ == "__main__":

xtype= sys.argv[1]

group= sys.argv[2]

key= sys.argv[3]

main(xtype, group, key)

總結(jié)

以上是生活随笔為你收集整理的python连接kafka-python连接kafka生产者,消费者脚本的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網(wǎng)站內(nèi)容還不錯,歡迎將生活随笔推薦給好友。