2021年大数据Flink(二十六):State代码示例
目錄
State代碼示例
Keyed State
官網代碼示例
需求:
編碼步驟
代碼示例
Operator State
官網代碼示例
需求:
編碼步驟:
代碼示例
State代碼示例
Keyed State
下圖就 word count 的 sum 所使用的StreamGroupedReduce類為例講解了如何在代碼中使用 keyed state:
官網代碼示例
//ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/datastream/fault-tolerance/state/
需求:
使用KeyState中的ValueState獲取數據中的最大值(實際中直接使用maxBy即可)
編碼步驟
//-1.定義一個狀態用來存放最大值
private transient ValueState<Long> maxValueState;
//-2.創建一個狀態描述符對象
ValueStateDescriptor descriptor = new ValueStateDescriptor("maxValueState", Long.class);
//-3.根據狀態描述符獲取State
maxValueState = getRuntimeContext().getState(maxValueStateDescriptor);
?//-4.使用State
Long historyValue = maxValueState.value();
//判斷當前值和歷史值誰大
if (historyValue == null || currentValue > historyValue)
//-5.更新狀態
maxValueState.update(currentValue);?????
???
代碼示例
package cn.it.state;import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;/*** Author lanson* Desc* 使用KeyState中的ValueState獲取流數據中的最大值(實際中直接使用maxBy即可)*/
public class StateDemo01_KeyedState {public static void main(String[] args) throws Exception {//1.envStreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);//方便觀察//2.SourceDataStreamSource<Tuple2<String, Long>> tupleDS = env.fromElements(Tuple2.of("北京", 1L),Tuple2.of("上海", 2L),Tuple2.of("北京", 6L),Tuple2.of("上海", 8L),Tuple2.of("北京", 3L),Tuple2.of("上海", 4L));//3.Transformation//使用KeyState中的ValueState獲取流數據中的最大值(實際中直接使用maxBy即可)//實現方式1:直接使用maxBy--開發中使用該方式即可//min只會求出最小的那個字段,其他的字段不管//minBy會求出最小的那個字段和對應的其他的字段//max只會求出最大的那個字段,其他的字段不管//maxBy會求出最大的那個字段和對應的其他的字段SingleOutputStreamOperator<Tuple2<String, Long>> result = tupleDS.keyBy(t -> t.f0).maxBy(1);//實現方式2:使用KeyState中的ValueState---學習測試時使用,或者后續項目中/實際開發中遇到復雜的Flink沒有實現的邏輯,才用該方式!SingleOutputStreamOperator<Tuple3<String, Long, Long>> result2 = tupleDS.keyBy(t -> t.f0).map(new RichMapFunction<Tuple2<String, Long>, Tuple3<String, Long, Long>>() {//-1.定義狀態用來存儲最大值private ValueState<Long> maxValueState = null;@Overridepublic void open(Configuration parameters) throws Exception {//-2.定義狀態描述符:描述狀態的名稱和里面的數據類型ValueStateDescriptor descriptor = new ValueStateDescriptor("maxValueState", Long.class);//-3.根據狀態描述符初始化狀態maxValueState = getRuntimeContext().getState(descriptor);}@Overridepublic Tuple3<String, Long, Long> map(Tuple2<String, Long> value) throws Exception {//-4.使用State,取出State中的最大值/歷史最大值Long historyMaxValue = maxValueState.value();Long currentValue = value.f1;if (historyMaxValue == null || currentValue > historyMaxValue) {//5-更新狀態,把當前的作為新的最大值存到狀態中maxValueState.update(currentValue);return Tuple3.of(value.f0, currentValue, currentValue);} else {return Tuple3.of(value.f0, currentValue, historyMaxValue);}}});//4.Sink//result.print();result2.print();//5.executeenv.execute();}
}
Operator State
下圖對 word count 示例中的FromElementsFunction類進行詳解并分享如何在代碼中使用 operator state:
官網代碼示例
//ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/datastream/fault-tolerance/state/
需求:
使用ListState存儲offset模擬Kafka的offset維護
編碼步驟:
//-1.聲明一個OperatorState來記錄offset
private ListState<Long> offsetState = null;
private Long offset = 0L;
//-2.創建狀態描述器
ListStateDescriptor<Long> descriptor = new ListStateDescriptor<Long>("offsetState", Long.class);
//-3.根據狀態描述器獲取State
offsetState = context.getOperatorStateStore().getListState(descriptor);
//-4.獲取State中的值
Iterator<Long> iterator = offsetState.get().iterator();
if (iterator.hasNext()) {//迭代器中有值
????offset = iterator.next();//取出的值就是offset
}
offset += 1L;
ctx.collect("subTaskId:" + getRuntimeContext().getIndexOfThisSubtask() + ",當前的offset為:" + offset);
if (offset % 5 == 0) {//每隔5條消息,模擬一個異常
//-5.保存State到Checkpoint中
offsetState.clear();//清理內存中存儲的offset到Checkpoint中
//-6.將offset存入State中
offsetState.add(offset);
代碼示例
package cn.it.state;import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;import java.util.Iterator;
import java.util.concurrent.TimeUnit;/*** Author lanson* Desc* 需求:* 使用OperatorState支持的數據結構ListState存儲offset信息, 模擬Kafka的offset維護,* 其實就是FlinkKafkaConsumer底層對應offset的維護!*/
public class StateDemo02_OperatorState {public static void main(String[] args) throws Exception {//1.envStreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);//先直接使用下面的代碼設置Checkpoint時間間隔和磁盤路徑以及代碼遇到異常后的重啟策略,下午會學env.enableCheckpointing(1000);//每隔1s執行一次Checkpointenv.setStateBackend(new FsStateBackend("file:///D:/ckp"));env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);//固定延遲重啟策略: 程序出現異常的時候,重啟2次,每次延遲3秒鐘重啟,超過2次,程序退出env.setRestartStrategy(RestartStrategies.fixedDelayRestart(2, 3000));//2.SourceDataStreamSource<String> sourceData = env.addSource(new MyKafkaSource());//3.Transformation//4.SinksourceData.print();//5.executeenv.execute();}/*** MyKafkaSource就是模擬的FlinkKafkaConsumer并維護offset*/public static class MyKafkaSource extends RichParallelSourceFunction<String> implements CheckpointedFunction {//-1.聲明一個OperatorState來記錄offsetprivate ListState<Long> offsetState = null;private Long offset = 0L;private boolean flag = true;@Overridepublic void initializeState(FunctionInitializationContext context) throws Exception {//-2.創建狀態描述器ListStateDescriptor descriptor = new ListStateDescriptor("offsetState", Long.class);//-3.根據狀態描述器初始化狀態offsetState = context.getOperatorStateStore().getListState(descriptor);}@Overridepublic void run(SourceContext<String> ctx) throws Exception {//-4.獲取并使用State中的值Iterator<Long> iterator = offsetState.get().iterator();if (iterator.hasNext()){offset = iterator.next();}while (flag){offset += 1;int id = getRuntimeContext().getIndexOfThisSubtask();ctx.collect("分區:"+id+"消費到的offset位置為:" + offset);//1 2 3 4 5 6//Thread.sleep(1000);TimeUnit.SECONDS.sleep(2);if(offset % 5 == 0){System.out.println("程序遇到異常了.....");throw new Exception("程序遇到異常了.....");}}}@Overridepublic void cancel() {flag = false;}/*** 下面的snapshotState方法會按照固定的時間間隔將State信息存儲到Checkpoint/磁盤中,也就是在磁盤做快照!*/@Overridepublic void snapshotState(FunctionSnapshotContext context) throws Exception {//-5.保存State到Checkpoint中offsetState.clear();//清理內存中存儲的offset到Checkpoint中//-6.將offset存入State中offsetState.add(offset);}}
}
總結
以上是生活随笔為你收集整理的2021年大数据Flink(二十六):State代码示例的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 2021年大数据Flink(二十五):F
- 下一篇: 2021年大数据Flink(二十七):F