spring-kafka整合:DefaultKafkaProducerFactory默认kafka生产者工厂介绍
【README】
0,為啥要看? DefaultKafkaProducerFactory? 最近在基于 springboot 開發kafka模塊,發現 kafakTemplate構造器傳入了 DefaultKafkaProducerFactory實例, kafkaTemplate內部使用了 很多 DefaultKafkaProducerFactory的方法; 所以把 DefaultKafkaProducerFactory的重點方法分析處理出來,以便于查看 KafkaTemplate的內部邏輯;?
1, 本文涉及的 kafka操作,不涉及事務和消費者,所以本文忽略了有關kafka事務,消費者的描述; kafka事務, refer2? 轉:Kafka事務使用和編程示例/實例_PacosonSWJTU的博客-CSDN博客Kafka事務使用和編程示例/實例_JobShow裁員加班實況-微信小程序-CSDN博客一、概述? Kafka事務特性是指一系列的生產者生產消息和消費者提交偏移量的操作在一個事務中,或者說是一個原子操作,生產消息和提交偏移量同時成功或者失敗。注意:kafka事務和DB事務。在理解消息的事務時,一直處于一個錯誤理解是,把操作db的業務邏輯跟操作消息當成是一個事務,如下所示:void kakfa_in_tranction(){ // 1.kafa的操作:讀取消息或生產消息 kafkaOperation(); /https://blog.csdn.net/PacosonSWJTU/article/details/1213058842,本文結合了 api doc 對 DefaultKafkaProducerFactory-默認kafka生產者工廠的重點方法進行介紹;??
3,DefaultKafkaProducerFactory 類代碼結構包括(小結):
【1】 類描述
類描述:
單例共享 Producer 實例的 ProducerFactory 實現。
此實現將為每次 createProducer() 調用時提供的 Map 配置和可選的 Serializer 實現返回相同的 Producer 實例(如果未啟用事務)。
如果您使用的序列化器沒有參數構造函數并且不需要設置,那么最簡單的方法是在傳遞給 DefaultKafkaProducerFactory 構造函數的配置中針對 ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG 和 ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG 鍵指定序列化器類。
如果這是不可能的,但您確定以下至少一項是正確的:
- 1 只有一個生產者會使用序列化程序。
- 2 您正在使用可以在 Producer 實例之間共享的序列化程序(特別是它們的 close() 方法是無操作的)。
- 3 您確定沒有任何單個 Producer 被關閉的風險,而其他具有相同序列化程序的 Producer 實例正在使用中。
然后您可以為鍵和值序列化程序之一或兩者傳入 Serializer 實例。
如果以上都不是真的,那么您可以為一個或兩個序列化程序提供一個供應商函數,每次工廠創建生產者時,該函數將用于獲取序列化程序。
Producer 被包裝,并且在調用 Producer.close() 時實際上并未關閉底層的 KafkaProducer 實例。當調用 DisposableBean.destroy() 或應用程序上下文發布 ContextStoppedEvent 時,KafkaProducer 物理關閉。你也可以調用reset()。
設置 setTransactionIdPrefix(String) 啟用事務;在這種情況下,會維護生產者的緩存;關閉生產者將其返回到緩存。當工廠被銷毀、應用程序上下文停止或調用 reset() 方法時,生產者將關閉并清除緩存。
public class DefaultKafkaProducerFactory<K, V> extends KafkaResourceFactoryimplements ProducerFactory<K, V>, ApplicationContextAware,BeanNameAware, ApplicationListener<ContextStoppedEvent>, DisposableBean {?【2】構造器
使用提供的配置和序列化程序供應商構建工廠。
如果提供,還可以把 transactionIdPrefix 配置為ProducerConfig.TRANSACTIONAL_ID_CONFIG 的值。
此配置將被目標 Producer 實例的后綴覆蓋。
public DefaultKafkaProducerFactory(Map<String, Object> configs,@Nullable Supplier<Serializer<K>> keySerializerSupplier,@Nullable Supplier<Serializer<V>> valueSerializerSupplier) {this.configs = new ConcurrentHashMap<>(configs);this.keySerializerSupplier = keySerializerSupplier == null ? () -> null : keySerializerSupplier;this.valueSerializerSupplier = valueSerializerSupplier == null ? () -> null : valueSerializerSupplier;// clientId 表示kafka生產者idif (this.clientIdPrefix == null && configs.get(ProducerConfig.CLIENT_ID_CONFIG) instanceof String) {this.clientIdPrefix = (String) configs.get(ProducerConfig.CLIENT_ID_CONFIG);}// 是否開啟事務String txId = (String) this.configs.get(ProducerConfig.TRANSACTIONAL_ID_CONFIG);if (StringUtils.hasText(txId)) {setTransactionIdPrefix(txId);this.configs.remove(ProducerConfig.TRANSACTIONAL_ID_CONFIG);}this.configs.put("internal.auto.downgrade.txn.commit", true); }【特別注意】我們可以不傳入 key序列化器,value序列化器 對象到 DefaultKafkaProducerFactory構造器(即設置為null) , 而把序列化器全限定類名設置在 configs 屬性里面,因為原生kafka生產者的構造器可以讀取配置中的序列化器類,如下:
new DefaultKafkaProducerFactory<>(0 (Map) PPKafkaClusterManager.INSTANCE.getKafkaClusterProps(kafkaClusterName));// 不傳入key value的序列化器,默認為null?kafka 原生構造器如下:
// 通過反射創建序列化器對象 public <T> T getConfiguredInstance(String key, Class<T> t) {Class<?> c = getClass(key);if (c == null)return null;Object o = Utils.newInstance(c);if (!t.isInstance(o))throw new KafkaException(c.getName() + " is not an instance of " + t.getName());if (o instanceof Configurable)((Configurable) o).configure(originals());return t.cast(o);}【3】方法介紹
【3.1】設置物理關閉生產者超時時間
public void setPhysicalCloseTimeout(int physicalCloseTimeout) {this.physicalCloseTimeout = Duration.ofSeconds(physicalCloseTimeout); }通過工廠物理關閉生產者而不是producer自身關閉的等待時間(當 {@link #reset()}、{@link #destroy() #closeProducerFor(String)} 或 {@link #closeThreadBoundProducer( )} 被調用)。 以秒為單位; 默認{@link #DEFAULT_PHYSICAL_CLOSE_TIMEOUT}。
【3.2】設置事務id 前綴(開啟事務)
public final void setTransactionIdPrefix(String transactionIdPrefix) {Assert.notNull(transactionIdPrefix, "'transactionIdPrefix' cannot be null");this.transactionIdPrefix = transactionIdPrefix;enableIdempotentBehaviour(); } // 事務可用? @Override public boolean transactionCapable() {return this.transactionIdPrefix != null; }為 ProducerConfig.TRANSACTIONAL_ID_CONFIG 配置設置前綴。 默認情況下,來自配置的 ProducerConfig.TRANSACTIONAL_ID_CONFIG 值用作目標生產者配置中的前綴。
【3.3】 設置是否每個線程產生一個生產者
public void setProducerPerThread(boolean producerPerThread) {this.producerPerThread = producerPerThread; }設置為 true 為每個線程創建一個生產者,而不是由所有客戶端共享的單例。 當不再需要生產者時,客戶端必須調用 closeThreadBoundProducer() 來物理關閉生產者。 這些生產者不會被 destroy() 或 reset() 關閉。?
【3.4】創建kafka生產者方法(重點)*
3個外觀方法
@Override public Producer<K, V> createProducer() {return createProducer(this.transactionIdPrefix); }@Override public Producer<K, V> createProducer(@Nullable String txIdPrefixArg) {String txIdPrefix = txIdPrefixArg == null ? this.transactionIdPrefix : txIdPrefixArg;return doCreateProducer(txIdPrefix); }@Override public Producer<K, V> createNonTransactionalProducer() {return doCreateProducer(null); }底層方法
private Producer<K, V> doCreateProducer(@Nullable String txIdPrefix) {if (txIdPrefix != null) { // 使用kafka事務if (this.producerPerConsumerPartition) {return createTransactionalProducerForPartition(txIdPrefix);}else {return createTransactionalProducer(txIdPrefix);}}if (this.producerPerThread) { // 每個線程一個生產者 return getOrCreateThreadBoundProducer();}synchronized (this) { // 我們走這里,synchronized同步代碼塊 if (this.producer != null && expire(this.producer)) {this.producer = null;}if (this.producer == null) {this.producer = new CloseSafeProducer<>(createKafkaProducer(), this::removeProducer,this.physicalCloseTimeout, this.beanName, this.epoch.get());this.listeners.forEach(listener -> listener.producerAdded(this.producer.clientId, this.producer));}return this.producer;} }【代碼解說】 以上方法有4個分支,包括創建分區帶事務的生產者, 帶事務的生產者,每個線程一個生產者,普通生產者;
【3.4.1】 創建分區帶事務的生產者 createTransactionalProducerForPartition(txIdPrefix)
因為方法過于復雜,放在文末說明
【3.4.2】 帶事務的生產者 createTransactionalProducer(txIdPrefix)
因為方法過于復雜,放在文末說明
【3.4.3】 每個線程一個生產者 getOrCreateThreadBoundProducer
該方法就是把 生產者放入了線程級變量 ThreadLocal; 僅此而已
private Producer<K, V> getOrCreateThreadBoundProducer() {// 從線程級變量獲取CloseSafeProducer<K, V> tlProducer = this.threadBoundProducers.get();if (tlProducer != null && (this.epoch.get() != tlProducer.epoch || expire(tlProducer))) {closeThreadBoundProducer();tlProducer = null;}if (tlProducer == null) {tlProducer = new CloseSafeProducer<>(createKafkaProducer(), this::removeProducer,this.physicalCloseTimeout, this.beanName, this.epoch.get());for (Listener<K, V> listener : this.listeners) {listener.producerAdded(tlProducer.clientId, tlProducer);}this.threadBoundProducers.set(tlProducer); // 放入線程級遍歷 }return tlProducer; }threadBoundProducers 就是線程級遍歷
【3.4.4】 普通生產者*(本文所關注的方法)
synchronized塊中 可以保證所有客戶端線程復用同一個 kafka生產者,只要這個kafka沒有過期;即便過期,它會重新創建一個 kafka生產者;
synchronized中調用了expire(this.producer)判斷是否過期
private boolean expire(CloseSafeProducer<K, V> producer) {boolean expired = this.maxAge > 0 && System.currentTimeMillis() - producer.created > this.maxAge;if (expired) {producer.closeDelegate(this.physicalCloseTimeout, this.listeners);}return expired; }接著 創建了 CloseSafeProducer -關閉安全的生產者, 這是一個內部類;
protected static class CloseSafeProducer<K, V> implements Producer<K, V> {在 調用CloseSafeProducer 構造器時傳入了 kafka生產者, 由 createKafkaProducer() 獲取; createKafkaProducer() 方法如下:
protected Producer<K, V> createKafkaProducer() {Map<String, Object> newConfigs;if (this.clientIdPrefix == null) {// 是否有生產者客戶端id ,這個值可以為空,newConfigs = new HashMap<>(this.configs);}else {newConfigs = new HashMap<>(this.configs);newConfigs.put(ProducerConfig.CLIENT_ID_CONFIG,this.clientIdPrefix + "-" + this.clientIdCounter.incrementAndGet());}checkBootstrap(newConfigs);return createRawProducer(newConfigs); }// 創建原生kafka生產者 protected Producer<K, V> createRawProducer(Map<String, Object> rawConfigs) {Producer<K, V> kafkaProducer =new KafkaProducer<>(rawConfigs, this.keySerializerSupplier.get(), this.valueSerializerSupplier.get());for (ProducerPostProcessor<K, V> pp : this.postProcessors) {kafkaProducer = pp.apply(kafkaProducer);}return kafkaProducer; }?子類必須返回一個原生生產者(kafka生產者),該生產者將被包裝在 DefaultKafkaProducerFactory.CloseSafeProducer 中。
【4】內部類-保證關閉安全的kafka生產者-CloseSafeProducer
1, 該類實現了 接口? Producer;
【4.1】構造器
上文調用? CloseSafeProducer構造器創建 生產者對象,
?其中 removeProducer 使用了 java8語法的方法引用,如下:
構造器有1個外觀
CloseSafeProducer(Producer<K, V> delegate,BiPredicate<CloseSafeProducer<K, V>, Duration> removeConsumerProducer, Duration closeTimeout,String factoryName, int epoch) {this(delegate, removeConsumerProducer, null, closeTimeout, factoryName, epoch); }底層構造器
CloseSafeProducer(Producer<K, V> delegate,BiPredicate<CloseSafeProducer<K, V>, Duration> removeProducer, @Nullable String txIdPrefix,Duration closeTimeout, String factoryName, int epoch) {Assert.isTrue(!(delegate instanceof CloseSafeProducer), "Cannot double-wrap a producer");this.delegate = delegate; // 原生kafka生產者 this.removeProducer = removeProducer; // 移除生產者的方法this.txIdPrefix = txIdPrefix; // 事務id前綴 this.closeTimeout = closeTimeout; // 關閉超時時間 Map<MetricName, ? extends Metric> metrics = delegate.metrics(); // 指標Iterator<MetricName> metricIterator = metrics.keySet().iterator();// 指標迭代器String id;if (metricIterator.hasNext()) {id = metricIterator.next().tags().get("client-id");}else {id = "unknown";}this.clientId = factoryName + "." + id; // 客戶端idthis.created = System.currentTimeMillis(); // 創建時間為當前時間(毫秒)this.epoch = epoch; // 副本時代版本號LOGGER.debug(() -> "Created new Producer: " + this); }【代碼解說】
delegate,表示代理,這里指 原生kafka生產者;
很顯然, 構造器沒有啥復雜邏輯,就是賦值而已;
【4.2】發送消息
發送消息有2個重載方法?
方法1, 直接發送 ProducerRecord
@Override public Future<RecordMetadata> send(ProducerRecord<K, V> record) {LOGGER.trace(() -> toString() + " send(" + record + ")");return this.delegate.send(record); }方法2,帶回調的發送方法,消息發送完成后的回調(成功或失敗),回調方法中可以獲取發送消息所在的偏移量和分區等信息(metadata) ;
這個方法先執行了 原生kafka發送消息分發的回調, 回調執行完成后,在執行 外層傳入的回調;
public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback) {LOGGER.trace(() -> toString() + " send(" + record + ")");return this.delegate.send(record, new Callback() {@Overridepublic void onCompletion(RecordMetadata metadata, Exception exception) {if (exception instanceof OutOfOrderSequenceException) {CloseSafeProducer.this.producerFailed = exception;close(CloseSafeProducer.this.closeTimeout);}callback.onCompletion(metadata, exception);}}); }【4.3】flush 刷新緩沖區
直接調用 原生kakfa生產者delegate的flush 方法;?
public void flush() {LOGGER.trace(() -> toString() + " flush()");this.delegate.flush(); }【4.4】kafka事務操作方法
初始化事務, initTransactions;
開啟事務, beginTransaction ;
發送偏移量到事務, sendOffsetsToTransaction ;
提交事務, commitTransaction ;
中斷事務, abortTransaction ;
【4.5】關閉生產者方法(非常重要*)
有2個方法關閉生產者;
【4.5.1】 close(Duration timeout )
public void close(@Nullable Duration timeout) {LOGGER.trace(() -> toString() + " close(" + (timeout == null ? "null" : timeout) + ")");if (!this.closed) {if (this.producerFailed != null) {LOGGER.warn(() -> "Error during some operation; producer removed from cache: " + this);this.closed = true;this.removeProducer.test(this, this.producerFailed instanceof TimeoutException? CLOSE_TIMEOUT_AFTER_TX_TIMEOUT: timeout);}else {this.closed = this.removeProducer.test(this, timeout);}} }調用 remoteProducer() 移除生產者
// 刪除單個共享生產者和線程綁定實例(如果存在)。 protected final synchronized boolean removeProducer(CloseSafeProducer<K, V> producerToRemove , Duration timeout) {if (producerToRemove.closed) {if (producerToRemove.equals(this.producer)) {this.producer = null;producerToRemove.closeDelegate(timeout, this.listeners);}this.threadBoundProducers.remove();return true;}else {return false;} }?
也就是說,不是每一次發送消息完成,都關閉kafka生產者;
【總結】 何時關閉kafka生產者 ?(干貨——非常重要) *
當發送消息拋出異常時,關閉kafka生產者;?
上述代碼還是調用了? producerToRemove.closeDelegate(timeout, this.listeners);
其中 producerToRemove 就是 producer(CloseSafeProducer )的 closeDelegate(),下文所述;
?【4.5.2】 CloseSafeProducer.closeDelegate
調用了 原生kafka生產者的close 方法;
把生產者從監聽器中移除;
void closeDelegate(Duration timeout, List<Listener<K, V>> listeners) {this.delegate.close(timeout == null ? this.closeTimeout : timeout);listeners.forEach(listener -> listener.producerRemoved(this.clientId, this));this.closed = true; }【補充】創建生產者
【代碼解說】 以上方法有4個分支,包括創建分區帶事務的生產者, 帶事務的生產者,每個線程一個生產者,普通生產者;
【3.4.1】 創建分區帶事務的生產者 createTransactionalProducerForPartition(txIdPrefix)
protected Producer<K, V> createTransactionalProducerForPartition(String txIdPrefix) {String suffix = TransactionSupport.getTransactionIdSuffix();if (suffix == null) {return createTransactionalProducer(txIdPrefix);}else {synchronized (this.consumerProducers) {CloseSafeProducer<K, V> consumerProducer = this.consumerProducers.get(suffix);if (consumerProducer == null || expire(consumerProducer)) {CloseSafeProducer<K, V> newProducer = doCreateTxProducer(txIdPrefix, suffix,this::removeConsumerProducer);this.consumerProducers.put(suffix, newProducer);return newProducer;}else {return consumerProducer;}}} } private final Map<String, CloseSafeProducer<K, V>> consumerProducers = new HashMap<>();- 1) 如果后綴為空,調用? createTransactionalProducer 創建生產者 ; 先從緩存中的阻塞隊列中獲取,若獲取不到,則 調用 doCreateTxProducer? 創建;
- ?2,不為空, 調用 doCreateTxProducer 創建生產者; 它也調用了? createRawProducer 根據原生 kafka生產者 創建? CloseSafeProducer
【3.4.2】 帶事務的生產者 createTransactionalProducer(txIdPrefix)
refer2【3.4.1】分支1。
總結
以上是生活随笔為你收集整理的spring-kafka整合:DefaultKafkaProducerFactory默认kafka生产者工厂介绍的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 联通联网设置(联通宽带联网设置)
- 下一篇: spring-kafka整合:Kafka