日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

【3】flink sink

發(fā)布時間:2023/12/3 编程问答 30 豆豆
生活随笔 收集整理的這篇文章主要介紹了 【3】flink sink 小編覺得挺不錯的,現(xiàn)在分享給大家,幫大家做個參考.

【README】

本文記錄了flink sink操作,輸出目的存儲器(中間件)包括

  • kafka;
  • es;
  • db;
  • 等等有很多;
  • 本文只給出了 sink2kafka的代碼

本文使用的flink為 1.14.4 版本;

本文部分內(nèi)容參考了 flink 官方文檔,如下:

Kafka | Apache Flinkhttps://nightlies.apache.org/flink/flink-docs-master/zh/docs/connectors/datastream/kafka/


【1】 flink sink2kafka

1)場景:

  • 消費上游topic hello0415的數(shù)據(jù),并把數(shù)據(jù)流輸出到下游kafka topic hell0416;
  • 如,我們在java框架中把數(shù)據(jù)庫日志發(fā)送到 topic1 ,然后我想統(tǒng)計執(zhí)行時間大于3秒的sql,則需要把篩選后的sql 發(fā)送到 下游 topic2, 就可以使用sink 來完成;

2)代碼

/*** @Description flink流輸出到kafka(下沉)* @author xiao tang* @version 1.0.0* @createTime 2022年04月16日*/ public class SinkTest1_Kafka {public static void main(String[] args) throws Exception {// 創(chuàng)建執(zhí)行環(huán)境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(4); // 設(shè)置全局并行度// 創(chuàng)建flink連接kafkaKafkaSource kafkaSource = KafkaSource.<String>builder().setValueOnlyDeserializer(new SimpleStringSchema()).setProperties(KafkaConsumerProps._INS.getProps()).setTopics("hello0415").setGroupId("flink").build();DataStream<String> kafkaStream = env.fromSource(kafkaSource, WatermarkStrategy.noWatermarks(), "kafkaSource");// kafka生產(chǎn)者屬性Properties kafkaProducerProps = new Properties();kafkaProducerProps.put(ProducerConfig.ACKS_CONFIG, "all");kafkaProducerProps.put(ProducerConfig.RETRIES_CONFIG, 3);kafkaProducerProps.put(ProducerConfig.BATCH_SIZE_CONFIG, 1 * KfkNumConst._1K);kafkaProducerProps.put(ProducerConfig.LINGER_MS_CONFIG, 1);kafkaProducerProps.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 32 * KfkNumConst._1M);// 把kafka數(shù)據(jù)流輸出到(sink) topic hello0416KafkaSink<String> sink = KafkaSink.<String>builder().setKafkaProducerConfig(kafkaProducerProps).setBootstrapServers("192.168.163.201:9092,192.168.163.202:9092,192.168.163.203:9092").setRecordSerializer(KafkaRecordSerializationSchema.builder().setTopic("hello0416").setValueSerializationSchema(new SimpleStringSchema()).build()).setDeliverGuarantee(DeliveryGuarantee.AT_LEAST_ONCE).build();// 添加到sinkkafkaStream.sinkTo(sink);// 打印streamkafkaStream.print("kafkaStream");// 執(zhí)行env.execute("kafkaSinkJob");} }

效果:


?【補充】

kafka 消費者屬性

public enum KafkaConsumerProps {_INS;/* 1.創(chuàng)建kafka生產(chǎn)者的配置信息 */Properties props = new Properties();private KafkaConsumerProps() {/*2.指定連接的kafka集群, broker-list */props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.163.201:9092,192.168.163.202:9092,192.168.163.203:9092");props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());props.put(ConsumerConfig.GROUP_ID_CONFIG, "G1");props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");}public Properties getProps() {return props;} }

總結(jié)

以上是生活随笔為你收集整理的【3】flink sink的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網(wǎng)站內(nèi)容還不錯,歡迎將生活随笔推薦給好友。