日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當(dāng)前位置: 首頁(yè) > 编程资源 > 编程问答 >内容正文

编程问答

Kafka事务特性详解

發(fā)布時(shí)間:2025/3/20 编程问答 15 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Kafka事务特性详解 小編覺得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.

Kafka在0.11版本中除了引入了Exactly Once語(yǔ)義,還引入了事務(wù)特性。Kafka事務(wù)特性是指一系列的生產(chǎn)者生產(chǎn)消息和消費(fèi)者提交偏移量的操作在一個(gè)事務(wù)中,或者說是一個(gè)原子操作,生產(chǎn)消息和提交偏移量同時(shí)成功或者失敗。

1. Kafka事務(wù)的使用

Kafka中的事務(wù)特性主要用于以下兩種場(chǎng)景:

  • 生產(chǎn)者發(fā)送多條消息可以封裝在一個(gè)事務(wù)中,形成一個(gè)原子操作。多條消息要么都發(fā)送成功,要么都發(fā)送失敗。
  • read-process-write模式:將消息消費(fèi)和生產(chǎn)封裝在一個(gè)事務(wù)中,形成一個(gè)原子操作。在一個(gè)流式處理的應(yīng)用中,常常一個(gè)服務(wù)需要從上游接收消息,然后經(jīng)過處理后送達(dá)到下游,這就對(duì)應(yīng)著消息的消費(fèi)和生成。

當(dāng)事務(wù)中僅僅存在Consumer消費(fèi)消息的操作時(shí),它和Consumer手動(dòng)提交Offset并沒有區(qū)別。因此單純的消費(fèi)消息并不是Kafka引入事務(wù)機(jī)制的原因,單純的消費(fèi)消息也沒有必要存在于一個(gè)事務(wù)中。

Kafka producer API提供了以下接口用于事務(wù)操作:

?

/*** 初始化事務(wù)*/public void initTransactions();/*** 開啟事務(wù)*/public void beginTransaction() throws ProducerFencedException ;/*** 在事務(wù)內(nèi)提交已經(jīng)消費(fèi)的偏移量*/public void sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> offsets, String consumerGroupId) throws ProducerFencedException ;/*** 提交事務(wù)*/public void commitTransaction() throws ProducerFencedException;/*** 丟棄事務(wù)*/public void abortTransaction() throws ProducerFencedException ;

下面是使用Kafka事務(wù)特性的例子,這段代碼Producer開啟了一個(gè)事務(wù),然后在這個(gè)事務(wù)中發(fā)送了兩條消息。這兩條消息要么都發(fā)送成功,要么都失敗。

?

KafkaProducer producer = createKafkaProducer("bootstrap.servers", "localhost:9092","transactional.id”, “my-transactional-id");producer.initTransactions(); producer.beginTransaction(); producer.send("outputTopic", "message1"); producer.send("outputTopic", "message2"); producer.commitTransaction();

下面這段代碼即為read-process-write模式,在一個(gè)Kafka事務(wù)中,同時(shí)涉及到了生產(chǎn)消息和消費(fèi)消息。

?

KafkaProducer producer = createKafkaProducer("bootstrap.servers", "localhost:9092","transactional.id", "my-transactional-id");KafkaConsumer consumer = createKafkaConsumer("bootstrap.servers", "localhost:9092","group.id", "my-group-id","isolation.level", "read_committed");consumer.subscribe(singleton("inputTopic"));producer.initTransactions();while (true) {ConsumerRecords records = consumer.poll(Long.MAX_VALUE);producer.beginTransaction();for (ConsumerRecord record : records)producer.send(producerRecord(“outputTopic”, record));producer.sendOffsetsToTransaction(currentOffsets(consumer), group); producer.commitTransaction(); }

注意:在理解消息的事務(wù)時(shí),一直處于一個(gè)錯(cuò)誤理解是,把操作db的業(yè)務(wù)邏輯跟操作消息當(dāng)成是一個(gè)事務(wù),如下所示:

?

void kakfa_in_tranction(){// 1.kafa的操作:讀取消息或生產(chǎn)消息kafkaOperation();// 2.db操作dbOperation(); }

其實(shí)這個(gè)是有問題的。操作DB數(shù)據(jù)庫(kù)的數(shù)據(jù)源是DB,消息數(shù)據(jù)源是kfaka,這是完全不同兩個(gè)數(shù)據(jù)。一種數(shù)據(jù)源(如mysql,kafka)對(duì)應(yīng)一個(gè)事務(wù),所以它們是兩個(gè)獨(dú)立的事務(wù)。kafka事務(wù)指kafka一系列 生產(chǎn)、消費(fèi)消息等操作組成一個(gè)原子操作,db事務(wù)是指操作數(shù)據(jù)庫(kù)的一系列增刪改操作組成一個(gè)原子操作。

2. Kafka事務(wù)配置

  • 對(duì)于Producer,需要設(shè)置transactional.id屬性,這個(gè)屬性的作用下文會(huì)提到。設(shè)置了transactional.id屬性后,enable.idempotence屬性會(huì)自動(dòng)設(shè)置為true。
  • 對(duì)于Consumer,需要設(shè)置isolation.level = read_committed,這樣Consumer只會(huì)讀取已經(jīng)提交了事務(wù)的消息。另外,需要設(shè)置enable.auto.commit = false來關(guān)閉自動(dòng)提交Offset功能。

更多關(guān)于配置的信息請(qǐng)參考我的文章:Kafka消息送達(dá)語(yǔ)義詳解

3. Kafka事務(wù)特性

Kafka的事務(wù)特性本質(zhì)上代表了三個(gè)功能:原子寫操作拒絕僵尸實(shí)例(Zombie fencing)和讀事務(wù)消息

3.1 原子寫

Kafka的事務(wù)特性本質(zhì)上是支持了Kafka跨分區(qū)和Topic的原子寫操作。在同一個(gè)事務(wù)中的消息要么同時(shí)寫入成功,要么同時(shí)寫入失敗。我們知道,Kafka中的Offset信息存儲(chǔ)在一個(gè)名為_consumed_offsets的Topic中,因此read-process-write模式,除了向目標(biāo)Topic寫入消息,還會(huì)向_consumed_offsets中寫入已經(jīng)消費(fèi)的Offsets數(shù)據(jù)。因此read-process-write本質(zhì)上就是跨分區(qū)和Topic的原子寫操作。Kafka的事務(wù)特性就是要確保跨分區(qū)的多個(gè)寫操作的原子性。

3.2 拒絕僵尸實(shí)例(Zombie fencing)

在分布式系統(tǒng)中,一個(gè)instance的宕機(jī)或失聯(lián),集群往往會(huì)自動(dòng)啟動(dòng)一個(gè)新的實(shí)例來代替它的工作。此時(shí)若原實(shí)例恢復(fù)了,那么集群中就產(chǎn)生了兩個(gè)具有相同職責(zé)的實(shí)例,此時(shí)前一個(gè)instance就被稱為“僵尸實(shí)例(Zombie Instance)”。在Kafka中,兩個(gè)相同的producer同時(shí)處理消息并生產(chǎn)出重復(fù)的消息(read-process-write模式),這樣就嚴(yán)重違反了Exactly Once Processing的語(yǔ)義。這就是僵尸實(shí)例問題。

Kafka事務(wù)特性通過transaction-id屬性來解決僵尸實(shí)例問題。所有具有相同transaction-id的Producer都會(huì)被分配相同的pid,同時(shí)每一個(gè)Producer還會(huì)被分配一個(gè)遞增的epoch。Kafka收到事務(wù)提交請(qǐng)求時(shí),如果檢查當(dāng)前事務(wù)提交者的epoch不是最新的,那么就會(huì)拒絕該P(yáng)roducer的請(qǐng)求。從而達(dá)成拒絕僵尸實(shí)例的目標(biāo)。

3.3 讀事務(wù)消息

為了保證事務(wù)特性,Consumer如果設(shè)置了isolation.level = read_committed,那么它只會(huì)讀取已經(jīng)提交了的消息。在Producer成功提交事務(wù)后,Kafka會(huì)將所有該事務(wù)中的消息的Transaction Marker從uncommitted標(biāo)記為committed狀態(tài),從而所有的Consumer都能夠消費(fèi)。

4. Kafka事務(wù)原理

Kafka為了支持事務(wù)特性,引入一個(gè)新的組件:Transaction Coordinator。主要負(fù)責(zé)分配pid,記錄事務(wù)狀態(tài)等操作。下面時(shí)Kafka開啟一個(gè)事務(wù)到提交一個(gè)事務(wù)的流程圖:

KafkaTransaction.png

主要分為以下步驟:

1. 查找Tranaction Corordinator

Producer向任意一個(gè)brokers發(fā)送 FindCoordinatorRequest請(qǐng)求來獲取Transaction Coordinator的地址。

2. 初始化事務(wù) initTransaction

Producer發(fā)送InitpidRequest給Transaction Coordinator,獲取pid。Transaction Coordinator在Transaciton Log中記錄這<TransactionId,pid>的映射關(guān)系。另外,它還會(huì)做兩件事:

  • 恢復(fù)(Commit或Abort)之前的Producer未完成的事務(wù)
  • 對(duì)PID對(duì)應(yīng)的epoch進(jìn)行遞增,這樣可以保證同一個(gè)app的不同實(shí)例對(duì)應(yīng)的PID是一樣,而epoch是不同的。

只要開啟了冪等特性即必須執(zhí)行InitpidRequest,而無須考慮該P(yáng)roducer是否開啟了事務(wù)特性。

3. 開始事務(wù)beginTransaction

執(zhí)行Producer的beginTransacion(),它的作用是Producer在本地記錄下這個(gè)transaction的狀態(tài)為開始狀態(tài)。這個(gè)操作并沒有通知Transaction Coordinator,因?yàn)門ransaction Coordinator只有在Producer發(fā)送第一條消息后才認(rèn)為事務(wù)已經(jīng)開啟。

4. read-process-write流程

一旦Producer開始發(fā)送消息,Transaction Coordinator會(huì)將該<Transaction, Topic, Partition>存于Transaction Log內(nèi),并將其狀態(tài)置為BEGIN。另外,如果該<Topic, Partition>為該事務(wù)中第一個(gè)<Topic, Partition>,Transaction Coordinator還會(huì)啟動(dòng)對(duì)該事務(wù)的計(jì)時(shí)(每個(gè)事務(wù)都有自己的超時(shí)時(shí)間)。

在注冊(cè)<Transaction, Topic, Partition>到Transaction Log后,生產(chǎn)者發(fā)送數(shù)據(jù),雖然沒有還沒有執(zhí)行commit或者abort,但是此時(shí)消息已經(jīng)保存到Broker上了。即使后面執(zhí)行abort,消息也不會(huì)刪除,只是更改狀態(tài)字段標(biāo)識(shí)消息為abort狀態(tài)。

5. 事務(wù)提交或終結(jié) commitTransaction/abortTransaction

在Producer執(zhí)行commitTransaction/abortTransaction時(shí),Transaction Coordinator會(huì)執(zhí)行一個(gè)兩階段提交:

  • 第一階段,將Transaction Log內(nèi)的該事務(wù)狀態(tài)設(shè)置為PREPARE_COMMIT或PREPARE_ABORT
  • 第二階段,將Transaction Marker寫入該事務(wù)涉及到的所有消息(即將消息標(biāo)記為committed或aborted)。這一步驟Transaction Coordinator會(huì)發(fā)送給當(dāng)前事務(wù)涉及到的每個(gè)<Topic, Partition>的Leader,Broker收到該請(qǐng)求后,會(huì)將對(duì)應(yīng)的Transaction Marker控制信息寫入日志。

一旦Transaction Marker寫入完成,Transaction Coordinator會(huì)將最終的COMPLETE_COMMIT或COMPLETE_ABORT狀態(tài)寫入Transaction Log中以標(biāo)明該事務(wù)結(jié)束。

5. 總結(jié)

  • Transaction Marker與PID提供了識(shí)別消息是否應(yīng)該被讀取的能力,從而實(shí)現(xiàn)了事務(wù)的隔離性。
  • Offset的更新標(biāo)記了消息是否被讀取,從而將對(duì)讀操作的事務(wù)處理轉(zhuǎn)換成了對(duì)寫(Offset)操作的事務(wù)處理。
  • Kafka事務(wù)的本質(zhì)是,將一組寫操作(如果有)對(duì)應(yīng)的消息與一組讀操作(如果有)對(duì)應(yīng)的Offset的更新進(jìn)行同樣的標(biāo)記(Transaction Marker)來實(shí)現(xiàn)事務(wù)中涉及的所有讀寫操作同時(shí)對(duì)外可見或同時(shí)對(duì)外不可見。
  • Kafka只提供對(duì)Kafka本身的讀寫操作的事務(wù)性,不提供包含外部系統(tǒng)的事務(wù)性。
《新程序員》:云原生和全面數(shù)字化實(shí)踐50位技術(shù)專家共同創(chuàng)作,文字、視頻、音頻交互閱讀

總結(jié)

以上是生活随笔為你收集整理的Kafka事务特性详解的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網(wǎng)站內(nèi)容還不錯(cuò),歡迎將生活随笔推薦給好友。