日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 运维知识 > 数据库 >内容正文

数据库

Demo:基于 Flink SQL 构建流式应用

發布時間:2025/3/21 数据库 40 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Demo:基于 Flink SQL 构建流式应用 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

摘要:上周四在 Flink 中文社區釘釘群中直播分享了《Demo:基于 Flink SQL 構建流式應用》,直播內容偏向實戰演示。這篇文章是對直播內容的一個總結,并且改善了部分內容,比如除 Flink 外其他組件全部采用 Docker Compose 安裝,簡化準備流程。讀者也可以結合視頻和本文一起學習。

Flink 1.10.0 于近期剛發布,釋放了許多令人激動的新特性。尤其是 Flink SQL 模塊,發展速度非常快,因此本文特意從實踐的角度出發,帶領大家一起探索使用 Flink SQL 如何快速構建流式應用。

本文將基于 Kafka, MySQL, Elasticsearch, Kibana,使用 Flink SQL 構建一個電商用戶行為的實時分析應用。本文所有的實戰演練都將在 Flink SQL CLI 上執行,全程只涉及 SQL 純文本,無需一行 Java/Scala 代碼,無需安裝 IDE。本實戰演練的最終效果圖:

??準備

?一臺裝有 Docker 和 Java8 的 Linux 或 MacOS 計算機。

使用 Docker Compose 啟動容器

本實戰演示所依賴的組件全都編排到了容器中,因此可以通過 docker-compose 一鍵啟動。你可以通過 wget 命令自動下載該 docker-compose.yml 文件,也可以手動下載。

mkdir flink-demo; cd flink-demo; wget https://raw.githubusercontent.com/wuchong/flink-sql-demo/master/docker-compose.yml

該 Docker Compose 中包含的容器有:

  • DataGen:數據生成器。容器啟動后會自動開始生成用戶行為數據,并發送到 Kafka 集群中。默認每秒生成 1000 條數據,持續生成約 3 小時。也可以更改 docker-compose.yml 中 datagen 的 speedup 參數來調整生成速率(重啟 docker compose 才能生效)。

  • MySQL:集成了 MySQL 5.7 ,以及預先創建好了類目表(category),預先填入了子類目與頂級類目的映射關系,后續作為維表使用。

  • Kafka:主要用作數據源。DataGen 組件會自動將數據灌入這個容器中。

  • Zookeeper:Kafka 容器依賴。

  • Elasticsearch:主要存儲 Flink SQL 產出的數據。

  • Kibana:可視化 Elasticsearch 中的數據。

在啟動容器前,建議修改 Docker 的配置,將資源調整到 4GB 以及 4核。啟動所有的容器,只需要在 docker-compose.yml 所在目錄下運行如下命令。

docker-compose up -d

該命令會以 detached 模式自動啟動 Docker Compose 配置中定義的所有容器。你可以通過 docker ps 來觀察上述的五個容器是否正常啟動了。也可以訪問 http://localhost:5601/ 來查看 Kibana 是否運行正常。

另外可以通過如下命令停止所有的容器:

docker-compose down

下載安裝 Flink 本地集群

我們推薦用戶手動下載安裝 Flink,而不是通過 Docker 自動啟動 Flink。因為這樣可以更直觀地理解 Flink 的各個組件、依賴、和腳本。

1.下載 Flink 1.10.0 安裝包并解壓(解壓目錄 flink-1.10.0):https://www.apache.org/dist/flink/flink-1.10.0/flink-1.10.0-bin-scala_2.11.tgz

2.進入 flink-1.10.0 目錄:cd flink-1.10.0。

3.通過如下命令下載依賴 jar 包,并拷貝到 lib/ 目錄下,也可手動下載和拷貝。因為我們運行時需要依賴各個 connector 實現。

wget -P ./lib/ https://repo1.maven.org/maven2/org/apache/flink/flink-json/1.10.0/flink-json-1.10.0.jar | \wget -P ./lib/ https://repo1.maven.org/maven2/org/apache/flink/flink-sql-connector-kafka_2.11/1.10.0/flink-sql-connector-kafka_2.11-1.10.0.jar | \wget -P ./lib/ https://repo1.maven.org/maven2/org/apache/flink/flink-sql-connector-elasticsearch6_2.11/1.10.0/flink-sql-connector-elasticsearch6_2.11-1.10.0.jar | \wget -P ./lib/ https://repo1.maven.org/maven2/org/apache/flink/flink-jdbc_2.11/1.10.0/flink-jdbc_2.11-1.10.0.jar | \wget -P ./lib/ https://repo1.maven.org/maven2/mysql/mysql-connector-java/5.1.48/mysql-connector-java-5.1.48.jar

4.將 conf/flink-conf.yaml 中的 taskmanager.numberOfTaskSlots?修改成 10,因為我們會同時運行多個任務。??

5.?執行 ./bin/start-cluster.sh,啟動集群。

運行成功的話,可以在 http://localhost:8081 訪問到 Flink Web UI。并且可以看到可用 Slots 數為 10 個。

6.執行 bin/sql-client.sh embedded 啟動 SQL CLI。便會看到如下的松鼠歡迎界面。

使用 DDL 創建 Kafka 表

Datagen 容器在啟動后會往 Kafka 的 user_behavior topic 中持續不斷地寫入數據。數據包含了2017年11月27日一天的用戶行為(行為包括點擊、購買、加購、喜歡),每一行表示一條用戶行為,以 JSON 的格式由用戶ID、商品ID、商品類目ID、行為類型和時間組成。該原始數據集來自阿里云天池公開數據集,特此鳴謝。

我們可以在 docker-compose.yml 所在目錄下運行如下命令,查看 Kafka 集群中生成的前10條數據。

docker-compose exec kafka bash -c 'kafka-console-consumer.sh --topic user_behavior --bootstrap-server kafka:9094 --from-beginning --max-messages 10' {"user_id": "952483", "item_id":"310884", "category_id": "4580532", "behavior": "pv", "ts": "2017-11-27T00:00:00Z"} {"user_id": "794777", "item_id":"5119439", "category_id": "982926", "behavior": "pv", "ts": "2017-11-27T00:00:00Z"} ...

有了數據源后,我們就可以用 DDL 去創建并連接這個 Kafka 中的 topic 了。在 Flink SQL CLI 中執行該 DDL。

CREATE TABLE user_behavior (user_id BIGINT,item_id BIGINT,category_id BIGINT,behavior STRING,ts TIMESTAMP(3),proctime as PROCTIME(), -- 通過計算列產生一個處理時間列WATERMARK FOR ts as ts - INTERVAL '5' SECOND -- 在ts上定義watermark,ts成為事件時間列 ) WITH ('connector.type' = 'kafka', -- 使用 kafka connector'connector.version' = 'universal', -- kafka 版本,universal 支持 0.11 以上的版本'connector.topic' = 'user_behavior', -- kafka topic'connector.startup-mode' = 'earliest-offset', -- 從起始 offset 開始讀取'connector.properties.zookeeper.connect' = 'localhost:2181', -- zookeeper 地址'connector.properties.bootstrap.servers' = 'localhost:9092', -- kafka broker 地址'format.type' = 'json' -- 數據源格式為 json );

如上我們按照數據的格式聲明了 5 個字段,除此之外,我們還通過計算列語法和 PROCTIME() 內置函數聲明了一個產生處理時間的虛擬列。我們還通過 WATERMARK 語法,在 ts 字段上聲明了 watermark 策略(容忍5秒亂序), ts 字段因此也成了事件時間列。關于時間屬性以及 DDL 語法可以閱讀官方文檔了解更多:

  • 時間屬性:

    https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/streaming/time_attributes.html

  • DDL:

    https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/sql/create.html#create-table

在 SQL CLI 中成功創建 Kafka 表后,可以通過 show tables; 和 describe user_behavior; 來查看目前已注冊的表,以及表的詳細信息。我們也可以直接在 SQL CLI 中運行 SELECT * FROM user_behavior; 預覽下數據(按q退出)。

接下來,我們會通過三個實戰場景來更深入地了解 Flink SQL 。

統計每小時的成交量

使用 DDL 創建 Elasticsearch 表

我們先在 SQL CLI 中創建一個 ES 結果表,根據場景需求主要需要保存兩個數據:小時、成交量。

CREATE TABLE buy_cnt_per_hour ( hour_of_day BIGINT,buy_cnt BIGINT ) WITH ('connector.type' = 'elasticsearch', -- 使用 elasticsearch connector'connector.version' = '6', -- elasticsearch 版本,6 能支持 es 6+ 以及 7+ 的版本'connector.hosts' = 'http://localhost:9200', -- elasticsearch 地址'connector.index' = 'buy_cnt_per_hour', -- elasticsearch 索引名,相當于數據庫的表名'connector.document-type' = 'user_behavior', -- elasticsearch 的 type,相當于數據庫的庫名'connector.bulk-flush.max-actions' = '1', -- 每條數據都刷新'format.type' = 'json', -- 輸出數據格式 json'update-mode' = 'append' );

我們不需要在 Elasticsearch 中事先創建 buy_cnt_per_hour 索引,Flink Job 會自動創建該索引。

提交 Query

統計每小時的成交量就是每小時共有多少 "buy" 的用戶行為。因此會需要用到 TUMBLE 窗口函數,按照一小時切窗。然后每個窗口分別統計 "buy" 的個數,這可以通過先過濾出 "buy" 的數據,然后 COUNT(*) 實現。

INSERT INTO buy_cnt_per_hour SELECT HOUR(TUMBLE_START(ts, INTERVAL '1' HOUR)), COUNT(*) FROM user_behavior WHERE behavior = 'buy' GROUP BY TUMBLE(ts, INTERVAL '1' HOUR);

這里我們使用 HOUR 內置函數,從一個 TIMESTAMP 列中提取出一天中第幾個小時的值。使用了 INSERT INTO將 query 的結果持續不斷地插入到上文定義的 es 結果表中(可以將 es 結果表理解成 query 的物化視圖)。另外可以閱讀該文檔了解更多關于窗口聚合的內容:https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/sql/queries.html#group-windows

在 Flink SQL CLI 中運行上述查詢后,在 Flink Web UI 中就能看到提交的任務,該任務是一個流式任務,因此會一直運行。

可以看到凌晨是一天中成交量的低谷。

使用 Kibana 可視化結果

我們已經通過 Docker Compose 啟動了 Kibana 容器,可以通過 http://localhost:5601 訪問 Kibana。首先我們需要先配置一個 index pattern。點擊左側工具欄的 "Management",就能找到 "Index Patterns"。點擊 "Create Index Pattern",然后通過輸入完整的索引名 "buy_cnt_per_hour" 創建 index pattern。創建完成后, Kibana 就知道了我們的索引,我們就可以開始探索數據了。

先點擊左側工具欄的"Discovery"按鈕,Kibana 就會列出剛剛創建的索引中的內容。

接下來,我們先創建一個 Dashboard 用來展示各個可視化的視圖。點擊頁面左側的"Dashboard",創建一個名為 ”用戶行為日志分析“ 的Dashboard。然后點擊 "Create New" 創建一個新的視圖,選擇 "Area" 面積圖,選擇 "buy_cnt_per_hour" 索引,按照如下截圖中的配置(左側)畫出成交量面積圖,并保存為”每小時成交量“。

統計一天每10分鐘累計獨立用戶數

另一個有意思的可視化是統計一天中每一刻的累計獨立用戶數(uv),也就是每一刻的 uv 數都代表從0點到當前時刻為止的總計 uv 數,因此該曲線肯定是單調遞增的。

我們仍然先在 SQL CLI 中創建一個 Elasticsearch 表,用于存儲結果匯總數據。主要有兩個字段:時間和累積 uv 數。

CREATE TABLE cumulative_uv (time_str STRING,uv BIGINT ) WITH ('connector.type' = 'elasticsearch','connector.version' = '6','connector.hosts' = 'http://localhost:9200','connector.index' = 'cumulative_uv','connector.document-type' = 'user_behavior','format.type' = 'json','update-mode' = 'upsert' );

為了實現該曲線,我們可以先通過 OVER WINDOW 計算出每條數據的當前分鐘,以及當前累計 uv(從0點開始到當前行為止的獨立用戶數)。uv 的統計我們通過內置的 COUNT(DISTINCT user_id)來完成,Flink SQL 內部對 COUNT DISTINCT 做了非常多的優化,因此可以放心使用。

CREATE VIEW uv_per_10min AS SELECT MAX(SUBSTR(DATE_FORMAT(ts, 'HH:mm'),1,4) || '0') OVER w AS time_str, COUNT(DISTINCT user_id) OVER w AS uv FROM user_behavior WINDOW w AS (ORDER BY proctime ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW);

這里我們使用 SUBSTR 和 ?DATE_FORMAT 還有 || 內置函數,將一個 TIMESTAMP 字段轉換成了 10分鐘單位的時間字符串,如: 12:10, 12:20。關于 OVER WINDOW 的更多內容可以參考文檔:https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/sql/queries.html#aggregations

我們還使用了 CREATE VIEW 語法將 query 注冊成了一個邏輯視圖,可以方便地在后續查詢中對該 query 進行引用,這有利于拆解復雜 query。注意,創建邏輯視圖不會觸發作業的執行,視圖的結果也不會落地,因此使用起來非常輕量,沒有額外開銷。由于 uv_per_10min 每條輸入數據都產生一條輸出數據,因此對于存儲壓力較大。我們可以基于 uv_per_10min 再根據分鐘時間進行一次聚合,這樣每10分鐘只有一個點會存儲在 Elasticsearch 中,對于 Elasticsearch 和 Kibana 可視化渲染的壓力會小很多。

INSERT INTO cumulative_uv SELECT time_str, MAX(uv) FROM uv_per_10min GROUP BY time_str;

提交上述查詢后,在 Kibana 中創建 cumulative_uv 的 index pattern,然后在 Dashboard 中創建一個"Line"折線圖,選擇 cumulative_uv 索引,按照如下截圖中的配置(左側)畫出累計獨立用戶數曲線,并保存。

頂級類目排行榜

最后一個有意思的可視化是類目排行榜,從而了解哪些類目是支柱類目。不過由于源數據中的類目分類太細(約5000個類目),對于排行榜意義不大,因此我們希望能將其歸約到頂級類目。所以筆者在 mysql 容器中預先準備了子類目與頂級類目的映射數據,用作維表。

在 SQL CLI 中創建 MySQL 表,后續用作維表查詢。

CREATE TABLE category_dim (sub_category_id BIGINT, -- 子類目parent_category_id BIGINT -- 頂級類目 ) WITH ('connector.type' = 'jdbc','connector.url' = 'jdbc:mysql://localhost:3306/flink','connector.table' = 'category','connector.driver' = 'com.mysql.jdbc.Driver','connector.username' = 'root','connector.password' = '123456','connector.lookup.cache.max-rows' = '5000','connector.lookup.cache.ttl' = '10min' );

同時我們再創建一個 Elasticsearch 表,用于存儲類目統計結果。

CREATE TABLE top_category (category_name STRING, -- 類目名稱buy_cnt BIGINT -- 銷量 ) WITH ('connector.type' = 'elasticsearch','connector.version' = '6','connector.hosts' = 'http://localhost:9200','connector.index' = 'top_category','connector.document-type' = 'user_behavior','format.type' = 'json','update-mode' = 'upsert' );

第一步我們通過維表關聯,補全類目名稱。我們仍然使用 CREATE VIEW 將該查詢注冊成一個視圖,簡化邏輯。維表關聯使用 temporal join 語法,可以查看文檔了解更多:https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/streaming/joins.html#join-with-a-temporal-table

CREATE VIEW rich_user_behavior AS SELECT U.user_id, U.item_id, U.behavior, CASE C.parent_category_idWHEN 1 THEN '服飾鞋包'WHEN 2 THEN '家裝家飾'WHEN 3 THEN '家電'WHEN 4 THEN '美妝'WHEN 5 THEN '母嬰'WHEN 6 THEN '3C數碼'WHEN 7 THEN '運動戶外'WHEN 8 THEN '食品'ELSE '其他'END AS category_name FROM user_behavior AS U LEFT JOIN category_dim FOR SYSTEM_TIME AS OF U.proctime AS C ON U.category_id = C.sub_category_id;

最后根據 類目名稱分組,統計出 buy 的事件數,并寫入 Elasticsearch 中。

INSERT INTO top_category SELECT category_name, COUNT(*) buy_cnt FROM rich_user_behavior WHERE behavior = 'buy' GROUP BY category_name;

提交上述查詢后,在 Kibana 中創建 top_category 的 index pattern,然后在 Dashboard 中創建一個"Horizontal Bar"條形圖,選擇 top_category 索引,按照如下截圖中的配置(左側)畫出類目排行榜,并保存。

可以看到服飾鞋包的成交量遠遠領先其他類目。

Kibana 還提供了非常豐富的圖形和可視化選項,感興趣的用戶可以用 Flink SQL 對數據進行更多維度的分析,并使用 Kibana 展示出可視化圖,并觀測圖形數據的實時變化。

結尾

在本文中,我們展示了如何使用 Flink SQL 集成 Kafka, MySQL, Elasticsearch 以及 Kibana 來快速搭建一個實時分析應用。整個過程無需一行 Java/Scala 代碼,使用 SQL 純文本即可完成。期望通過本文,可以讓讀者了解到 Flink SQL 的易用和強大,包括輕松連接各種外部系統、對事件時間和亂序數據處理的原生支持、維表關聯、豐富的內置函數等等。希望你能喜歡我們的實戰演練,并從中獲得樂趣和知識!

作者介紹:

伍翀(云邪),Apache Flink PMC member & Committer,阿里巴巴技術專家,北京理工大學碩士畢業。2015年加入阿里巴巴,從事 JStorm 的開發與設計。自2016年開始長期活躍于 Flink 社區,Flink/Blink SQL 模塊的核心開發之一。

《新程序員》:云原生和全面數字化實踐50位技術專家共同創作,文字、視頻、音頻交互閱讀

總結

以上是生活随笔為你收集整理的Demo:基于 Flink SQL 构建流式应用的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。