日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當(dāng)前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

阿里云 EMR Delta Lake 在流利说数据接入中的架构和实践

發(fā)布時(shí)間:2024/8/23 编程问答 50 豆豆
生活随笔 收集整理的這篇文章主要介紹了 阿里云 EMR Delta Lake 在流利说数据接入中的架构和实践 小編覺得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.

簡介: 為了消滅數(shù)據(jù)孤島,企業(yè)往往會(huì)把各個(gè)組織的數(shù)據(jù)都接入到數(shù)據(jù)湖以提供統(tǒng)一的查詢或分析。本文將介紹流利說當(dāng)前數(shù)據(jù)接入的整個(gè)過程,期間遇到的挑戰(zhàn),以及delta在數(shù)據(jù)接入中產(chǎn)生的價(jià)值。

?

背景

流利說目前的離線計(jì)算任務(wù)中,大部分?jǐn)?shù)據(jù)源都是來自于業(yè)務(wù) DB,業(yè)務(wù)DB數(shù)據(jù)接入的準(zhǔn)確性、穩(wěn)定性和及時(shí)性,決定著下游整個(gè)離線計(jì)算 pipeline 的準(zhǔn)確性和及時(shí)性。同時(shí),我們還有部分業(yè)務(wù)需求,需要對(duì) DB 中的數(shù)據(jù)和 hive 中的數(shù)據(jù)做近實(shí)時(shí)的聯(lián)合查詢。
在引入阿里云 EMR Delta Lake 之前,我們通過封裝 DataX 來完成業(yè)務(wù) DB 數(shù)據(jù)的接入,采用 Master-Slave 架構(gòu),Master 維護(hù)著每日要執(zhí)行的 DataX 任務(wù)的元數(shù)據(jù)信息,Worker 節(jié)點(diǎn)通過不斷的以搶占的方式獲取狀態(tài)為 init 和 restryable 的 DataX 任務(wù)來執(zhí)行,直到當(dāng)天的所有的 DataX 任務(wù)全都執(zhí)行完畢為止。

架構(gòu)圖大致如下:

Worker 處理的過程如下:

對(duì)于近實(shí)時(shí)需求,我們是直接開一個(gè)從庫,配置 presto connector 去連接從庫,來實(shí)現(xiàn)業(yè)務(wù) BD 中的數(shù)據(jù)和 hive 中的數(shù)據(jù)做近實(shí)時(shí)的聯(lián)合查詢需求。

這種架構(gòu)方案的優(yōu)點(diǎn)是簡單,易于實(shí)現(xiàn)。但是隨著數(shù)據(jù)量也來越多,缺點(diǎn)也就逐漸暴露出來了:
性能瓶頸: 隨著業(yè)務(wù)的增長,這種通過 SELECT 的方式接入數(shù)據(jù)的性能會(huì)越來越差,受 DB 性能瓶頸影響,無法通過增加 Worker 節(jié)點(diǎn)的方式來緩解。
規(guī)模大的表只能通過從庫來拉取,造成數(shù)據(jù)接入的成本越來越高。
無法業(yè)務(wù)滿足近實(shí)時(shí)的查詢需求,近實(shí)時(shí)查詢只能通過從庫的方式查詢,進(jìn)一步加大了接入的成本。
為了解決這些問題,我們將目光聚焦到了 CDC實(shí)時(shí)接入的方案上。

技術(shù)方案選型

對(duì)于 CDC實(shí)時(shí)接入的方案,目前業(yè)內(nèi)主要有以下幾種: CDC + Merge 方案、CDC + Hudi、CDC + Delta Lake 及 CDC + Iceberg 等幾種方案。其中,CDC + Merge 方案是在是在數(shù)據(jù)湖方案出現(xiàn)之前的做法,這種方案能節(jié)省DB從庫的成本,但是無法滿足業(yè)務(wù)近實(shí)時(shí)查詢的需求等功能,所以最開始就 pass 掉了,而 Iceberg 在我們選型之初,還不夠成熟,業(yè)界也沒有可參考的案列,所以也被 pass 掉了,最后我們是在 CDC + Hudi 和 CDC + Delta Lake 之間選擇。
在選型時(shí),Hudi 和 Delta Lake 兩者的功能上都是大同小異的,所以我們主要是從這幾方案來考慮的: 穩(wěn)定性、小文件合并、是否支持SQL、云廠商支持程度、語言支持程度等幾個(gè)方面來考慮。

基于以上指標(biāo),加上我們整個(gè)數(shù)據(jù)平臺(tái)都是基于阿里云 EMR 搭建的,選擇 Delta Lake 的話,會(huì)省掉大量的適配開發(fā)工作,所以我們最終選擇了 CDC + Delta Lake 的方案。
整體架構(gòu)

?

總體架構(gòu)圖

整體的架構(gòu)如上圖所示。我們接入的數(shù)據(jù)會(huì)分為兩部分,存量歷史數(shù)據(jù)和新數(shù)據(jù),存量歷史數(shù)據(jù)使用 DataX 從 MySQL 中導(dǎo)出,存入 OSS 中,新數(shù)據(jù)使用 Binlog 采集存入 Delta Lake 表中。每日凌晨跑 ETL 任務(wù)前,先對(duì)歷史數(shù)據(jù)和新數(shù)據(jù)做 Merge 操作,ETL 任務(wù)使用 Merge 之后的數(shù)據(jù)。

Delta Lake 數(shù)據(jù)接入
在 Binlog 實(shí)時(shí)采集方面,我們采用了開源的 Debezium ,負(fù)責(zé)從 MySQL 實(shí)時(shí)拉取 Binlog 并完成適當(dāng)解析,每張表對(duì)應(yīng)一個(gè) Topic ,分庫分表合并為一個(gè) Topic 分發(fā)到 Kafka 上供下游消費(fèi)。Binlog 數(shù)據(jù)接入到 Kafka 之后,我們需要?jiǎng)?chuàng)建 Kafka Source 表指向?qū)?yīng)的 Kafka Topic 中, 表的格式為:

CREATE TABLE kafka_{db_name}_{table_name} (key BINARY, value BINARY, topic STRING, partition INT, offset BIGINT, timestamp TIMESTAMP, timestampType INT)
USING kafka
OPTIONS (
kafka.sasl.mechanism 'PLAIN',
subscribe 'cdc-{db_name}-{table_name}',
serialization.format '1',
kafka.sasl.jaas.config '*****(redacted)',
kafka.bootstrap.servers '{bootstrap-servers}',
kafka.security.protocol 'SASL_PLAINTEXT'
)
我們主要用到的字段是 value 和 offset ,其中 value 的格式如下:

{
"payload": {
"before": {
db記錄變更前的schema及內(nèi)容,op=c時(shí),為null
},
"after": {
db記錄變更后的schema及內(nèi)容,op=d時(shí),為null
},
"source": {
ebezium配置信息
},
"op": "c",
"ts_ms":
}
}
同時(shí)創(chuàng)建 Delta Lake 表,Location 指向 HDFS 或者 OSS ,表結(jié)構(gòu)為:

CREATE TABLE IF NOT EXISTS delta.delta_{dbname}{table_name}(
{row_key_info},
ts_ms bigint,
json_record string,
operation_type string,
offset bigint
)
USING delta
LOCATION '------/delta/{db_name}.db/{table_name}'
其中 row_key_info 為 Delta Lake 表的唯一索引字段,對(duì)于單庫單表而言,row_key_info 為 mysql 表的 primary key 字段 eg: id long,對(duì)于分庫分表及分實(shí)例分庫分表而言,row_key_info 為分庫分表的字段和單表里primary key 字段組成,eg: 以 user_id 為分表字段,每張表里以 id 為 primary key , 那么對(duì)應(yīng)的 row_key_info 為 id long, user_id long。
StreamingSQL 處理 Kafka 中的數(shù)據(jù),我們主要是提取 Kafka Source 表中的 offset、value 字段及 value 字段中的 CDC 信息如: op、ts_ms 及 payload 的 after 和 before 字段。StreamingSQL 中,我們采用 5min 一個(gè) mini batch,主要是考慮到 mini batch 太小會(huì)產(chǎn)生很多小文件,處理速度會(huì)越來越慢,也會(huì)影響讀的性能,太大了又沒法滿足近實(shí)時(shí)查詢的要求。而 Delta Lake 表,我們不將 after 或者 before 字段解析出來,主要是考慮到我們業(yè)務(wù)表 的 schema 經(jīng)常變更,業(yè)務(wù)表 schema 一變更就要去修復(fù)一遍數(shù)據(jù),成本比較大。在 StreamingSQL 處理過程中,對(duì)于 op=’c’ 的數(shù)據(jù)我們會(huì)直接 insert 操作,json_record 取 after 字段。對(duì)于 op=’u’ 或者 op=’d’ 的數(shù)據(jù),如果 Delta Lake 表中不存在,那么執(zhí)行 insert 操作, 如果存在,那么執(zhí)行 update 操作;json_record 的賦值值,op=’d’,json_record 取 before 字段,op=’u’,jsonrecord 取 after 字段。保留 op=’d’ 的字段,主要是考慮到刪除的數(shù)據(jù)可能在存量歷史表中,如果直接刪除的話,凌晨 merge 的數(shù)據(jù)中,存在存量歷史表中的數(shù)據(jù)就不會(huì)被刪除。
整個(gè) StreamingSQL 的處理大致如下:
CREATE SCAN incremental{dbname}{tablename} on kafka{dbname}{table_name} USING STREAM
OPTIONS(
startingOffsets='earliest',
maxOffsetsPerTrigger='1000000',
failOnDataLoss=false
);
CREATE STREAM job
OPTIONS(
checkpointLocation='------/delta/{db_name}.db/{table_name}checkpoint',
triggerIntervalMs='300000'
)
MERGE INTO delta.delta{dbname}{table_name} as target
USING (
SELECT * FROM (
SELECT ts_ms, offset, operation_type, {key_column_sql}, coalesce(after_record, before_record) as after_record, row_number() OVER (PARTITION BY {key_column_partition_sql} ORDER BY ts_ms DESC, offset DESC) as rank
FROM (
SELECT ts_ms, offset, operation_type, before_record, after_record, {key_column_include_sql}
FROM ( SELECT get_json_object(string(value), '$.payload.op') as operation_type, get_json_object(string(value), '$.payload.before') as before_record,
get_json_object(string(value), '$.payload.after') as after_record, get_json_object(string(value), '$.payload.ts_ms') as tsms,
offset
FROM incremental{dbname}{table_name}
) binlog
) binlog_wo_init ) binlog_rank where rank = 1) as source
ON {key_column_condition_sql}
WHEN MATCHED AND (source.operation_type = 'u' or source.operation_type='d') THEN
UPDATE SET {set_key_column_sql}, ts_ms=source.ts_ms, json_record=source.after_record, operation_type=source.operation_type, offset=source.offset
WHEN NOT MATCHED AND (source.operation_type='c' or source.operation_type='u' or source.operation_type='d') THEN
INSERT ({inser_key_column_sql}, ts_ms, json_record, operation_type, offset) values ({insert_key_column_value_sql}, source.ts_ms, source.after_record, source.operation_type, source.offset);
執(zhí)行完 StreamingSQL 之后,就會(huì)生成如下格式的數(shù)據(jù):

其中 part-xxxx.snappy.parquet 保存的是 DeltaLake 表的數(shù)據(jù)文件,而 _deltalog 目錄下保存的是 DeltaLake 表的元數(shù)據(jù),包括如下:
其中 xxxxxxxx 表示的是版本信息,xxxxxxxx.json 文件里保存的是有效的 parquet 文件信息,其中 add 類型的為有效的 parquet 文件, remove 為無效的 parquet 文件。
Delta Lake 是支持 Time travel 的,但是我們 CDC 數(shù)據(jù)接入的話,用不到數(shù)據(jù)回滾策略,如果多版本的數(shù)據(jù)一直保留會(huì)給我們的存儲(chǔ)帶來一定的影響,所以我們要定期刪除過期版本的數(shù)據(jù),目前是僅保留2個(gè)小時(shí)內(nèi)的版本數(shù)據(jù)。同時(shí),Delta Lake 不支持自動(dòng)合并小文件的功能,所以我們還需要定期合并小文件。目前我們的做法是,每小時(shí)通過 OPTIMIZE 和 VACCUM 來做一次合并小文件操作及清理過期數(shù)據(jù)文件操作:
optimize delta{dbname}{tablename};
set spark.databricks.delta.retentionDurationCheck.enabled = false;
VACUUM delta{dbname}{table_name} RETAIN 1 HOURS;
由于目前 Hive 和 Presto 無法直接讀取 Spark SQL 創(chuàng)建的 Delta Lake 表,但是監(jiān)控及近實(shí)時(shí)查詢需求,需要查詢 Delta Lake 表,所以我們還創(chuàng)建了用于 Hive 和 Presto 表查詢的。
Delta Lake 數(shù)據(jù)與存量數(shù)據(jù) Merge
由于 Delta Lake 的數(shù)據(jù)我們僅接入新數(shù)據(jù),對(duì)于存量歷史數(shù)據(jù)我們是通過DataX 一次性導(dǎo)入的,加上 Delta Lake 表 Hive 無法直接查詢,所以每日凌晨我們需要對(duì)這兩部分?jǐn)?shù)據(jù)做一次 merge 操作,寫入到新的表中便于 Spark SQL 和 Hive 統(tǒng)一使用。這一模塊的架構(gòu)大致如下:
圖片

每日凌晨0點(diǎn)前,調(diào)用 DeltaService API ,根據(jù) Delta Lake 任務(wù)的配置自動(dòng)生成 merge任務(wù) 的 task 信息、spark-sql 腳本及 對(duì)應(yīng)的 Airflow DAG 文件。
merge 任務(wù)的 task 信息主要包括如下信息:

自動(dòng)生成 Merge 腳本,主要是從 Delta Lake 任務(wù)的配置中獲取 mysql 表的schema 信息,刪掉歷史的 Hive 表,再根據(jù) schema 信息重新創(chuàng)建 Hive 外部表,再根據(jù)新的 schema 從Delta Lake表的 json_record 字段和歷史存量數(shù)據(jù)表中獲取對(duì)應(yīng)的字段值做 union all 操作,缺失值采用mysql 的默認(rèn)值, union 之后,再根據(jù) row_key 進(jìn)行分組,按 ts_ms 排序取第一條,同時(shí)取出operation_type=’d’ 的數(shù)據(jù)。整體如下:
CREATE DATABASE IF NOT EXISTS {db_name} LOCATION '------/delta/{db_name}.db';
DROP TABLE IF EXISTS {db_name}.{table_name};
CREATE TABLE IF NOT EXISTS {db_name}.{table_name}(
{table_column_infos}
)
STORED AS PARQUET
LOCATION '------/delta/{db_name}.db/{table_name}/data_date=${{data_date}}';
INSERT OVERWRITE TABLE {db_name}.{table_name}
SELECT {table_columns}
FROM ( SELECT {table_columns}, _operation_type, row_number() OVER (PARTITION BY {row_keys} ORDER BY ts_ms DESC) as ranknum
FROM (
SELECT {delta_columns}, operation_type as _operation_type, tsms
FROM delta{dbname}{table_name}
UNION ALL
SELECT {hive_columns}, 'c' as _operation_type, 0 as ts_ms
FROM {db_name}.{table_name}_delta_history
) union_rank
) ranked_data
WHERE ranknum=1
AND _operation_type <> 'd'
凌晨0點(diǎn)之后,Airflow 會(huì)根據(jù) Airflow DAG 文件自動(dòng)調(diào)度執(zhí)行 merge 的Spark SQL 腳本,腳本執(zhí)行成功后,更新 merge task 的狀態(tài)為 succeed ,Airflow 的 ETL DAG 會(huì)根據(jù)merge task 的狀態(tài)自動(dòng)調(diào)度下游的 ETL 任務(wù)。
Delta Lake 數(shù)據(jù)監(jiān)控
對(duì)于 Delta Lake 數(shù)據(jù)的監(jiān)控,我們主要是為了兩個(gè)目的:監(jiān)控?cái)?shù)據(jù)是否延遲及監(jiān)控?cái)?shù)據(jù)是否丟失,主要是在 MySQL 與 Delta Lake 表之間及 CDC 接入過來的 Kafka Topic 與 Delta Lake 表之間。
CDC 接入過來的 Kafka Topic 和 Delta Lake 表之間的延遲監(jiān)控:我們是每15分鐘從 Kafka 的 Topic 中獲取每個(gè) Partition 的最大 offset 對(duì)應(yīng)的 mysql 的 row_key 字段內(nèi)容,放入監(jiān)控的 MySQL 表 delta_kafka_monitor_info 中,再從 delta_kafka_monitor_info 中獲取上一周期的 row_key 字段內(nèi)容,到 Delta Lake 表中查詢,如果查詢不到,說明數(shù)據(jù)有延遲或者丟失,發(fā)出告警。
MySQL 與 Delta Lake 之間的監(jiān)控:我們有兩種,一種是探針方案,每15分鐘,從 MySQL 中獲取最大的 id,對(duì)于分庫分表,只監(jiān)控一張表的,存入 delta_mysql_monitor_info 中,再從 delta_mysql_monitor_info 中獲取上一周期的最大 id,到 Delta Lake 表中查詢,如果查詢不到,說明數(shù)據(jù)有延遲或者丟失,發(fā)出告警。另一種是直接 count(id),這種方案又分為單庫單表和分庫分表兩種,元數(shù)據(jù)保存在 mysql 表 id_based_mysql_delta_monitor_info 中,主要包含 min_id、max_id、mysql_count 三個(gè)字段,對(duì)于單庫單表,也是每隔5分鐘,從 Delta Lake 表中獲取 min_id 和 max_id 之間的 count 值,跟 mysql_count 對(duì)比,如果小于 mysql_count 值說明有數(shù)據(jù)丟失或者延遲,發(fā)出告警。再從 mysql 中獲取 max(id) 和 max_id 與 max(id) 之間的 count 值,更新到 id_based_mysql_delta_monitor_info 表中。對(duì)于分庫分表的情況,根據(jù)分庫分表規(guī)則,生成每一張表對(duì)應(yīng)的 id_based_mysql_delta_monitor_info 信息,每半小時(shí)執(zhí)行一遍監(jiān)控,規(guī)則同單庫單表。

遇到的挑戰(zhàn)

業(yè)務(wù)表 schema 變更頻繁,Delta Lake 表如果直接解析 CDC 的字段信息的話,如果不能及時(shí)發(fā)現(xiàn)并修復(fù)數(shù)據(jù)的話,后期修復(fù)數(shù)據(jù)的成本會(huì)較大,目前我們是不解析字段,等到凌晨 merge 的時(shí)候再解析。
隨著數(shù)據(jù)量越來越大,StreamingSQL 任務(wù)的性能會(huì)越來越差。我們目前是 StreamingSQL 處理延遲,出現(xiàn)大量延遲告警后,將 Delta Lake 存量數(shù)據(jù)替換成昨日 merge 后的數(shù)據(jù),再刪掉 Delta Lake 表,刪除 checkpoint 數(shù)據(jù),從頭開始消費(fèi) KafkaSource 表的數(shù)據(jù)。降低 Delta Lake 表數(shù)據(jù),從而緩解StreamingSQL 的壓力。
Hive 和 Presto 不能直接查詢 Spark SQL 創(chuàng)建的 Delta Lake 表,目前我們是創(chuàng)建支持 Hive 和 Presto 查詢的外部表來供 Hive 和 Presto 使用,但是這些表又無法通過 Spark SQL 查詢。所以上層 ETL 應(yīng)用無法在不更改代碼的情況下,在 Hive 和 Spark SQL 及Presto 引擎之間自由切換。

帶來的收益

節(jié)省了 DB 從庫的成本,采用 CDC + Delta Lake 之后,我們的成本節(jié)省了近80%。
凌晨 DB 數(shù)據(jù)接入的時(shí)間成本大大降低,能夠確保所有非特殊要求的 DB 數(shù)據(jù)接入都能在1個(gè)小時(shí)內(nèi)跑完。

后續(xù)規(guī)劃

StreamingSQL 任務(wù)隨著 Delta Lake 表數(shù)據(jù)量越來越大,性能越來越差問題跟進(jìn)。
推動(dòng)能否解決 Spark SQL 創(chuàng)建的 Delta Lake 表,無法直接使用 Hive 和 Presto 查詢的問題。

后續(xù)規(guī)劃

StreamingSQL 任務(wù)隨著 Delta Lake 表數(shù)據(jù)量越來越大,性能越來越差問題跟進(jìn)。
推動(dòng)能否解決 Spark SQL 創(chuàng)建的 Delta Lake 表,無法直接使用 Hive 和 Presto 查詢的問題。

原文鏈接?

總結(jié)

以上是生活随笔為你收集整理的阿里云 EMR Delta Lake 在流利说数据接入中的架构和实践的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網(wǎng)站內(nèi)容還不錯(cuò),歡迎將生活随笔推薦給好友。