日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當(dāng)前位置: 首頁 > 人文社科 > 生活经验 >内容正文

生活经验

2021年大数据Flink(三十六):​​​​​​​Table与SQL ​​​​​​案例三

發(fā)布時(shí)間:2023/11/28 生活经验 19 豆豆
生活随笔 收集整理的這篇文章主要介紹了 2021年大数据Flink(三十六):​​​​​​​Table与SQL ​​​​​​案例三 小編覺得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.

目錄

案例三

需求

編碼步驟

代碼實(shí)現(xiàn)-方式1

代碼實(shí)現(xiàn)-方式2


案例三

需求

使用Flink SQL來統(tǒng)計(jì)5秒內(nèi) 每個(gè)用戶的 訂單總數(shù)、訂單的最大金額、訂單的最小金額

也就是每隔5秒統(tǒng)計(jì)最近5秒的每個(gè)用戶的訂單總數(shù)、訂單的最大金額、訂單的最小金額

上面的需求使用流處理的Window的基于時(shí)間的滾動(dòng)窗口就可以搞定!

那么接下來使用FlinkTable&SQL-API來實(shí)現(xiàn)

???????編碼步驟

1.創(chuàng)建環(huán)境

2.使用自定義函數(shù)模擬實(shí)時(shí)流數(shù)據(jù)

3.設(shè)置事件時(shí)間和Watermaker

4.注冊(cè)表

5.執(zhí)行sql-可以使用sql風(fēng)格或table風(fēng)格(了解)

6.輸出結(jié)果

7.觸發(fā)執(zhí)行

???????代碼實(shí)現(xiàn)-方式1

package cn.it.sql;import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;import java.time.Duration;
import java.util.Random;
import java.util.UUID;
import java.util.concurrent.TimeUnit;import static org.apache.flink.table.api.Expressions.$;/*** Author lanson* Desc*/
public class FlinkSQL_Table_Demo04 {public static void main(String[] args) throws Exception {//1.準(zhǔn)備環(huán)境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);//2.SourceDataStreamSource<Order> orderDS ?= env.addSource(new RichSourceFunction<Order>() {private Boolean isRunning = true;@Overridepublic void run(SourceContext<Order> ctx) throws Exception {Random random = new Random();while (isRunning) {Order order = new Order(UUID.randomUUID().toString(), random.nextInt(3), random.nextInt(101), System.currentTimeMillis());TimeUnit.SECONDS.sleep(1);ctx.collect(order);}}@Overridepublic void cancel() {isRunning = false;}});//3.TransformationDataStream<Order> watermakerDS = orderDS.assignTimestampsAndWatermarks(WatermarkStrategy.<Order>forBoundedOutOfOrderness(Duration.ofSeconds(2)).withTimestampAssigner((event, timestamp) -> event.getCreateTime()));//4.注冊(cè)表tEnv.createTemporaryView("t_order", watermakerDS,$("orderId"), $("userId"), $("money"), $("createTime").rowtime());//5.執(zhí)行SQLString sql = "select " +"userId," +"count(*) as totalCount," +"max(money) as maxMoney," +"min(money) as minMoney " +"from t_order " +"group by userId," +"tumble(createTime, interval '5' second)";Table ResultTable = tEnv.sqlQuery(sql);//6.Sink//將SQL的執(zhí)行結(jié)果轉(zhuǎn)換成DataStream再打印出來//toAppendStream → 將計(jì)算后的數(shù)據(jù)append到結(jié)果DataStream中去//toRetractStream ?→ 將計(jì)算后的新的數(shù)據(jù)在DataStream原數(shù)據(jù)的基礎(chǔ)上更新true或是刪除falseDataStream<Tuple2<Boolean, Row>> resultDS = tEnv.toRetractStream(ResultTable, Row.class);resultDS.print();env.execute();}@Data@AllArgsConstructor@NoArgsConstructorpublic static class Order {private String orderId;private Integer userId;private Integer money;private Long createTime;}
}

toAppendStream → 將計(jì)算后的數(shù)據(jù)append到結(jié)果DataStream中去

toRetractStream ?→ 將計(jì)算后的新的數(shù)據(jù)在DataStream原數(shù)據(jù)的基礎(chǔ)上更新true或是刪除false

???????代碼實(shí)現(xiàn)-方式2

package cn.it.sql;import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.Tumble;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;import java.time.Duration;
import java.util.Random;
import java.util.UUID;
import java.util.concurrent.TimeUnit;import static org.apache.flink.table.api.Expressions.$;
import static org.apache.flink.table.api.Expressions.lit;/*** Author lanson* Desc*/
public class FlinkSQL_Table_Demo05 {public static void main(String[] args) throws Exception {//1.準(zhǔn)備環(huán)境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);//2.SourceDataStreamSource<Order> orderDS = env.addSource(new RichSourceFunction<Order>() {private Boolean isRunning = true;@Overridepublic void run(SourceContext<Order> ctx) throws Exception {Random random = new Random();while (isRunning) {Order order = new Order(UUID.randomUUID().toString(), random.nextInt(3), random.nextInt(101), System.currentTimeMillis());TimeUnit.SECONDS.sleep(1);ctx.collect(order);}}@Overridepublic void cancel() {isRunning = false;}});//3.TransformationDataStream<Order> watermakerDS = orderDS.assignTimestampsAndWatermarks(WatermarkStrategy.<Order>forBoundedOutOfOrderness(Duration.ofSeconds(2)).withTimestampAssigner((event, timestamp) -> event.getCreateTime()));//4.注冊(cè)表tEnv.createTemporaryView("t_order", watermakerDS,$("orderId"), $("userId"), $("money"), $("createTime").rowtime());//查看表約束tEnv.from("t_order").printSchema();//5.TableAPI查詢Table ResultTable = tEnv.from("t_order")//.window(Tumble.over("5.second").on("createTime").as("tumbleWindow")).window(Tumble.over(lit(5).second()).on($("createTime")).as("tumbleWindow")).groupBy($("tumbleWindow"), $("userId")).select($("userId"),$("userId").count().as("totalCount"),$("money").max().as("maxMoney"),$("money").min().as("minMoney"));//6.將SQL的執(zhí)行結(jié)果轉(zhuǎn)換成DataStream再打印出來DataStream<Tuple2<Boolean, Row>> resultDS = tEnv.toRetractStream(ResultTable, Row.class);resultDS.print();//7.excuteenv.execute();}@Data@AllArgsConstructor@NoArgsConstructorpublic static class Order {private String orderId;private Integer userId;private Integer money;private Long createTime;}
}

總結(jié)

以上是生活随笔為你收集整理的2021年大数据Flink(三十六):​​​​​​​Table与SQL ​​​​​​案例三的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網(wǎng)站內(nèi)容還不錯(cuò),歡迎將生活随笔推薦給好友。