【3】flink sink
生活随笔
收集整理的這篇文章主要介紹了
【3】flink sink
小編覺得挺不錯的,現(xiàn)在分享給大家,幫大家做個參考.
【README】
本文記錄了flink sink操作,輸出目的存儲器(中間件)包括
- kafka;
- es;
- db;
- 等等有很多;
- 本文只給出了 sink2kafka的代碼;
本文使用的flink為 1.14.4 版本;
本文部分內(nèi)容參考了 flink 官方文檔,如下:
Kafka | Apache Flinkhttps://nightlies.apache.org/flink/flink-docs-master/zh/docs/connectors/datastream/kafka/
【1】 flink sink2kafka
1)場景:
- 消費上游topic hello0415的數(shù)據(jù),并把數(shù)據(jù)流輸出到下游kafka topic hell0416;
- 如,我們在java框架中把數(shù)據(jù)庫日志發(fā)送到 topic1 ,然后我想統(tǒng)計執(zhí)行時間大于3秒的sql,則需要把篩選后的sql 發(fā)送到 下游 topic2, 就可以使用sink 來完成;
2)代碼
/*** @Description flink流輸出到kafka(下沉)* @author xiao tang* @version 1.0.0* @createTime 2022年04月16日*/ public class SinkTest1_Kafka {public static void main(String[] args) throws Exception {// 創(chuàng)建執(zhí)行環(huán)境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(4); // 設(shè)置全局并行度// 創(chuàng)建flink連接kafkaKafkaSource kafkaSource = KafkaSource.<String>builder().setValueOnlyDeserializer(new SimpleStringSchema()).setProperties(KafkaConsumerProps._INS.getProps()).setTopics("hello0415").setGroupId("flink").build();DataStream<String> kafkaStream = env.fromSource(kafkaSource, WatermarkStrategy.noWatermarks(), "kafkaSource");// kafka生產(chǎn)者屬性Properties kafkaProducerProps = new Properties();kafkaProducerProps.put(ProducerConfig.ACKS_CONFIG, "all");kafkaProducerProps.put(ProducerConfig.RETRIES_CONFIG, 3);kafkaProducerProps.put(ProducerConfig.BATCH_SIZE_CONFIG, 1 * KfkNumConst._1K);kafkaProducerProps.put(ProducerConfig.LINGER_MS_CONFIG, 1);kafkaProducerProps.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 32 * KfkNumConst._1M);// 把kafka數(shù)據(jù)流輸出到(sink) topic hello0416KafkaSink<String> sink = KafkaSink.<String>builder().setKafkaProducerConfig(kafkaProducerProps).setBootstrapServers("192.168.163.201:9092,192.168.163.202:9092,192.168.163.203:9092").setRecordSerializer(KafkaRecordSerializationSchema.builder().setTopic("hello0416").setValueSerializationSchema(new SimpleStringSchema()).build()).setDeliverGuarantee(DeliveryGuarantee.AT_LEAST_ONCE).build();// 添加到sinkkafkaStream.sinkTo(sink);// 打印streamkafkaStream.print("kafkaStream");// 執(zhí)行env.execute("kafkaSinkJob");} }效果:
?【補充】
kafka 消費者屬性
public enum KafkaConsumerProps {_INS;/* 1.創(chuàng)建kafka生產(chǎn)者的配置信息 */Properties props = new Properties();private KafkaConsumerProps() {/*2.指定連接的kafka集群, broker-list */props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.163.201:9092,192.168.163.202:9092,192.168.163.203:9092");props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());props.put(ConsumerConfig.GROUP_ID_CONFIG, "G1");props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");}public Properties getProps() {return props;} }總結(jié)
以上是生活随笔為你收集整理的【3】flink sink的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 优化差遭吐槽,《CS2(反恐精英 2)》
- 下一篇: 数据结构排序总结