Flink DataStream API 编程模型
Flink系列文章
- 第01講:Flink 的應(yīng)用場(chǎng)景和架構(gòu)模型
- 第02講:Flink 入門程序 WordCount 和 SQL 實(shí)現(xiàn)
- 第03講:Flink 的編程模型與其他框架比較
- 第04講:Flink 常用的 DataSet 和 DataStream API
- 第05講:Flink SQL & Table 編程和案例
- 第06講:Flink 集群安裝部署和 HA 配置
- 第07講:Flink 常見核心概念分析
- 第08講:Flink 窗口、時(shí)間和水印
- 第09講:Flink 狀態(tài)與容錯(cuò)
- 第10講:Flink Side OutPut 分流
- 第11講:Flink CEP 復(fù)雜事件處理
- 第12講:Flink 常用的 Source 和 Connector
- 第13講:如何實(shí)現(xiàn)生產(chǎn)環(huán)境中的 Flink 高可用配置
- 第14講:Flink Exactly-once 實(shí)現(xiàn)原理解析
- 第15講:如何排查生產(chǎn)環(huán)境中的反壓?jiǎn)栴}
- 第16講:如何處理Flink生產(chǎn)環(huán)境中的數(shù)據(jù)傾斜問(wèn)題
- 第17講:生產(chǎn)環(huán)境中的并行度和資源設(shè)置
- Flink系列文章
- Flink 架構(gòu)
- 流處理
- 示例
-
Data Sources
- 基本的stream source
-
DataStream Transformations
- 1. Map算子 DataStream => DataStream
- 2. FlatMap算子 DataStream => DataStream
- 3. Filter算子 DataStream => DataStream
- KeyBy算子 DataStream => KeyedStream
- Rich Functions
- Data Sinks
- Flink 中的 API
-
容錯(cuò)處理
- Checkpoint Storage
- 狀態(tài)快照如何工作?
- 確保精確一次(exactly once)
- 端到端精確一次
- Job 升級(jí)與擴(kuò)容
-
遲到的數(shù)據(jù)
- Event Time and Watermarks
本章教程對(duì) Apache Flink 的基本概念進(jìn)行了介紹,雖然省略了許多重要細(xì)節(jié),但是如果你掌握了本章內(nèi)容,就足以對(duì)Flink實(shí)現(xiàn)可擴(kuò)展并行度的 ETL、數(shù)據(jù)分析以及事件驅(qū)動(dòng)的流式應(yīng)用程序,有一個(gè)大致的了解。
Flink 架構(gòu)
Flink 是一個(gè)分布式系統(tǒng),需要有效分配和管理計(jì)算資源才能執(zhí)行流應(yīng)用程序。它集成了所有常見的集群資源管理器,例如Hadoop YARN,但也可以設(shè)置作為獨(dú)立集群甚至庫(kù)運(yùn)行。Flink 運(yùn)行時(shí)由兩種類型的進(jìn)程組成:一個(gè) JobManager 和一個(gè)或者多個(gè) TaskManager。
Client 不是運(yùn)行時(shí)和程序執(zhí)行的一部分,而是用于準(zhǔn)備數(shù)據(jù)流并將其發(fā)送給 JobManager。之后,客戶端可以斷開連接(分離模式),或保持連接來(lái)接收進(jìn)程報(bào)告(附加模式)。客戶端可以作為觸發(fā)執(zhí)行 Java/Scala 程序的一部分運(yùn)行,也可以在命令行進(jìn)程./bin/flink run ...中運(yùn)行。
可以通過(guò)多種方式啟動(dòng) JobManager 和 TaskManager:直接在機(jī)器上作為standalone 集群?jiǎn)?dòng)、在容器中啟動(dòng)、或者通過(guò)YARN等資源框架管理并啟動(dòng)。TaskManager 連接到 JobManagers,宣布自己可用,并被分配工作。
流處理
在自然環(huán)境中,數(shù)據(jù)的產(chǎn)生原本就是流式的。無(wú)論是來(lái)自 Web 服務(wù)器的事件數(shù)據(jù),證券交易所的交易數(shù)據(jù),還是來(lái)自工廠車間機(jī)器上的傳感器數(shù)據(jù),其數(shù)據(jù)都是流式的。但是當(dāng)你分析數(shù)據(jù)時(shí),可以圍繞 有界流(bounded)或 *流(unbounded)兩種模型來(lái)組織處理數(shù)據(jù),當(dāng)然,選擇不同的模型,程序的執(zhí)行和處理方式也都會(huì)不同。
Flink 程序看起來(lái)像一個(gè)轉(zhuǎn)換 DataStream 的常規(guī)程序。每個(gè)程序由相同的基本部分組成:
- 獲取一個(gè)執(zhí)行環(huán)境(execution environment);
- 加載/創(chuàng)建初始數(shù)據(jù);
- 指定數(shù)據(jù)相關(guān)的轉(zhuǎn)換;
- 指定計(jì)算結(jié)果的存儲(chǔ)位置;
- 觸發(fā)程序執(zhí)行。
通常,你只需要使用 getExecutionEnvironment() 即可,因?yàn)樵摲椒〞?huì)根據(jù)上下文做正確的處理:如果你在 IDE 中執(zhí)行你的程序或?qū)⑵渥鳛橐话愕?Java 程序執(zhí)行,那么它將創(chuàng)建一個(gè)本地環(huán)境,該環(huán)境將在你的本地機(jī)器上執(zhí)行你的程序。如果你基于程序創(chuàng)建了一個(gè) JAR 文件,并通過(guò)命令行運(yùn)行它,F(xiàn)link 集群管理器將執(zhí)行程序的 main 方法,同時(shí) getExecutionEnvironment() 方法會(huì)返回一個(gè)執(zhí)行環(huán)境以在集群上執(zhí)行你的程序。
StreamExecutionEnvironment senv = StreamExecutionEnvironment.getExecutionEnvironment();
示例
如下是一個(gè)完整的、可運(yùn)行的程序示例,它是基于流窗口的單詞統(tǒng)計(jì)應(yīng)用程序,計(jì)算 5 秒窗口內(nèi)來(lái)自 Web 套接字的單詞數(shù)。你可以復(fù)制并粘貼代碼以在本地運(yùn)行,需要的maven依賴地址。
package wordcount;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;
public class WindowWordCount {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<Tuple2<String, Integer>> dataStream = env
.socketTextStream("192.168.20.130", 9999)
.flatMap(new Splitter())
.keyBy(value -> value.f0)
.window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
.sum(1);
dataStream.print();
System.out.println("parallelism -> " + env.getParallelism());
env.execute("Window WordCount");
}
public static class Splitter implements FlatMapFunction<String, Tuple2<String, Integer>> {
@Override
public void flatMap(String sentence, Collector<Tuple2<String, Integer>> out) throws Exception {
for (String word: sentence.split(" ")) {
out.collect(new Tuple2<String, Integer>(word, 1));
}
}
}
}
Linux安裝nc工具:yum install nc,并且在命令行鍵入數(shù)據(jù):
[root@hadoop-001 ~]# nc -lk 9999
flink flink spark
flink hadoop spark
程序執(zhí)行結(jié)果:
# IDEA執(zhí)行,默認(rèn)flink并行度是8,可以env.setParallelism來(lái)設(shè)置
parallelism -> 8
1> (spark,1)
7> (flink,2)
1> (spark,1)
8> (hadoop,1)
7> (flink,1)
兩個(gè)窗口的結(jié)果,可以看到,把flink spark hadoop三個(gè)單詞的總次數(shù)一個(gè)不漏的算出來(lái)了。需要注意打印結(jié)果,1>表示編號(hào)為1的task打印的,代碼的gitee地址 。
我們知道了一個(gè)Flink程序通常有source -> transform -> sink,即 讀取數(shù)據(jù)源,處理轉(zhuǎn)換數(shù)據(jù),結(jié)果保存 ,接下來(lái)將逐步介紹這些基本用法。
Data Sources
Source 是你的程序從中讀取其輸入的地方。你可以用 StreamExecutionEnvironment.addSource(sourceFunction) 將一個(gè) source 關(guān)聯(lián)到你的程序。Flink 自帶了許多預(yù)先實(shí)現(xiàn)的 source functions,不過(guò)你仍然可以通過(guò)實(shí)現(xiàn) SourceFunction 接口編寫自定義的非并行 source,也可以通過(guò)實(shí)現(xiàn) ParallelSourceFunction 接口或者繼承 RichParallelSourceFunction 類編寫自定義的并行 sources。通過(guò) StreamExecutionEnvironment 可以訪問(wèn)多種預(yù)定義的 stream source:
1 基于文件:
-
readTextFile(path) - 讀取文本文件,例如遵守 TextInputFormat 規(guī)范的文件,逐行讀取并將它們作為字符串返回。
-
readFile(fileInputFormat, path) - 按照指定的文件輸入格式讀取(一次)文件。
-
readFile(fileInputFormat, path, watchType, interval, pathFilter, typeInfo) - 這是前兩個(gè)方法內(nèi)部調(diào)用的方法。它基于給定的 fileInputFormat 讀取路徑 path 上的文件。根據(jù)提供的 watchType 的不同,source 可能定期(每 interval 毫秒)監(jiān)控路徑上的新數(shù)據(jù)(watchType 為 FileProcessingMode.PROCESS_CONTINUOUSLY),或者處理一次當(dāng)前路徑中的數(shù)據(jù)然后退出(watchType 為 FileProcessingMode.PROCESS_ONCE)。使用 pathFilter,用戶可以進(jìn)一步排除正在處理的文件。
實(shí)現(xiàn):
在底層,F(xiàn)link 將文件讀取過(guò)程拆分為兩個(gè)子任務(wù),即 目錄監(jiān)控 和 數(shù)據(jù)讀取。每個(gè)子任務(wù)都由一個(gè)單獨(dú)的實(shí)體實(shí)現(xiàn)。監(jiān)控由單個(gè)非并行(并行度 = 1)任務(wù)實(shí)現(xiàn),而讀取由多個(gè)并行運(yùn)行的任務(wù)執(zhí)行。后者的并行度和作業(yè)的并行度相等。單個(gè)監(jiān)控任務(wù)的作用是掃描目錄(定期或僅掃描一次,取決于 watchType),找到要處理的文件,將它們劃分為 分片,并將這些分片分配給下游 reader。Reader 是將實(shí)際獲取數(shù)據(jù)的角色。每個(gè)分片只能被一個(gè) reader 讀取,而一個(gè) reader 可以一個(gè)一個(gè)地讀取多個(gè)分片。
重要提示:
-
如果 watchType 設(shè)置為 FileProcessingMode.PROCESS_CONTINUOUSLY,當(dāng)一個(gè)文件被修改時(shí),它的內(nèi)容會(huì)被完全重新處理。這可能會(huì)打破 “精確一次” 的語(yǔ)義,因?yàn)樵谖募┪沧芳訑?shù)據(jù)將導(dǎo)致重新處理文件的所有內(nèi)容。
-
如果 watchType 設(shè)置為 FileProcessingMode.PROCESS_ONCE,source 掃描一次路徑然后退出,無(wú)需等待 reader 讀完文件內(nèi)容。當(dāng)然,reader 會(huì)繼續(xù)讀取數(shù)據(jù),直到所有文件內(nèi)容都讀完。關(guān)閉 source 會(huì)導(dǎo)致在那之后不再有檢查點(diǎn)。這可能會(huì)導(dǎo)致節(jié)點(diǎn)故障后恢復(fù)速度變慢,因?yàn)樽鳂I(yè)將從最后一個(gè)檢查點(diǎn)恢復(fù)讀取。
2 基于套接字:
- socketTextStream - 從套接字讀取。元素可以由分隔符分隔。
3 基于集合:
-
fromCollection(Collection) - 從 Java Java.util.Collection 創(chuàng)建數(shù)據(jù)流。集合中的所有元素必須屬于同一類型。
-
fromCollection(Iterator, Class) - 從迭代器創(chuàng)建數(shù)據(jù)流。class 參數(shù)指定迭代器返回元素的數(shù)據(jù)類型。
-
fromElements(T ...) - 從給定的對(duì)象序列中創(chuàng)建數(shù)據(jù)流。所有的對(duì)象必須屬于同一類型。
-
fromParallelCollection(SplittableIterator, Class) - 從迭代器并行創(chuàng)建數(shù)據(jù)流。class 參數(shù)指定迭代器返回元素的數(shù)據(jù)類型。
-
generateSequence(from, to) - 基于給定間隔內(nèi)的數(shù)字序列并行生成數(shù)據(jù)流。
4 自定義:
- addSource - 關(guān)聯(lián)一個(gè)新的 source function。例如,你可以使用 addSource(new FlinkKafkaConsumer<>(...)) 來(lái)從 Apache Kafka 獲取數(shù)據(jù)。更多詳細(xì)信息見連接器。
基本的stream source
這樣將簡(jiǎn)單的流放在一起是為了方便用于原型或測(cè)試。StreamExecutionEnvironment 上還有一個(gè) fromCollection(Collection) 方法。因此,你可以這樣做:
List<Person> people = new ArrayList<Person>();
people.add(new Person("Fred", 35));
people.add(new Person("Wilma", 35));
people.add(new Person("Pebbles", 2));
DataStream<Person> flintstones = env.fromCollection(people);
另一個(gè)獲取數(shù)據(jù)到流中的便捷方法是用 socket
DataStream<String> lines = env.socketTextStream("localhost", 9999)
public static void demo4() throws Exception {
StreamExecutionEnvironment senv = StreamExecutionEnvironment.getExecutionEnvironment();
/**
* 1. linux安裝nc工具:yum install nc
* 2. 發(fā)送數(shù)據(jù): nc -lk 9999
*/
DataStream<Person> persons = senv.socketTextStream("192.168.20.130", 9999)
.map(line -> new Person(line.split(",")[0], Integer.valueOf(line.split(",")[1])));
persons.print();
senv.execute("DataSourceDemo");
}
或讀取文件
DataStream<String> lines = env.readTextFile("file:///path");
在真實(shí)的應(yīng)用中,最常用的數(shù)據(jù)源是那些支持低延遲,高吞吐并行讀取以及重復(fù)(高性能和容錯(cuò)能力為先決條件)的數(shù)據(jù)源,例如 Apache Kafka,Kinesis 和各種文件系統(tǒng),這將在后面的教程會(huì)經(jīng)常使用Kafka Source。REST API 和數(shù)據(jù)庫(kù)也經(jīng)常用于增強(qiáng)流處理的能力(stream enrichment)。
由于篇幅,這里不會(huì)列出所有的代碼,demo的gitee地址 。
DataStream Transformations
轉(zhuǎn)換主要常用的算子有map、flatMap、Filter、KeyBy、Window等,它們作用是對(duì)數(shù)據(jù)進(jìn)行清洗、轉(zhuǎn)換、分發(fā)等。這里列出幾個(gè)常用算子,在以后的Flink程序編寫中,這將是非常常用的。通常都需要用戶自定義Function,可以通過(guò)1)實(shí)現(xiàn)接口;2)匿名類;3)Java8 Lambdas表達(dá)式;
1. Map算子 DataStream => DataStream
輸入一個(gè)元素同時(shí)輸出一個(gè)元素。下面是將輸入流中元素?cái)?shù)值加倍的 map function:
DataStream<Integer> dataStream = //...
dataStream.map(new MapFunction<Integer, Integer>() {
@Override
public Integer map(Integer value) throws Exception {
return 2 * value;
}
});
2. FlatMap算子 DataStream => DataStream
輸入一個(gè)元素同時(shí)產(chǎn)生零個(gè)、一個(gè)或多個(gè)元素。下面是將句子拆分為單詞的 flatmap function:
dataStream.flatMap(new FlatMapFunction<String, String>() {
@Override
public void flatMap(String value, Collector<String> out)
throws Exception {
for(String word: value.split(" ")){
out.collect(word);
}
}
});
3. Filter算子 DataStream => DataStream
為每個(gè)元素執(zhí)行一個(gè)布爾 function,并保留那些 function 輸出值為 true 的元素。下面是過(guò)濾掉零值的 filter:
dataStream.filter(new FilterFunction<Integer>() {
@Override
public boolean filter(Integer value) throws Exception {
return value != 0;
}
});
KeyBy算子 DataStream => KeyedStream
在邏輯上將流劃分為不相交的分區(qū)。具有相同 key 的記錄都分配到同一個(gè)分區(qū)。在內(nèi)部, keyBy() 是通過(guò)哈希分區(qū)實(shí)現(xiàn)的,有多種指定 key 的方式,以下是通過(guò)Java8 Lambdas表達(dá)式:
dataStream.keyBy(value -> value.getSomeKey());
dataStream.keyBy(value -> value.f0);
還可以通過(guò)實(shí)現(xiàn)KeySelector接口,來(lái)指定key。
Rich Functions
至此,你已經(jīng)看到了 Flink 的幾種函數(shù)接口,包括 FilterFunction, MapFunction,和 FlatMapFunction。這些都是單一抽象方法模式。對(duì)其中的每一個(gè)接口,F(xiàn)link 同樣提供了一個(gè)所謂 “rich” 的變體,如 RichFlatMapFunction,其中增加了以下方法,包括:
-
open(Configuration c)
-
close()
-
getRuntimeContext()
open() 僅在算子初始化時(shí)調(diào)用一次。可以用來(lái)加載一些靜態(tài)數(shù)據(jù),或者建立外部服務(wù)的鏈接等,比如從數(shù)據(jù)庫(kù)讀取配置。
getRuntimeContext() 為整套潛在有趣的東西提供了一個(gè)訪問(wèn)途徑,最明顯的,它是你創(chuàng)建和訪問(wèn) Flink 狀態(tài)的途徑。
Data Sinks
Data sinks 使用 DataStream 并將它們轉(zhuǎn)發(fā)到文件、套接字、外部系統(tǒng)或打印它們。Flink 自帶了多種內(nèi)置的輸出格式,這些格式相關(guān)的實(shí)現(xiàn)封裝在 DataStreams 的算子里:
-
writeAsText() / TextOutputFormat - 將元素按行寫成字符串。通過(guò)調(diào)用每個(gè)元素的 toString() 方法獲得字符串。
-
writeAsCsv(...) / CsvOutputFormat - 將元組寫成逗號(hào)分隔值文件。行和字段的分隔符是可配置的。每個(gè)字段的值來(lái)自對(duì)象的 toString() 方法。
-
print() / printToErr() - 在標(biāo)準(zhǔn)輸出/標(biāo)準(zhǔn)錯(cuò)誤流上打印每個(gè)元素的 toString() 值。 可選地,可以提供一個(gè)前綴(msg)附加到輸出。這有助于區(qū)分不同的 print 調(diào)用。如果并行度大于1,輸出結(jié)果將附帶輸出任務(wù)標(biāo)識(shí)符的前綴。
-
writeUsingOutputFormat() / FileOutputFormat - 自定義文件輸出的方法和基類。支持自定義 object 到 byte 的轉(zhuǎn)換。
-
writeToSocket - 根據(jù) SerializationSchema 將元素寫入套接字。
-
addSink - 調(diào)用自定義 sink function。Flink 捆綁了連接到其他系統(tǒng)(例如 Apache Kafka)的連接器,這些連接器被實(shí)現(xiàn)為 sink functions。
print() / printToErr() 主要是程序開發(fā)調(diào)試的時(shí)候,將一些中間結(jié)果打印到控制臺(tái),便于調(diào)試。
在實(shí)際業(yè)務(wù)開發(fā)中,通常會(huì)使用addSink ,里面?zhèn)魅胍粋€(gè)SinkFunction對(duì)象,將結(jié)果保存到mysql等外部存儲(chǔ)。
rows.addSink(new RichSinkFunction<Row>() {
private Connection conn = null;
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
if(conn == null) {
Class.forName("ru.yandex.clickhouse.ClickHouseDriver");
conn = DriverManager.getConnection("jdbc:clickhouse://192.168.1.2:8123/test");
}
}
@Override
public void close() throws Exception {
super.close();
if(conn != null) {
conn.close();
}
}
@Override
public void invoke(Row row, Context context) throws Exception {
String sql = "";
PreparedStatement ps = null;
sql = "insert into table ...";
ps = conn.prepareStatement(sql);
ps.setInt(1, ...);
ps.execute();
if(ps != null) {
ps.close();
}
}
});
在sink里面拿到數(shù)據(jù)庫(kù)連接,通常在open()方法,并且組裝sql,invoke()將其寫入到數(shù)據(jù)庫(kù)。
Flink 中的 API
Flink 為流式/批式處理應(yīng)用程序的開發(fā)提供了不同級(jí)別的抽象。
-
Flink API 最底層的抽象為有狀態(tài)實(shí)時(shí)流處理。其抽象實(shí)現(xiàn)是 Process Function,并且 Process Function 被 Flink 框架集成到了 DataStream API 中來(lái)為我們使用。它允許用戶在應(yīng)用程序中*地處理來(lái)自單流或多流的事件(數(shù)據(jù)),并提供具有全局一致性和容錯(cuò)保障的狀態(tài)。此外,用戶可以在此層抽象中注冊(cè)事件時(shí)間(event time)和處理時(shí)間(processing time)回調(diào)方法,從而允許程序可以實(shí)現(xiàn)復(fù)雜計(jì)算。
-
Flink API 第二層抽象是 Core APIs。實(shí)際上,許多應(yīng)用程序不需要使用到上述最底層抽象的 API,而是可以使用 Core APIs 進(jìn)行編程:其中包含 DataStream API(應(yīng)用于有界/*數(shù)據(jù)流場(chǎng)景)。Core APIs 提供的流式 API(Fluent API)為數(shù)據(jù)處理提供了通用的模塊組件,例如各種形式的用戶自定義轉(zhuǎn)換(transformations)、聯(lián)接(joins)、聚合(aggregations)、窗口(windows)和狀態(tài)(state)操作等。此層 API 中處理的數(shù)據(jù)類型在每種編程語(yǔ)言中都有其對(duì)應(yīng)的類。
-
Process Function 這類底層抽象和 DataStream API 的相互集成使得用戶可以選擇使用更底層的抽象 API 來(lái)實(shí)現(xiàn)自己的需求。DataSet API 還額外提供了一些原語(yǔ),比如循環(huán)/迭代(loop/iteration)操作。
-
Flink API 第三層抽象是 Table API。Table API 是以表(Table)為中心的聲明式編程(DSL)API,例如在流式數(shù)據(jù)場(chǎng)景下,它可以表示一張正在動(dòng)態(tài)改變的表。Table API 遵循(擴(kuò)展)關(guān)系模型:即表?yè)碛?schema(類似于關(guān)系型數(shù)據(jù)庫(kù)中的 schema),并且 Table API 也提供了類似于關(guān)系模型中的操作,比如 select、project、join、group-by 和 aggregate 等。Table API 程序是以聲明的方式定義應(yīng)執(zhí)行的邏輯操作,而不是確切地指定程序應(yīng)該執(zhí)行的代碼。盡管 Table API 使用起來(lái)很簡(jiǎn)潔并且可以由各種類型的用戶自定義函數(shù)擴(kuò)展功能,但還是比 Core API 的表達(dá)能力差。此外,Table API 程序在執(zhí)行之前還會(huì)使用優(yōu)化器中的優(yōu)化規(guī)則對(duì)用戶編寫的表達(dá)式進(jìn)行優(yōu)化。
表和 DataStream/DataSet 可以進(jìn)行無(wú)縫切換,F(xiàn)link 允許用戶在編寫應(yīng)用程序時(shí)將 Table API 與 DataStream/DataSet API 混合使用。
- Flink API 最頂層抽象是 SQL。這層抽象在語(yǔ)義和程序表達(dá)式上都類似于 Table API,但是其程序?qū)崿F(xiàn)都是 SQL 查詢表達(dá)式。SQL 抽象與 Table API 抽象之間的關(guān)聯(lián)是非常緊密的,并且 SQL 查詢語(yǔ)句可以在 Table API 中定義的表上執(zhí)行。
容錯(cuò)處理
流式處理遇到程序中斷是很常見的異常,如何恢復(fù),這將是很關(guān)鍵的,那么Flink又是如何進(jìn)行容錯(cuò)的呢?
Checkpoint Storage
Flink 定期對(duì)每個(gè)算子的所有狀態(tài)進(jìn)行持久化快照,并將這些快照復(fù)制到更持久的地方,例如分布式文件系統(tǒng)hdfs。 如果發(fā)生故障,F(xiàn)link 可以恢復(fù)應(yīng)用程序的完整狀態(tài)并恢復(fù)處理,就好像沒有出現(xiàn)任何問(wèn)題一樣。
這些快照的存儲(chǔ)位置是通過(guò)作業(yè)_checkpoint storage_定義的。 有兩種可用檢查點(diǎn)存儲(chǔ)實(shí)現(xiàn):一種持久保存其狀態(tài)快照 到一個(gè)分布式文件系統(tǒng),另一種是使用 JobManager 的堆。
狀態(tài)快照如何工作?
Flink 使用 Chandy-Lamport algorithm 算法的一種變體,稱為異步 barrier 快照(asynchronous barrier snapshotting)。
當(dāng) checkpoint coordinator(job manager 的一部分)指示 task manager 開始 checkpoint 時(shí),它會(huì)讓所有 sources 記錄它們的偏移量,并將編號(hào)的 checkpoint barriers 插入到它們的流中。這些 barriers 流經(jīng) job graph,標(biāo)注每個(gè) checkpoint 前后的流部分。
Checkpoint n 將包含每個(gè) operator 的 state,這些 state 是對(duì)應(yīng)的 operator 消費(fèi)了嚴(yán)格在 checkpoint barrier n 之前的所有事件,并且不包含在此(checkpoint barrier n)后的任何事件后而生成的狀態(tài)。
當(dāng) job graph 中的每個(gè) operator 接收到 barriers 時(shí),它就會(huì)記錄下其狀態(tài)。擁有兩個(gè)輸入流的 Operators(例如 CoProcessFunction)會(huì)執(zhí)行 barrier 對(duì)齊(barrier alignment) 以便當(dāng)前快照能夠包含消費(fèi)兩個(gè)輸入流 barrier 之前(但不超過(guò))的所有 events 而產(chǎn)生的狀態(tài)。
確保精確一次(exactly once)
當(dāng)流處理應(yīng)用程序發(fā)生錯(cuò)誤的時(shí)候,結(jié)果可能會(huì)產(chǎn)生丟失或者重復(fù)。Flink 根據(jù)你為應(yīng)用程序和集群的配置,可以產(chǎn)生以下結(jié)果:
-
Flink 不會(huì)從快照中進(jìn)行恢復(fù)(at most once)
-
沒有任何丟失,但是你可能會(huì)得到重復(fù)冗余的結(jié)果(at least once)
-
沒有丟失或冗余重復(fù)(exactly once)
Flink 通過(guò)回退和重新發(fā)送 source 數(shù)據(jù)流從故障中恢復(fù),當(dāng)理想情況被描述為精確一次時(shí),這并不意味著每個(gè)事件都將被精確一次處理。相反,這意味著 每一個(gè)事件都會(huì)影響 Flink 管理的狀態(tài)精確一次。
Barrier 只有在需要提供精確一次的語(yǔ)義保證時(shí)需要進(jìn)行對(duì)齊(Barrier alignment)。如果不需要這種語(yǔ)義,可以通過(guò)配置 CheckpointingMode.AT_LEAST_ONCE 關(guān)閉 Barrier 對(duì)齊來(lái)提高性能。
端到端精確一次
為了實(shí)現(xiàn)端到端的精確一次,以便 sources 中的每個(gè)事件都僅精確一次對(duì) sinks 生效,必須滿足以下條件:
-
你的 sources 必須是可重放的,并且
-
你的 sinks 必須是事務(wù)性的(或冪等的)
在Flink里面開啟checkpoint只需要:
Job 升級(jí)與擴(kuò)容
升級(jí) Flink 作業(yè)一般都需要兩步:第一,使用 Savepoint 優(yōu)雅地停止 Flink Job。 Savepoint 是整個(gè)應(yīng)用程序狀態(tài)的一次快照(類似于 checkpoint ),該快照是在一個(gè)明確定義的、全局一致的時(shí)間點(diǎn)生成的。第二,從 Savepoint 恢復(fù)啟動(dòng)待升級(jí)的 Flink Job。 在此,“升級(jí)”包含如下幾種含義:
-
配置升級(jí)(比如 Job 并行度修改)
-
Job 拓?fù)渖?jí)(比如添加或者刪除算子)
-
Job 的用戶自定義函數(shù)升級(jí)
Step 1: 停止 Job
要優(yōu)雅停止 Job,需要使用 JobID 通過(guò) CLI 或 REST API 調(diào)用 “stop” 命令。 JobID 可以通過(guò)獲取所有運(yùn)行中的 Job 接口或 Flink WebUI 界面獲取,拿到 JobID 后就可以繼續(xù)停止作業(yè)了:
bin/flink stop <job-id>
client 預(yù)期輸出
Suspending job "<job-id>" with a savepoint.
Suspended job "<job-id>" with a savepoint.
Savepoint 已保存在 state.savepoints.dir 指定的路徑中,該配置在 flink-conf.yaml 中定義,flink-conf.yaml 掛載在本機(jī)的 /tmp/flink-savepoints-directory/ 目錄下。 在下一步操作中我們會(huì)用到這個(gè) Savepoint 路徑,如果我們是通過(guò) REST API 操作的, 那么 Savepoint 路徑會(huì)隨著響應(yīng)結(jié)果一起返回,我們可以直接查看文件系統(tǒng)來(lái)確認(rèn) Savepoint 保存情況。
**Step 2: 重啟 Job (不作任何變更) **
如果代碼邏輯需要改變,現(xiàn)在你可以從這個(gè) Savepoint 重新啟動(dòng)待升級(jí)的 Job。
flink run -s <savepoint-path> -p 3 -c MainClass -yid app_id /opt/ClickCountJob.jar
預(yù)期輸出
Starting execution of program
Job has been submitted with JobID <job-id>
遲到的數(shù)據(jù)
對(duì)于數(shù)據(jù)延遲,F(xiàn)link又是怎么處理的呢?這里先介紹2個(gè)概念。
Event Time and Watermarks
Flink 明確支持以下三種時(shí)間語(yǔ)義:
-
事件時(shí)間(event time): 事件產(chǎn)生的時(shí)間,記錄的是設(shè)備生產(chǎn)(或者存儲(chǔ))事件的時(shí)間;
-
攝取時(shí)間(ingestion time): Flink 讀取事件時(shí)記錄的時(shí)間;
-
處理時(shí)間(processing time): Flink pipeline 中具體算子處理事件的時(shí)間;
為了獲得可重現(xiàn)的結(jié)果,例如在計(jì)算過(guò)去的特定一天里第一個(gè)小時(shí)股票的最高價(jià)格時(shí),我們應(yīng)該使用事件時(shí)間。這樣的話,無(wú)論什么時(shí)間去計(jì)算都不會(huì)影響輸出結(jié)果。然而如果使用處理時(shí)間的話,實(shí)時(shí)應(yīng)用程序的結(jié)果是由程序運(yùn)行的時(shí)間所決定。多次運(yùn)行基于處理時(shí)間的實(shí)時(shí)程序,可能得到的結(jié)果都不相同,也可能會(huì)導(dǎo)致再次分析歷史數(shù)據(jù)或者測(cè)試新代碼變得異常困難。
EventTime就是我們的數(shù)據(jù)時(shí)間,F(xiàn)link把每條數(shù)據(jù)稱為Event;Watermarks就是每條數(shù)據(jù)允許的最大延遲;
公司組織春游,規(guī)定周六早晨8:00 ~ 8:30清查人數(shù),人齊則發(fā)車出發(fā),可是總有那么個(gè)同學(xué)會(huì)睡懶覺遲到,這時(shí)候通常也會(huì)等待20分鐘,但是不能一直等下去,最多等到8:50,不會(huì)繼續(xù)等待了,直接出發(fā)。在這個(gè)例子中,最晚期限時(shí)間是8:50 - 20分鐘,watermark就是8:30對(duì)應(yīng)的時(shí)間戳。
在基于窗口的允許延遲的Flink程序中,窗口最大時(shí)間,減去允許延遲的時(shí)間,也就是watermark,如果watermark大于window 結(jié)束時(shí)間,則觸發(fā)計(jì)算。
總結(jié)
以上是生活随笔為你收集整理的Flink DataStream API 编程模型的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。
- 上一篇: 你真的了解MySQL日期函数吗?
- 下一篇: 数组篇-其之一-数组的概念与一维数组