Flink 运维与调优
轉(zhuǎn)載-flink優(yōu)化_黃瓜燉啤酒鴨的博客-CSDN博客
1.1 內(nèi)存設(shè)置?
1.2 并行度設(shè)置?
1.2.1 最優(yōu)并行度計(jì)算?
1.2.2 Source 端并行度的配置?
1.2.3 Transform端并行度的配置?
1.2.4 Sink 端并行度的配置?
1.3 RocksDB大狀態(tài)調(diào)優(yōu)?
1.4 Checkpoint設(shè)置?
1.5 使用 Flink ParameterTool 讀取配置?
1.5.1 讀取運(yùn)行參數(shù)?
1.5.2 讀取系統(tǒng)屬性?
1.5.3 讀取配置文件?
1.5.4 注冊(cè)全局參數(shù)?
1.6 壓測(cè)方式?
2 反壓處理?
2.1 反壓現(xiàn)象及定位?
2.1.1 利用 Flink Web UI 定位產(chǎn)生反壓的位置?
2.1.2 利用Metrics定位反壓位置?
2.2 反壓的原因及處理?
2.2.1 系統(tǒng)資源?
2.2.2 垃圾收集(GC)?
2.2.3 CPU/線程瓶頸?
2.2.4 線程競(jìng)爭(zhēng)?
2.2.5 負(fù)載不平衡?
2.2.6 外部依賴?
3 數(shù)據(jù)傾斜?
3.1 判斷是否存在數(shù)據(jù)傾斜?
3.2 數(shù)據(jù)傾斜的解決?
3.2.1 keyBy 后的聚合操作存在數(shù)據(jù)傾斜?
3.2.2 keyBy 之前發(fā)生數(shù)據(jù)傾斜?
3.2.3 keyBy 后的窗口聚合操作存在數(shù)據(jù)傾斜?
4 KafkaSource調(diào)優(yōu)?
4.1 動(dòng)態(tài)發(fā)現(xiàn)分區(qū)?
4.2 從kafka數(shù)據(jù)源生成watermark?
4.3 設(shè)置空閑等待?
4.4 Kafka的offset消費(fèi)策略?
5 FlinkSQL調(diào)優(yōu)?
5.1 Group Aggregate優(yōu)化?
5.1.1 開啟MiniBatch(提升吞吐)?
5.1.2 開啟LocalGlobal(解決常見數(shù)據(jù)熱點(diǎn)問(wèn)題)?
5.1.3 開啟Split Distinct(解決COUNT DISTINCT熱點(diǎn)問(wèn)題)?
5.1.4 改寫為AGG WITH FILTER語(yǔ)法(提升大量COUNT DISTINCT場(chǎng)景性能)?
5.2 TopN優(yōu)化?
5.2.1 使用最優(yōu)算法?
5.2.2 無(wú)排名優(yōu)化(解決數(shù)據(jù)膨脹問(wèn)題)?
5.2.3 增加TopN的Cache大小?
5.2.4 PartitionBy的字段中要有時(shí)間類字段?
5.2.5 優(yōu)化后的SQL示例?
5.3 高效去重方案?
5.3.1 保留首行的去重策略(Deduplicate Keep FirstRow)?
5.3.2 保留末行的去重策略(Deduplicate Keep LastRow)?
5.4 高效的內(nèi)置函數(shù)?
5.4.1 使用內(nèi)置函數(shù)替換自定義函數(shù)?
5.4.2 LIKE操作注意事項(xiàng)?
5.4.3 慎用正則函數(shù)(REGEXP)?
5.5 指定時(shí)區(qū)?
5.6 設(shè)置參數(shù)總結(jié)?
資源配置調(diào)優(yōu)
Flink性能調(diào)優(yōu)的第一步,就是為任務(wù)分配合適的資源,在一定范圍內(nèi),增加資源的分配與性能的提升是成正比的,實(shí)現(xiàn)了最優(yōu)的資源配置后,在此基礎(chǔ)上再考慮進(jìn)行后面論述的性能調(diào)優(yōu)策略。
提交方式主要是yarn-per-job,資源的分配在使用腳本提交Flink任務(wù)時(shí)進(jìn)行指定。
- 標(biāo)準(zhǔn)的Flink任務(wù)提交腳本(Generic CLI?模式)
從1.11開始,增加了通用客戶端模式,參數(shù)使用-D <property=value>指定
bin/flink run \
-t?yarn-per-job \
-d \
-p 5 \ 指定并行度
-Dyarn.application.queue=test \ 指定yarn隊(duì)列
-Djobmanager.memory.process.size=1024mb \ 指定JM的總進(jìn)程大小
-Dtaskmanager.memory.process.size=1024mb \ 指定每個(gè)TM的總進(jìn)程大小
-Dtaskmanager.numberOfTaskSlots=2 \ 指定每個(gè)TM的slot數(shù)
-c com.at.app.dwd.LogBaseApp \
/opt/module/gmall-flink/gmall-realtime-1.0-SNAPSHOT-jar-with-dependencies.jar
參數(shù)列表:
Apache Flink 1.12 Documentation: Configuration
內(nèi)存設(shè)置
生產(chǎn)資源配置:
bin/flink run \
-t yarn-per-job \
-d \
-p 5 \ 指定并行度
-Dyarn.application.queue=test \ 指定yarn隊(duì)列
-Djobmanager.memory.process.size=2048mb \ JM2~4G足夠
-Dtaskmanager.memory.process.size=6144mb \ 單個(gè)TM2~8G足夠
-Dtaskmanager.numberOfTaskSlots=2 \ 與容器核數(shù)1core:1slot或1core:2slot
-c com.at.app.dwd.LogBaseApp \
/opt/module/gmall-flink/gmall-realtime-1.0-SNAPSHOT-jar-with-dependencies.jar
Flink是實(shí)時(shí)流處理,關(guān)鍵在于資源情況能不能抗住高峰時(shí)期每秒的數(shù)據(jù)量,通常用QPS/TPS來(lái)描述數(shù)據(jù)情況。
最優(yōu)并行度計(jì)算
開發(fā)完成后,先進(jìn)行壓測(cè)。任務(wù)并行度給10以下,測(cè)試單個(gè)并行度的處理上限。然后 總QPS/單并行度的處理能力 = 并行度
不能只從QPS去得出并行度,因?yàn)橛行┳侄紊佟⑦壿嫼?jiǎn)單的任務(wù),單并行度一秒處理幾萬(wàn)條數(shù)據(jù)。而有些數(shù)據(jù)字段多,處理邏輯復(fù)雜,單并行度一秒只能處理1000條數(shù)據(jù)。
最好根據(jù)高峰期的QPS壓測(cè),并行度*1.2倍,富余一些資源。
Source 端并行度的配置
數(shù)據(jù)源端是 Kafka,Source的并行度設(shè)置為Kafka對(duì)應(yīng)Topic的分區(qū)數(shù)。
如果已經(jīng)等于 Kafka 的分區(qū)數(shù),消費(fèi)速度仍跟不上數(shù)據(jù)生產(chǎn)速度,考慮下Kafka 要擴(kuò)大分區(qū),同時(shí)調(diào)大并行度等于分區(qū)數(shù)。
Flink 的一個(gè)并行度可以處理一至多個(gè)分區(qū)的數(shù)據(jù),如果并行度多于 Kafka 的分區(qū)數(shù),那么就會(huì)造成有的并行度空閑,浪費(fèi)資源。
Transform端并行度的配置- Keyby之前的算子
一般不會(huì)做太重的操作,都是比如map、filter、flatmap等處理較快的算子,并行度可以和source保持一致。
- Keyby之后的算子
如果并發(fā)較大,建議設(shè)置并行度為 2 的整數(shù)次冪,例如:128、256、512;
小并發(fā)任務(wù)的并行度不一定需要設(shè)置成 2 的整數(shù)次冪;
大并發(fā)任務(wù)如果沒(méi)有 KeyBy,并行度也無(wú)需設(shè)置為 2 的整數(shù)次冪;
Sink 端并行度的配置
Sink 端是數(shù)據(jù)流向下游的地方,可以根據(jù) Sink 端的數(shù)據(jù)量及下游的服務(wù)抗壓能力進(jìn)行評(píng)估。如果Sink端是Kafka,可以設(shè)為Kafka對(duì)應(yīng)Topic的分區(qū)數(shù)。
Sink 端的數(shù)據(jù)量小,比較常見的就是監(jiān)控告警的場(chǎng)景,并行度可以設(shè)置的小一些。
Source 端的數(shù)據(jù)量是最小的,拿到 Source 端流過(guò)來(lái)的數(shù)據(jù)后做了細(xì)粒度的拆分,數(shù)據(jù)量不斷的增加,到 Sink 端的數(shù)據(jù)量就非常大。那么在 Sink 到下游的存儲(chǔ)中間件的時(shí)候就需要提高并行度。
另外 Sink 端要與下游的服務(wù)進(jìn)行交互,并行度還得根據(jù)下游的服務(wù)抗壓能力來(lái)設(shè)置,如果在 Flink Sink 這端的數(shù)據(jù)量過(guò)大的話,且 Sink 處并行度也設(shè)置的很大,但下游的服務(wù)完全撐不住這么大的并發(fā)寫入,可能會(huì)造成下游服務(wù)直接被寫掛,所以最終還是要在 Sink 處的并行度做一定的權(quán)衡。
RocksDB 是基于 LSM Tree 實(shí)現(xiàn)的(類似HBase),寫數(shù)據(jù)都是先緩存到內(nèi)存中,所以RocksDB 的寫請(qǐng)求效率比較高。RocksDB 使用內(nèi)存結(jié)合磁盤的方式來(lái)存儲(chǔ)數(shù)據(jù),每次獲取數(shù)據(jù)時(shí),先從內(nèi)存中 blockcache 中查找,如果內(nèi)存中沒(méi)有再去磁盤中查詢。優(yōu)化后差不多單并行度 TPS 5000 record/s,性能瓶頸主要在于 RocksDB 對(duì)磁盤的讀請(qǐng)求,所以當(dāng)處理性能不夠時(shí),僅需要橫向擴(kuò)展并行度即可提高整個(gè)Job 的吞吐量。以下幾個(gè)調(diào)優(yōu)參數(shù):
- 設(shè)置本地 RocksDB 多目錄
在flink-conf.yaml 中配置:
state.backend.rocksdb.localdir: /data1/flink/rocksdb,/data2/flink/rocksdb,/data3/flink/rocksdb
注意:不要配置單塊磁盤的多個(gè)目錄,務(wù)必將目錄配置到多塊不同的磁盤上,讓多塊磁盤來(lái)分擔(dān)壓力。當(dāng)設(shè)置多個(gè) RocksDB 本地磁盤目錄時(shí),Flink 會(huì)隨機(jī)選擇要使用的目錄,所以就可能存在三個(gè)并行度共用同一目錄的情況。如果服務(wù)器磁盤數(shù)較多,一般不會(huì)出現(xiàn)該情況,但是如果任務(wù)重啟后吞吐量較低,可以檢查是否發(fā)生了多個(gè)并行度共用同一塊磁盤的情況。
當(dāng)一個(gè) TaskManager 包含 3 個(gè) slot 時(shí),那么單個(gè)服務(wù)器上的三個(gè)并行度都對(duì)磁盤造成頻繁讀寫,從而導(dǎo)致三個(gè)并行度的之間相互爭(zhēng)搶同一個(gè)磁盤 io,這樣務(wù)必導(dǎo)致三個(gè)并行度的吞吐量都會(huì)下降。設(shè)置多目錄實(shí)現(xiàn)三個(gè)并行度使用不同的硬盤從而減少資源競(jìng)爭(zhēng)。
如下所示是測(cè)試過(guò)程中磁盤的 IO 使用率,可以看出三個(gè)大狀態(tài)算子的并行度分別對(duì)應(yīng)了三塊磁盤,這三塊磁盤的 IO 平均使用率都保持在 45% 左右,IO 最高使用率幾乎都是 100%,而其他磁盤的 IO 平均使用率相對(duì)低很多。由此可見使用 RocksDB 做為狀態(tài)后端且有大狀態(tài)的頻繁讀取時(shí), 對(duì)磁盤IO性能消耗確實(shí)比較大。
如下圖所示,其中兩個(gè)并行度共用了 sdb 磁盤,一個(gè)并行度使用 sdj磁盤。可以看到 sdb 磁盤的 IO 使用率已經(jīng)達(dá)到了 91.6%,就會(huì)導(dǎo)致 sdb 磁盤對(duì)應(yīng)的兩個(gè)并行度吞吐量大大降低,從而使得整個(gè) Flink 任務(wù)吞吐量降低。如果每個(gè)服務(wù)器上有一兩塊 SSD,強(qiáng)烈建議將 RocksDB 的本地磁盤目錄配置到 SSD 的目錄下,從 HDD 改為 SSD 對(duì)于性能的提升可能比配置 10 個(gè)優(yōu)化參數(shù)更有效。
- state.backend.incremental:開啟增量檢查點(diǎn),默認(rèn)false,改為true。
- state.backend.rocksdb.predefined-options:SPINNING_DISK_OPTIMIZED_HIGH_MEM設(shè)置為機(jī)械硬盤+內(nèi)存模式,有條件上SSD,指定為FLASH_SSD_OPTIMIZED
- state.backend.rocksdb.block.cache-size: 整個(gè) RocksDB 共享一個(gè) block cache,讀數(shù)據(jù)時(shí)內(nèi)存的 cache 大小,該參數(shù)越大讀數(shù)據(jù)時(shí)緩存命中率越高,默認(rèn)大小為 8 MB,建議設(shè)置到 64 ~ 256 MB。
- state.backend.rocksdb.thread.num: 用于后臺(tái) flush 和合并 sst 文件的線程數(shù),默認(rèn)為 1,建議調(diào)大,機(jī)械硬盤用戶可以改為?4 等更大的值。
- state.backend.rocksdb.writebuffer.size: RocksDB 中,每個(gè) State 使用一個(gè) Column Family,每個(gè) Column Family 使用獨(dú)占的 write buffer,建議調(diào)大,例如:32M
- state.backend.rocksdb.writebuffer.count: 每個(gè) Column Family 對(duì)應(yīng)的 writebuffer 數(shù)目,默認(rèn)值是 2,對(duì)于機(jī)械磁盤來(lái)說(shuō),如果內(nèi)存?夠大,可以調(diào)大到 5 左右
- state.backend.rocksdb.writebuffer.number-to-merge: 將數(shù)據(jù)從 writebuffer 中 flush 到磁盤時(shí),需要合并的 writebuffer 數(shù)量,默認(rèn)值為 1,可以調(diào)成3。
- state.backend.local-recovery: 設(shè)置本地恢復(fù),當(dāng) Flink 任務(wù)失敗時(shí),可以基于本地的狀態(tài)信息進(jìn)行恢復(fù)任務(wù),可能不需要從 hdfs 拉取數(shù)據(jù)
Checkpoint設(shè)置
一般我們的 Checkpoint 時(shí)間間隔可以設(shè)置為分鐘級(jí)別,例如 1 分鐘、3 分鐘,對(duì)于狀態(tài)很大的任務(wù)每次 Checkpoint 訪問(wèn) HDFS 比較耗時(shí),可以設(shè)置為 5~10 分鐘一次Checkpoint,并且調(diào)大兩次 Checkpoint 之間的暫停間隔,例如設(shè)置兩次Checkpoint 之間至少暫停 4或8 分鐘。
如果 Checkpoint 語(yǔ)義配置為 EXACTLY_ONCE,那么在 Checkpoint 過(guò)程中還會(huì)存在 barrier 對(duì)齊的過(guò)程,可以通過(guò) Flink Web UI 的 Checkpoint 選項(xiàng)卡來(lái)查看 Checkpoint 過(guò)程中各階段的耗時(shí)情況,從而確定到底是哪個(gè)階段導(dǎo)致 Checkpoint 時(shí)間過(guò)長(zhǎng)然后針對(duì)性的解決問(wèn)題。
RocksDB相關(guān)參數(shù)在1.3中已說(shuō)明,可以在flink-conf.yaml指定,也可以在Job的代碼中調(diào)用API單獨(dú)指定,這里不再列出。
?// 使? RocksDBStateBackend 做為狀態(tài)后端,并開啟增量 Checkpoint
?RocksDBStateBackend rocksDBStateBackend = new RocksDBStateBackend("hdfs://hadoop1:8020/flink/checkpoints", true);
?env.setStateBackend(rocksDBStateBackend);
?// 開啟Checkpoint,間隔為 3 分鐘
?env.enableCheckpointing(TimeUnit.MINUTES.toMillis(3));
?// 配置 Checkpoint
?CheckpointConfig checkpointConf = env.getCheckpointConfig();
?checkpointConf.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)
?// 最小間隔 4分鐘
?checkpointConf.setMinPauseBetweenCheckpoints(TimeUnit.MINUTES.toMillis(4))
?// 超時(shí)時(shí)間 10分鐘
?checkpointConf.setCheckpointTimeout(TimeUnit.MINUTES.toMillis(10));
?// 保存checkpoint
?checkpointConf.enableExternalizedCheckpoints(
CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
使用 Flink ParameterTool 讀取配置
在實(shí)際開發(fā)中,有各種環(huán)境(開發(fā)、測(cè)試、預(yù)發(fā)、生產(chǎn)),作業(yè)也有很多的配置:算子的并行度配置、Kafka 數(shù)據(jù)源的配置(broker 地址、topic 名、group.id)、Checkpoint 是否開啟、狀態(tài)后端存儲(chǔ)路徑、數(shù)據(jù)庫(kù)地址、用戶名和密碼等各種各樣的配置,可能每個(gè)環(huán)境的這些配置對(duì)應(yīng)的值都是不一樣的。
如果你是直接在代碼??寫死的配置,每次換個(gè)環(huán)境去運(yùn)行測(cè)試作業(yè),都要重新去修改代碼中的配置,然后編譯打包,提交運(yùn)行,這樣就要花費(fèi)很多時(shí)間在這些重復(fù)的勞動(dòng)力上了。在 Flink 中可以通過(guò)使用 ParameterTool 類讀取配置,它可以讀取環(huán)境變量、運(yùn)行參數(shù)、配置文件。
ParameterTool 是可序列化的,所以你可以將它當(dāng)作參數(shù)進(jìn)行傳遞給算子的自定義函數(shù)類。
讀取運(yùn)行參數(shù)
我們可以在Flink的提交腳本添加運(yùn)行參數(shù),格式:
- --參數(shù)名 參數(shù)值
- -參數(shù)名 參數(shù)值
在 Flink 程序中可以直接使用 ParameterTool.fromArgs(args) 獲取到所有的參數(shù),也可以通過(guò) parameterTool.get("username") 方法獲取某個(gè)參數(shù)對(duì)應(yīng)的值。
舉例:通過(guò)運(yùn)行參數(shù)指定jobname
bin/flink run \
-t yarn-per-job \
-d \
-p 5 \ 指定并行度
-Dyarn.application.queue=test \ 指定yarn隊(duì)列
-Djobmanager.memory.process.size=1024mb \ 指定JM的總進(jìn)程大小
-Dtaskmanager.memory.process.size=1024mb \ 指定每個(gè)TM的總進(jìn)程大小
-Dtaskmanager.numberOfTaskSlots=2 \ 指定每個(gè)TM的slot數(shù)
-c com.at.app.dwd.LogBaseApp \
/opt/module/gmall-flink/gmall-realtime-1.0-SNAPSHOT-jar-with-dependencies.jar \
--jobname dwd-LogBaseApp ?//參數(shù)名自己隨便起,代碼里對(duì)應(yīng)上即可
在代碼里獲取參數(shù)值:
????????ParameterTool parameterTool = ParameterTool.fromArgs(args);
????????String myJobname = parameterTool.get("jobname"); ?//參數(shù)名對(duì)應(yīng)
????????env.execute(myJobname);
ParameterTool 還?持通過(guò) ParameterTool.fromSystemProperties()?方法讀取系統(tǒng)屬性。做個(gè)打印:
ParameterTool parameterTool = ParameterTool.fromSystemProperties();
System.out.println(parameterTool.toMap().toString());
可以得到全面的系統(tǒng)屬性,部分結(jié)果
讀取配置文件
可以使用ParameterTool.fromPropertiesFile("/application.properties") 讀取 properties 配置文件。可以將所有要配置的地方(比如并行度和一些 Kafka、MySQL 等配置)都寫成可配置的,然后其對(duì)應(yīng)的 key 和 value 值都寫在配置文件中,最后通過(guò) ParameterTool 去讀取配置文件獲取對(duì)應(yīng)的值。
在 ExecutionConfig 中可以將 ParameterTool 注冊(cè)為全作業(yè)參數(shù)的參數(shù),這樣就可以被 JobManager 的web 端以及用戶?定義函數(shù)中以配置值的形式訪問(wèn)。
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.getConfig().setGlobalJobParameters(ParameterTool.fromArgs(args));
可以不用將ParameterTool當(dāng)作參數(shù)傳遞給算子的自定義函數(shù),直接在用戶?定義的 Rich 函數(shù)中直接獲取到參數(shù)值了。
env.addSource(new RichSourceFunction() {
@Override
public void run(SourceContext sourceContext) throws Exception {
?while (true) {
ParameterTool parameterTool = (ParameterTool)getRuntimeContext().getExecutionConfig().getGlobalJobParameters();
???}
??}
??@Override
??public void cancel() {
??}
})
壓測(cè)的方式很簡(jiǎn)單,先在kafka中積壓數(shù)據(jù),之后開啟Flink任務(wù),出現(xiàn)反壓,就是處理瓶頸。相當(dāng)于水庫(kù)先積水,一下子泄洪。數(shù)據(jù)可以是自己造的模擬數(shù)據(jù),也可以是生產(chǎn)中的部分?jǐn)?shù)據(jù)。
反壓處理
反壓(BackPressure)通常產(chǎn)生于這樣的場(chǎng)景:短時(shí)間的負(fù)載高峰導(dǎo)致系統(tǒng)接收數(shù)據(jù)的速率遠(yuǎn)高于它處理數(shù)據(jù)的速率。許多日常問(wèn)題都會(huì)導(dǎo)致反壓,例如,垃圾回收停頓可能會(huì)導(dǎo)致流入的數(shù)據(jù)快速堆積,或遇到大促、秒殺活動(dòng)導(dǎo)致流量陡增。反壓如果不能得到正確的處理,可能會(huì)導(dǎo)致資源耗盡甚至系統(tǒng)崩潰。
反壓機(jī)制是指系統(tǒng)能夠自己檢測(cè)到被阻塞的 Operator,然后自適應(yīng)地降低源頭或上游數(shù)據(jù)的發(fā)送速率,從而維持整個(gè)系統(tǒng)的穩(wěn)定。Flink 任務(wù)一般運(yùn)行在多個(gè)節(jié)點(diǎn)上,數(shù)據(jù)從上游算子發(fā)送到下游算子需要網(wǎng)絡(luò)傳輸,若系統(tǒng)在反壓時(shí)想要降低數(shù)據(jù)源頭或上游算子數(shù)據(jù)的發(fā)送速率,那么肯定也需要網(wǎng)絡(luò)傳輸。所以下面先來(lái)了解一下 Flink 的網(wǎng)絡(luò)流控(Flink 對(duì)網(wǎng)絡(luò)數(shù)據(jù)流量的控制)機(jī)制。
Flink 的反壓太過(guò)于天然了,導(dǎo)致無(wú)法簡(jiǎn)單地通過(guò)監(jiān)控 BufferPool 的使用情況來(lái)判斷反壓狀態(tài)。Flink 通過(guò)對(duì)運(yùn)行中的任務(wù)進(jìn)行采樣來(lái)確定其反壓,如果一個(gè) Task 因?yàn)榉磯簩?dǎo)致處理速度降低了,那么它肯定會(huì)卡在向 LocalBufferPool 申請(qǐng)內(nèi)存塊上。那么該 Task 的 stack trace 應(yīng)該是這樣:
java.lang.Object.wait(Native Method)
o.a.f.[...].LocalBufferPool.requestBuffer(LocalBufferPool.java:163) o.a.f.[...].LocalBufferPool.requestBufferBlocking(LocalBufferPool.java:133) [...]
監(jiān)控對(duì)正常的任務(wù)運(yùn)行有一定影響,因此只有當(dāng) Web 頁(yè)面切換到 Job 的 BackPressure 頁(yè)面時(shí),JobManager 才會(huì)對(duì)該 Job 觸發(fā)反壓監(jiān)控。默認(rèn)情況下,JobManager 會(huì)觸發(fā) 100 次 stack trace 采樣,每次間隔 50ms 來(lái)確定反壓。Web 界面看到的比率表示在內(nèi)部方法調(diào)用中有多少 stack trace 被卡在LocalBufferPool.requestBufferBlocking(),例如: 0.01 表示在 100 個(gè)采樣中只有 1 個(gè)被卡在LocalBufferPool.requestBufferBlocking()。采樣得到的比例與反壓狀態(tài)的對(duì)應(yīng)關(guān)系如下:
- OK: 0 <= 比例 <= 0.10
- LOW: 0.10 < 比例 <= 0.5
- HIGH: 0.5 < 比例 <= 1
Task 的狀態(tài)為 OK 表示沒(méi)有反壓,HIGH 表示這個(gè) Task 被反壓。
在 Flink Web UI 中有 BackPressure 的頁(yè)面,通過(guò)該頁(yè)面可以查看任務(wù)中 subtask 的反壓狀態(tài),如下兩圖所示,分別展示了狀態(tài)是 OK 和 HIGH 的場(chǎng)景。排查的時(shí)候,先把operator chain禁用,方便定位。
當(dāng)某個(gè) Task 吞吐量下降時(shí),基于 Credit 的反壓機(jī)制,上游不會(huì)給該 Task 發(fā)送數(shù)據(jù),所以該 Task 不會(huì)頻繁卡在向 Buffer Pool 去申請(qǐng) Buffer。反壓監(jiān)控實(shí)現(xiàn)原理就是監(jiān)控 Task 是否卡在申請(qǐng) buffer 這一步,所以遇到瓶頸的 Task 對(duì)應(yīng)的反壓??必然會(huì)顯示 OK,即表示沒(méi)有受到反壓。
如果該 Task 吞吐量下降,造成該Task 上游的 Task 出現(xiàn)反壓時(shí),必然會(huì)存在:該 Task 對(duì)應(yīng)的 InputChannel 變滿,已經(jīng)申請(qǐng)不到可用的Buffer 空間。如果該 Task 的 InputChannel 還能申請(qǐng)到可用 Buffer,那么上游就可以給該 Task 發(fā)送數(shù)據(jù),上游 Task 也就不會(huì)被反壓了,所以說(shuō)遇到瓶頸且導(dǎo)致上游 Task 受到反壓的 Task 對(duì)應(yīng)的 InputChannel 必然是滿的(這?不考慮?絡(luò)遇到瓶頸的情況)。從這個(gè)思路出發(fā),可以對(duì)該 Task 的 InputChannel 的使用情況進(jìn)行監(jiān)控,如果 InputChannel 使用率 100%,那么該 Task 就是我們要找的反壓源。Flink 1.9 及以上版本inPoolUsage 表示 inputFloatingBuffersUsage 和inputExclusiveBuffersUsage 的總和。
反壓時(shí),可以看到遇到瓶頸的該Task的inPoolUage為1。
先檢查基本原因,然后再深入研究更復(fù)雜的原因,最后找出導(dǎo)致瓶頸的原因。下面列出從最基本到比較復(fù)雜的一些反壓潛在原因。
注意:反壓可能是暫時(shí)的,可能是由于負(fù)載高峰、CheckPoint 或作業(yè)重啟引起的數(shù)據(jù)積壓而導(dǎo)致反壓。如果反壓是暫時(shí)的,應(yīng)該忽略它。另外,請(qǐng)記住,斷斷續(xù)續(xù)的反壓會(huì)影響我們分析和解決問(wèn)題。
檢查涉及服務(wù)器基本資源的使用情況,如CPU、網(wǎng)絡(luò)或磁盤I/O,目前 Flink 任務(wù)使用最主要的還是內(nèi)存和 CPU 資源,本地磁盤、依賴的外部存儲(chǔ)資源以及網(wǎng)卡資源一般都不會(huì)是瓶頸。如果某些資源被充分利用或大量使用,可以借助分析工具,分析性能瓶頸(JVM Profiler+ FlameGraph生成火焰圖)。
如何生成火焰圖:如何生成 Flink 作業(yè)的交互式火焰圖? | zhisheng的博客
如何讀懂火焰圖:如何讀懂火焰圖? - 知乎
- 針對(duì)特定的資源調(diào)優(yōu)Flink
- 通過(guò)增加并行度或增加集群中的服務(wù)器數(shù)量來(lái)橫向擴(kuò)展
- 減少瓶頸算子上游的并行度,從而減少瓶頸算子接收的數(shù)據(jù)量(不建議,可能造成整個(gè)Job數(shù)據(jù)延遲增大)
垃圾收集(GC)
長(zhǎng)時(shí)間GC暫停會(huì)導(dǎo)致性能問(wèn)題。可以通過(guò)打印調(diào)試GC日志(通過(guò)-XX:+PrintGCDetails)或使用某些內(nèi)存或 GC 分析器(GCViewer工具)來(lái)驗(yàn)證是否處于這種情況。
- 在Flink提交腳本中,設(shè)置JVM參數(shù),打印GC日志:
bin/flink run \
-t?yarn-per-job \
-d \
-p 5 \ 指定并行度
-Dyarn.application.queue=test \ 指定yarn隊(duì)列
-Djobmanager.memory.process.size=1024mb \ 指定JM的總進(jìn)程大小
-Dtaskmanager.memory.process.size=1024mb \ 指定每個(gè)TM的總進(jìn)程大小
-Dtaskmanager.numberOfTaskSlots=2 \ 指定每個(gè)TM的slot數(shù)
-Denv.java.opts="-XX:+PrintGCDetails -XX:+PrintGCDateStamps"
-c com.at.app.dwd.LogBaseApp \
/opt/module/gmall-flink/gmall-realtime-1.0-SNAPSHOT-jar-with-dependencies.jar
- 下載GC日志的方式:
因?yàn)槭莖n yarn模式,運(yùn)行的節(jié)點(diǎn)一個(gè)一個(gè)找比較麻煩。可以打開WebUI,選擇JobManager或者TaskManager,點(diǎn)擊Stdout,即可看到GC日志,點(diǎn)擊下載按鈕即可將GC日志通過(guò)HTTP的方式下載下來(lái)。
- 分析GC日志:
通過(guò) GC 日志分析出單個(gè) Flink Taskmanager 堆總大小、年輕代、老年代分配的內(nèi)存空間、Full GC 后老年代剩余大小等,相關(guān)指標(biāo)定義可以去 Github 具體查看。
GCViewer地址:https://github.com/chewiebug/GCViewer
擴(kuò)展:最重要的指標(biāo)是Full GC 后,老年代剩余大小這個(gè)指標(biāo),按照《Java 性能優(yōu)化權(quán)威指南》這本書 Java 堆大小計(jì)算法則,設(shè) Full GC 后老年代剩余大小空間為 M,那么堆的大小建議 3 ~ 4倍 M,新生代為 1 ~ 1.5 倍 M,老年代應(yīng)為 2 ~ 3 倍 M。
CPU/線程瓶頸
有時(shí),一個(gè)或幾個(gè)線程導(dǎo)致 CPU 瓶頸,而整個(gè)機(jī)器的CPU使用率仍然相對(duì)較低,則可能無(wú)法看到 CPU 瓶頸。例如,48核的服務(wù)器上,單個(gè) CPU 瓶頸的線程僅占用 2%的 CPU 使用率,就算單個(gè)線程發(fā)生了 CPU 瓶頸,我們也看不出來(lái)。可以考慮使用2.2.1提到的分析工具,它們可以顯示每個(gè)線程的 CPU 使用情況來(lái)識(shí)別熱線程。
線程競(jìng)爭(zhēng)
與上?的 CPU/線程瓶頸問(wèn)題類似,subtask 可能會(huì)因?yàn)楣蚕碣Y源上高負(fù)載線程的競(jìng)爭(zhēng)而成為瓶頸。同樣,可以考慮使用2.2.1提到的分析工具,考慮在用戶代碼中查找同步開銷、鎖競(jìng)爭(zhēng),盡管避免在用戶代碼中添加同步。
如果瓶頸是由數(shù)據(jù)傾斜引起的,可以嘗試通過(guò)將數(shù)據(jù)分區(qū)的 key 進(jìn)行加鹽或通過(guò)實(shí)現(xiàn)本地預(yù)聚合來(lái)減輕數(shù)據(jù)傾斜的影響。(關(guān)于數(shù)據(jù)傾斜的詳細(xì)解決方案,會(huì)在下一章節(jié)詳細(xì)討論)
如果發(fā)現(xiàn)我們的 Source 端數(shù)據(jù)讀取性能比較低或者 Sink 端寫入性能較差,需要檢查第三方組件是否遇到瓶頸。例如,Kafka 集群是否需要擴(kuò)容,Kafka 連接器是否并行度較低,HBase 的 rowkey 是否遇到熱點(diǎn)問(wèn)題。關(guān)于第三方組件的性能問(wèn)題,需要結(jié)合具體的組件來(lái)分析。
數(shù)據(jù)傾斜
判斷是否存在數(shù)據(jù)傾斜
相同 Task 的多個(gè) Subtask 中,個(gè)別Subtask 接收到的數(shù)據(jù)量明顯大于其他 Subtask 接收到的數(shù)據(jù)量,通過(guò) Flink Web UI 可以精確地看到每個(gè) Subtask 處理了多少數(shù)據(jù),即可判斷出 Flink 任務(wù)是否存在數(shù)據(jù)傾斜。通常,數(shù)據(jù)傾斜也會(huì)引起反壓。
keyBy 后的聚合操作存在數(shù)據(jù)傾斜
使用LocalKeyBy的思想:在 keyBy 上游算子數(shù)據(jù)發(fā)送之前,首先在上游算子的本地
對(duì)數(shù)據(jù)進(jìn)行聚合后再發(fā)送到下游,使下游接收到的數(shù)據(jù)量大大減少,從而使得 keyBy 之后的聚合操作不再是任務(wù)的瓶頸。類似MapReduce 中 Combiner 的思想,但是這要求聚合操作必須是多條數(shù)據(jù)或者一批數(shù)據(jù)才能聚合,單條數(shù)據(jù)沒(méi)有辦法通過(guò)聚合來(lái)減少數(shù)據(jù)量。從Flink LocalKeyBy 實(shí)現(xiàn)原理來(lái)講,必然會(huì)存在一個(gè)積攢批次的過(guò)程,在上游算子中必須攢夠一定的數(shù)據(jù)量,對(duì)這些數(shù)據(jù)聚合后再發(fā)送到下游。
注意:Flink是實(shí)時(shí)流處理,如果keyby之后的聚合操作存在數(shù)據(jù)傾斜,且沒(méi)有開窗口的情況下,簡(jiǎn)單的認(rèn)為使用兩階段聚合,是不能解決問(wèn)題的。因?yàn)檫@個(gè)時(shí)候Flink是來(lái)一條處理一條,且向下游發(fā)送一條結(jié)果,對(duì)于原來(lái)keyby的維度(第二階段聚合)來(lái)講,數(shù)據(jù)量并沒(méi)有減少,且結(jié)果重復(fù)計(jì)算(非FlinkSQL,未使用回撤流),如下圖所示:
- 實(shí)現(xiàn)方式:以計(jì)算PV為例,keyby之前,使用flatMap實(shí)現(xiàn)LocalKeyby
class LocalKeyByFlatMap extends RichFlatMapFunction<String, Tuple2<String,
?//Checkpoint 時(shí)為了保證 Exactly Once,將 buffer 中的數(shù)據(jù)保存到該 ListState 中
?private ListState<Tuple2<String, Long>> localPvStatListState;
?//本地 buffer,存放 local 端緩存的 app 的 pv 信息
?private HashMap<String, Long> localPvStat;
?//緩存的數(shù)據(jù)量大小,即:緩存多少數(shù)據(jù)再向下游發(fā)送
?private int batchSize;
?//計(jì)數(shù)器,獲取當(dāng)前批次接收的數(shù)據(jù)量
?private AtomicInteger currentSize;
?//構(gòu)造器,批次大小傳參
?LocalKeyByFlatMap(int batchSize){
? this.batchSize = batchSize;
?}
?@Override
?public void flatMap(String in, Collector collector) throws Exception {
? // 將新來(lái)的數(shù)據(jù)添加到 buffer 中
? Long pv = localPvStat.getOrDefault(in, 0L);
? localPvStat.put(in, pv + 1);
? // 如果到達(dá)設(shè)定的批次,則將 buffer 中的數(shù)據(jù)發(fā)送到下游
? if(currentSize.incrementAndGet() >= batchSize){
? // 遍歷 Buffer 中數(shù)據(jù),發(fā)送到下游
? for(Map.Entry<String, Long> appIdPv: localPvStat.entrySet()) {
? collector.collect(Tuple2.of(appIdPv.getKey(), appIdPv.getValue()
? }
? // Buffer 清空,計(jì)數(shù)器清零
? localPvStat.clear();
? currentSize.set(0);
? }
?}
?@Override
?public void snapshotState(FunctionSnapshotContext functionSnapshotConte
? // 將 buffer 中的數(shù)據(jù)保存到狀態(tài)中,來(lái)保證 Exactly Once
? localPvStatListState.clear();
? for(Map.Entry<String, Long> appIdPv: localPvStat.entrySet()) {
? localPvStatListState.add(Tuple2.of(appIdPv.getKey(), appIdPv.ge
? }
?}
?@Override
?public void initializeState(FunctionInitializationContext context) {
? // 從狀態(tài)中恢復(fù) buffer 中的數(shù)據(jù)
? localPvStatListState = context.getOperatorStateStore().getListState
? new ListStateDescriptor<>("localPvStat",
? TypeInformation.of(new TypeHint<Tuple2<String, Long>>})));
? localPvStat = new HashMap();
? if(context.isRestored()) {
? // 從狀態(tài)中恢復(fù)數(shù)據(jù)到 localPvStat 中
? for(Tuple2<String, Long> appIdPv: localPvStatListState.get()){
long pv = localPvStat.getOrDefault(appIdPv.f0, 0L);
? // 如果出現(xiàn) pv != 0,說(shuō)明改變了并行度,
? // ListState 中的數(shù)據(jù)會(huì)被均勻分發(fā)到新的 subtask中
? // 所以單個(gè) subtask 恢復(fù)的狀態(tài)中可能包含兩個(gè)相同的 app 的數(shù)據(jù)
? localPvStat.put(appIdPv.f0, pv + appIdPv.f1);
? }
? // 從狀態(tài)恢復(fù)時(shí),默認(rèn)認(rèn)為 buffer 中數(shù)據(jù)量達(dá)到了 batchSize,需要向下游發(fā)
? currentSize = new AtomicInteger(batchSize);
? } else {
? currentSize = new AtomicInteger(0);
? }
?}
}
keyBy 之前發(fā)生數(shù)據(jù)傾斜
如果 keyBy 之前就存在數(shù)據(jù)傾斜,上游算子的某些實(shí)例可能處理的數(shù)據(jù)較多,某些實(shí)例可能處理的數(shù)據(jù)較少,產(chǎn)生該情況可能是因?yàn)閿?shù)據(jù)源的數(shù)據(jù)本身就不均勻,例如由于某些原因 Kafka 的 topic 中某些 partition 的數(shù)據(jù)量較大,某些 partition 的數(shù)據(jù)量較少。對(duì)于不存在 keyBy 的 Flink 任務(wù)也會(huì)出現(xiàn)該情況。
這種情況,需要讓 Flink 任務(wù)強(qiáng)制進(jìn)行shuffle。使用shuffle、rebalance 或 rescale算子即可將數(shù)據(jù)均勻分配,從而解決數(shù)據(jù)傾斜的問(wèn)題。
因?yàn)槭褂昧舜翱?#xff0c;變成了有界數(shù)據(jù)的處理(3.2.1已分析過(guò)),窗口默認(rèn)是觸發(fā)時(shí)才會(huì)輸出一條結(jié)果發(fā)往下游,所以可以使用兩階段聚合的方式:
實(shí)現(xiàn)思路:
- 第一階段聚合:key拼接隨機(jī)數(shù)前綴或后綴,進(jìn)行keyby、開窗、聚合
注意:聚合完不再是WindowedStream,要獲取WindowEnd作為窗口標(biāo)記作為第二階段分組依據(jù),避免不同窗口的結(jié)果聚合到一起)
- 第二階段聚合:去掉隨機(jī)數(shù)前綴或后綴,按照原來(lái)的key及windowEnd作keyby、聚合
KafkaSource調(diào)優(yōu)
動(dòng)態(tài)發(fā)現(xiàn)分區(qū)
當(dāng) FlinkKafkaConsumer 初始化時(shí),每個(gè) subtask 會(huì)訂閱一批 partition,但是當(dāng) Flink 任務(wù)運(yùn)行過(guò)程中,如果被訂閱的 topic 創(chuàng)建了新的 partition,FlinkKafkaConsumer 如何實(shí)現(xiàn)動(dòng)態(tài)發(fā)現(xiàn)新創(chuàng)建的 partition 并消費(fèi)呢?
在使用 FlinkKafkaConsumer 時(shí),可以開啟 partition 的動(dòng)態(tài)發(fā)現(xiàn)。通過(guò) Properties指定參數(shù)開啟(單位是毫秒):
FlinkKafkaConsumerBase.KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS?
該參數(shù)表示間隔多久檢測(cè)一次是否有新創(chuàng)建的 partition。默認(rèn)值是Long的最小值,表示不開啟,大于0表示開啟。開啟時(shí)會(huì)啟動(dòng)一個(gè)線程根據(jù)傳入的interval定期獲取Kafka最新的元數(shù)據(jù),新 partition 對(duì)應(yīng)的那一個(gè) subtask 會(huì)自動(dòng)發(fā)現(xiàn)并從earliest 位置開始消費(fèi),新創(chuàng)建的 partition 對(duì)其他 subtask 并不會(huì)產(chǎn)生影響。
代碼如下所示:
properties.setProperty(FlinkKafkaConsumerBase.KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS, 30 * 1000 + "");
Kafka單分區(qū)內(nèi)有序,多分區(qū)間無(wú)序。在這種情況下,可以使用 Flink 中可識(shí)別 Kafka 分區(qū)的 watermark 生成機(jī)制。使用此特性,將在 Kafka 消費(fèi)端內(nèi)部針對(duì)每個(gè) Kafka 分區(qū)生成 watermark,并且不同分區(qū) watermark 的合并方式與在數(shù)據(jù)流 shuffle 時(shí)的合并方式相同。
在單分區(qū)內(nèi)有序的情況下,使用時(shí)間戳單調(diào)遞增按分區(qū)生成的 watermark 將生成完美的全局 watermark。
可以不使用?TimestampAssigner,直接用 Kafka 記錄自身的時(shí)間戳:
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "hadoop1:9092,hadoop2:9092,hadoop3:9092");
properties.setProperty("group.id", "fffffffffff");
FlinkKafkaConsumer<String> kafkaSourceFunction = new FlinkKafkaConsumer<>(
????????????????"flinktest",
????????????????new SimpleStringSchema(),
????????????????properties
????????);
kafkaSourceFunction.assignTimestampsAndWatermarks(
????????????????WatermarkStrategy
????????????????????????.forBoundedOutOfOrderness(Duration.ofMinutes(2))
);
env.addSource(kafkaSourceFunction)
如果數(shù)據(jù)源中的某一個(gè)分區(qū)/分片在一段時(shí)間內(nèi)未發(fā)送事件數(shù)據(jù),則意味著 WatermarkGenerator 也不會(huì)獲得任何新數(shù)據(jù)去生成 watermark。我們稱這類數(shù)據(jù)源為空閑輸入或空閑源。在這種情況下,當(dāng)某些其他分區(qū)仍然發(fā)送事件數(shù)據(jù)的時(shí)候就會(huì)出現(xiàn)問(wèn)題。比如Kafka的Topic中,由于某些原因,造成個(gè)別Partition一直沒(méi)有新的數(shù)據(jù)。由于下游算子 watermark 的計(jì)算方式是取所有不同的上游并行數(shù)據(jù)源 watermark 的最小值,則其 watermark 將不會(huì)發(fā)生變化,導(dǎo)致窗口、定時(shí)器等不會(huì)被觸發(fā)。
為了解決這個(gè)問(wèn)題,你可以使用 WatermarkStrategy 來(lái)檢測(cè)空閑輸入并將其標(biāo)記為空閑狀態(tài)。
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "hadoop1:9092,hadoop2:9092,hadoop3:9092");
properties.setProperty("group.id", "fffffffffff");
FlinkKafkaConsumer<String> kafkaSourceFunction = new FlinkKafkaConsumer<>(
????????????????"flinktest",
????????????????new SimpleStringSchema(),
????????????????properties
????????);
kafkaSourceFunction.assignTimestampsAndWatermarks(
????????????????WatermarkStrategy
????????????????????????.forBoundedOutOfOrderness(Duration.ofMinutes(2))
.withIdleness(Duration.ofMinutes(5))
);
env.addSource(kafkaSourceFunction)
Kafka的offset消費(fèi)策略
FlinkKafkaConsumer可以調(diào)用以下API,注意與”auto.offset.reset”區(qū)分開:
- setStartFromGroupOffsets():默認(rèn)消費(fèi)策略,默認(rèn)讀取上次保存的offset信息,如果是應(yīng)用第一次啟動(dòng),讀取不到上次的offset信息,則會(huì)根據(jù)這個(gè)參數(shù)auto.offset.reset的值來(lái)進(jìn)行消費(fèi)數(shù)據(jù)。建議使用這個(gè)。
- setStartFromEarliest():從最早的數(shù)據(jù)開始進(jìn)行消費(fèi),忽略存儲(chǔ)的offset信息
- setStartFromLatest():從最新的數(shù)據(jù)進(jìn)行消費(fèi),忽略存儲(chǔ)的offset信息
- setStartFromSpecificOffsets(Map):從指定位置進(jìn)行消費(fèi)
- setStartFromTimestamp(long):從topic中指定的時(shí)間點(diǎn)開始消費(fèi),指定時(shí)間點(diǎn)之前的數(shù)據(jù)忽略
- 當(dāng)checkpoint機(jī)制開啟的時(shí)候,KafkaConsumer會(huì)定期把kafka的offset信息還有其他operator的狀態(tài)信息一塊保存起來(lái)。當(dāng)job失敗重啟的時(shí)候,Flink會(huì)從最近一次的checkpoint中進(jìn)行恢復(fù)數(shù)據(jù),重新從保存的offset消費(fèi)kafka中的數(shù)據(jù)(也就是說(shuō),上面幾種策略,只有第一次啟動(dòng)的時(shí)候起作用)。
- 為了能夠使用支持容錯(cuò)的kafka Consumer,需要開啟checkpoint
FlinkSQL調(diào)優(yōu)
FlinkSQL官網(wǎng)配置參數(shù):
開啟MiniBatch(提升吞吐)
MiniBatch是微批處理,原理是緩存一定的數(shù)據(jù)后再觸發(fā)處理,以減少對(duì)State的訪問(wèn),從而提升吞吐并減少數(shù)據(jù)的輸出量。MiniBatch主要依靠在每個(gè)Task上注冊(cè)的Timer線程來(lái)觸發(fā)微批,需要消耗一定的線程調(diào)度性能。
- MiniBatch默認(rèn)關(guān)閉,開啟方式如下:
// 初始化table environment
TableEnvironment tEnv = ...
// 獲取 tableEnv的配置對(duì)象
Configuration configuration = tEnv.getConfig().getConfiguration();
// 設(shè)置參數(shù):
// 開啟miniBatch
configuration.setString("table.exec.mini-batch.enabled", "true");
// 批量輸出的間隔時(shí)間
configuration.setString("table.exec.mini-batch.allow-latency", "5 s");
// 防止OOM設(shè)置每個(gè)批次最多緩存數(shù)據(jù)的條數(shù),可以設(shè)為2萬(wàn)條
configuration.setString("table.exec.mini-batch.size", "20000");
- FlinkSQL參數(shù)配置列表:
Apache Flink 1.12 Documentation: Configuration
- 適用場(chǎng)景
微批處理通過(guò)增加延遲換取高吞吐,如果有超低延遲的要求,不建議開啟微批處理。通常對(duì)于聚合的場(chǎng)景,微批處理可以顯著的提升系統(tǒng)性能,建議開啟。
- 注意事項(xiàng):
1)目前,key-value 配置項(xiàng)僅被?Blink planner 支持。
2)1.12之前的版本有bug,開啟miniBatch,不會(huì)清理過(guò)期狀態(tài),也就是說(shuō)如果設(shè)置狀態(tài)的TTL,無(wú)法清理過(guò)期狀態(tài)。1.12版本才修復(fù)這個(gè)問(wèn)題。
參考ISSUE:
LocalGlobal優(yōu)化將原先的Aggregate分成Local+Global兩階段聚合,即MapReduce模型中的Combine+Reduce處理模式。第一階段在上游節(jié)點(diǎn)本地?cái)€一批數(shù)據(jù)進(jìn)行聚合(localAgg),并輸出這次微批的增量值(Accumulator)。第二階段再將收到的Accumulator合并(Merge),得到最終的結(jié)果(GlobalAgg)。
LocalGlobal本質(zhì)上能夠靠LocalAgg的聚合篩除部分傾斜數(shù)據(jù),從而降低GlobalAgg的熱點(diǎn),提升性能。結(jié)合下圖理解LocalGlobal如何解決數(shù)據(jù)傾斜的問(wèn)題。
由上圖可知:
- LocalGlobal開啟方式:
1)LocalGlobal優(yōu)化需要先開啟MiniBatch,依賴于MiniBatch的參數(shù)。
2)table.optimizer.agg-phase-strategy: 聚合策略。默認(rèn)AUTO,支持參數(shù)AUTO、TWO_PHASE(使用LocalGlobal兩階段聚合)、ONE_PHASE(僅使用Global一階段聚合)。
// 初始化table environment
TableEnvironment tEnv = ...
// 獲取 tableEnv的配置對(duì)象
Configuration configuration = tEnv.getConfig().getConfiguration();
// 設(shè)置參數(shù):
// 開啟miniBatch
configuration.setString("table.exec.mini-batch.enabled", "true");
// 批量輸出的間隔時(shí)間
configuration.setString("table.exec.mini-batch.allow-latency", "5 s");
// 防止OOM設(shè)置每個(gè)批次最多緩存數(shù)據(jù)的條數(shù),可以設(shè)為2萬(wàn)條
configuration.setString("table.exec.mini-batch.size", "20000");
// 開啟LocalGlobal
configuration.setString("table.optimizer.agg-phase-strategy", "TWO_PHASE");
- 判斷是否生效
觀察最終生成的拓?fù)鋱D的節(jié)點(diǎn)名字中是否包含GlobalGroupAggregate或LocalGroupAggregate。
- 適用場(chǎng)景
LocalGlobal適用于提升如SUM、COUNT、MAX、MIN和AVG等普通聚合的性能,以及解決這些場(chǎng)景下的數(shù)據(jù)熱點(diǎn)問(wèn)題。
- 注意事項(xiàng):
1)需要先開啟MiniBatch
2)開啟LocalGlobal需要UDAF實(shí)現(xiàn)Merge方法。
LocalGlobal優(yōu)化針對(duì)普通聚合(例如SUM、COUNT、MAX、MIN和AVG)有較好的效果,對(duì)于COUNT DISTINCT收效不明顯,因?yàn)镃OUNT DISTINCT在Local聚合時(shí),對(duì)于DISTINCT KEY的去重率不高,導(dǎo)致在Global節(jié)點(diǎn)仍然存在熱點(diǎn)。
之前,為了解決COUNT DISTINCT的熱點(diǎn)問(wèn)題,通常需要手動(dòng)改寫為兩層聚合(增加按Distinct Key取模的打散層)。
從Flink1.9.0版本開始,提供了COUNT DISTINCT自動(dòng)打散功能,不需要手動(dòng)重寫。Split Distinct和LocalGlobal的原理對(duì)比參見下圖。
舉例:統(tǒng)計(jì)一天的UV
SELECT day, COUNT(DISTINCT user_id)
FROM T
GROUP BY day
如果手動(dòng)實(shí)現(xiàn)兩階段聚合:
SELECT day, SUM(cnt)
FROM (
????SELECT day, COUNT(DISTINCT user_id) as cnt
????FROM T
????GROUP BY day, MOD(HASH_CODE(user_id), 1024)
)
GROUP BY day
第一層聚合: 將Distinct Key打散求COUNT DISTINCT。
第二層聚合: 對(duì)打散去重后的數(shù)據(jù)進(jìn)行SUM匯總。
- Split Distinct開啟方式
默認(rèn)不開啟,使用參數(shù)顯式開啟:
// 初始化table environment
TableEnvironment tEnv = ...
// 獲取 tableEnv的配置對(duì)象
Configuration configuration = tEnv.getConfig().getConfiguration();
// 設(shè)置參數(shù):
// 開啟Split Distinct
configuration.setString("table.optimizer.distinct-agg.split.enabled", "true");
// 第一層打散的bucket數(shù)目
configuration.setString("table.optimizer.distinct-agg.split.bucket-num", "1024");
- 判斷是否生效
觀察最終生成的拓?fù)鋱D的節(jié)點(diǎn)名中是否包含Expand節(jié)點(diǎn),或者原來(lái)一層的聚合變成了兩層的聚合。
- 適用場(chǎng)景
使用COUNT DISTINCT,但無(wú)法滿足聚合節(jié)點(diǎn)性能要求。
- 注意事項(xiàng):
1)目前不能在包含UDAF的Flink SQL中使用Split Distinct優(yōu)化方法。
2)拆分出來(lái)的兩個(gè)GROUP聚合還可參與LocalGlobal優(yōu)化。
3)從Flink1.9.0版本開始,提供了COUNT DISTINCT自動(dòng)打散功能,不需要手動(dòng)重寫(不用像上面的例子去手動(dòng)實(shí)現(xiàn))。
改寫為AGG WITH FILTER語(yǔ)法(提升大量COUNT DISTINCT場(chǎng)景性能)
在某些場(chǎng)景下,可能需要從不同維度來(lái)統(tǒng)計(jì)UV,如Android中的UV,iPhone中的UV,Web中的UV和總UV,這時(shí),可能會(huì)使用如下CASE WHEN語(yǔ)法。
SELECT
?day,
?COUNT(DISTINCT user_id) AS total_uv,
?COUNT(DISTINCT CASE WHEN flag IN ('android', 'iphone') THEN user_id ELSE NULL END) AS app_uv,
?COUNT(DISTINCT CASE WHEN flag IN ('wap', 'other') THEN user_id ELSE NULL END) AS web_uv
FROM T
GROUP BY day
在這種情況下,建議使用FILTER語(yǔ)法, 目前的Flink SQL優(yōu)化器可以識(shí)別同一唯一鍵上的不同F(xiàn)ILTER參數(shù)。如,在上面的示例中,三個(gè)COUNT DISTINCT都作用在user_id列上。此時(shí),經(jīng)過(guò)優(yōu)化器識(shí)別后,Flink可以只使用一個(gè)共享狀態(tài)實(shí)例,而不是三個(gè)狀態(tài)實(shí)例,可減少狀態(tài)的大小和對(duì)狀態(tài)的訪問(wèn)。
將上邊的CASE WHEN替換成FILTER后,如下所示:
SELECT
?day,
?COUNT(DISTINCT user_id) AS total_uv,
?COUNT(DISTINCT user_id) FILTER (WHERE flag IN ('android', 'iphone')) AS app_uv,
?COUNT(DISTINCT user_id) FILTER (WHERE flag IN ('wap', 'other')) AS web_uv
FROM T
GROUP BY day
TopN優(yōu)化
使用最優(yōu)算法
當(dāng)TopN的輸入是非更新流(例如Source),TopN只有一種算法AppendRank。當(dāng)TopN的輸入是更新流時(shí)(例如經(jīng)過(guò)了AGG/JOIN計(jì)算),TopN有2種算法,性能從高到低分別是:UpdateFastRank 和RetractRank。算法名字會(huì)顯示在拓?fù)鋱D的節(jié)點(diǎn)名字上。
注意:apache社區(qū)版的Flink1.12目前還沒(méi)有UnaryUpdateRank,阿里云實(shí)時(shí)計(jì)算版Flink才有
- UpdateFastRank :最優(yōu)算法
需要具備2個(gè)條件:
1)輸入流有PK(Primary Key)信息,例如ORDER BY AVG。
2)排序字段的更新是單調(diào)的,且單調(diào)方向與排序方向相反。例如,ORDER BY COUNT/COUNT_DISTINCT/SUM(正數(shù))DESC。
如果要獲取到優(yōu)化Plan,則您需要在使用ORDER BY SUM DESC時(shí),添加SUM為正數(shù)的過(guò)濾條件。
- AppendFast:結(jié)果只追加,不更新
- RetractRank:普通算法,性能差
不建議在生產(chǎn)環(huán)境使用該算法。請(qǐng)檢查輸入流是否存在PK信息,如果存在,則可進(jìn)行UpdateFastRank優(yōu)化。
- 無(wú)排名優(yōu)化(解決數(shù)據(jù)膨脹問(wèn)題)
- TopN語(yǔ)法:
SELECT *
FROM (
??SELECT *,
????ROW_NUMBER() OVER ([PARTITION BY col1[, col2..]]
????ORDER BY col1 [asc|desc][, col2 [asc|desc]...]) AS rownum
??FROM table_name)
WHERE rownum <= N [AND conditions]
- 數(shù)據(jù)膨脹問(wèn)題:
根據(jù)TopN的語(yǔ)法,rownum字段會(huì)作為結(jié)果表的主鍵字段之一寫入結(jié)果表。但是這可能導(dǎo)致數(shù)據(jù)膨脹的問(wèn)題。例如,收到一條原排名9的更新數(shù)據(jù),更新后排名上升到1,則從1到9的數(shù)據(jù)排名都發(fā)生變化了,需要將這些數(shù)據(jù)作為更新都寫入結(jié)果表。這樣就產(chǎn)生了數(shù)據(jù)膨脹,導(dǎo)致結(jié)果表因?yàn)槭盏搅颂嗟臄?shù)據(jù)而降低更新速度。
- 使用方式
TopN的輸出結(jié)果無(wú)需要顯示rownum值,僅需在最終前端顯式時(shí)進(jìn)行1次排序,極大地減少輸入結(jié)果表的數(shù)據(jù)量。只需要在外層查詢中將rownum字段裁剪掉即可
// 最外層的字段,不寫 rownum
SELECT col1, col2, col3
FROM (
?SELECT col1, col2, col3
???ROW_NUMBER() OVER ([PARTITION BY col1[, col2..]]
???ORDER BY col1 [asc|desc][, col2 [asc|desc]...]) AS rownum
?FROM table_name)
WHERE rownum <= N [AND conditions]
在無(wú)rownum的場(chǎng)景中,對(duì)于結(jié)果表主鍵的定義需要特別小心。如果定義有誤,會(huì)直接導(dǎo)致TopN結(jié)果的不正確。 無(wú)rownum場(chǎng)景中,主鍵應(yīng)為TopN上游GROUP BY節(jié)點(diǎn)的KEY列表。
增加TopN的Cache大小
TopN為了提升性能有一個(gè)State Cache層,Cache層能提升對(duì)State的訪問(wèn)效率。TopN的Cache命中率的計(jì)算公式為。
cache_hit = cache_size*parallelism/top_n/partition_key_num
例如,Top100配置緩存10000條,并發(fā)50,當(dāng)PatitionBy的key維度較大時(shí),例如10萬(wàn)級(jí)別時(shí),Cache命中率只有10000*50/100/100000=5%,命中率會(huì)很低,導(dǎo)致大量的請(qǐng)求都會(huì)擊中State(磁盤),性能會(huì)大幅下降。因此當(dāng)PartitionKey維度特別大時(shí),可以適當(dāng)加大TopN的CacheS ize,相對(duì)應(yīng)的也建議適當(dāng)加大TopN節(jié)點(diǎn)的Heap Memory。
- 使用方式
// 初始化table environment
TableEnvironment tEnv = ...
// 獲取 tableEnv的配置對(duì)象
Configuration configuration = tEnv.getConfig().getConfiguration();
// 設(shè)置參數(shù):
// 默認(rèn)10000條,調(diào)整TopN cahce到20萬(wàn),那么理論命中率能達(dá)200000*50/100/100000 = 100%
configuration.setString("table.exec.topn.cache-size", "200000");
注意:目前源碼中標(biāo)記為實(shí)驗(yàn)項(xiàng),官網(wǎng)中未列出該
例如每天的排名,要帶上Day字段。否則TopN的結(jié)果到最后會(huì)由于State ttl有錯(cuò)亂。
insert
??into print_test
SELECT
??cate_id,
??seller_id,
??stat_date,
??pay_ord_amt ?--不輸出rownum字段,能減小結(jié)果表的輸出量(無(wú)排名優(yōu)化)
FROM (
????SELECT
??????*,
??????ROW_NUMBER () OVER (
????????PARTITION BY cate_id,
????????stat_date ?--注意要有時(shí)間字段,否則state過(guò)期會(huì)導(dǎo)致數(shù)據(jù)錯(cuò)亂(分區(qū)字段優(yōu)化)
????????ORDER
??????????BY pay_ord_amt DESC ?--根據(jù)上游sum結(jié)果排序。排序字段的更新是單調(diào)的,且單調(diào)方向與排序方向相反(走最優(yōu)算法)
??????) as rownum ?
????FROM (
????????SELECT
??????????cate_id,
??????????seller_id,
??????????stat_date,
??????????--重點(diǎn)。聲明Sum的參數(shù)都是正數(shù),所以Sum的結(jié)果是單調(diào)遞增的,因此TopN能使用優(yōu)化算法,只獲取前100個(gè)數(shù)據(jù)(走最優(yōu)算法)
??????????sum (total_fee) filter (
????????????where
??????????????total_fee >= 0
??????????) as pay_ord_amt
????????FROM
??????????random_test
????????WHERE
??????????total_fee >= 0
????????GROUP
??????????BY cate_name,
??????????seller_id,
??????????stat_date
??????) a
????WHERE
??????rownum <= 100
??);
由于SQL上沒(méi)有直接支持去重的語(yǔ)法,還要靈活的保留第一條或保留最后一條。因此我們使用了SQL的ROW_NUMBER OVER WINDOW功能來(lái)實(shí)現(xiàn)去重語(yǔ)法。去重本質(zhì)上是一種特殊的TopN。
保留KEY下第一條出現(xiàn)的數(shù)據(jù),之后出現(xiàn)該KEY下的數(shù)據(jù)會(huì)被丟棄掉。因?yàn)镾TATE中只存儲(chǔ)了KEY數(shù)據(jù),所以性能較優(yōu),示例如下:
SELECT *
FROM (
??SELECT *,
????ROW_NUMBER() OVER (PARTITION BY b ORDER BY proctime) as rowNum
??FROM T
)
WHERE rowNum = 1
以上示例是將T表按照b字段進(jìn)行去重,并按照系統(tǒng)時(shí)間保留第一條數(shù)據(jù)。Proctime在這里是源表T中的一個(gè)具有Processing Time屬性的字段。如果按照系統(tǒng)時(shí)間去重,也可以將Proctime字段簡(jiǎn)化PROCTIME()函數(shù)調(diào)用,可以省略Proctime字段的聲明。
保留KEY下最后一條出現(xiàn)的數(shù)據(jù)。保留末行的去重策略性能略優(yōu)于LAST_VALUE函數(shù),示例如下:
SELECT *
FROM (
??SELECT *,
????ROW_NUMBER() OVER (PARTITION BY b, d ORDER BY rowtime DESC) as rowNum
??FROM T
)
WHERE rowNum = 1
以上示例是將T表按照b和d字段進(jìn)行去重,并按照業(yè)務(wù)時(shí)間保留最后一條數(shù)據(jù)。Rowtime在這里是源表T中的一個(gè)具有Event Time屬性的字段。
使用內(nèi)置函數(shù)替換自定義函數(shù)
Flink的內(nèi)置函數(shù)在持續(xù)的優(yōu)化當(dāng)中,請(qǐng)盡量使用內(nèi)部函數(shù)替換自定義函數(shù)。使用內(nèi)置函數(shù)好處:
1)優(yōu)化數(shù)據(jù)序列化和反序列化的耗時(shí)。
2)新增直接對(duì)字節(jié)單位進(jìn)行操作的功能。
支持的系統(tǒng)內(nèi)置函數(shù):
LIKE操作注意事項(xiàng)- 如果需要進(jìn)行StartWith操作,使用LIKE 'xxx%'。
- 如果需要進(jìn)行EndWith操作,使用LIKE '%xxx'。
- 如果需要進(jìn)行Contains操作,使用LIKE '%xxx%'。
- 如果需要進(jìn)行Equals操作,使用LIKE 'xxx',等價(jià)于str = 'xxx'。
- 如果需要匹配 _ 字符,請(qǐng)注意要完成轉(zhuǎn)義LIKE '%seller/id%' ESCAPE '/'。_在SQL中屬于單字符通配符,能匹配任何字符。如果聲明為 LIKE '%seller_id%',則不單會(huì)匹配seller_id還會(huì)匹配seller#id、sellerxid或seller1id 等,導(dǎo)致結(jié)果錯(cuò)誤。
慎用正則函數(shù)(REGEXP)
正則表達(dá)式是非常耗時(shí)的操作,對(duì)比加減乘除通常有百倍的性能開銷,而且正則表達(dá)式在某些極端情況下可能會(huì)進(jìn)入無(wú)限循環(huán),導(dǎo)致作業(yè)阻塞。建議使用LIKE。正則函數(shù)包括:
- REGEXP
- REGEXP_EXTRACT
- REGEXP_REPLACE
指定時(shí)區(qū)
本地時(shí)區(qū)定義了當(dāng)前會(huì)話時(shí)區(qū)id。當(dāng)本地時(shí)區(qū)的時(shí)間戳進(jìn)行轉(zhuǎn)換時(shí)使用。在內(nèi)部,帶有本地時(shí)區(qū)的時(shí)間戳總是以UTC時(shí)區(qū)表示。但是,當(dāng)轉(zhuǎn)換為不包含時(shí)區(qū)的數(shù)據(jù)類型時(shí)(例如TIMESTAMP, TIME或簡(jiǎn)單的STRING),會(huì)話時(shí)區(qū)在轉(zhuǎn)換期間被使用。為了避免時(shí)區(qū)錯(cuò)亂的問(wèn)題,可以參數(shù)指定時(shí)區(qū)。
// 初始化table environment
TableEnvironment tEnv = ...
// 獲取 tableEnv的配置對(duì)象
Configuration configuration = tEnv.getConfig().getConfiguration();
// 設(shè)置參數(shù):
// 指定時(shí)區(qū)
configuration.setString("table.local-time-zone", "Asia/Shanghai");
設(shè)置參數(shù)總結(jié)
總結(jié)以上的調(diào)優(yōu)參數(shù),代碼如下:
// 初始化table environment
TableEnvironment tEnv = ...
// 獲取 tableEnv的配置對(duì)象
Configuration configuration = tEnv.getConfig().getConfiguration();
// 設(shè)置參數(shù):
// 開啟miniBatch
configuration.setString("table.exec.mini-batch.enabled", "true");
// 批量輸出的間隔時(shí)間
configuration.setString("table.exec.mini-batch.allow-latency", "5 s");
// 防止OOM設(shè)置每個(gè)批次最多緩存數(shù)據(jù)的條數(shù),可以設(shè)為2萬(wàn)條
configuration.setString("table.exec.mini-batch.size", "20000");
// 開啟LocalGlobal
configuration.setString("table.optimizer.agg-phase-strategy", "TWO_PHASE");
// 開啟Split Distinct
configuration.setString("table.optimizer.distinct-agg.split.enabled", "true");
// 第一層打散的bucket數(shù)目
configuration.setString("table.optimizer.distinct-agg.split.bucket-num", "1024");
// TopN 的緩存條數(shù)
configuration.setString("table.exec.topn.cache-size", "200000");
// 指定時(shí)區(qū)
configuration.setString("table.local-time-zone", "Asia/Shanghai");
總結(jié)
以上是生活随笔為你收集整理的Flink 运维与调优的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。
- 上一篇: IP地址绕过 . 拦截
- 下一篇: 月入2万的10个小生意项目