日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當(dāng)前位置: 首頁 > 编程资源 > 综合教程 >内容正文

综合教程

基于Apache Hudi和Debezium构建CDC入湖管道

發(fā)布時(shí)間:2023/10/11 综合教程 102 老码农
生活随笔 收集整理的這篇文章主要介紹了 基于Apache Hudi和Debezium构建CDC入湖管道 小編覺得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.

從 Hudi v0.10.0 開始,我們很高興地宣布推出適用于 DeltastreamerDebezium 源,它提供從 Postgres 和 MySQL 數(shù)據(jù)庫到數(shù)據(jù)湖的變更捕獲數(shù)據(jù) (CDC) 的攝取。有關(guān)詳細(xì)信息請參閱原始 RFC

1. 背景

當(dāng)想要對來自事務(wù)數(shù)據(jù)庫(如 Postgres 或 MySQL)的數(shù)據(jù)執(zhí)行分析時(shí),通常需要通過稱為更改數(shù)據(jù)捕獲 CDC的過程將此數(shù)據(jù)引入數(shù)據(jù)倉庫或數(shù)據(jù)湖等 OLAP 系統(tǒng)。 Debezium 是一種流行的工具,它使 CDC 變得簡單,其提供了一種通過讀取更改日志來捕獲數(shù)據(jù)庫中行級更改的方法,通過這種方式 Debezium 可以避免增加數(shù)據(jù)庫上的 CPU 負(fù)載,并確保捕獲包括刪除在內(nèi)的所有變更。

現(xiàn)在 Apache Hudi 提供了 Debezium 源連接器,CDC 引入數(shù)據(jù)湖比以往任何時(shí)候都更容易,因?yàn)樗哂幸恍?a rel="external nofollow noreferrer">獨(dú)特的差異化功能。 Hudi 可在數(shù)據(jù)湖上實(shí)現(xiàn)高效的更新、合并和刪除事務(wù)。 Hudi 獨(dú)特地提供了 Merge-On-Read 寫入器,與使用 Spark 或 Flink 的典型數(shù)據(jù)湖寫入器相比,該寫入器可以顯著降低攝取延遲。 最后,Apache Hudi 提供增量查詢,因此在從數(shù)據(jù)庫中捕獲更改后可以在所有后續(xù) ETL 管道中以增量方式處理這些更改下游。

2. 總體設(shè)計(jì)

上面顯示了使用 Apache Hudi 的端到端 CDC 攝取流的架構(gòu),第一個(gè)組件是 Debezium 部署,它由 Kafka 集群、schema registry(Confluent 或 Apicurio)和 Debezium 連接器組成,Debezium 連接器不斷輪詢數(shù)據(jù)庫中的更改日志,并將每個(gè)數(shù)據(jù)庫行的更改寫入 AVRO 消息到每個(gè)表的專用 Kafka 主題。

第二個(gè)組件是 Hudi Deltastreamer,它為每個(gè)表從 Kafka 讀取和處理傳入的 Debezium 記錄,并在云存儲(chǔ)上的 Hudi 表中寫入(更新)相應(yīng)的行。

為了近乎實(shí)時(shí)地將數(shù)據(jù)庫表中的數(shù)據(jù)提取到 Hudi 表中,我們實(shí)現(xiàn)了兩個(gè)可插拔的 Deltastreamer 類。首先我們實(shí)現(xiàn)了一個(gè) Debezium 源。 Deltastreamer 在連續(xù)模式下運(yùn)行,源源不斷地從給定表的 Kafka 主題中讀取和處理 Avro 格式的 Debezium 更改記錄,并將更新的記錄寫入目標(biāo) Hudi 表。 除了數(shù)據(jù)庫表中的列之外,我們還攝取了一些由 Debezium 添加到目標(biāo) Hudi 表中的元字段,元字段幫助我們正確地合并更新和刪除記錄,使用Schema Registry表中的最新模式讀取記錄。

其次我們實(shí)現(xiàn)了一個(gè)自定義的 Debezium Payload,它控制了在更新或刪除同一行時(shí)如何合并 Hudi 記錄,當(dāng)接收到現(xiàn)有行的新 Hudi 記錄時(shí),有效負(fù)載使用相應(yīng)列的較高值(MySQL 中的 FILEID 和 POS 字段以及 Postgres 中的 LSN 字段)選擇最新記錄,在后一個(gè)事件是刪除記錄的情況下,有效負(fù)載實(shí)現(xiàn)確保從存儲(chǔ)中硬刪除記錄。 刪除記錄使用 op 字段標(biāo)識(shí),該字段的值 d 表示刪除。

3. Apache Hudi配置

在使用 Debezium 源連接器進(jìn)行 CDC 攝取時(shí),請務(wù)必考慮以下 Hudi 部署配置。

  • 記錄鍵 - 表的 Hudi 記錄鍵應(yīng)設(shè)置為上游數(shù)據(jù)庫中表的主鍵。這可確保正確應(yīng)用更新,因?yàn)橛涗涙I唯一地標(biāo)識(shí) Hudi 表中的一行。
  • 源排序字段 - 對于更改日志記錄的重復(fù)數(shù)據(jù)刪除,源排序字段應(yīng)設(shè)置為數(shù)據(jù)庫上發(fā)生的更改事件的實(shí)際位置。 例如我們分別使用 MySQL 中的 FILEID 和 POS 字段以及 Postgres 數(shù)據(jù)庫中的 LSN 字段來確保記錄在原始數(shù)據(jù)庫中以正確的出現(xiàn)順序進(jìn)行處理。
  • 分區(qū)字段 - 不要將 Hudi 表的分區(qū)與與上游數(shù)據(jù)庫相同的分區(qū)字段相匹配。當(dāng)然也可以根據(jù)需要為 Hudi 表單獨(dú)設(shè)置分區(qū)字段。

3.1 引導(dǎo)現(xiàn)有表

一個(gè)重要的用例可能是必須對現(xiàn)有數(shù)據(jù)庫表進(jìn)行 CDC 攝取。在流式傳輸更改之前我們可以通過兩種方式獲取現(xiàn)有數(shù)據(jù)庫數(shù)據(jù):

  • 默認(rèn)情況下,Debezium 在初始化時(shí)執(zhí)行數(shù)據(jù)庫的初始一致快照(由 config snapshot.mode 控制)。在初始快照之后它會(huì)繼續(xù)從正確的位置流式傳輸更新以避免數(shù)據(jù)丟失。
  • 雖然第一種方法很簡單,但對于大型表,Debezium 引導(dǎo)初始快照可能需要很長時(shí)間。或者我們可以運(yùn)行 Deltastreamer 作業(yè),使用 JDBC 源直接從數(shù)據(jù)庫引導(dǎo)表,這為用戶定義和執(zhí)行引導(dǎo)數(shù)據(jù)庫表所需的更優(yōu)化的 SQL 查詢提供了更大的靈活性。引導(dǎo)作業(yè)成功完成后,將執(zhí)行另一個(gè) Deltastreamer 作業(yè),處理來自 Debezium 的數(shù)據(jù)庫更改日志,用戶必須在 Deltastreamer 中使用檢查點(diǎn)來確保第二個(gè)作業(yè)從正確的位置開始處理變更日志,以避免數(shù)據(jù)丟失。

3.2 例子

以下描述了使用 AWS RDS 實(shí)例 Postgres、基于 Kubernetes 的 Debezium 部署和在 Spark 集群上運(yùn)行的 Hudi Deltastreamer 實(shí)施端到端 CDC 管道的步驟。

3.3 數(shù)據(jù)庫

RDS 實(shí)例需要進(jìn)行一些配置更改才能啟用邏輯復(fù)制。

SET rds.logical_replication to 1 (instead of 0)
psql --host=<aws_rds_instance> --port=5432 --username=postgres --password -d <database_name>;
CREATE PUBLICATION <publication_name> FOR TABLE schema1.table1, schema1.table2;
ALTER TABLE schema1.table1 REPLICA IDENTITY FULL;

3.4 Debezium 連接器

Strimzi 是在 Kubernetes 集群上部署和管理 Kafka 連接器的推薦選項(xiàng),或者可以選擇使用 Confluent 托管的 Debezium 連接器

kubectl create namespace kafka
kubectl create -f https://strimzi.io/install/latest?namespace=kafka -n kafka
kubectl -n kafka apply -f kafka-connector.yaml

kafka-connector.yaml 的示例如下所示:

apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnect
metadata:
name: debezium-kafka-connect
annotations:
strimzi.io/use-connector-resources: "false"
spec:
image: debezium-kafka-connect:latest
replicas: 1
bootstrapServers: localhost:9092
config:
config.storage.replication.factor: 1
offset.storage.replication.factor: 1
status.storage.replication.factor: 1

可以使用以下包含 Postgres Debezium 連接器的 Dockerfile 構(gòu)建 docker 映像 debezium-kafka-connect

FROM confluentinc/cp-kafka-connect:6.2.0 as cp
RUN confluent-hub install --no-prompt confluentinc/kafka-connect-avro-converter:6.2.0
FROM strimzi/kafka:0.18.0-kafka-2.5.0
USER root:root
RUN yum -y update
RUN yum -y install git
RUN yum -y install wget RUN wget https://repo1.maven.org/maven2/io/debezium/debezium-connector-postgres/1.6.1.Final/debezium-connector-postgres-1.6.1.Final-plugin.tar.gz
RUN tar xzf debezium-connector-postgres-1.6.1.Final-plugin.tar.gz RUN mkdir -p /opt/kafka/plugins/debezium && mkdir -p /opt/kafka/plugins/avro/
RUN mv debezium-connector-postgres /opt/kafka/plugins/debezium/
COPY --from=cp /usr/share/confluent-hub-components/confluentinc-kafka-connect-avro-converter/lib /opt/kafka/plugins/avro/
USER 1001

一旦部署了 Strimzi 運(yùn)算符和 Kafka 連接器,我們就可以啟動(dòng) Debezium 連接器。

curl -X POST -H "Content-Type:application/json" -d @connect-source.json http://localhost:8083/connectors/

以下是設(shè)置 Debezium 連接器以生成兩個(gè)表 table1 和 table2 的更改日志的配置示例。

connect-source.json 的內(nèi)容如下

{
"name": "postgres-debezium-connector",
"config": {
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"database.hostname": "localhost",
"database.port": "5432",
"database.user": "postgres",
"database.password": "postgres",
"database.dbname": "database",
"plugin.name": "pgoutput",
"database.server.name": "postgres",
"table.include.list": "schema1.table1,schema1.table2",
"publication.autocreate.mode": "filtered",
"tombstones.on.delete":"false",
"key.converter": "io.confluent.connect.avro.AvroConverter",
"key.converter.schema.registry.url": "<schema_registry_host>",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url": "<schema_registry_host>",
"slot.name": "pgslot"
}
}

3.5 Hudi Deltastreamer

接下來我們使用 Spark 運(yùn)行 Hudi Deltastreamer,它將從 kafka 攝取 Debezium 變更日志并將它們寫入 Hudi 表。 下面顯示了一個(gè)這樣的命令實(shí)例,它適用于 Postgres 數(shù)據(jù)庫。 幾個(gè)關(guān)鍵配置如下:

  • 將源類設(shè)置為 PostgresDebeziumSource。
  • 將有效負(fù)載類設(shè)置為 PostgresDebeziumAvroPayload。
  • 為 Debezium Source 和 Kafka Source 配置模式注冊表 URL。
  • 將記錄鍵設(shè)置為數(shù)據(jù)庫表的主鍵。
  • 將源排序字段 (dedup) 設(shè)置為 _event_lsn
spark-submit \\
--jars "/home/hadoop/hudi-utilities-bundle_2.12-0.10.0.jar,/usr/lib/spark/external/lib/spark-avro.jar" \\
--master yarn --deploy-mode client \\
--class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer /home/hadoop/hudi-packages/hudi-utilities-bundle_2.12-0.10.0-SNAPSHOT.jar \\
--table-type COPY_ON_WRITE --op UPSERT \\
--target-base-path s3://bucket_name/path/for/hudi_table1 \\
--target-table hudi_table1 --continuous \\
--min-sync-interval-seconds 60 \\
--source-class org.apache.hudi.utilities.sources.debezium.PostgresDebeziumSource \\
--source-ordering-field _event_lsn \\
--payload-class org.apache.hudi.common.model.debezium.PostgresDebeziumAvroPayload \\
--hoodie-conf schema.registry.url=https://localhost:8081 \\
--hoodie-conf hoodie.deltastreamer.schemaprovider.registry.url=https://localhost:8081/subjects/postgres.schema1.table1-value/versions/latest \\
--hoodie-conf hoodie.deltastreamer.source.kafka.value.deserializer.class=io.confluent.kafka.serializers.KafkaAvroDeserializer \\
--hoodie-conf hoodie.deltastreamer.source.kafka.topic=postgres.schema1.table1 \\
--hoodie-conf auto.offset.reset=earliest \\
--hoodie-conf hoodie.datasource.write.recordkey.field=”database_primary_key” \\
--hoodie-conf hoodie.datasource.write.partitionpath.field=partition_key \\
--enable-hive-sync \\
--hoodie-conf hoodie.datasource.hive_sync.partition_extractor_class=org.apache.hudi.hive.MultiPartKeysValueExtractor \\
--hoodie-conf hoodie.datasource.write.hive_style_partitioning=true \\
--hoodie-conf hoodie.datasource.hive_sync.database=default \\
--hoodie-conf hoodie.datasource.hive_sync.table=hudi_table1 \\
--hoodie-conf hoodie.datasource.hive_sync.partition_fields=partition_key

4. 總結(jié)

這篇文章介紹了用于 Hudi Deltastreamer 的 Debezium 源,以將 Debezium 更改日志提取到 Hudi 表中。 現(xiàn)在可以將數(shù)據(jù)庫數(shù)據(jù)提取到數(shù)據(jù)湖中,以提供一種經(jīng)濟(jì)高效的方式來存儲(chǔ)和分析數(shù)據(jù)庫數(shù)據(jù)。

請關(guān)注此 JIRA 以了解有關(guān)此新功能的更多信息。

總結(jié)

以上是生活随笔為你收集整理的基于Apache Hudi和Debezium构建CDC入湖管道的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網(wǎng)站內(nèi)容還不錯(cuò),歡迎將生活随笔推薦給好友。