2021年大数据Flink(三十三):Table与SQL相关概念
目錄
相關概念
Dynamic Tables & Continuous Queries
???????Table to Stream Conversion
???????
???????相關概念
https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/streaming/dynamic_tables.html
???????Dynamic Tables & Continuous Queries
?
在Flink中,它把針對無界流的表稱之為Dynamic Table(動態表)。它是Flink Table API和SQL的核心概念。顧名思義,它表示了Table是不斷變化的。
我們可以這樣來理解,當我們用Flink的API,建立一個表,其實把它理解為建立一個邏輯結構,這個邏輯結構需要映射到數據上去。Flink source源源不斷的流入數據,就好比每次都往表上新增一條數據。表中有了數據,我們就可以使用SQL去查詢了。要注意一下,流處理中的數據是只有新增的,所以看起來數據會源源不斷地添加到表中。
動態表也是一種表,既然是表,就應該能夠被查詢。我們來回想一下原先我們查詢表的場景。
打開編譯工具,編寫一條SQL語句
- 將SQL語句放入到mysql的終端執行
- 查看結果
- 再編寫一條SQL語句
- 再放入到終端執行
- 再查看結果
…..如此反復
?
而針對動態表,Flink的source端肯定是源源不斷地會有數據流入,然后我們基于這個數據流建立了一張表,再編寫SQL語句查詢數據,進行處理。這個SQL語句一定是不斷地執行的。而不是只執行一次。注意:針對流處理的SQL絕對不會像批式處理一樣,執行一次拿到結果就完了。而是會不停地執行,不斷地查詢獲取結果處理。所以,官方給這種查詢方式取了一個名字,叫Continuous Query,中文翻譯過來叫連續查詢。而且每一次查詢出來的數據也是不斷變化的。
?
這是一個非常簡單的示意圖。該示意圖描述了:我們通過建立動態表和連續查詢來實現在無界流中的SQL操作。大家也可以看到,在Continuous上面有一個State,表示查詢出來的結果會存儲在State中,再下來Flink最終還是使用流來進行處理。
所以,我們可以理解為Flink的Table API和SQL,是一個邏輯模型,通過該邏輯模型可以讓我們的數據處理變得更加簡單。
?
?
???????Table to Stream Conversion
- 表中的Update和Delete
我們前面提到的表示不斷地Append,表的數據是一直累加的,因為表示對接Source的,Source是不會有update的。但如果我們編寫了一個SQL。這個SQL看起來是這樣的:
SELECT user, sum(money) FROM order GROUP BY user;
第一條數據,張三,2000,執行這條SQL語句的結果是,張三,2000當執行一條SQL語句之后,這條語句的結果還是一個表,因為在Flink中執行的SQL是Continuous Query,這個表的數據是不斷變化的。新創建的表存在Update的情況。仔細看下下面的示例,例如:
第二條數據,李四,1500,繼續執行這條SQL語句,結果是,張三,2000 | 李四,1500
第三條數據,張三,300,繼續執行這條SQL語句,結果是,張三,2300 | 李四,1500
….
大家發現了嗎,現在數據結果是有Update的。張三一開始是2000,但后面變成了2300。
那還有刪除的情況嗎?有的。看一下下面這條SQL語句:
SELECT t1.`user`, SUM(t1.`money`) FROM t_order t1WHERENOT EXISTS (SELECT T2.`user`AS TOTAL_MONEY FROM t_order t2 WHERE T2.`user` = T1.`user` GROUP BY t2.`user` HAVING SUM(T2.`money`) > 3000)GROUP BY t1.`user`GROUP BY t1.`user`
?
第一條數據,張三,2000,執行這條SQL語句的結果是,張三,2000
第二條數據,李四,1500,繼續執行這條SQL語句,結果是,張三,2000 | 李四,1500
第三條數據,張三,300,繼續執行這條SQL語句,結果是,張三,2300 | 李四,1500
第四條數據,張三,800,繼續執行這條SQL語句,結果是,李四,1500
驚不驚喜?意不意外?
因為張三的消費的金額已經超過了3000,所以SQL執行完后,張三是被處理掉了。從數據的角度來看,它不就是被刪除了嗎?
?
通過上面的兩個示例,給大家演示了,在Flink SQL中,對接Source的表都是Append-only的,不斷地增加。執行一些SQL生成的表,這個表可能是要UPDATE的、也可能是要INSERT的。
?
- 對表的編碼操作
我們前面說到過,表是一種邏輯結構。而Flink中的核心還是Stream。所以,Table最終還是會以Stream方式來繼續處理。如果是以Stream方式處理,最終Stream中的數據有可能會寫入到其他的外部系統中,例如:將Stream中的數據寫入到MySQL中。
我們前面也看到了,表是有可能會UPDATE和DELETE的。那么如果是輸出到MySQL中,就要執行UPDATE和DELETE語句了。而DataStream我們在學習Flink的時候就學習過了,DataStream是不能更新、刪除事件的。
如果對表的操作是INSERT,這很好辦,直接轉換輸出就好,因為DataStream數據也是不斷遞增的。但如果一個TABLE中的數據被UPDATE了、或者被DELETE了,如果用流來表達呢?因為流不可變的特征,我們肯定要對這種能夠進行UPDATE/DELETE的TABLE做特殊操作。
我們可以針對每一種操作,INSERT/UPDATE/DELETE都用一個或多個經過編碼的事件來表示。
例如:針對UPDATE,我們用兩個操作來表達,[DELETE] 數據+ ?[INSERT]數據。也就是先把之前的數據刪除,然后再插入一條新的數據。針對DELETE,我們也可以對流中的數據進行編碼,[DELETE]數據。
總體來說,我們通過對流數據進行編碼,也可以告訴DataStream的下游,[DELETE]表示發出MySQL的DELETE操作,將數據刪除。用 [INSERT]表示插入新的數據。
?
- 將表轉換為三種不同編碼方式的流
Flink中的Table API或者SQL支持三種不同的編碼方式。分別是:
Append-only流
Retract流
Upsert流
?
分別來解釋下這三種流。
?
- Append-only流
跟INSERT操作對應。這種編碼類型的流針對的是只會不斷新增的Dynamic Table。這種方式好處理,不需要進行特殊處理,源源不斷地往流中發送事件即可。
?
- Retract流
這種流就和Append-only不太一樣。上面的只能處理INSERT,如果表會發生DELETE或者UPDATE,Append-only編碼方式的流就不合適了。Retract流有幾種類型的事件類型:
ADD MESSAGE:這種消息對應的就是INSERT操作。
RETRACT MESSAGE:直譯過來叫取消消息。這種消息對應的就是DELETE操作。
?
我們可以看到通過ADD MESSAGE和RETRACT MESSAGE可以很好的向外部系統表達刪除和插入操作。那如何進行UPDATE呢?好辦!RETRACT MESSAGE + ADD MESSAGE即可。先把之前的數據進行刪除,然后插入一條新的。完美~
?
- Upsert流
前面我們看到的RETRACT編碼方式的流,實現UPDATE是使用DELETE + INSERT模式的。大家想一下:在MySQL中我們更新數據的時候,肯定不會先DELETE掉一條數據,然后再插入一條數據,肯定是直接發出UPDATE語句執行更新。而Upsert編碼方式的流,是能夠支持Update的,這種效率更高。它同樣有兩種類型的消息:
UPSERT MESSAGE:這種消息可以表示要對外部系統進行Update或者INSERT操作
DELETE MESSAGE:這種消息表示DELETE操作。
Upsert流是要求必須指定Primary Key的,因為Upsert操作是要有Key的。Upsert流針對UPDATE操作用一個UPSERT MESSAGE就可以描述,所以效率會更高。
?
總結
以上是生活随笔為你收集整理的2021年大数据Flink(三十三):Table与SQL相关概念的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 2021年大数据Flink(三十二):
- 下一篇: 2021年大数据Flink(三十四):