日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 运维知识 > 数据库 >内容正文

数据库

Flink自定义SQL连接器

發布時間:2024/1/18 数据库 51 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Flink自定义SQL连接器 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

1. 為什么要自定義連接器

通常我們會有這樣的需求,在使用Flink SQL將指標進行聚合計算完成之后,想要寫入到我們想要寫入的中間件時,例如opentsdb時序數據庫,可能會發現Flink官方并沒有給我們提供opentsdb的連接器,這個時候我們就需要自己去定義連接器

2. 自定義連接器的步驟

  • 創建TableFactory,子類有StreamTableSourceFactory和StreamTableSinkFactory
  • 創建TableSink和TableSource,子類有AppendStreamTableSink和StreamTableSource
  • 自定義校驗器ConnectorDescriptorValidator
  • 創建子類繼承RichSinkFunction
  • 在resource目錄下創建META-INF/services,并且創建org.apache.flink.table.factories.TableFactory文件
  • 3. 各步驟解釋

    這里我們以sink的角度來解釋一下各個步驟,source的角度是類似的

    3.1 StreamTableSinkFactory

    我們需要創建自己的Factory去實現StreamTableSinkFactory,主要關注它的幾個方法:createStreamTableSink(Map),requiredContext(),supportedProperties():定義connector支持的配置

    createStreamTableSink(Map):創建StreamTableSink

    requiredContext():唯一標識這個connector的類型,即connector.type

    3.2 AppendStreamTableSink

    這是一個追加流,當然還有upsertStreamTableSink和RetractStreamTableSink,根據自己的需求去使用,它們之間的區別略過

    consumeDataStream():這是我們重點關注的方法,這個方法用于消費數據流中的數據然后通過addSink調用RichSinkFunction,將數據進行消費

    3.3 RichSinkFunction

    我們自己實現的Sink方法,主要有三個方法

    invoke(Row,Context):關鍵代碼,我們在這里獲取數據然后進行操作

    open:一般進行初始化操作,例如初始化一些客戶端如httpClient,kafkaClient

    close:結束時調用,一般進行關閉操作例如客戶端的關閉

    3.4 ConnectorDescriptorValidator

    4. 實戰代碼

    4.1 生成數據

    首先我們將數據推送到我們的Kafka中:

    創建JavaBean:

    public class KafkaTestBean {private Double bandWidth;private Long app_time;private Double packet;private String networkLineId;public KafkaTestBean(Double bandWidth, Long app_time, Double packet, String networkLineId) {this.bandWidth = bandWidth;this.app_time = app_time;this.packet = packet;this.networkLineId = networkLineId;}public Double getPacket() {return packet;}public void setPacket(Double packet) {this.packet = packet;}public KafkaTestBean() {}public Double getBandWidth() {return bandWidth;}public void setBandWidth(Double bandWidth) {this.bandWidth = bandWidth;}public Long getApp_time() {return app_time;}public void setApp_time(Long app_time) {this.app_time = app_time;}public String getNetworkLineId() {return networkLineId;}public void setNetworkLineId(String networkLineId) {this.networkLineId = networkLineId;} }

    Kafka消息生成器:

    public class KafkaMessageGenerator {public static void main(String[] args) throws Exception{//配置信息Properties props = new Properties();//kafka服務器地址props.put("bootstrap.servers", "192.168.245.11:9092");//設置數據key和value的序列化處理類props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");//創建生產者實例KafkaProducer<String,String> producer = new KafkaProducer<>(props);while (true){KafkaTestBean bean = new KafkaTestBean();bean.setNetworkLineId(UUID.randomUUID().toString());bean.setBandWidth(generateBandWidth());bean.setApp_time(System.currentTimeMillis() / 1000);bean.setPacket(generateBandWidth());ProducerRecord<String,String> record = new ProducerRecord("firsttopic", JSONObject.toJSONString(bean));producer.send(record);Thread.sleep(1000);}}private static Double generateBandWidth() {String s1 = String.valueOf((int) ((Math.random()) * 10));String s2 = String.valueOf((int) ((Math.random()) * 10));return Double.parseDouble(s1.concat(".").concat(s2));} }

    4.2 實現功能

    實現這么一個功能:從kafka接受數據然后進行聚合計算,寫入到opentsdb中

    4.2.1 創建TableFactory

    package com.cxc.flink.extend;import com.alibaba.fastjson.JSONObject; import org.apache.flink.table.api.TableSchema; import org.apache.flink.table.descriptors.DescriptorProperties; import org.apache.flink.table.descriptors.FormatDescriptorValidator; import org.apache.flink.table.descriptors.Schema; import org.apache.flink.table.factories.StreamTableSinkFactory; import org.apache.flink.table.factories.StreamTableSourceFactory; import org.apache.flink.table.sinks.StreamTableSink; import org.apache.flink.table.sources.StreamTableSource; import org.apache.flink.table.utils.TableSchemaUtils; import org.apache.flink.types.Row;import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map;import static com.cxc.flink.extend.CustomizedConnectorDescriptorValidator.*; import static org.apache.flink.table.descriptors.ConnectorDescriptorValidator.CONNECTOR_TYPE;/*** create by chenxichao*/ public class CustomizedTableSourceSinkFactory implements StreamTableSinkFactory<Row>, StreamTableSourceFactory<Row> {private CustomizedConnectorDescriptorValidator customizedConnectorDescriptorValidator;@Overridepublic StreamTableSink<Row> createStreamTableSink(Map<String, String> properties) {customizedConnectorDescriptorValidator = new CustomizedConnectorDescriptorValidator();final DescriptorProperties descriptorProperties = new DescriptorProperties(true);descriptorProperties.putProperties(properties);//參數校驗customizedConnectorDescriptorValidator.validate(descriptorProperties);final TableSchema schema = TableSchemaUtils.getPhysicalSchema(descriptorProperties.getTableSchema(Schema.SCHEMA));String job = descriptorProperties.getString(CONNECTOR_JOB);String metrics = descriptorProperties.getString(CONNECTOR_METRICS);String address = descriptorProperties.getString(CONNECTOR_ADDRESS);String format = descriptorProperties.getString(FORMAT_TYPE);return new CustomizedTableSink(job, metrics, address, schema,format);}@Overridepublic StreamTableSource<Row> createStreamTableSource(Map<String, String> properties) {return null;}@Overridepublic Map<String, String> requiredContext() {/*** 這里connector類型,通過這個配置flink有且只能discover一種connector*/Map<String,String> context = new HashMap<>();context.put(CONNECTOR_TYPE,CONNECTOR_TYPE_VALUE_CUSTOMIZE);return context;}/*** 這里是自定義connector支持的配置* @return*/@Overridepublic List<String> supportedProperties() {List<String> supportProperties = new ArrayList<>();supportProperties.add(CONNECTOR_JOB);supportProperties.add(CONNECTOR_METRICS);supportProperties.add(CONNECTOR_ADDRESS);//schemasupportProperties.add(Schema.SCHEMA + ".#." + Schema.SCHEMA_DATA_TYPE);supportProperties.add(Schema.SCHEMA + ".#." + Schema.SCHEMA_NAME);supportProperties.add(FormatDescriptorValidator.FORMAT_TYPE);return supportProperties;}}

    4.2.2 創建StreamTableSink

    package com.cxc.flink.extend;import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSink; import org.apache.flink.table.api.TableSchema; import org.apache.flink.table.api.ValidationException; import org.apache.flink.table.sinks.AppendStreamTableSink; import org.apache.flink.table.sinks.TableSink; import org.apache.flink.table.types.DataType; import org.apache.flink.table.utils.TableConnectorUtils; import org.apache.flink.table.utils.TableSchemaUtils; import org.apache.flink.types.Row; import org.apache.flink.util.Preconditions;import java.util.Arrays;/*** create by chenxichao*/ public class CustomizedTableSink implements AppendStreamTableSink<Row> {private final String job;private final String metrics;private final String address;private final TableSchema schema;private final String formatType;public CustomizedTableSink(String job, String metrics, String address, TableSchema schema,String formatType) {this.job = Preconditions.checkNotNull(job, "job must not be null.");this.metrics = Preconditions.checkNotNull(metrics, "metrics must not be null.");this.address = Preconditions.checkNotNull(address, "address must not be null.");this.schema = TableSchemaUtils.checkNoGeneratedColumns(schema);this.formatType = Preconditions.checkNotNull(formatType,"formatType must not be null");}@Overridepublic void emitDataStream(DataStream<Row> dataStream) {}@Overridepublic DataStreamSink<?> consumeDataStream(DataStream<Row> dataStream) {return dataStream.addSink(new CustomizedSinkFunction(this.address)).setParallelism(dataStream.getParallelism()).name(TableConnectorUtils.generateRuntimeName(this.getClass(),getFieldNames()));}@Overridepublic TableSink<Row> configure(String[] fieldNames, TypeInformation<?>[] fieldTypes) {if (!Arrays.equals(getFieldNames(), fieldNames) || !Arrays.equals(getFieldTypes(), fieldTypes)) {throw new ValidationException("Reconfiguration with different fields is not allowed. " +"Expected: " + Arrays.toString(getFieldNames()) + " / " + Arrays.toString(getFieldTypes()) + ". " +"But was: " + Arrays.toString(fieldNames) + " / " + Arrays.toString(fieldTypes));}return this;}@Overridepublic TypeInformation<Row> getOutputType() {return schema.toRowType();}@Overridepublic String[] getFieldNames() {return schema.getFieldNames();}@Overridepublic TypeInformation<?>[] getFieldTypes() {return schema.getFieldTypes();} }

    4.2.3 創建RickSinkFunction

    package com.cxc.flink.extend;import org.apache.flink.api.common.serialization.SerializationSchema; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.formats.json.JsonRowSerializationSchema; import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; import org.apache.flink.types.Row;/*** create by chenxichao*/ public class CustomizedSinkFunction extends RichSinkFunction<Row> {private String address;public CustomizedSinkFunction(String address){this.address = address;}@Overridepublic void invoke(Row value, Context context) {//打印即可System.out.println("send to " + address + "---" + value);} }

    4.2.4 創建ConnectorDescriptorValidator

    package com.cxc.flink.extend;import org.apache.flink.table.descriptors.ConnectorDescriptorValidator; import org.apache.flink.table.descriptors.DescriptorProperties; import org.slf4j.Logger; import org.slf4j.LoggerFactory;/*** create by chenxichao*/ public class CustomizedConnectorDescriptorValidator extends ConnectorDescriptorValidator {public static final String CONNECTOR_JOB = "connector.job";public static final String CONNECTOR_METRICS = "connector.metrics";public static final String CONNECTOR_ADDRESS = "connector.address";public static final String CONNECTOR_TYPE_VALUE_CUSTOMIZE = "customize";public static final String FORMAT_TYPE = "format.type";private Logger logger = LoggerFactory.getLogger(this.getClass());@Overridepublic void validate(DescriptorProperties properties) {/*** 這里對連接屬性進行校驗*/logger.info("開始校驗連接器參數");super.validate(properties);logger.info("連接器參數校驗完畢");} }

    4.2.5 創建META-INF/services

    TableFactory是利用Java的SPI去發現工廠的,可以在TableServiceFactory的discoverFactories()方法去查看源碼

    在目錄下創建META-INF/services/org.apache.flink.table.factories.TableFactory

    內容為:

    com.cxc.flink.extend.CustomizedTableSourceSinkFactory

    如果不創建該文件則會導致ServiceLoader找不到該工廠,使用就會報錯

    4.2.6 SQL代碼

    寫一個Flink SQL測試程序,計算之后發送到我們自定義的sink中

    import org.apache.flink.streaming.api.TimeCharacteristic; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.EnvironmentSettings; import org.apache.flink.table.api.java.StreamTableEnvironment;/*** create by chenxichao*/ public class SQLExtendTest {public static void main(String[] args) throws Exception{StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env,settings);env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);//輸入表的sql語句StringBuilder INPUT_SQL = new StringBuilder();INPUT_SQL.append("CREATE TABLE bandWidthInputTable (").append("networkLineId VARCHAR,bandWidth DOUBLE,app_time BIGINT,packet DOUBLE,").append("ts as TO_TIMESTAMP(FROM_UNIXTIME(app_time,'yyyy-MM-dd HH:mm:ss')),").append("WATERMARK FOR ts AS ts - INTERVAL '5' second) ").append("WITH(").append("'connector.type' = 'kafka',").append("'connector.version' = 'universal',").append("'connector.topic' = 'firsttopic',").append("'connector.properties.group.id' = 'start_log_group',").append("'connector.properties.zookeeper.connect' = '192.168.245.11:2181',").append("'connector.properties.bootstrap.servers' = '192.168.245.11:9092',").append("'format.type' = 'json'").append(")");tableEnv.sqlUpdate(INPUT_SQL.toString());//輸出表的sql語句StringBuilder OUT_TABLE_SQL = new StringBuilder();OUT_TABLE_SQL.append("CREATE TABLE bandWidthOutputTable (").append("metricset VARCHAR,`value` DOUBLE,`timestamp` BIGINT,networkLineId VARCHAR)").append("WITH(").append("'connector.type' = 'customize',").append("'connector.address' = '192.168.245.138:8081',").append("'connector.job' = 'testextendjob',").append("'connector.metrics' = 'testmetric',").append("'format.type' = 'json'").append(")");tableEnv.sqlUpdate(OUT_TABLE_SQL.toString());String sql = "INSERT INTO bandWidthOutputTable SELECT 'nmct_line_metric_bandwidth' as metricset,sum(bandWidth),avg(app_time),networkLineId FROM bandWidthInputTable GROUP BY tumble(ts,interval '5' second),networkLineId";tableEnv.sqlUpdate(sql);env.execute("window sql job");} }

    5. 功能擴展

    上述代碼我們可以實現一個小小的功能就是打印在控制臺,但是我們發現我們的數據并不是一個標準的JSON格式,在很多場景中我們都需要我們的數據是JSON,那么如何實現呢?

    這里通過翻閱Flink kafka連接器,發現一個SerializationSchema接口,這樣就很簡單了,找到實現類JsonRowSerializationSchema 它內部利用jackson進行json序列化,直接使用即可

    5.1 修改RichSinkFunction

    在invoke方法中處理數據然后進行序列化操作

    package com.cxc.flink.extend;import org.apache.flink.api.common.serialization.SerializationSchema; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.formats.json.JsonRowSerializationSchema; import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; import org.apache.flink.types.Row;/*** create by chenxichao*/ public class CustomizedSinkFunction extends RichSinkFunction<Row> {private String address;private SerializationSchema<Row> serializationSchema;public CustomizedSinkFunction(TypeInformation<Row> rowType,String address,String formatType){this.address = address;if(formatType.equals("json")){this.serializationSchema = new JsonRowSerializationSchema.Builder(rowType).build();}else{throw new RuntimeException("current custom format only support json serializer");}}@Overridepublic void invoke(Row value, Context context) {//打印即可byte[] serialize = this.serializationSchema.serialize(value);String jsonValue = new String(serialize);System.out.println("send to " + address + "---" + jsonValue);}}

    5.2 修改StreamTableSink

    創建SinkFunction的時候將數據schema傳入,為了生成jsonNode

    package com.cxc.flink.extend;import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSink; import org.apache.flink.table.api.TableSchema; import org.apache.flink.table.api.ValidationException; import org.apache.flink.table.sinks.AppendStreamTableSink; import org.apache.flink.table.sinks.TableSink; import org.apache.flink.table.utils.TableConnectorUtils; import org.apache.flink.table.utils.TableSchemaUtils; import org.apache.flink.types.Row; import org.apache.flink.util.Preconditions;import java.util.Arrays;/*** create by chenxichao*/ public class CustomizedTableSink implements AppendStreamTableSink<Row> {private final String job;private final String metrics;private final String address;private final TableSchema schema;private final String formatType;public CustomizedTableSink(String job, String metrics, String address, TableSchema schema,String formatType) {this.job = Preconditions.checkNotNull(job, "job must not be null.");this.metrics = Preconditions.checkNotNull(metrics, "metrics must not be null.");this.address = Preconditions.checkNotNull(address, "address must not be null.");this.schema = TableSchemaUtils.checkNoGeneratedColumns(schema);this.formatType = Preconditions.checkNotNull(formatType,"formatType must not be null");}@Overridepublic void emitDataStream(DataStream<Row> dataStream) {}@Overridepublic DataStreamSink<?> consumeDataStream(DataStream<Row> dataStream) {return dataStream.addSink(new CustomizedSinkFunction(this.schema.toRowType(),this.address,formatType)).setParallelism(dataStream.getParallelism()).name(TableConnectorUtils.generateRuntimeName(this.getClass(),getFieldNames()));}@Overridepublic TableSink<Row> configure(String[] fieldNames, TypeInformation<?>[] fieldTypes) {if (!Arrays.equals(getFieldNames(), fieldNames) || !Arrays.equals(getFieldTypes(), fieldTypes)) {throw new ValidationException("Reconfiguration with different fields is not allowed. " +"Expected: " + Arrays.toString(getFieldNames()) + " / " + Arrays.toString(getFieldTypes()) + ". " +"But was: " + Arrays.toString(fieldNames) + " / " + Arrays.toString(fieldTypes));}return this;}@Overridepublic TypeInformation<Row> getOutputType() {return schema.toRowType();}@Overridepublic String[] getFieldNames() {return schema.getFieldNames();}@Overridepublic TypeInformation<?>[] getFieldTypes() {return schema.getFieldTypes();} }

    總結

    以上是生活随笔為你收集整理的Flink自定义SQL连接器的全部內容,希望文章能夠幫你解決所遇到的問題。

    如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。

    主站蜘蛛池模板: 亚洲国产成人自拍 | 成人性视频免费网站 | 性欧美另类 | 夜夜爽夜夜叫夜夜高潮漏水 | 亚洲资源av | 久久久久麻豆 | 女大学生的家政保姆初体验 | 久久免费看视频 | 你懂的视频网站 | 亚洲精品一线 | 日韩欧美国产一区二区三区在线观看 | 欧美日韩国产伦理 | 伊人久久青青草 | 乳女教师の诱惑julia | 91一级片| 华人在线视频 | 国产精品18久久久久久vr下载 | 中国一级特黄真人毛片免费观看 | 三级小视频在线观看 | 亚洲天堂成人在线 | 少妇人妻偷人精品无码视频 | 厨房性猛交hd | 日韩电影在线观看一区 | 狠狠插av| 亚洲桃色av | 国产欧美精品一区二区三区 | 国产一二三区在线视频 | 中文字幕第十二页 | av在线你懂的| 国产伦精品一区二区三区四区 | 黑白配高清国语在线观看 | yy色综合 | 免费国产高清 | 国产69久久 | 黄色一级片在线看 | 手机看片久久久 | 成人欧美视频在线观看 | 亚洲精品v | av看片网站 | 欧美性猛交xxxx黑人猛交 | 国产区一二 | 一区二区三区av在线 | av噜噜在线 | 亚州av网站| 国产一区二区黄色 | 中文字幕影院 | 影音先锋中文字幕在线播放 | 黑人又大又粗又长 | 婷婷六月综合 | 性活交片大全免费看 | 欧美人伦 | 久青草资源福利视频 | 伊人久久成人网 | 国内毛片毛片毛片毛片 | 亚洲av人无码激艳猛片服务器 | 久久免费精品国产 | 伊人久久大香线蕉成人综合网 | 日朝毛片 | 麻豆视频91 | 亚洲一级片免费 | 中文字幕一区二区三区四区不卡 | 宅男的天堂 | 国产一区二区99 | 激烈娇喘叫1v1高h糙汉 | 国产91精品久久久久久久 | 脱女学生小内内摸了高潮 | 污视频在线免费 | 日本高清xxxx| 成人3d动漫一区二区三区 | 中文字幕一区二区在线视频 | 中文字幕在线第一页 | 小明成人免费视频 | 国产黄a三级三级看三级 | jizz中国少妇 | 新婚之夜玷污岳丰满少妇在线观看 | 狗爬女子的视频 | 国产主播在线一区 | 国产精品一区无码 | 中文字幕第35页 | 九热这里只有精品 | 韩日精品中文字幕 | 久久综合久久久久 | 国产你懂得 | 日本xxxx在线观看 | 少妇综合 | 日韩精品电影在线 | 国产午夜伦鲁鲁 | 毛片aa| 中文字幕一区二区人妻视频 | 欧美涩涩涩| 国产精品久久久久久久午夜 | 欧美视频在线观看一区二区 | 国产孕妇一区二区三区 | av一级二级| 亚洲一区二区三区国产 | 丰满熟妇人妻av无码区 | 久久久久久逼 | 69网址| 国产cao |