日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

spring-kafka整合:KafkaTemplate-kafka模板类介绍

發布時間:2023/12/3 编程问答 39 豆豆
生活随笔 收集整理的這篇文章主要介紹了 spring-kafka整合:KafkaTemplate-kafka模板类介绍 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

【README】

1,本文主要關注 KafkaTemplate的重點方法,并非全部方法;

2,KafkaTemplate? 底層依賴于 DefaultKafkaProducerFactory , 關于 DefaultKafkaProducerFactory 的介紹,refer2?

spring-kafka整合:DefaultKafkaProducerFactory默認kafka生產者工廠介紹_PacosonSWJTU的博客-CSDN博客【1】 類描述類描述:單例共享 Producer 實例的 ProducerFactory 實現。此實現將為每次 createProducer() 調用時提供的 Map 配置和可選的 Serializer 實現返回相同的 Producer 實例(如果未啟用事務)。如果您使用的序列化器沒有參數構造函數并且不需要設置,那么最簡單的方法是在傳遞給 DefaultKafkaProducerFactory 構造函數的配置中針對 ProducerConfig.KEY_SERIALIZER_CLASS_Chttps://blog.csdn.net/PacosonSWJTU/article/details/121306370


【1】KafkaTemplate 類說明

用于執行高級操作的模板。 當與 DefaultKafkaProducerFactory 一起使用時,模板是線程安全的。 生產者工廠和 org.apache.kafka.clients.producer.KafkaProducer 確保這一點;

public class KafkaTemplate<K, V> implements KafkaOperations<K, V>, ApplicationContextAware, BeanNameAware,ApplicationListener<ContextStoppedEvent>, DisposableBean {

?【1.1】構造方法

使用提供的生產者工廠和 autoFlush 設置創建一個實例。


如果您已將生產者的 linger.ms 配置為非默認值并希望立即在此模板上發送操作,無論該設置如何, 又或者您希望阻塞直到服務器根據acs屬性確認已收到消息, 需要把autoFlush設置為true

如果 configOverrides 不為 null 或不為空,則將使用合并的生產者屬性創建一個新的 DefaultKafkaProducerFactory,這些屬性在提供的工廠屬性之后進行覆蓋。

public KafkaTemplate(ProducerFactory<K, V> producerFactory, boolean autoFlush,@Nullable Map<String, Object> configOverrides) {Assert.notNull(producerFactory, "'producerFactory' cannot be null");this.autoFlush = autoFlush;this.micrometerEnabled = KafkaUtils.MICROMETER_PRESENT;// 是否自定義生產者工廠 this.customProducerFactory = configOverrides != null && configOverrides.size() > 0;if (this.customProducerFactory) {Map<String, Object> configs = new HashMap<>(producerFactory.getConfigurationProperties()); // 覆蓋工廠屬性 configs.putAll(configOverrides); // 創建新的 DefaultKafkaProducerFactoryDefaultKafkaProducerFactory<K, V> newFactory = new DefaultKafkaProducerFactory<>(configs, producerFactory.getKeySerializerSupplier(), producerFactory.getValueSerializerSupplier()); // 設置物理關閉生產者的超時時間 newFactory.setPhysicalCloseTimeout((int) producerFactory.getPhysicalCloseTimeout().getSeconds()); // 設置是否分區 newFactory.setProducerPerConsumerPartition(producerFactory.isProducerPerConsumerPartition()); // 設置是否 每個線程創建一個 生產者; newFactory.setProducerPerThread(producerFactory.isProducerPerThread()); // 新工廠賦值this.producerFactory = newFactory;} else {this.producerFactory = producerFactory;} // 是否開啟kafka事務 this.transactional = this.producerFactory.transactionCapable(); }

【1.2】發送消息方法(非常重要)

發送消息有很多方法,大致分為兩類;

  • send();
  • doSend();

【1.2.1】send() 發送消息

有4個外觀方法,使用的都是默認topic;

@Override public ListenableFuture<SendResult<K, V>> sendDefault(@Nullable V data) {return send(this.defaultTopic, data); }@Override public ListenableFuture<SendResult<K, V>> sendDefault(K key, @Nullable V data) {return send(this.defaultTopic, key, data); }@Override public ListenableFuture<SendResult<K, V>> sendDefault(Integer partition, K key, @Nullable V data) {return send(this.defaultTopic, partition, key, data); }@Override public ListenableFuture<SendResult<K, V>> sendDefault(Integer partition, Long timestamp, K key, @Nullable V data) {return send(this.defaultTopic, partition, timestamp, key, data); } @Override public ListenableFuture<SendResult<K, V>> send(String topic, K key, @Nullable V data) {ProducerRecord<K, V> producerRecord = new ProducerRecord<>(topic, key, data);return doSend(producerRecord); }

可以看到,最后還是調用了 底層的 doSend() 方法;


【1.2.2】doSend() 方法

5個 doSend() 方法的外觀方法 ,這5個方法對 topic ,分區, 消息key,時間戳,消息value? 進行了重載

@Override public ListenableFuture<SendResult<K, V>> send(String topic, @Nullable V data) {ProducerRecord<K, V> producerRecord = new ProducerRecord<>(topic, data);return doSend(producerRecord); }@Override public ListenableFuture<SendResult<K, V>> send(String topic, K key, @Nullable V data) {ProducerRecord<K, V> producerRecord = new ProducerRecord<>(topic, key, data);return doSend(producerRecord); }@Override public ListenableFuture<SendResult<K, V>> send(String topic, Integer partition, K key, @Nullable V data) {ProducerRecord<K, V> producerRecord = new ProducerRecord<>(topic, partition, key, data);return doSend(producerRecord); }@Override public ListenableFuture<SendResult<K, V>> send(String topic, Integer partition, Long timestamp, K key,@Nullable V data) {ProducerRecord<K, V> producerRecord = new ProducerRecord<>(topic, partition, timestamp, key, data);return doSend(producerRecord); }@Override public ListenableFuture<SendResult<K, V>> send(ProducerRecord<K, V> record) {Assert.notNull(record, "'record' cannot be null");return doSend(record); }

底層 doSend() 定義如下:

protected ListenableFuture<SendResult<K, V>>

????????doSend(final ProducerRecord<K, V> producerRecord)

protected ListenableFuture<SendResult<K, V>> doSend(final ProducerRecord<K, V> producerRecord) {// 獲取生產者 final Producer<K, V> producer = getTheProducer(producerRecord.topic());this.logger.trace(() -> "Sending: " + producerRecord);final SettableListenableFuture<SendResult<K, V>> future = new SettableListenableFuture<>();Object sample = null;if (this.micrometerEnabled && this.micrometerHolder == null) {this.micrometerHolder = obtainMicrometerHolder();}if (this.micrometerHolder != null) {sample = this.micrometerHolder.start();}// 發送消息 Future<RecordMetadata> sendFuture =producer.send(producerRecord, buildCallback(producerRecord, producer, future, sample));// May be an immediate failure (注意,這里可能馬上失敗,或有運行時異常拋出)if (sendFuture.isDone()) { try {sendFuture.get(); // 這里調用get會阻塞,如果發送沒有完成的話 }catch (InterruptedException e) {Thread.currentThread().interrupt();throw new KafkaException("Interrupted", e);}catch (ExecutionException e) {throw new KafkaException("Send failed", e.getCause()); // NOSONAR, stack trace}}if (this.autoFlush) { // 自動刷新 flush();}this.logger.trace(() -> "Sent: " + producerRecord);return future; }

【代碼解說】

step1, 調用了 getTheProducer() 獲取生產者 ;

關于 DefaultKafkaProducerFactory.createProducer() 可以參見 以下博文,因篇幅,本文不再贅述;

spring-kafka整合:DefaultKafkaProducerFactory默認kafka生產者工廠介紹_PacosonSWJTU的博客-CSDN博客

step2,producer.send(producerRecord, buildCallback(producerRecord, producer, future, sample)) ; 調用了 buildCallback(...) 構建回調對象;

?

非事務模式,則關閉生產者

由 DefaultKafkaProducerFactory 可知, 生產者是? CloseSafeProducer, 其包裹了 原生 kafka生產者; 所以 調用了 CloseSafeProducer.close() 方法;

?

?step3,自動刷新緩存 flush();?

如果 ProducerFactory 提供單例生產者(例如 DefaultKafkaProducerFactory),則調用此方法才有意義。

public void flush() {Producer<K, V> producer = getTheProducer();try {producer.flush();}finally {closeProducer(producer, inTransaction());} }protected void closeProducer(Producer<K, V> producer, boolean inTx) {if (!inTx) { // 非事務才關閉 producer.close(this.closeTimeout);} }


【2】KafkaTemplate 發送消息與生產者復用?

?我們再次 follow了 DefaultKafkaProducerFactory的 doCreateProducer() 方法;

第1次因為發送消息 新建了 producer;

第2次再發送消息時,因為producer 不為null;所以直接取走;

同時 synchronized同步塊可以避免并發問題;

發送消息后,是否關閉生產者,可以參考 【小結】


【小結】

通過分析 KafkaTemplate.doSend() 消息發送分發, 我們可以看到,

每發送一條消息,如果拋出異常的話,則會關閉kafka生產者,否則不會關閉生產者;原因參見??

spring-kafka整合:DefaultKafkaProducerFactory默認kafka生產者工廠介紹_PacosonSWJTU的博客-CSDN博客https://blog.csdn.net/PacosonSWJTU/article/details/121306370中的章節 【4.5.1】;

總結

以上是生活随笔為你收集整理的spring-kafka整合:KafkaTemplate-kafka模板类介绍的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。