日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 >

1.18.5.流式概念、动态表(Dynamic Table)、DataStream上的关系查询、动态表 连续查询(Continuous Query)、在流上定义表、处理时间

發布時間:2024/9/27 42 豆豆
生活随笔 收集整理的這篇文章主要介紹了 1.18.5.流式概念、动态表(Dynamic Table)、DataStream上的关系查询、动态表 连续查询(Continuous Query)、在流上定义表、处理时间 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

1.18.5.流式概念
1.18.5.1.動態表(Dynamic Table)
1.18.5.1.1.DataStream上的關系查詢
1.18.5.1.2.動態表 & 連續查詢(Continuous Query)
1.18.5.1.3.在流上定義表
1.18.5.1.4.連續查詢
1.18.5.1.5.處理時間
1.18.5.1.5.1.在創建表的DDL中定義
1.18.5.1.5.2.在DataStream到Table轉換時定義
1.18.5.1.5.3.使用TableSource定義
1.18.5.1.5.4.更新和追加查詢
1.18.5.1.5.5.查詢限制
1.18.5.1.5.6.表到流的轉換

1.18.5.流式概念

1.18.5.1.動態表(Dynamic Table)

SQL 和關系代數在設計時并未考慮流數據。因此,在關系代數(和 SQL)之間幾乎沒有概念上的差異。

本文會討論這種差異,并介紹 Flink 如何在無界數據集上實現與數據庫引擎在有界數據上的處理具有相同的語義。

1.18.5.1.1.DataStream上的關系查詢

下表比較了傳統的關系代數和流處理與輸入數據、執行和輸出結果的關系。

關系代數/SQL流處理
關系(或表)是有界(多)元組集合。流是一個無限元組序列。
對批數據(例如關系數據庫中的表)執行的查詢可以訪問完整的輸入數據。流式查詢在啟動時不能訪問所有數據,必須”等待”數據流入。
批處理查詢在產生固定大小的結果后終止。流查詢不斷地根據接收到的記錄更新其結果,并且始終不會結束。

盡管存在這些差異,但是使用關系查詢和 SQL 處理流并不是不可能的。高級關系數據庫系統提供了一個稱為 物化視圖(Materialized Views) 的特性。物化視圖被定義為一條 SQL 查詢,就像常規的虛擬視圖一樣。與虛擬視圖相反,物化視圖緩存查詢的結果,因此在訪問視圖時不需要對查詢進行計算。緩存的一個常見難題是防止緩存為過期的結果提供服務。當其定義查詢的基表被修改時,物化視圖將過期。 即時視圖維護(Eager View Maintenance) 是一種一旦更新了物化視圖的基表就立即更新視圖的技術。

如果我們考慮以下問題,那么即時視圖維護和流上的SQL查詢之間的聯系就會變得顯而易見:
?數據庫表是 INSERT、UPDATE 和 DELETE DML 語句的 stream 的結果,通常稱為 changelog stream。
?物化視圖被定義為一條 SQL 查詢。為了更新視圖,查詢不斷地處理視圖的基本關系的changelog流。
?物化視圖是流式SQL查詢的結果。
了解了這些要點之后,我們將在下一節中介紹 動態表(Dynamic tables) 的概念。

1.18.5.1.2.動態表 & 連續查詢(Continuous Query)

動態表 是 Flink 的支持流數據的 Table API 和 SQL 的核心概念。與表示批處理數據的靜態表不同,動態表是隨時間變化的??梢韵癫樵冹o態批處理表一樣查詢它們。查詢動態表將生成一個 連續查詢 。一個連續查詢永遠不會終止,結果會生成一個動態表。查詢不斷更新其(動態)結果表,以反映其(動態)輸入表上的更改。本質上,動態表上的連續查詢非常類似于定義物化視圖的查詢。

需要注意的是,連續查詢的結果在語義上總是等價于以批處理模式在輸入表快照上執行的相同查詢的結果。

下圖顯示了流、動態表和連續查詢之間的關系:

1.將流轉換為動態表。
2.在動態表上計算一個連續查詢,生成一個新的動態表。
3.生成的動態表被轉換回流。

注意: 動態表首先是一個邏輯概念。在查詢執行期間不一定(完全)物化動態表。
在下面,我們將解釋動態表和連續查詢的概念,并使用具有以下模式的單擊事件流:

[user: VARCHAR, //用戶名cTime: TIMESTAMP, //訪問URL的時間url: VARCHAR //用戶訪問的URL ]
1.18.5.1.3.在流上定義表

為了使用關系查詢處理流,必須將其轉換成 Table。從概念上講,流的每條記錄都被解釋為對結果表的 INSERT 操作。本質上我們正在從一個 INSERT-only 的 changelog 流構建表。

下圖顯示了單擊事件流(左側)如何轉換為表(右側)。當插入更多的單擊流記錄時,結果表將不斷增長。

**注意:**在流上定義的表內部沒有物化。

1.18.5.1.4.連續查詢

在動態表上計算一個連續查詢,并生成一個新的動態表。與批處理查詢不同,連續查詢從不終止,并根據其輸入表上的更新更新其結果表。在任何時候,連續查詢的結果在語義上與以批處理模式在輸入表快照上執行的相同查詢的結果相同。

在接下來的代碼中,我們將展示 clicks 表上的兩個示例查詢,這個表是在點擊事件流上定義的。

第一個查詢是一個簡單的 GROUP-BY COUNT 聚合查詢。它基于 user 字段對 clicks 表進行分組,并統計訪問的 URL 的數量。下面的圖顯示了當clicks表被附加的行更新時,查詢是如何被評估的。

當查詢開始,clicks 表(左側)是空的。當第一行數據被插入到 clicks 表時,查詢開始計算結果表。第一行數據 [Mary,./home] 插入后,結果表(右側,上部)由一行 [Mary, 1] 組成。當第二行 [Bob, ./cart] 插入到 clicks 表時,查詢會更新結果表并插入了一行新數據 [Bob, 1]。第三行 [Mary, ./prod?id=1] 將產生已計算的結果行的更新,[Mary, 1] 更新成 [Mary, 2]。最后,當第四行數據加入 clicks 表時,查詢將第三行 [Liz, 1] 插入到結果表中。

第二條查詢與第一條類似,但是除了用戶屬性之外,還將 clicks 分組至每小時滾動窗口中,然后計算 url 數量(基于時間的計算,例如基于特定時間屬性的窗口,后面會討論)。同樣,該圖顯示了不同時間點的輸入和輸出,以可視化動態表的變化特性。

與前面一樣,左邊顯示了輸入表 clicks。查詢每小時持續計算結果并更新結果表。clicks表包含四行帶有時間戳(cTime)的數據,時間戳在 12:00:00 和 12:59:59 之間。查詢從這個輸入計算出兩個結果行(每個 user 一個),并將它們附加到結果表中。對于 13:00:00 和 13:59:59 之間的下一個窗口,clicks 表包含三行,這將導致另外兩行被追加到結果表。隨著時間的推移,更多的行被添加到 click 中,結果表將被更新。

1.18.5.1.5.處理時間

處理時間是基于機器的本地時間來處理數據,它是最簡單的一種時間概念,但是它不能提供確定性。它既不需要從數據里獲取時間,也不需要生成 watermark。

共三種方法可以定義處理時間。

1.18.5.1.5.1.在創建表的DDL中定義

處理時間屬性可以在創建表的DDL中用計算列的方式定義,用PROCTIME()就可以定義處理時間。關于計算列,更多信息可以參考:CREATE TABLE DDL (https://ci.apache.org/projects/flink/flink-docs-release-1.12/zh/dev/table/sql/create.html#create-table)

CREATE TABLE user_actions (user_name STRING,data STRING,user_action_time AS PROCTIME() -- 聲明一個額外的列作為處理時間屬性 ) WITH (... );SELECT TUMBLE_START(user_action_time, INTERVAL '10' MINUTE), COUNT(DISTINCT user_name) FROM user_actions GROUP BY TUMBLE(user_action_time, INTERVAL '10' MINUTE);
1.18.5.1.5.2.在DataStream到Table轉換時定義

處理時間屬性可以在 schema 定義的時候用 .proctime 后綴來定義。時間屬性一定不能定義在一個已有字段上,所以它只能定義在 schema 定義的最后。

Java代碼:

package com.toto.demo.sql;import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.*; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;import static org.apache.flink.table.api.Expressions.$; import static org.apache.flink.table.api.Expressions.lit;public class Demo {public static void main(String[] args) {StreamExecutionEnvironment bsEnv = StreamExecutionEnvironment.getExecutionEnvironment();EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();StreamTableEnvironment tEnv = StreamTableEnvironment.create(bsEnv, bsSettings);// or TableEnvironment bsTableEnv = TableEnvironment.create(bsSettings);DataStream<Tuple2<String, String>> stream = ...;//聲明一個額外的字段作為時間屬性字段Table table = tEnv.fromDataStream(stream, $("user_name"), $("data"),$("user_action_time").proctime());GroupWindowedTable windowedTable = table.window(Tumble.over(lit(10).minute()).on($("user_action_time")).as("userActionWindow"));}}

Scala代碼:

package com.toto.learn.sqlimport org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment} import org.apache.flink.table.api.{EnvironmentSettings, Tumble} import org.apache.flink.table.api.bridge.scala.StreamTableEnvironmentobject Demo {def main(args: Array[String]): Unit = {val bsEnv = StreamExecutionEnvironment.getExecutionEnvironmentval bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()val tEnv = StreamTableEnvironment.create(bsEnv, bsSettings)// or val bsTableEnv = TableEnvironment.create(bsSettings)val stream: DataStream[(String, String)] = ...// 聲明一個額外的字段作為時間屬性字段val table = tEnv.fromDataStream(stream, $"UserActionTimestamp", $"user_name", $"data", $"user_action_time".proctime)val windowedTable = table.window(Tumble over 10.minutes on $"user_action_time" as "userActionWindow")}}
1.18.5.1.5.3.使用TableSource定義

處理時間屬性可以在實現了 DefinedProctimeAttribute 的 TableSource 中定義。邏輯的時間屬性會放在 TableSource 已有物理字段的最后。

// 定義一個由處理時間屬性的 table source public class UserActionSource implements StreamTableSource<Row>, DefinedProctimeAttribute {@Overridepublic TypeInformation<Row> getReturnType() {String[] names = new String[] {"user_name" , "data"};TypeInformation[] types = new TypeInformation[] {Types.STRING(), Types.STRING()};return Types.ROW(names, types);}@Overridepublic DataStream<Row> getDataStream(StreamExecutionEnvironment execEnv) {// create streamDataStream<Row> stream = ...;return stream;}@Overridepublic String getProctimeAttribute() {// 這個名字的列會被追加到最后,作為第三列return "user_action_time";} }// register table source tEnv.registerTableSource("user_actions", new UserActionSource());WindowedTable windowedTable = tEnv.from("user_actions").window(Tumble.over(lit(10).minutes()).on($("user_action_time")).as("userActionWindow"));

Scala代碼:

// 定義一個由處理時間屬性的 table source class UserActionSource extends StreamTableSource[Row] with DefinedProctimeAttribute {override def getReturnType = {val names = Array[String]("user_name" , "data")val types = Array[TypeInformation[_]](Types.STRING, Types.STRING)Types.ROW(names, types)}override def getDataStream(execEnv: StreamExecutionEnvironment): DataStream[Row] = {// create streamval stream = ...stream}override def getProctimeAttribute = {// 這個名字的列會被追加到最后,作為第三列"user_action_time"} }// register table source tEnv.registerTableSource("user_actions", new UserActionSource)val windowedTable = tEnv.from("user_actions").window(Tumble over 10.minutes on $"user_action_time" as "userActionWindow")
1.18.5.1.5.4.更新和追加查詢

雖然這兩個示例查詢看起來非常相似(都計算分組計數聚合),但它們在一個重要方面不同:
?第一個查詢更新先前輸出的結果,即定義結果表的 changelog 流包含 INSERT 和 UPDATE 操作。
?第二個查詢只附加到結果表,即結果表的 changelog 流只包含 INSERT 操作。

一個查詢是產生一個只追加的表還是一個更新的表有一些含義:
?產生更新更改的查詢通常必須維護更多的狀態(請參閱以下部分)。
?將 append-only 的表轉換為流與將已更新的表轉換為流是不同的(參閱表到流的轉換章節)。

1.18.5.1.5.5.查詢限制

許多(但不是全部)語義上有效的查詢可以作為流上的連續查詢進行評估。有些查詢代價太高而無法計算,這可能是由于它們需要維護的狀態大小,也可能是由于計算更新代價太高。
狀態大小:連續查詢在無界流上計算,通常應該運行數周或數月。因此,連續查詢處理的數據總量可能非常大。必須更新先前輸出的結果的查詢需要維護所有輸出的行,以便能夠更新它們。例如,第一個查詢示例需要存儲每個用戶的 URL 計數,以便能夠增加該計數并在輸入表接收新行時發送新結果。如果只跟蹤注冊用戶,則要維護的計數數量可能不會太高。但是,如果未注冊的用戶分配了一個惟一的用戶名,那么要維護的計數數量將隨著時間增長,并可能最終導致查詢失敗。

SELECT user, COUNT(url) FROM clicks GROUP BY user;

計算更新:有些查詢需要重新計算和更新大量已輸出的結果行,即使只添加或更新一條輸入記錄。顯然,這樣的查詢不適合作為連續查詢執行。下面的查詢就是一個例子,它根據最后一次單擊的時間為每個用戶計算一個 RANK。一旦 click 表接收到一個新行,用戶的 lastAction 就會更新,并必須計算一個新的排名。然而,由于兩行不能具有相同的排名,所以所有較低排名的行也需要更新。

SELECT user, RANK() OVER (ORDER BY lastAction) FROM (SELECT user, MAX(cTime) AS lastAction FROM clicks GROUP BY user );

查詢配置(https://ci.apache.org/projects/flink/flink-docs-release-1.12/zh/dev/table/streaming/query_configuration.html)章節討論了控制連續查詢執行的參數。一些參數可以用來在維持狀態的大小和獲得結果的準確性之間做取舍。

1.18.5.1.5.6.表到流的轉換

動態表可以像普通數據庫表一樣通過 INSERT、UPDATE 和 DELETE 來不斷修改。它可能是一個只有一行、不斷更新的表,也可能是一個 insert-only 的表,沒有 UPDATE 和 DELETE 修改,或者介于兩者之間的其他表。

在將動態表轉換為流或將其寫入外部系統時,需要對這些更改進行編碼。Flink的 Table API 和 SQL 支持三種方式來編碼一個動態表的變化:
Append-only 流:僅通過INSERT操作修改的動態表可以通過輸出插入的行轉換為流。
Retract流:retract 流包含兩種類型的 message: add messages 和 retract messages 。通過將INSERT 操作編碼為 add message、將 DELETE 操作編碼為 retract message、將 UPDATE 操作編碼為更新(先前)行的 retract message 和更新(新)行的 add message,將動態表轉換為 retract 流。下圖顯示了將動態表轉換為 retract 流的過程。

Upsert流:upsert 流包含兩種類型的 message: upsert messages 和delete messages。轉換為 upsert 流的動態表需要(可能是組合的)唯一鍵。通過將 INSERT 和 UPDATE 操作編碼為 upsert message,將 DELETE 操作編碼為 delete message ,將具有唯一鍵的動態表轉換為流。消費流的算子需要知道唯一鍵的屬性,以便正確地應用 message。與 retract 流的主要區別在于 UPDATE 操作是用單個 message 編碼的,因此效率更高。下圖顯示了將動態表轉換為 upsert 流的過程。

在通用概念(https://ci.apache.org/projects/flink/flink-docs-release-1.12/zh/dev/table/common.html#convert-a-table-into-a-datastream)中討論了將動態表轉換為 DataStream 的 API。請注意,在將動態表轉換為 DataStream 時,只支持 append 流和 retract 流。在 TableSources 和 TableSinks (https://ci.apache.org/projects/flink/flink-docs-release-1.12/zh/dev/table/sourceSinks.html#define-a-tablesink)章節討論向外部系統輸出動態表的 TableSink 接口。

總結

以上是生活随笔為你收集整理的1.18.5.流式概念、动态表(Dynamic Table)、DataStream上的关系查询、动态表 连续查询(Continuous Query)、在流上定义表、处理时间的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。