日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程语言 > python >内容正文

python

Python kafka操作实例

發布時間:2025/3/15 python 29 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Python kafka操作实例 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

一、基本概念

  • Topic:一組消息數據的標記符;
  • Producer:生產者,用于生產數據,可將生產后的消息送入指定的Topic;
  • Consumer:消費者,獲取數據,可消費指定的Topic;
  • Group:消費者組,同一個group可以有多個消費者,一條消息在一個group中,只會被一個消費者獲取;
  • Partition:分區,為了保證kafka的吞吐量,一個Topic可以設置多個分區。同一分區只能被一個消費者訂閱。

二、本地安裝與啟動(基于Docker)

  • 下載zookeeper鏡像與kafka鏡像:
  • docker pull wurstmeister/zookeeper docker pull wurstmeister/kafka

    2. 本地啟動zookeeper

    docker run -d --name zookeeper -p 2181:2181 -t wurstmeister/zookeeper

    3. 本地啟動kafka

    docker run -d --name kafka --publish 9092:9092 --link zookeeper \ --env KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181 \ --env KAFKA_ADVERTISED_HOST_NAME=localhost \ --env KAFKA_ADVERTISED_PORT=9092 \ wurstmeister/kafka:latest

    注意:上述代碼,將kafka啟動在9092端口

    4. 進入kafka bash

    docker exec -it kafka bash cd /opt/kafka/bin

    5. 創建Topic,分區為2,Topic name為'kafka_demo'

    kafka-topics.sh --create --zookeeper zookeeper:2181 \ --replication-factor 1 --partitions 2 --topic kafka_demo

    6. 查看當前所有topic

    kafka-topics.sh --zookeeper zookeeper:2181 --list

    7. 安裝kafka-python

    pip install kafka-python

    三、生產者(Producer)與消費者(Consumer)

    個人封裝

    生產者和消費者的簡易Demo,這里一起演示:

    #!/usr/bin/env python # -*- coding: utf-8 -*-import json import tracebackfrom kafka import KafkaConsumer, KafkaProducer, TopicPartition""" kafka?生產者 """ class KProducer(object):def __init__(self, bootstrap_servers):""":param?bootstrap_servers:?地址"""#?json?格式化發送的內容self.producer = KafkaProducer(bootstrap_servers = bootstrap_servers,value_serializer = lambda m: json.dumps(m).encode("ascii")# compression_type = "gzip" # 壓縮消息發送)def sync_producer(self, topic, data):"""同步發送?數據:param?topic:??topic:param?data_li:??發送數據:return:"""future = self.producer.send(topic, data)record_metadata = future.get(timeout=10) #?同步確認消費partition = record_metadata.partition #?數據所在的分區offset = record_metadata.offset #?數據所在分區的位置print("save?success, partition: {}, offset: {}".format(partition, offset))def asyn_producer(self, topic, data):"""異步發送數據:param?topic:??topic:param?data_li:發送數據:return:"""self.producer.send(topic, data)self.producer.flush() #?批量提交def asyn_producer_callback(self, topic, data):"""異步發送數據?+?發送狀態處理:param?topic:??topic:param?data_li:發送數據:return:"""self.producer.send(topic, data).add_callback(self.send_success).add_errback(self.send_error)self.producer.flush() #?批量提交def send_success(self, *args, **kwargs):"""異步發送成功回調函數"""print('save?success')returndef send_error(self, *args, **kwargs):"""異步發送錯誤回調函數"""print('save?error')returndef close_producer(self):try:self.producer.close()except:pass""" kafka 消費商 """ class PConsumers(object):def __init__(self, bootstrap_servers, group_id):""":param?bootstrap_servers: 地址"""self.bootstrap_servers = bootstrap_serversself.group_id = group_id# 獲取規定個數的數據(可修改做無限持續獲取數據)def get_message(self, topic, count=1):""":param?topic: topic:param?count: 取的條數:return: msg"""counter = 0msg = []try:consumer = KafkaConsumer(topic,bootstrap_servers = self.bootstrap_servers,group_id = self.group_id,value_deserializer = lambda m: json.loads(m.decode("ascii")), # 確定返回結果json還是strauto_offset_reset = "earliest")for message in consumer:print("%s:%d:%d: key=%s value=%s header=%s" % (message.topic, message.partition,message.offset, message.key, message.value, message.headers))msg.append(message.value)counter += 1if count == counter:breakelse:continueconsumer.close()except Exception as e:print("{0}, {1}".format(e, traceback.print_exc()))return Nonereturn msg# 查看剩余量def get_count(self, topic):""":param?topic: topic:return: count"""try:consumer = KafkaConsumer(topic,bootstrap_servers = self.bootstrap_servers,group_id = self.group_id)partitions = [TopicPartition(topic, p) for p in consumer.partitions_for_topic(topic)]#print("start to cal offset:")# totaltoff = consumer.end_offsets(partitions)toff = [(key.partition, toff[key]) for key in toff.keys()]toff.sort()#print("total offset: {}".format(str(toff)))# currentcoff = [(x.partition, consumer.committed(x)) for x in partitions]coff.sort()#print("current offset: {}".format(str(coff)))# cal sum and lefttoff_sum = sum([x[1] for x in toff])cur_sum = sum([x[1] for x in coff if x[1] is not None])left_sum = toff_sum - cur_sum#print("kafka left: {}".format(left_sum))consumer.close()except Exception as e:print("{0}, {1}".format(e, traceback.print_exc()))return Nonereturn left_sumif __name__ == "__main__":send_data_li = {"test": 1}#kp?=?KProducer(topic="test",?bootstrap_servers='127.0.0.1:9001,127.0.0.1:9002')kp = KProducer(bootstrap_servers="1.1.1.1:9092")#?同步發送#kp.sync_producer(send_data_li)#?異步發送#?kp.asyn_producer(send_data_li)#?異步+回調kp.asyn_producer_callback(topic="test", data=send_data_li)#kp.close_producer()#cp = PConsumers(bootstrap_servers="1.1.1.1:9092", topic="detect-file")cp = PConsumers(bootstrap_servers="1.1.1.1:9092", group_id = "boxer")#cp = PConsumers(bootstrap_servers="1.1.1.1:9092", topic="custom-event")#print(cp.get_count(topic="test"))print(cp.get_message(topic="test"))

    ?

    總結

    以上是生活随笔為你收集整理的Python kafka操作实例的全部內容,希望文章能夠幫你解決所遇到的問題。

    如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。