日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當(dāng)前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

flink sql设置并行度_《从0到1学习Flink》—— Flink parallelism 和 Slot 介绍

發(fā)布時間:2023/12/2 编程问答 33 豆豆
生活随笔 收集整理的這篇文章主要介紹了 flink sql设置并行度_《从0到1学习Flink》—— Flink parallelism 和 Slot 介绍 小編覺得挺不錯的,現(xiàn)在分享給大家,幫大家做個參考.

前言

之所以寫這個是因為前段時間自己的項目出現(xiàn)過這樣的一個問題:

Caused by: akka.pattern.AskTimeoutException: Ask timed out on [Actor[akka://flink/user/taskmanager_0#15608456]] after [10000 ms]. Sender[null] sent message of type "org.apache.flink.runtime.rpc.messages.LocalRpcInvocation".

跟著這問題在 Flink 的 Issue 列表里看到了一個類似的問題:https://issues.apache.org/jira/browse/FLINK-9056 ,看下面的評論差不多就是 TaskManager 的 slot 數(shù)量不足的原因,導(dǎo)致 job 提交失敗。在 Flink 1.63 中已經(jīng)修復(fù)了變成拋出異常了。

竟然知道了是因為 slot 不足的原因了,那么我們就要先了解下 slot 是什么東東呢?不過文章這里先介紹下 parallelism。

什么是 parallelism?

如翻譯這樣,parallelism 是并行的意思,在 Flink 里面代表每個任務(wù)的并行度,適當(dāng)?shù)奶岣卟⑿卸瓤梢源蟠筇岣?job 的執(zhí)行效率,比如你的 job 消費 kafka 數(shù)據(jù)過慢,適當(dāng)調(diào)大可能就消費正常了。

那么在 Flink 中怎么設(shè)置并行度呢?

如何設(shè)置 parallelism?

如上圖,在 flink 配置文件中可以查看到默認(rèn)并行度是 1,

cat flink-conf.yaml | grep parallelism# The parallelism used for programs that did not specify and other parallelism. parallelism.default: 1

所以你如何在你的 flink job 里面不設(shè)置任何的 parallelism 的話,那么他也會有一個默認(rèn)的 parallelism = 1。那也意味著你可以修改這個配置文件的默認(rèn)并行度。

如果你是用命令行啟動你的 Flink job,那么你也可以這樣設(shè)置并行度(使用 -p 并行度):

./bin/flink run -p 10 ../word-count.jar

你也可以通過這樣來設(shè)置你整個程序的并行度:

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(10);

注意:這樣設(shè)置的并行度是你整個程序的并行度,那么后面如果你的每個算子不單獨設(shè)置并行度覆蓋的話,那么后面每個算子的并行度就都是這里設(shè)置的并行度的值了。

如何給每個算子單獨設(shè)置并行度呢?

data

如上,就是在每個算子后面單獨的設(shè)置并行度,這樣的話,就算你前面設(shè)置了 env.setParallelism(10) 也是會被覆蓋的。

這也說明優(yōu)先級是:算子設(shè)置并行度 > env 設(shè)置并行度 > 配置文件默認(rèn)并行度

并行度講到這里應(yīng)該都懂了,下面 zhisheng 就繼續(xù)跟你講講 什么是 slot?

什么是 slot?

其實什么是 slot 這個問題之前在第一篇文章 《從0到1學(xué)習(xí)Flink》—— Apache Flink 介紹 中就介紹過了,這里再講細(xì)一點。

圖中 Task Manager 是從 Job Manager 處接收需要部署的 Task,任務(wù)的并行性由每個 Task Manager 上可用的 slot 決定。每個任務(wù)代表分配給任務(wù)槽的一組資源,slot 在 Flink 里面可以認(rèn)為是資源組,Flink 將每個任務(wù)分成子任務(wù)并且將這些子任務(wù)分配到 slot 來并行執(zhí)行程序。

例如,如果 Task Manager 有四個 slot,那么它將為每個 slot 分配 25% 的內(nèi)存。 可以在一個 slot 中運行一個或多個線程。 同一 slot 中的線程共享相同的 JVM。 同一 JVM 中的任務(wù)共享 TCP 連接和心跳消息。Task Manager 的一個 Slot 代表一個可用線程,該線程具有固定的內(nèi)存,注意 Slot 只對內(nèi)存隔離,沒有對 CPU 隔離。默認(rèn)情況下,Flink 允許子任務(wù)共享 Slot,即使它們是不同 task 的 subtask,只要它們來自相同的 job。這種共享可以有更好的資源利用率。

文字說的比較干,zhisheng 這里我就拿下面的圖片來講解:

上面圖片中有兩個 Task Manager,每個 Task Manager 有三個 slot,這樣我們的算子最大并行度那么就可以達(dá)到 6 個,在同一個 slot 里面可以執(zhí)行 1 至多個子任務(wù)。

那么再看上面的圖片,source/map/keyby/window/apply 最大可以有 6 個并行度,sink 只用了 1 個并行。

每個 Flink TaskManager 在集群中提供 slot。 slot 的數(shù)量通常與每個 TaskManager 的可用 CPU 內(nèi)核數(shù)成比例。一般情況下你的 slot 數(shù)是你每個 TaskManager 的 cpu 的核數(shù)。

但是 flink 配置文件中設(shè)置的 task manager 默認(rèn)的 slot 是 1。

slot 和 parallelism

下面給出官方的圖片來更加深刻的理解下 slot:

1、slot 是指 taskmanager 的并發(fā)執(zhí)行能力

taskmanager.numberOfTaskSlots:3

每一個 taskmanager 中的分配 3 個 TaskSlot, 3 個 taskmanager 一共有 9 個 TaskSlot。

2、parallelism 是指 taskmanager 實際使用的并發(fā)能力

parallelism.default:1

運行程序默認(rèn)的并行度為 1,9 個 TaskSlot 只用了 1 個,有 8 個空閑。設(shè)置合適的并行度才能提高效率。

3、parallelism 是可配置、可指定的

上圖中 example2 每個算子設(shè)置的并行度是 2, example3 每個算子設(shè)置的并行度是 9。

example4 除了 sink 是設(shè)置的并行度為 1,其他算子設(shè)置的并行度都是 9。

好了,既然并行度和 slot zhisheng 都帶大家過了一遍了,那么再來看文章開頭的問題:slot 資源不夠。

問題原因

現(xiàn)在這個問題的答案其實就已經(jīng)很明顯了,就是我們設(shè)置的并行度 parallelism 超過了 Task Manager 能提供的最大 slot 數(shù)量,所以才會報這個錯誤。

再來拿我的代碼來看吧,當(dāng)時我就是只設(shè)置了整個項目的并行度:

env.setParallelism(15);

為什么要設(shè)置 15 呢,因為我項目消費的 Kafka topic 有 15 個 parttion,就想著讓一個并行去消費一個 parttion,沒曾想到 Flink 資源的不夠,稍微降低下 并行度為 10 后就沒出現(xiàn)這個錯誤了。

總結(jié)

本文由自己項目生產(chǎn)環(huán)境的一個問題來講解了自己對 Flink parallelism 和 slot 的理解,并告訴大家如何去設(shè)置這兩個參數(shù),最后也指出了問題的原因所在。

關(guān)注我

轉(zhuǎn)載請務(wù)必注明原創(chuàng)地址為:http://www.54tianzhisheng.cn/2019/01/14/Flink-parallelism-slot/ , 未經(jīng)允許禁止轉(zhuǎn)載。

微信公眾號:zhisheng

另外我自己整理了些 Flink 的學(xué)習(xí)資料,目前已經(jīng)全部放到微信公眾號了。你可以加我的微信:zhisheng_tian,然后回復(fù)關(guān)鍵字:Flink 即可無條件獲取到。

更多私密資料請加入知識星球!

Github 代碼倉庫

https://github.com/zhisheng17/flink-learning/

以后這個項目的所有代碼都將放在這個倉庫里,包含了自己學(xué)習(xí) flink 的一些 demo 和博客

相關(guān)文章

1、《從0到1學(xué)習(xí)Flink》—— Apache Flink 介紹

2、《從0到1學(xué)習(xí)Flink》—— Mac 上搭建 Flink 1.6.0 環(huán)境并構(gòu)建運行簡單程序入門

3、《從0到1學(xué)習(xí)Flink》—— Flink 配置文件詳解

4、《從0到1學(xué)習(xí)Flink》—— Data Source 介紹

5、《從0到1學(xué)習(xí)Flink》—— 如何自定義 Data Source ?

6、《從0到1學(xué)習(xí)Flink》—— Data Sink 介紹

7、《從0到1學(xué)習(xí)Flink》—— 如何自定義 Data Sink ?

8、《從0到1學(xué)習(xí)Flink》—— Flink Data transformation(轉(zhuǎn)換)

9、《從0到1學(xué)習(xí)Flink》—— 介紹Flink中的Stream Windows

10、《從0到1學(xué)習(xí)Flink》—— Flink 中的幾種 Time 詳解

11、《從0到1學(xué)習(xí)Flink》—— Flink 寫入數(shù)據(jù)到 ElasticSearch

12、《從0到1學(xué)習(xí)Flink》—— Flink 項目如何運行?

13、《從0到1學(xué)習(xí)Flink》—— Flink 寫入數(shù)據(jù)到 Kafka

14、《從0到1學(xué)習(xí)Flink》—— Flink JobManager 高可用性配置

15、《從0到1學(xué)習(xí)Flink》—— Flink parallelism 和 Slot 介紹

16、《從0到1學(xué)習(xí)Flink》—— Flink 讀取 Kafka 數(shù)據(jù)批量寫入到 MySQL

17、《從0到1學(xué)習(xí)Flink》—— Flink 讀取 Kafka 數(shù)據(jù)寫入到 RabbitMQ

創(chuàng)作挑戰(zhàn)賽新人創(chuàng)作獎勵來咯,堅持創(chuàng)作打卡瓜分現(xiàn)金大獎

總結(jié)

以上是生活随笔為你收集整理的flink sql设置并行度_《从0到1学习Flink》—— Flink parallelism 和 Slot 介绍的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網(wǎng)站內(nèi)容還不錯,歡迎將生活随笔推薦給好友。