转:Kafka事务使用和编程示例/实例
Kafka事務(wù)使用和編程示例/實例_JobShow裁員加班實況-微信小程序-CSDN博客一、概述? Kafka事務(wù)特性是指一系列的生產(chǎn)者生產(chǎn)消息和消費者提交偏移量的操作在一個事務(wù)中,或者說是一個原子操作,生產(chǎn)消息和提交偏移量同時成功或者失敗。注意:kafka事務(wù)和DB事務(wù)。在理解消息的事務(wù)時,一直處于一個錯誤理解是,把操作db的業(yè)務(wù)邏輯跟操作消息當成是一個事務(wù),如下所示:void kakfa_in_tranction(){ // 1.kafa的操作:讀取消息或生產(chǎn)消息 kafkaOperation(); // 2.db操作 dbOperation()https://blog.csdn.net/u010002184/article/details/113933973
一、概述
? Kafka事務(wù)特性是指一系列的生產(chǎn)者生產(chǎn)消息和消費者提交偏移量的操作在一個事務(wù)中,或者說是一個原子操作,生產(chǎn)消息和提交偏移量同時成功或者失敗。
-
注意:kafka事務(wù)和DB事務(wù)。
在理解消息的事務(wù)時,一直處于一個錯誤理解是,把操作db的業(yè)務(wù)邏輯跟操作消息當成是一個事務(wù),如下所示:
void kakfa_in_tranction(){// 1.kafa的操作:讀取消息或生產(chǎn)消息kafkaOperation();// 2.db操作dbOperation(); }操作DB數(shù)據(jù)庫的數(shù)據(jù)源是DB,消息數(shù)據(jù)源是kfaka,這是完全不同兩個數(shù)據(jù)。一種數(shù)據(jù)源(如mysql,kafka)對應(yīng)一個事務(wù),所以它們是兩個獨立的事務(wù)。kafka事務(wù)指kafka一系列 生產(chǎn)、消費消息等操作組成一個原子操作,db事務(wù)是指操作數(shù)據(jù)庫的一系列增刪改操作組成一個原子操作。
二、事務(wù)的使用
Kafka中的事務(wù)特性主要用于以下兩種場景:
-
生產(chǎn)者發(fā)送多條消息可以封裝在一個事務(wù)中,形成一個原子操作。多條消息要么都發(fā)送成功,要么都發(fā)送失敗。
-
read-process-write模式:將消息消費和生產(chǎn)封裝在一個事務(wù)中,形成一個原子操作。在一個**流式處理**的應(yīng)用中,常常一個服務(wù)需要從上游接收消息,然后經(jīng)過處理后送達到下游,這就對應(yīng)著消息的消費和生成。
當事務(wù)中僅僅存在Consumer消費消息的操作時,它和Consumer手動提交Offset并沒有區(qū)別。因此單純的消費消息并不是Kafka引入事務(wù)機制的原因,單純的消費消息也沒有必要存在于一個事務(wù)中。
三、事務(wù)相關(guān)的API
1, api
/*** 初始化事務(wù)*/public void initTransactions();/*** 開啟事務(wù)*/public void beginTransaction() throws ProducerFencedException ;/*** 在事務(wù)內(nèi)提交已經(jīng)消費的偏移量*/public void sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> offsets, String consumerGroupId) ;/*** 提交事務(wù)*/public void commitTransaction() throws ProducerFencedException;/*** 丟棄事務(wù)*/public void abortTransaction() throws ProducerFencedException ;2,事務(wù)配置
2,1 生產(chǎn)者
需要設(shè)置transactional.id屬性。
設(shè)置了transactional.id屬性后,enable.idempotence屬性會自動設(shè)置為true。
2.2 消費者
?需要設(shè)置isolation.level = read_committed,這樣Consumer只會讀取已經(jīng)提交了事務(wù)的消息。另外,需要設(shè)置enable.auto.commit = false來關(guān)閉自動提交Offset功能。
四、事務(wù)使用示例
1, 需求
在Kafka的topic:ods_user中有一些用戶數(shù)據(jù),數(shù)據(jù)格式如下:
?
姓名,性別,出生日期
張三,1,1980-10-09
李四,0,1985-11-01
?
我們需要編寫程序,將用戶的性別轉(zhuǎn)換為男、女(1-男,0-女),轉(zhuǎn)換后將數(shù)據(jù)寫入到topic:dwd_user中。要求使用事務(wù)保障,要么消費了數(shù)據(jù)同時寫入數(shù)據(jù)到 topic,提交offset。要么全部失敗。
2,控制臺模擬數(shù)據(jù)
# 創(chuàng)建名為ods_user和dwd_user的主題 bin/kafka-topics.sh --create --zookeeper node-1:2181 --topic ods_user --partitions 3 --replication-factor 2 bin/kafka-topics.sh --create --zookeeper node-1:2181 --topic dwd_user --partitions 3 --replication-factor 2 # 生產(chǎn)數(shù)據(jù)到 ods_user bin/kafka-console-producer.sh --broker-list node-1:9092 --topic ods_user # 從dwd_user消費數(shù)據(jù) bin/kafka-console-consumer.sh --bootstrap-server node-1:9092 --topic dwd_user --from-beginning3, 詳細代碼
public class TransUse {public static void main(String[] args) {Consumer<String, String> consumer = createConsumer();Producer<String, String> producer = createProduceer();// 初始化事務(wù)producer.initTransactions();while(true) {try {// 1. 開啟事務(wù)producer.beginTransaction();// 2. 定義Map結(jié)構(gòu),用于保存分區(qū)對應(yīng)的offsetMap<TopicPartition, OffsetAndMetadata> offsetCommits = new HashMap<>();// 2. 拉取消息ConsumerRecords<String, String> records = consumer.poll(2000);for (ConsumerRecord<String, String> record : records) {// 3. 保存偏移量offsetCommits.put(new TopicPartition(record.topic(), record.partition()),new OffsetAndMetadata(record.offset() + 1));// 4. 進行轉(zhuǎn)換處理String[] fields = record.value().split(",");fields[1] = fields[1].equalsIgnoreCase("1") ? "男":"女";String message = fields[0] + "," + fields[1] + "," + fields[2];// 5. 生產(chǎn)消息到dwd_userproducer.send(new ProducerRecord<>("dwd_user", message));}// 6. 提交偏移量到事務(wù)producer.sendOffsetsToTransaction(offsetCommits, "ods_user");// 7. 提交事務(wù)producer.commitTransaction();} catch (Exception e) {// 8. 放棄事務(wù)producer.abortTransaction();}}}// 1. 創(chuàng)建消費者public static Consumer<String, String> createConsumer() {// 1. 創(chuàng)建Kafka消費者配置Properties props = new Properties();props.setProperty("bootstrap.servers", "node-1:9092");props.setProperty("group.id", "ods_user");props.put("isolation.level","read_committed");props.setProperty("enable.auto.commit", "false");props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");// 2. 創(chuàng)建Kafka消費者KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);// 3. 訂閱要消費的主題consumer.subscribe(Arrays.asList("ods_user"));return consumer;}// 2. 創(chuàng)建生產(chǎn)者public static Producer<String, String> createProduceer() {// 1. 創(chuàng)建生產(chǎn)者配置Properties props = new Properties();props.put("bootstrap.servers", "node-1:9092");props.put("transactional.id", "dwd_user");props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");// 2. 創(chuàng)建生產(chǎn)者Producer<String, String> producer = new KafkaProducer<>(props);return producer;}}4,異常模擬
// 3. 保存偏移量 offsetCommits.put(new TopicPartition(record.topic(), record.partition()),new OffsetAndMetadata(record.offset() + 1)); // 4. 進行轉(zhuǎn)換處理 String[] fields = record.value().split(","); fields[1] = fields[1].equalsIgnoreCase("1") ? "男":"女"; String message = fields[0] + "," + fields[1] + "," + fields[2];// 模擬異常 int i = 1/0;// 5. 生產(chǎn)消息到dwd_user producer.send(new ProducerRecord<>("dwd_user", message));?我們發(fā)現(xiàn),可以消費到消息,但如果中間出現(xiàn)異常的話,offset是不會被提交的,除非消費、生產(chǎn)消息都成功,才會提交事務(wù)。
轉(zhuǎn)自:
kafka事務(wù)使用和編程示例_青眼酷白龍的博客-CSDN博客_kafka事務(wù)使用
-------------------------------------------
一、事務(wù)場景
二、幾個關(guān)鍵概念和推導
1.因為producer發(fā)送消息可能是分布式事務(wù),所以引入了常用的2PC,所以有事務(wù)協(xié)調(diào)者(Transaction Coordinator)。Transaction Coordinator和之前為了解決腦裂和驚群問題引入的Group Coordinator在選舉和failover上面類似。
2.事務(wù)管理中事務(wù)日志是必不可少的,kafka使用一個內(nèi)部topic來保存事務(wù)日志,這個設(shè)計和之前使用內(nèi)部topic保存位點的設(shè)計保持一致。事務(wù)日志是Transaction Coordinator管理的狀態(tài)的持久化,因為不需要回溯事務(wù)的歷史狀態(tài),所以事務(wù)日志只用保存最近的事務(wù)狀態(tài)。
3.因為事務(wù)存在commit和abort兩種操作,而客戶端又有read committed和read uncommitted兩種隔離級別,所以消息隊列必須能標識事務(wù)狀態(tài),這個被稱作Control Message。
4.producer掛掉重啟或者漂移到其它機器需要能關(guān)聯(lián)的之前的未完成事務(wù)所以需要有一個唯一標識符來進行關(guān)聯(lián),這個就是TransactionalId,一個producer掛了,另一個有相同TransactionalId的producer能夠接著處理這個事務(wù)未完成的狀態(tài)。注意不要把TransactionalId和數(shù)據(jù)庫事務(wù)中常見的transaction id搞混了,kafka目前沒有引入全局序,所以也沒有transaction id,這個TransactionalId是用戶提前配置的。
5. TransactionalId能關(guān)聯(lián)producer,也需要避免兩個使用相同TransactionalId的producer同時存在,所以引入了producer epoch來保證對應(yīng)一個TransactionalId只有一個活躍的producer epoch
三、事務(wù)語義
2.1.? 多分區(qū)原子寫入
事務(wù)能夠保證Kafka topic下每個分區(qū)的原子寫入。事務(wù)中所有的消息都將被成功寫入或者丟棄。例如,處理過程中發(fā)生了異常并導致事務(wù)終止,這種情況下,事務(wù)中的消息都不會被Consumer讀取。現(xiàn)在我們來看下Kafka是如何實現(xiàn)原子的“讀取-處理-寫入”過程的。
首先,我們來考慮一下原子“讀取-處理-寫入”周期是什么意思。簡而言之,這意味著如果某個應(yīng)用程序在某個topic tp0的偏移量X處讀取到了消息A,并且在對消息A進行了一些處理(如B = F(A))之后將消息B寫入topic tp1,則只有當消息A和B被認為被成功地消費并一起發(fā)布,或者完全不發(fā)布時,整個讀取過程寫入操作是原子的。
現(xiàn)在,只有當消息A的偏移量X被標記為消耗時,消息A才被認為是從topic tp0消耗的,消費到的數(shù)據(jù)偏移量(record offset)將被標記為提交偏移量(Committing offset)。在Kafka中,我們通過寫入一個名為__consumer_offsets topic的內(nèi)部Kafka topic來記錄offset commit。消息僅在其offset被提交給__consumer_offsets topic時才被認為成功消費。
由于offset commit只是對Kafkatopic的另一次寫入,并且由于消息僅在提交偏移量時被視為成功消費,所以跨多個主題和分區(qū)的原子寫入也啟用原子“讀取-處理-寫入”循環(huán):提交偏移量X到offset topic和消息B到tp1的寫入將是單個事務(wù)的一部分,所以整個步驟都是原子的。
kafka系列九、kafka事務(wù)原理、事務(wù)API和使用場景 - 小人物的奮斗 - 博客園
總結(jié)
以上是生活随笔為你收集整理的转:Kafka事务使用和编程示例/实例的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 荒漠屠夫天赋符文(荒漠屠夫天赋符文推荐)
- 下一篇: spring-kafka整合:Defau