Spark Streaming事务
2019獨(dú)角獸企業(yè)重金招聘Python工程師標(biāo)準(zhǔn)>>>
一:傳統(tǒng)事務(wù)概念
事務(wù)(Transaction)是訪問(wèn)并可能更新數(shù)據(jù)庫(kù)中各種數(shù)據(jù)項(xiàng)的一個(gè)程序執(zhí)行單元(unit)。事務(wù)通常由高級(jí)數(shù)據(jù)庫(kù)操縱語(yǔ)言或編程語(yǔ)言(如SQL,C++或Java)書(shū)寫的用戶程序的執(zhí)行所引起,并用形如begin transaction和end transaction語(yǔ)句(或函數(shù)調(diào)用)來(lái)界定。事務(wù)由事務(wù)開(kāi)始(begin transaction)和事務(wù)結(jié)束(end transaction)之間執(zhí)行的全體操作組成。
例如:在關(guān)系數(shù)據(jù)庫(kù)中,一個(gè)事務(wù)可以是一條SQL語(yǔ)句,一組SQL語(yǔ)句或整個(gè)程序。
?
特性:事務(wù)是恢復(fù)和并發(fā)控制的基本單位。
事務(wù)應(yīng)該具有4個(gè)屬性:原子性、一致性、隔離性、持久性。這四個(gè)屬性通常稱為ACID特性。
原子性(atomicity)。一個(gè)事務(wù)是一個(gè)不可分割的工作單位,事務(wù)中包括的諸操作要么都做,要么都不做。
一致性(consistency)。事務(wù)必須是使數(shù)據(jù)庫(kù)從一個(gè)一致性狀態(tài)變到另一個(gè)一致性狀態(tài)。一致性與原子性是密切相關(guān)的。
隔離性(isolation)。一個(gè)事務(wù)的執(zhí)行不能被其他事務(wù)干擾。即一個(gè)事務(wù)內(nèi)部的操作及使用的數(shù)據(jù)對(duì)并發(fā)的其他事務(wù)是隔離的,并發(fā)執(zhí)行的各個(gè)事務(wù)之間不能互相干擾。
持久性(durability)。持久性也稱永久性(permanence),指一個(gè)事務(wù)一旦提交,它對(duì)數(shù)據(jù)庫(kù)中數(shù)據(jù)的改變就應(yīng)該是永久性的。接下來(lái)的其他操作或故障不應(yīng)該對(duì)其有任何影響。
二:spark Streaming中的事務(wù)
1.?Exactly once容錯(cuò)
2.?數(shù)據(jù)輸出不重復(fù)
一.?事務(wù)場(chǎng)景 :
以銀行轉(zhuǎn)帳一次為例,A用戶轉(zhuǎn)賬給B用戶,如何保證事務(wù)的一致性,即A用戶能夠轉(zhuǎn)出且只能轉(zhuǎn)出一次,B用戶能夠收到且只能收到一次。
二.??Exactly once容錯(cuò):
事務(wù)處理中如何保證能夠處理且只能處理一次,數(shù)據(jù)能夠輸出且只能輸出一次。
數(shù)據(jù)丟失的主要場(chǎng)景如下:
在Receiver收到數(shù)據(jù)且通過(guò)Driver的調(diào)度,Executor開(kāi)始計(jì)算數(shù)據(jù)的時(shí)候如果Driver突然奔潰(導(dǎo)致Executor會(huì)被Kill掉),此時(shí)Executor會(huì)被Kill掉,那么Executor中的數(shù)據(jù)就會(huì)丟失。
1. 事務(wù)處理如下圖 :
事務(wù)處理過(guò)程解析?:?
01.??InputStream?: 輸入數(shù)據(jù)?
02.? Executor?: 通過(guò)Receiver接收數(shù)據(jù),當(dāng)接收到數(shù)據(jù)后向Driver?匯報(bào)?
03.? Driver?: 通過(guò)StreamingContext接收到數(shù)據(jù)會(huì)啟動(dòng)Job進(jìn)行操作?
2.??解決事務(wù)源數(shù)據(jù)接收的安全性?:
事務(wù)處理解析?:
01.? Executor?:?在Receiver接收來(lái)自Kafka數(shù)據(jù)首先通過(guò)BlockManager寫入內(nèi)存+磁盤或者通過(guò)WAL來(lái)保證數(shù)據(jù)的安全性;
02.??Executor??: 通過(guò)Replication完成后產(chǎn)生Ack信號(hào);
03.? Kafka?: 確定收信息并讀取下一條數(shù)據(jù),Kafka才會(huì)進(jìn)行updateOffsets操作?;
04.??通過(guò)WAL機(jī)制讓所有的數(shù)據(jù)通過(guò)類似HDFS的方式進(jìn)行安全性容錯(cuò)處理,從而解決Executor被Kill掉后導(dǎo)致數(shù)據(jù)丟失可以通過(guò)WAL機(jī)制恢復(fù)回來(lái)。
3.??解決Driver數(shù)據(jù)輸出的安全性?:
數(shù)據(jù)的處理怎么保證有且僅有被處理一次?
數(shù)據(jù)零丟失并不能保證Exactly Once,如果Receiver接收且保存起來(lái)后沒(méi)來(lái)得及更新updateOffsets時(shí),就會(huì)導(dǎo)致數(shù)據(jù)被重復(fù)處理。
01.??通過(guò)StreamingContext接收數(shù)據(jù)通過(guò)CheckPoint進(jìn)行容錯(cuò)?;
02. logging the updates?: 通過(guò)記錄跟蹤所有生成RDD的轉(zhuǎn)換(transformations)也就是記錄每個(gè)RDD的lineage(血統(tǒng))來(lái)重新計(jì)算生成丟失的分區(qū)數(shù)據(jù)?;
?4.? Exactly Once的事務(wù)處理?:
01、 數(shù)據(jù)零丟失:必須有可靠的數(shù)據(jù)來(lái)源和可靠的Receiver,且整個(gè)應(yīng)用程序的metadata必須進(jìn)行checkpoint,且通過(guò)WAL來(lái)保證數(shù)據(jù)安全;
02、Spark Streaming 1.3的時(shí)候?yàn)榱吮苊釽AL的性能損失和實(shí)現(xiàn)Exactly Once而提供了Kafka Direct API,把Kafka作為文件存儲(chǔ)系統(tǒng)!!
03、此時(shí)兼具有流的優(yōu)勢(shì)和文件系統(tǒng)的優(yōu)勢(shì),Spark Streaming+Kafka就構(gòu)建了完美的流處理世界!!!
04、 數(shù)據(jù)不需要copy副本,不需要WAL性能損耗,不需要Receiver,所有的Executors直接通過(guò)kafka direct api直接消費(fèi)數(shù)據(jù),直接管理Offset,所以也不會(huì)重復(fù)消費(fèi)數(shù)據(jù);
三.? ?Spark Streaming數(shù)據(jù)輸出多次重寫及解決方案:
1、 為什么會(huì)有這個(gè)問(wèn)題,因?yàn)镾parkStreaming在計(jì)算的時(shí)候基于SparkCore,SparkCore天生會(huì)做以下事情導(dǎo)致SparkStreaming的結(jié)果(部分)重復(fù)輸出:
1、Task重試;
2、慢任務(wù)推測(cè);
3、Stage重復(fù);
4、Job重試;
等會(huì)導(dǎo)致數(shù)據(jù)的丟失。
2、 對(duì)應(yīng)的解決方案:
1、一個(gè)任務(wù)失敗就是job 失敗,設(shè)置spark.task.maxFailures次數(shù)為1;
2、設(shè)置spark.speculation為關(guān)閉狀態(tài)(因?yàn)槁蝿?wù)推測(cè)其實(shí)非常消耗性能,所以關(guān)閉后可以顯著的提高Spark Streaming處理性能)
3、Spark streaming on kafka的話,假如job失敗后可以設(shè)置kafka的auto.offset.reset為largest的方式會(huì)自動(dòng)恢復(fù)job的執(zhí)行。
最后再次強(qiáng)調(diào):?可以通過(guò)transform和foreachRDD基于業(yè)務(wù)邏輯代碼進(jìn)行邏輯控制來(lái)實(shí)現(xiàn)數(shù)據(jù)不重復(fù)消費(fèi)和輸出不重復(fù)!這二個(gè)方法類似于spark streaming的后門,可以做任意想象的控制操作!
轉(zhuǎn)載于:https://my.oschina.net/u/1253652/blog/669374
總結(jié)
以上是生活随笔為你收集整理的Spark Streaming事务的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。
- 上一篇: FreeMarker 一二事 - 静态模
- 下一篇: Navicat for Oracle实现