日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 人文社科 > 生活经验 >内容正文

生活经验

2021年大数据Flink(二十六):​​​​​​​State代码示例

發布時間:2023/11/28 生活经验 29 豆豆
生活随笔 收集整理的這篇文章主要介紹了 2021年大数据Flink(二十六):​​​​​​​State代码示例 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

目錄

State代碼示例

Keyed State

官網代碼示例

需求:

編碼步驟

代碼示例

Operator State

官網代碼示例

需求:

編碼步驟:

代碼示例


State代碼示例



Keyed State

下圖就 word count 的 sum 所使用的StreamGroupedReduce類為例講解了如何在代碼中使用 keyed state:

官網代碼示例

//ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/datastream/fault-tolerance/state/

需求:

使用KeyState中的ValueState獲取數據中的最大值(實際中直接使用maxBy即可)

編碼步驟

//-1.定義一個狀態用來存放最大值

private transient ValueState<Long> maxValueState;

//-2.創建一個狀態描述符對象

ValueStateDescriptor descriptor = new ValueStateDescriptor("maxValueState", Long.class);

//-3.根據狀態描述符獲取State

maxValueState = getRuntimeContext().getState(maxValueStateDescriptor);

?//-4.使用State

Long historyValue = maxValueState.value();

//判斷當前值和歷史值誰大

if (historyValue == null || currentValue > historyValue)

//-5.更新狀態

maxValueState.update(currentValue);?????

???

代碼示例

package cn.it.state;import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;/*** Author lanson* Desc* 使用KeyState中的ValueState獲取流數據中的最大值(實際中直接使用maxBy即可)*/
public class StateDemo01_KeyedState {public static void main(String[] args) throws Exception {//1.envStreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);//方便觀察//2.SourceDataStreamSource<Tuple2<String, Long>> tupleDS = env.fromElements(Tuple2.of("北京", 1L),Tuple2.of("上海", 2L),Tuple2.of("北京", 6L),Tuple2.of("上海", 8L),Tuple2.of("北京", 3L),Tuple2.of("上海", 4L));//3.Transformation//使用KeyState中的ValueState獲取流數據中的最大值(實際中直接使用maxBy即可)//實現方式1:直接使用maxBy--開發中使用該方式即可//min只會求出最小的那個字段,其他的字段不管//minBy會求出最小的那個字段和對應的其他的字段//max只會求出最大的那個字段,其他的字段不管//maxBy會求出最大的那個字段和對應的其他的字段SingleOutputStreamOperator<Tuple2<String, Long>> result = tupleDS.keyBy(t -> t.f0).maxBy(1);//實現方式2:使用KeyState中的ValueState---學習測試時使用,或者后續項目中/實際開發中遇到復雜的Flink沒有實現的邏輯,才用該方式!SingleOutputStreamOperator<Tuple3<String, Long, Long>> result2 = tupleDS.keyBy(t -> t.f0).map(new RichMapFunction<Tuple2<String, Long>, Tuple3<String, Long, Long>>() {//-1.定義狀態用來存儲最大值private ValueState<Long> maxValueState = null;@Overridepublic void open(Configuration parameters) throws Exception {//-2.定義狀態描述符:描述狀態的名稱和里面的數據類型ValueStateDescriptor descriptor = new ValueStateDescriptor("maxValueState", Long.class);//-3.根據狀態描述符初始化狀態maxValueState = getRuntimeContext().getState(descriptor);}@Overridepublic Tuple3<String, Long, Long> map(Tuple2<String, Long> value) throws Exception {//-4.使用State,取出State中的最大值/歷史最大值Long historyMaxValue = maxValueState.value();Long currentValue = value.f1;if (historyMaxValue == null || currentValue > historyMaxValue) {//5-更新狀態,把當前的作為新的最大值存到狀態中maxValueState.update(currentValue);return Tuple3.of(value.f0, currentValue, currentValue);} else {return Tuple3.of(value.f0, currentValue, historyMaxValue);}}});//4.Sink//result.print();result2.print();//5.executeenv.execute();}
}

Operator State

下圖對 word count 示例中的FromElementsFunction類進行詳解并分享如何在代碼中使用 operator state:

官網代碼示例

//ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/datastream/fault-tolerance/state/

需求:

使用ListState存儲offset模擬Kafka的offset維護

編碼步驟:

//-1.聲明一個OperatorState來記錄offset

private ListState<Long> offsetState = null;

private Long offset = 0L;

//-2.創建狀態描述器

ListStateDescriptor<Long> descriptor = new ListStateDescriptor<Long>("offsetState", Long.class);

//-3.根據狀態描述器獲取State

offsetState = context.getOperatorStateStore().getListState(descriptor);

//-4.獲取State中的值

Iterator<Long> iterator = offsetState.get().iterator();

if (iterator.hasNext()) {//迭代器中有值

????offset = iterator.next();//取出的值就是offset

}

offset += 1L;

ctx.collect("subTaskId:" + getRuntimeContext().getIndexOfThisSubtask() + ",當前的offset為:" + offset);

if (offset % 5 == 0) {//每隔5條消息,模擬一個異常

//-5.保存State到Checkpoint中

offsetState.clear();//清理內存中存儲的offset到Checkpoint中

//-6.將offset存入State中

offsetState.add(offset);

代碼示例

package cn.it.state;import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;import java.util.Iterator;
import java.util.concurrent.TimeUnit;/*** Author lanson* Desc* 需求:* 使用OperatorState支持的數據結構ListState存儲offset信息, 模擬Kafka的offset維護,* 其實就是FlinkKafkaConsumer底層對應offset的維護!*/
public class StateDemo02_OperatorState {public static void main(String[] args) throws Exception {//1.envStreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);//先直接使用下面的代碼設置Checkpoint時間間隔和磁盤路徑以及代碼遇到異常后的重啟策略,下午會學env.enableCheckpointing(1000);//每隔1s執行一次Checkpointenv.setStateBackend(new FsStateBackend("file:///D:/ckp"));env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);//固定延遲重啟策略: 程序出現異常的時候,重啟2次,每次延遲3秒鐘重啟,超過2次,程序退出env.setRestartStrategy(RestartStrategies.fixedDelayRestart(2, 3000));//2.SourceDataStreamSource<String> sourceData = env.addSource(new MyKafkaSource());//3.Transformation//4.SinksourceData.print();//5.executeenv.execute();}/*** MyKafkaSource就是模擬的FlinkKafkaConsumer并維護offset*/public static class MyKafkaSource extends RichParallelSourceFunction<String> implements CheckpointedFunction {//-1.聲明一個OperatorState來記錄offsetprivate ListState<Long> offsetState = null;private Long offset = 0L;private boolean flag = true;@Overridepublic void initializeState(FunctionInitializationContext context) throws Exception {//-2.創建狀態描述器ListStateDescriptor descriptor = new ListStateDescriptor("offsetState", Long.class);//-3.根據狀態描述器初始化狀態offsetState = context.getOperatorStateStore().getListState(descriptor);}@Overridepublic void run(SourceContext<String> ctx) throws Exception {//-4.獲取并使用State中的值Iterator<Long> iterator = offsetState.get().iterator();if (iterator.hasNext()){offset = iterator.next();}while (flag){offset += 1;int id = getRuntimeContext().getIndexOfThisSubtask();ctx.collect("分區:"+id+"消費到的offset位置為:" + offset);//1 2 3 4 5 6//Thread.sleep(1000);TimeUnit.SECONDS.sleep(2);if(offset % 5 == 0){System.out.println("程序遇到異常了.....");throw new Exception("程序遇到異常了.....");}}}@Overridepublic void cancel() {flag = false;}/*** 下面的snapshotState方法會按照固定的時間間隔將State信息存儲到Checkpoint/磁盤中,也就是在磁盤做快照!*/@Overridepublic void snapshotState(FunctionSnapshotContext context) throws Exception {//-5.保存State到Checkpoint中offsetState.clear();//清理內存中存儲的offset到Checkpoint中//-6.將offset存入State中offsetState.add(offset);}}
}

總結

以上是生活随笔為你收集整理的2021年大数据Flink(二十六):​​​​​​​State代码示例的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。

主站蜘蛛池模板: 超碰8| 网红av在线 | 黄色一级片在线播放 | 熟女少妇一区二区三区 | 中文字幕av片 | 麻豆视频官网 | 欧美精品videos另类日本 | 漂亮人妻洗澡被公强 日日躁 | 黄色网页网站 | 激情av | 91射| 免费视频一区二区 | gav在线| 91片黄在线观看 | 韩国主播青草200vip视频 | avav国产 | 免费不卡的av | 夜间福利在线 | 中文字幕在线观看日韩 | 免费国产黄 | av在线亚洲天堂 | 在线播放精品视频 | 欧美日韩黄色 | 91麻豆精品国产91久久久久久 | 精品久久BBBBB精品人妻 | 99精品欧美一区二区蜜桃免费 | 五月婷婷激情小说 | 国产精品网站入口 | 涩涩一区 | 一区二区不卡视频 | 看一级黄色片 | 青青草中文字幕 | 亚洲你我色 | 欧美国产日韩精品 | 波多野结衣1区2区3区 | 在线看国产 | 一区二区三区黄色 | 成人免费短视频 | 亚洲成人一区二区在线观看 | 日韩欧美xxx| 亚洲伦理在线视频 | 狠狠激情| 亚洲一区二区三区四区五区六区 | 中文字幕第八页 | 亚洲射情| 久久午夜免费视频 | 性生活一级大片 | 欧美综合视频在线观看 | 五月婷婷综合激情 | 国产精品一区二区三 | 欧美国产综合 | 欧美69影院| 懂色a v| 99视频这里有精品 | 99夜色| 国产欧美激情在线观看 | 我要色综合网 | 成人亚洲网站 | 精品国产乱码久久 | 日韩少妇毛片 | 色婷婷激情五月 | 最新在线视频 | 超碰女人| 伊人最新网址 | 91精品视频免费在线观看 | 欧美日韩四区 | 欧美在线观看视频一区 | 成人自拍av| 成人欧美视频在线观看 | 亚洲人交配 | 国产99久久久国产精品成人免费 | 精品国产精品国产偷麻豆 | 亚洲人成亚洲人成在线观看 | 国内精品91| 中文字幕在线观看日韩 | 筱田优av| 色很久| 日韩伦理一区 | 国产精品久久久久久婷婷天堂 | 日本少妇网站 | 97xxxx| 肉体粗喘娇吟国产91 | 欧美激情喷水 | 免费看黄在线观看 | 黄色肉肉视频 | 永久免费在线观看视频 | 国产激情精品 | 中文字幕在线观看网址 | 国产三级在线观看完整版 | 黄色不卡 | 无罩大乳的熟妇正在播放 | 国产最新在线视频 | 久久午夜国产精品 | 国产女主播福利 | 日日干综合| 性感美女av在线 | 一道本久久 | 成人精品| 亚洲国产日韩a在线播放性色 |