flink 作业提交流程
之前給大家介紹了DataStream API中 Environment 和 Transformation 連個體系的源代碼,今天來了小插曲,給大家宏觀介紹下 Flink 作業的提交流程,希望對大家有幫助。
一、DataStream 作業提交流程
1)、首先,先給大家展示下流程圖:
2)、提交流程說明:
FlinkCli 先創建一個 Flink 環境變量
然后將環境變量存入到ThreadLocal中
在啟動 Flink 作業jar包的 main 方法
Flink 應用程序通過 StreamExecutionEnvironment.getExecutionEnvironment() 獲取到相應的執行環境變量
Flink 應用程序將用戶編寫的作業轉換成 jobGraph 提交給Flink 集群
3)、Flink 作業以哪種方式提交,取決于 StreamExecutionEnvironment 的配置信息;
起到主要作用的配置參數是 execution.target;
execution.target 取值:
remote
local
yarn-per-job
yarn-session
kubernetes-session
yarn-application
kubernetes-application
StreamExecutionEnvironment 會根據 execution.target 配置的不同取值創建相應的 PipelineExecutorFactory, 再由 PipelineExecutorFactory 創建相應的 PipelineExecutor, PipelineExecutor執行相應的作業提交工作;
源代碼探究:
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute()
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(String jobName)
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamGraph streamGraph)
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamGraph streamGraph)
org.apache.flink.core.execution.DefaultExecutorServiceLoader.getExecutorFactory(final Configuration configuration) (見 代碼 3-1)
ExecutorFactory 舉例,org.apache.flink.yarn.executors.YarnSessionClusterExecutorFactory,(見代碼 3-2)
代碼 3-1
@Override
public PipelineExecutorFactory getExecutorFactory(final Configuration configuration) {
checkNotNull(configuration);
// 通過 java SPI 技術加載 實現了 PipelineExecutorFactory 接口的類
final ServiceLoader loader =
ServiceLoader.load(PipelineExecutorFactory.class);
}
代碼 3-2
@Internal
public class YarnSessionClusterExecutorFactory implements PipelineExecutorFactory {
}
// 配置選項
public static final ConfigOption TARGET =
key(“execution.target”)
4)、FlinkCli 創建 Flink 環境變量相關流程:
org.apache.flink.client.cli.CliFrontend.main()
org.apache.flink.client.cli.CliFrontend.executeProgram()
org.apache.flink.client.ClientUtils.executeProgram()
public static void executeProgram(
PipelineExecutorServiceLoader executorServiceLoader,
Configuration configuration,
PackagedProgram program,
boolean enforceSingleJobExecution,
boolean suppressSysout)
throws ProgramInvocationException {
checkNotNull(executorServiceLoader);
final ClassLoader userCodeClassLoader = program.getUserCodeClassLoader();
final ClassLoader contextClassLoader = Thread.currentThread().getContextClassLoader();
try {
Thread.currentThread().setContextClassLoader(userCodeClassLoader);
// 設置流環境變量
StreamContextEnvironment.setAsContext(
executorServiceLoader,
configuration,
userCodeClassLoader,
enforceSingleJobExecution,
suppressSysout);
}
5)、StreamExecutionEnvironment.getExecutionEnvironment() 獲取執行環境的邏輯:
先從 threadLocal 獲取環境變量
如果 threadLocal 中沒有相應的環境變量,則創建一個本地環境變量
return Utils.resolveFactory(threadLocalContextEnvironmentFactory, contextEnvironmentFactory)
.map(StreamExecutionEnvironmentFactory::createExecutionEnvironment)
.orElseGet(StreamExecutionEnvironment::createLocalEnvironment);
public static Optional resolveFactory(ThreadLocal threadLocalFactory, @Nullable T staticFactory) {
final T localFactory = threadLocalFactory.get();
final T factory = localFactory == null ? staticFactory : localFactory;
return Optional.ofNullable(factory);
}
二、Flink Table
1)、flink Sql 作業提交流程
2)、提交流程說明
TableEnvironmentImpl 在創建的過程中創建了 Executor , ExecutorBase 中包含了StreamExecutionEnvironment 的實例, StreamExecutionEnvironment 的實例由 StreamExecutionEnvironment .getExecutionEnvironment() 方法創建。
TableEnvironmentImpl 作業的提交依賴 StreamExecutionEnvironment 的作業提交流程。
TableEnvironmentImpl 借助Parser組件將 SQL 語句轉換成 Operation,然后借助 Planner組件將Operation轉換成 List。
使用StreamExecutionEnvironment 將 List 轉換成 StreamGraph。
后續操作與DataStream提交流程一樣。
3)、 TableEnvironmentImpl .executeSql() 執行邏輯:
Sql 解析, 將Sql語句解析為 List 變量;
Transformation轉換,將 List 轉換為 List<Transformation<?>>
PipeLine轉換, 將List<Transformation<?>> 轉換為 PipeLine
4)、TableEnvironmentImpl 創建過程:
ModuleManager 的創建
CatalogManager 的創建
FunctionCatalog 的創建
Executor (執行環境)的創建, 先通過 java SPI 加載 Executor 工廠, 通過EnvironmentSettings.Builder.useBlinkPlanner() 指定為 org.apache.flink.table.planner.delegation.BlinkExecutorFactory
Planner的創建(包括Parser的構造),先通過 java SPI 加載 Planner 工廠,通過EnvironmentSettings.Builder.useBlinkPlanner() 指定為org.apache.flink.table.planner.delegation.BlinkPlannerFactory
構造TableEnvironmentImpl
5)、Sql解析 (Blink Planner: StreamPlanner / BatchPlanner)
基本流程:
Sql語句解析成Sql 抽象語法樹
Planner對sql 語法樹進行驗證
將驗證過的語法樹轉換成關系代數樹
將關系代數樹封裝成Flink對應的Operation
public List parse(String statement) {
CalciteParser parser = calciteParserSupplier.get();
FlinkPlannerImpl planner = validatorSupplier.get();
// parse the sql query
SqlNode parsed = parser.parse(statement);
Operation operation = SqlToOperationConverter.convert(planner, catalogManager, parsed)
.orElseThrow(() -> new TableException("Unsupported query: " + statement));
return Collections.singletonList(operation);
Calsite :Sql 解析框架
SqlNode 代表Sql 抽象語法樹中的節點,CalciteParser 內部使用 SqlParser 將Sql語句解析成Sql 抽象語法樹。
Operation (Flink Table API中抽象出來的概念) 代表任意類型的Sql操作行為,例如 Select 、Insert、Drop 等sql操作可以表示為QueryOperation、CatalogSinkModifyOperation、DropOperation。FlinkPlannerImpl內部使用 Calsite 的 SqlToRelConverter 將驗證后的抽象語法樹轉換成關系代數樹。
6)、Operation 轉換為 Transformation 邏輯 (Blink Planner : StreamPlanner / BatchPlanner)
基本流程:
從Operation中 獲取到 關系代數樹
根據優化規則優化關系代數樹
生成物理執行計劃
將物理執行計劃轉換成 List<Transformation<?>>
override def translate(
modifyOperations: util.List[ModifyOperation]): util.List[Transformation[]] = {
validateAndOverrideConfiguration()
if (modifyOperations.isEmpty) {
return List.empty[Transformation[]]
}
val relNodes = modifyOperations.map(translateToRel)
val optimizedRelNodes = optimize(relNodes)
val execGraph = translateToExecNodeGraph(optimizedRelNodes)
val transformations = translateToPlan(execGraph)
cleanupInternalConfigurations()
transformations
}
總結
以上是生活随笔為你收集整理的flink 作业提交流程的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 如何理解几何分布与指数分布的无记忆性?
- 下一篇: 传统僵尸网络家族