日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

Kafka生产者发送消息的三种方式

發布時間:2024/4/14 编程问答 41 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Kafka生产者发送消息的三种方式 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

Kafka是一種分布式的基于發布/訂閱的消息系統,它的高吞吐量、靈活的offset是其它消息系統所沒有的。

Kafka發送消息主要有三種方式:

1.發送并忘記 2.同步發送 3.異步發送+回調函數

?

下面以單節點的方式分別用三種方法發送1w條消息測試:

方式一:發送并忘記(不關心消息是否正常到達,對返回結果不做任何判斷處理)

發送并忘記的方式本質上也是一種異步的方式,只是它不會獲取消息發送的返回結果,這種方式的吞吐量是最高的,但是無法保證消息的可靠性:

1 import pickle 2 import time 3 from kafka import KafkaProducer 4 5 producer = KafkaProducer(bootstrap_servers=['192.168.33.11:9092'], 6 key_serializer=lambda k: pickle.dumps(k), 7 value_serializer=lambda v: pickle.dumps(v)) 8 9 start_time = time.time() 10 for i in range(0, 10000): 11 print('------{}---------'.format(i)) 12 future = producer.send('test_topic', key='num', value=i, partition=0) 13 14 # 將緩沖區的全部消息push到broker當中 15 producer.flush() 16 producer.close() 17 18 end_time = time.time() 19 time_counts = end_time - start_time 20 print(time_counts)

?測試結果:1.88s

?

方式二:同步發送(通過get方法等待Kafka的響應,判斷消息是否發送成功)

以同步的方式發送消息時,一條一條的發送,對每條消息返回的結果判斷, 可以明確地知道每條消息的發送情況,但是由于同步的方式會阻塞,只有當消息通過get返回future對象時,才會繼續下一條消息的發送:

?

1 import pickle 2 import time 3 from kafka import KafkaProducer 4 from kafka.errors import kafka_errors 5 6 producer = KafkaProducer( 7 bootstrap_servers=['192.168.33.11:9092'], 8 key_serializer=lambda k: pickle.dumps(k), 9 value_serializer=lambda v: pickle.dumps(v) 10 ) 11 12 start_time = time.time() 13 for i in range(0, 10000): 14 print('------{}---------'.format(i)) 15 future = producer.send(topic="test_topic", key="num", value=i) 16 # 同步阻塞,通過調用get()方法進而保證一定程序是有序的. 17 try: 18 record_metadata = future.get(timeout=10) 19 # print(record_metadata.topic) 20 # print(record_metadata.partition) 21 # print(record_metadata.offset) 22 except kafka_errors as e: 23 print(str(e)) 24 25 end_time = time.time() 26 time_counts = end_time - start_time 27 print(time_counts)

?

測試結果:16s

?

方式三:異步發送+回調函數(消息以異步的方式發送,通過回調函數返回消息發送成功/失敗)

在調用send方法發送消息的同時,指定一個回調函數,服務器在返回響應時會調用該回調函數,通過回調函數能夠對異常情況進行處理,當調用了回調函數時,只有回調函數執行完畢生產者才會結束,否則一直會阻塞:

1 import pickle 2 import time 3 from kafka import KafkaProducer 4 5 producer = KafkaProducer( 6 bootstrap_servers=['192.168.33.11:9092'], 7 key_serializer=lambda k: pickle.dumps(k), 8 value_serializer=lambda v: pickle.dumps(v) 9 ) 10 11 12 def on_send_success(*args, **kwargs): 13 """ 14 發送成功的回調函數 15 :param args: 16 :param kwargs: 17 :return: 18 """ 19 return args 20 21 22 def on_send_error(*args, **kwargs): 23 """ 24 發送失敗的回調函數 25 :param args: 26 :param kwargs: 27 :return: 28 """ 29 30 return args 31 32 33 start_time = time.time() 34 for i in range(0, 10000): 35 print('------{}---------'.format(i)) 36 # 如果成功,傳進record_metadata,如果失敗,傳進Exception. 37 producer.send( 38 topic="test_topic", key="num", value=i 39 ).add_callback(on_send_success).add_errback(on_send_error) 40 41 producer.flush() 42 producer.close() 43 44 end_time = time.time() 45 time_counts = end_time - start_time 46 print(time_counts)

測試結果:2.15s

?

三種方式雖然在時間上有所差別,但并不是說時間越快的越好,具體要看業務的應用場景:

場景1:如果業務要求消息必須是按順序發送的,那么可以使用同步的方式,并且只能在一個partation上,結合參數設置retries的值讓發送失敗時重試,設置max_in_flight_requests_per_connection=1,可以控制生產者在收到服務器晌應之前只能發送1個消息,從而控制消息順序發送;

場景2:如果業務只關心消息的吞吐量,容許少量消息發送失敗,也不關注消息的發送順序,那么可以使用發送并忘記的方式,并配合參數acks=0,這樣生產者不需要等待服務器的響應,以網絡能支持的最大速度發送消息;

場景3:如果業務需要知道消息發送是否成功,并且對消息的順序不關心,那么可以用異步+回調的方式來發送消息,配合參數retries=0,并將發送失敗的消息記錄到日志文件中;

轉載于:https://www.cnblogs.com/FG123/p/10091478.html

總結

以上是生活随笔為你收集整理的Kafka生产者发送消息的三种方式的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。