2021年大数据Flink(三十六):Table与SQL 案例三
目錄
案例三
需求
編碼步驟
代碼實(shí)現(xiàn)-方式1
代碼實(shí)現(xiàn)-方式2
案例三
需求
使用Flink SQL來統(tǒng)計(jì)5秒內(nèi) 每個(gè)用戶的 訂單總數(shù)、訂單的最大金額、訂單的最小金額
也就是每隔5秒統(tǒng)計(jì)最近5秒的每個(gè)用戶的訂單總數(shù)、訂單的最大金額、訂單的最小金額
上面的需求使用流處理的Window的基于時(shí)間的滾動(dòng)窗口就可以搞定!
那么接下來使用FlinkTable&SQL-API來實(shí)現(xiàn)
???????編碼步驟
1.創(chuàng)建環(huán)境
2.使用自定義函數(shù)模擬實(shí)時(shí)流數(shù)據(jù)
3.設(shè)置事件時(shí)間和Watermaker
4.注冊(cè)表
5.執(zhí)行sql-可以使用sql風(fēng)格或table風(fēng)格(了解)
6.輸出結(jié)果
7.觸發(fā)執(zhí)行
???????代碼實(shí)現(xiàn)-方式1
package cn.it.sql;import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;import java.time.Duration;
import java.util.Random;
import java.util.UUID;
import java.util.concurrent.TimeUnit;import static org.apache.flink.table.api.Expressions.$;/*** Author lanson* Desc*/
public class FlinkSQL_Table_Demo04 {public static void main(String[] args) throws Exception {//1.準(zhǔn)備環(huán)境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);//2.SourceDataStreamSource<Order> orderDS ?= env.addSource(new RichSourceFunction<Order>() {private Boolean isRunning = true;@Overridepublic void run(SourceContext<Order> ctx) throws Exception {Random random = new Random();while (isRunning) {Order order = new Order(UUID.randomUUID().toString(), random.nextInt(3), random.nextInt(101), System.currentTimeMillis());TimeUnit.SECONDS.sleep(1);ctx.collect(order);}}@Overridepublic void cancel() {isRunning = false;}});//3.TransformationDataStream<Order> watermakerDS = orderDS.assignTimestampsAndWatermarks(WatermarkStrategy.<Order>forBoundedOutOfOrderness(Duration.ofSeconds(2)).withTimestampAssigner((event, timestamp) -> event.getCreateTime()));//4.注冊(cè)表tEnv.createTemporaryView("t_order", watermakerDS,$("orderId"), $("userId"), $("money"), $("createTime").rowtime());//5.執(zhí)行SQLString sql = "select " +"userId," +"count(*) as totalCount," +"max(money) as maxMoney," +"min(money) as minMoney " +"from t_order " +"group by userId," +"tumble(createTime, interval '5' second)";Table ResultTable = tEnv.sqlQuery(sql);//6.Sink//將SQL的執(zhí)行結(jié)果轉(zhuǎn)換成DataStream再打印出來//toAppendStream → 將計(jì)算后的數(shù)據(jù)append到結(jié)果DataStream中去//toRetractStream ?→ 將計(jì)算后的新的數(shù)據(jù)在DataStream原數(shù)據(jù)的基礎(chǔ)上更新true或是刪除falseDataStream<Tuple2<Boolean, Row>> resultDS = tEnv.toRetractStream(ResultTable, Row.class);resultDS.print();env.execute();}@Data@AllArgsConstructor@NoArgsConstructorpublic static class Order {private String orderId;private Integer userId;private Integer money;private Long createTime;}
}
toAppendStream → 將計(jì)算后的數(shù)據(jù)append到結(jié)果DataStream中去
toRetractStream ?→ 將計(jì)算后的新的數(shù)據(jù)在DataStream原數(shù)據(jù)的基礎(chǔ)上更新true或是刪除false
???????代碼實(shí)現(xiàn)-方式2
package cn.it.sql;import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.Tumble;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;import java.time.Duration;
import java.util.Random;
import java.util.UUID;
import java.util.concurrent.TimeUnit;import static org.apache.flink.table.api.Expressions.$;
import static org.apache.flink.table.api.Expressions.lit;/*** Author lanson* Desc*/
public class FlinkSQL_Table_Demo05 {public static void main(String[] args) throws Exception {//1.準(zhǔn)備環(huán)境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);//2.SourceDataStreamSource<Order> orderDS = env.addSource(new RichSourceFunction<Order>() {private Boolean isRunning = true;@Overridepublic void run(SourceContext<Order> ctx) throws Exception {Random random = new Random();while (isRunning) {Order order = new Order(UUID.randomUUID().toString(), random.nextInt(3), random.nextInt(101), System.currentTimeMillis());TimeUnit.SECONDS.sleep(1);ctx.collect(order);}}@Overridepublic void cancel() {isRunning = false;}});//3.TransformationDataStream<Order> watermakerDS = orderDS.assignTimestampsAndWatermarks(WatermarkStrategy.<Order>forBoundedOutOfOrderness(Duration.ofSeconds(2)).withTimestampAssigner((event, timestamp) -> event.getCreateTime()));//4.注冊(cè)表tEnv.createTemporaryView("t_order", watermakerDS,$("orderId"), $("userId"), $("money"), $("createTime").rowtime());//查看表約束tEnv.from("t_order").printSchema();//5.TableAPI查詢Table ResultTable = tEnv.from("t_order")//.window(Tumble.over("5.second").on("createTime").as("tumbleWindow")).window(Tumble.over(lit(5).second()).on($("createTime")).as("tumbleWindow")).groupBy($("tumbleWindow"), $("userId")).select($("userId"),$("userId").count().as("totalCount"),$("money").max().as("maxMoney"),$("money").min().as("minMoney"));//6.將SQL的執(zhí)行結(jié)果轉(zhuǎn)換成DataStream再打印出來DataStream<Tuple2<Boolean, Row>> resultDS = tEnv.toRetractStream(ResultTable, Row.class);resultDS.print();//7.excuteenv.execute();}@Data@AllArgsConstructor@NoArgsConstructorpublic static class Order {private String orderId;private Integer userId;private Integer money;private Long createTime;}
}
總結(jié)
以上是生活随笔為你收集整理的2021年大数据Flink(三十六):Table与SQL 案例三的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 2021年大数据Flink(三十五):
- 下一篇: 2021年大数据Flink(三十七):