flink sql设置并行度_Flink集成Hivestream模式用例
01
背景
基于前面的文章
Flink集成hive?bath模式用例
knowfarhhy,公眾號(hào):大數(shù)據(jù)摘文Flink 集成Hive,我們繼續(xù)介紹stream模式下的用例。
02
流模式讀取Hive
EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();????????TableEnvironment?tableEnv?=?TableEnvironment.create(bsSettings); //增加hive支持 String name = "myhive"; String defaultDatabase = "dim";????????String?version?????????=?"1.2.1";? String hivecondir = System.getenv("HIVE_CONF_DIR");????????HiveCatalog?hive?=?new?HiveCatalog(name,?defaultDatabase,?hivecondir,?version); log.info("注冊(cè)catalog"); tableEnv.registerCatalog(name, hive); log.info("注冊(cè)catalog完成"); log.info("使用catalog"); tableEnv.useCatalog(name); log.info("注冊(cè)database");????????tableEnv.useDatabase(defaultDatabase);????????????????tableEnv.sqlQuery("SELECT?name,age,score,dt?FROM?myhive.dim.dim_flink_test").printSchema(); String[] fields = new String[4]; fields[0] = "name"; fields[1] = "age"; fields[2] = "score";????????fields[3]?=?"dt"; TypeInformation[] fieldType = new TypeInformation[4]; fieldType[0] = Types.STRING; fieldType[1] = Types.INT; fieldType[2] = Types.LONG;????????fieldType[3]?=?Types.STRING; PrintTableUpsertSink sink = new PrintTableUpsertSink(fields,fieldType,true); tableEnv.registerTableSink("inserttable",sink); tableEnv.sqlUpdate("INSERT INTO inserttable SELECT name,age,score,dt FROM myhive.dim.dim_flink_test");???????????????tableEnv.execute("stream_read_hive");03
運(yùn)行拓?fù)?/p>
上圖展示了第二節(jié)中的測(cè)試用例任務(wù)的拓?fù)鋱D,我們會(huì)發(fā)現(xiàn)在流任務(wù)中出現(xiàn)了Finish這樣的最終狀態(tài),而不是一個(gè)Running狀態(tài),這個(gè)主要是目前1.10版本支持Hive的功能沒(méi)有那么完善,無(wú)法真正的實(shí)時(shí)讀取Hive數(shù)據(jù),以及無(wú)法檢測(cè)Hive數(shù)據(jù)發(fā)生改變情況,只會(huì)在任務(wù)運(yùn)行時(shí)候讀取一次表數(shù)據(jù),然后Hive相關(guān)的算子任務(wù)便會(huì)結(jié)束。如果想要更好的使用Hive,建議大家還是用Flink 1.11之后,功能更加強(qiáng)大完善。
為了方便看流任務(wù),有Finished狀態(tài),提供另外一個(gè)流任務(wù)的拓?fù)鋱D,便于看到區(qū)別:
具體的流方式讀取Hive,即Hive Streaming,在Flink 1.11進(jìn)行了相關(guān)的支持,這里提供幾篇參考文章:
相關(guān)Hive Streaming文章
Flink?1.11?新特性之?SQL?Hive?Streaming?簡(jiǎn)單示例
LittleMagic,公眾號(hào):Flink 中文社區(qū)Flink 1.11 新特性之 SQL Hive Streaming 簡(jiǎn)單示例Flink?x?Zeppelin?,Hive?Streaming?實(shí)戰(zhàn)解析
狄杰@蘑菇街,公眾號(hào):Flink 中文社區(qū)Flink x Zeppelin ,Hive Streaming 實(shí)戰(zhàn)解析Flink?SQL?FileSystem?Connector?分區(qū)提交與自定義小文件合并策略?
LittleMagic,公眾號(hào):Flink 中文社區(qū)Flink SQL FileSystem Connector 分區(qū)提交與自定義小文件合并策略 ?Flink?1.11?SQL?使用攻略
李勁松,公眾號(hào):Flink 中文社區(qū)Flink 1.11 SQL 使用攻略04
注意事項(xiàng)
任務(wù)運(yùn)行環(huán)境:
(1)設(shè)置Job默認(rèn)并行度為2
(2)基于K8s運(yùn)行,申請(qǐng)了一個(gè)JobManager 以及一個(gè) TaskManager
(3)TaskManager設(shè)置了8個(gè)Slot
上面的拓?fù)渲?#xff0c;我們可以看到第一個(gè)算子的并行度是8,第二個(gè)算子是2,任務(wù)正常執(zhí)行,是因?yàn)樵黾恿似渌O(shè)置才使得任務(wù)正常運(yùn)行。
但是你可能會(huì)遇到下面情況:
第一個(gè)算子并行度是10,第二個(gè)算子并行度是2,因?yàn)榧褐挥?個(gè)Slot可用,就會(huì)導(dǎo)致資源不夠,任務(wù)一直處于created狀態(tài),最終超時(shí)失敗。
問(wèn)題分析:
下面展示了設(shè)置HiveTableSource的并行度:
???? int parallelism = conf.get(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM); if (conf.getBoolean(HiveOptions.TABLE_EXEC_HIVE_INFER_SOURCE_PARALLELISM)) { int max = conf.getInteger(HiveOptions.TABLE_EXEC_HIVE_INFER_SOURCE_PARALLELISM_MAX); if (max < 1) { throw new IllegalConfigurationException( HiveOptions.TABLE_EXEC_HIVE_INFER_SOURCE_PARALLELISM_MAX.key() + " cannot be less than 1"); } int splitNum; try { long nano1 = System.nanoTime(); splitNum = inputFormat.createInputSplits(0).length; long nano2 = System.nanoTime(); LOG.info( "Hive source({}}) createInputSplits use time: {} ms", tablePath, (nano2 - nano1) / 1_000_000); } catch (IOException e) { throw new FlinkHiveException(e); } parallelism = Math.min(splitNum, max); } parallelism = limit > 0 ? Math.min(parallelism, (int) limit / 1000) : parallelism; parallelism = Math.max(1, parallelism); source.setParallelism(parallelism);涉及的相關(guān)參數(shù):
public class HiveOptions { public static final ConfigOption TABLE_EXEC_HIVE_FALLBACK_MAPRED_READER = key("table.exec.hive.fallback-mapred-reader") .defaultValue(false) .withDescription( "If it is false, using flink native vectorized reader to read orc files; " + "If it is true, using hadoop mapred record reader to read orc files."); public static final ConfigOption TABLE_EXEC_HIVE_INFER_SOURCE_PARALLELISM = key("table.exec.hive.infer-source-parallelism") .defaultValue(true) .withDescription( "If is false, parallelism of source are set by config.\n" + "If is true, source parallelism is inferred according to splits number.\n"); public static final ConfigOption TABLE_EXEC_HIVE_INFER_SOURCE_PARALLELISM_MAX = key("table.exec.hive.infer-source-parallelism.max") .defaultValue(1000) .withDescription("Sets max infer parallelism for source operator.");}table.exec.hive.infer-source-parallelism - true : 并行度通過(guò)推導(dǎo)得到,依賴splits 數(shù)量 - false : 通過(guò)config獲得并行度table.exec.resource.default-parallelism 設(shè)置所有Operator(例如join agg filter等)的默認(rèn)并行度table.exec.hive.infer-source-parallelism.max 設(shè)置HiveTableSource的最大并行度,默認(rèn)值1000(1)首先從config中獲取所有算子的默認(rèn)并行度
int parallelism = conf.get(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM);(2)如果沒(méi)有開(kāi)啟并行度自動(dòng)推導(dǎo),那么使用這個(gè)默認(rèn)并行度
(3)如果開(kāi)啟了并行度推導(dǎo),會(huì)根據(jù)計(jì)算的split數(shù)量與設(shè)置的最大并行度取最小值:
parallelism = Math.min(splitNum, max);splitNum大小為下面方法返回?cái)?shù)組的長(zhǎng)度 public HiveTableInputSplit[] createInputSplits(int minNumSplits) throws IOException { List hiveSplits = new ArrayList<>(); int splitNum = 0; for (HiveTablePartition partition : partitions) { StorageDescriptor sd = partition.getStorageDescriptor(); InputFormat format; try { format = (InputFormat) Class.forName(sd.getInputFormat(), true, Thread.currentThread().getContextClassLoader()).newInstance(); } catch (Exception e) { throw new FlinkHiveException("Unable to instantiate the hadoop input format", e); } ReflectionUtils.setConf(format, jobConf); jobConf.set(INPUT_DIR, sd.getLocation()); //TODO: we should consider how to calculate the splits according to minNumSplits in the future. org.apache.hadoop.mapred.InputSplit[] splitArray = format.getSplits(jobConf, minNumSplits); for (org.apache.hadoop.mapred.InputSplit inputSplit : splitArray) { hiveSplits.add(new HiveTableInputSplit(splitNum++, inputSplit, jobConf, partition)); } } return hiveSplits.toArray(new HiveTableInputSplit[0]); }通過(guò)上面的參數(shù)配置 ,我們可以合理的控制HiveTableSource的并行度,不至于超過(guò)集群的資源配置,無(wú)法啟動(dòng)任務(wù)。
?!關(guān)注不迷路~ 各種福利、資源定期分享!
總結(jié)
以上是生活随笔為你收集整理的flink sql设置并行度_Flink集成Hivestream模式用例的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。
- 上一篇: java怎么配置哨兵模式_redis 哨
- 下一篇: jre for mac 删除_在 Mac