日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當(dāng)前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

大数据之flink数据一致性

發(fā)布時間:2023/12/8 编程问答 29 豆豆
生活随笔 收集整理的這篇文章主要介紹了 大数据之flink数据一致性 小編覺得挺不錯的,現(xiàn)在分享給大家,幫大家做個參考.

一、flink分析結(jié)果寫入redis

1、下載flink-hadoop整合包,放入所有節(jié)點(diǎn)

2、KafkaToRedisWordCount

package cn._51doit.flink.day08;import cn._51doit.flink.day02.RedisSinkDemo; import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.utils.ParameterTool; import org.apache.flink.runtime.state.filesystem.FsStateBackend; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.KeyedStream; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.CheckpointConfig; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer; import org.apache.flink.streaming.connectors.redis.RedisSink; import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig; import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommand; import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommandDescription; import org.apache.flink.streaming.connectors.redis.common.mapper.RedisMapper; import org.apache.flink.util.Collector;import java.util.Properties;/*** 當(dāng)前的程序能不能容錯(保證數(shù)據(jù)的一致性)* 當(dāng)前程序如果可以保證數(shù)據(jù)的一致性,是使用ExactlyOnce還是AtLeastOnce,使用的是AtLeastOnce* KafkaSource:可以記錄偏移量,可以將偏移量保存到狀態(tài)中(OperatorState)* keyBy后調(diào)用sum:sum有狀態(tài)(ValueState)* RedisSink:使用HSET方法可以將數(shù)據(jù)覆蓋(冪等性)*/ public class KafkaToRedisWordCount {//--topic doit2021 --groupId g02 --redisHost node-3.51doit.cn //--redisPwd 123456 --fsBackend hdfs://node-1.51doit.cn:9000/flinkck2021public static void main(String[] args) throws Exception{//System.setProperty("HADOOP_USER_NAME", "root");ParameterTool parameterTool = ParameterTool.fromArgs(args);StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();//可以間內(nèi)存中的狀態(tài)持久化到StateBackendenv.enableCheckpointing(parameterTool.getLong("chkInterval", 30000));//設(shè)置狀態(tài)存儲的后端env.setStateBackend(new FsStateBackend(parameterTool.getRequired("fsBackend")));//如果你手動canceljob后,不刪除job的checkpoint數(shù)據(jù)env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);//設(shè)置Kafka相關(guān)參數(shù)Properties properties = new Properties();//設(shè)置Kafka的地址和端口properties.setProperty("bootstrap.servers", "node-1.51doit.cn:9092,node-1.51doit.cn:9092,node-1.51doit.cn:9092");//讀取偏移量策略:如果沒有記錄偏移量,就從頭讀,如果記錄過偏移量,就接著讀properties.setProperty("auto.offset.reset", "earliest");//設(shè)置消費(fèi)者組IDproperties.setProperty("group.id", parameterTool.get("groupId"));//開啟checkpoint,不然讓flink的消費(fèi)(source對他的subtask)自動提交偏移量properties.setProperty("enable.auto.commit", "false");//創(chuàng)建FlinkKafkaConsumer并傳入相關(guān)參數(shù)FlinkKafkaConsumer<String> kafkaConsumer = new FlinkKafkaConsumer<>(parameterTool.getRequired("topic"), //要讀取數(shù)據(jù)的Topic名稱new SimpleStringSchema(), //讀取文件的反序列化Schemaproperties //傳入Kafka的參數(shù));//設(shè)置在checkpoint是不將偏移量保存到kafka特殊的topic中,可設(shè)可不設(shè)//kafkaConsumer.setCommitOffsetsOnCheckpoints(false); //使用addSource添加kafkaConsumerDataStreamSource<String> lines = env.addSource(kafkaConsumer);SingleOutputStreamOperator<Tuple2<String, Integer>> wordAndOne = lines.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {@Overridepublic void flatMap(String line, Collector<Tuple2<String, Integer>> collector) throws Exception {String[] words = line.split(" ");for (String word : words) {//new Tuple2<String, Integer>(word, 1)collector.collect(Tuple2.of(word, 1));}}});//分組KeyedStream<Tuple2<String, Integer>, String> keyed = wordAndOne.keyBy(t -> t.f0);//聚合SingleOutputStreamOperator<Tuple2<String, Integer>> summed = keyed.sum(1);//將聚合后的結(jié)果寫入到Redis中//調(diào)用Sink//summed.addSink()FlinkJedisPoolConfig conf = new FlinkJedisPoolConfig.Builder().setHost(parameterTool.getRequired("redisHost")).setPassword(parameterTool.getRequired("redisPwd")).setDatabase(9).build();summed.addSink(new RedisSink<Tuple2<String, Integer>>(conf, new RedisSinkDemo.RedisWordCountMapper()));env.execute();}private static class RedisWordCountMapper implements RedisMapper<Tuple2<String, Integer>> {@Overridepublic RedisCommandDescription getCommandDescription() {return new RedisCommandDescription(RedisCommand.HSET, "WORD_COUNT");}@Overridepublic String getKeyFromData(Tuple2<String, Integer> data) {return data.f0;}@Overridepublic String getValueFromData(Tuple2<String, Integer> data) {return data.f1.toString();}}}

備注:若redis掛了,flink繼續(xù)寫入數(shù)據(jù),redis恢復(fù),錯過數(shù)據(jù)依舊寫進(jìn)來,因?yàn)?#xff1b;
取消flink, 不刪除偏移量數(shù)據(jù),重啟后指定上次checkpoint,還能繼續(xù)計(jì)算, 上面的案例就使用的這種方式或者使用savePoint,取消時手動保存。


二、從kafka讀取數(shù)據(jù),處理后寫回kafka

package cn._51doit.flink.day09;import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.runtime.state.filesystem.FsStateBackend; import org.apache.flink.streaming.api.CheckpointingMode; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;import java.util.Properties;/*** 從Kafka中讀取數(shù)據(jù),并且將數(shù)據(jù)處理后在寫回到Kafka* 要求:保證數(shù)據(jù)的一致性* ExactlyOnce(Source可以記錄偏移量【重放】,如果出現(xiàn)異常,的偏移量不更新),Sink要求支持事務(wù)* 開啟Checkpointping,Source的偏移量保存到狀態(tài)中(OperatorState),然后將處理的數(shù)據(jù)也保存狀態(tài)中*/ public class KafkaToKafka {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();//開啟checkpointingenv.enableCheckpointing(30000);env.setStateBackend(new FsStateBackend("file:///Users/xing/Desktop/flinkck20210123"));//設(shè)置Kafka相關(guān)參數(shù)Properties properties = new Properties();//設(shè)置Kafka的地址和端口properties.setProperty("bootstrap.servers", "node-1.51doit.cn:9092,node-1.51doit.cn:9092,node-1.51doit.cn:9092");//讀取偏移量策略:如果沒有記錄偏移量,就從頭讀,如果記錄過偏移量,就接著讀properties.setProperty("auto.offset.reset", "earliest");//設(shè)置消費(fèi)者組IDproperties.setProperty("group.id", "g1");//沒有開啟checkpoint,讓flink提交偏移量的消費(fèi)者定期自動提交偏移量properties.setProperty("enable.auto.commit", "false");//創(chuàng)建FlinkKafkaConsumer并傳入相關(guān)參數(shù)FlinkKafkaConsumer<String> kafkaConsumer = new FlinkKafkaConsumer<>("doit2021", //要讀取數(shù)據(jù)的Topic名稱new SimpleStringSchema(), //讀取文件的反序列化Schemaproperties //傳入Kafka的參數(shù));//使用addSource添加kafkaConsumerkafkaConsumer.setCommitOffsetsOnCheckpoints(false); //在checkpoint時,不將偏移量寫入到kafka特殊的topic中DataStreamSource<String> lines = env.addSource(kafkaConsumer);SingleOutputStreamOperator<String> filtered = lines.filter(e -> !e.startsWith("error"));//使用的是AtLeastOnce // FlinkKafkaProducer<String> kafkaProducer = new FlinkKafkaProducer<>( // "node-1.51doit.cn:9092,node-2.51doit.cn:9092,node-3.51doit.cn:9092", "out2021", new SimpleStringSchema() // );//寫入Kafka的topicString topic = "out2021";//設(shè)置Kafka相關(guān)參數(shù)properties.setProperty("transaction.timeout.ms",1000 * 60 * 5 + "");//創(chuàng)建FlinkKafkaProducerFlinkKafkaProducer<String> kafkaProducer = new FlinkKafkaProducer<String>(topic, //指定topicnew KafkaStringSerializationSchema(topic), //指定寫入Kafka的序列化Schemaproperties, //指定Kafka的相關(guān)參數(shù)FlinkKafkaProducer.Semantic.EXACTLY_ONCE //指定寫入Kafka為EXACTLY_ONCE語義);filtered.addSink(kafkaProducer);env.execute();} }

2、定義KafkaStringSerializationSchema

package cn._51doit.flink.day09;import org.apache.flink.streaming.connectors.kafka.KafkaSerializationSchema; import org.apache.kafka.clients.producer.ProducerRecord;import javax.annotation.Nullable; import java.nio.charset.Charset;/*** 自定義String類型數(shù)據(jù)Kafka的序列化Schema*/ public class KafkaStringSerializationSchema implements KafkaSerializationSchema<String> {private String topic;private String charset;//構(gòu)造方法傳入要寫入的topic和字符集,默認(rèn)使用UTF-8public KafkaStringSerializationSchema(String topic) {this(topic, "UTF-8");}public KafkaStringSerializationSchema(String topic, String charset) {this.topic = topic;this.charset = charset;}//調(diào)用該方法將數(shù)據(jù)進(jìn)行序列化@Overridepublic ProducerRecord<byte[], byte[]> serialize(String element, @Nullable Long timestamp) {//將數(shù)據(jù)轉(zhuǎn)成bytes數(shù)組byte[] bytes = element.getBytes(Charset.forName(charset));//返回ProducerRecordreturn new ProducerRecord<>(topic, bytes);} }

總結(jié)

以上是生活随笔為你收集整理的大数据之flink数据一致性的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網(wǎng)站內(nèi)容還不錯,歡迎將生活随笔推薦給好友。