八、Doris外部表及数据导入
DorisDB提供了多種導入方式,用戶可以根據數據量大小、導入頻率等要求選擇最適合自己業務需求的導入方式。
數據導入:
1、離線數據導入:如果數據源是Hive/HDFS,推薦采用Broker Load導入, 如果數據表很多導入比較麻煩可以考慮使用Hive外表直連查詢,性能會比Broker load導入效果差,但是可以避免數據搬遷,如果單表的數據量特別大,或者需要做全局數據字典來精確去重可以考慮Spark Load導入。
2、實時數據導入:日志數據和業務數據庫的binlog同步到Kafka以后,優先推薦通過Routine load 導入DorisDB,如果導入過程中有復雜的多表關聯和ETL預處理可以使用Flink處理以后用stream load寫入DorisDB
3、程序寫入DorisDB:推薦使用Stream Load
4、Mysql數據導入:推薦使用Mysql外表,insert into new_table select * from external_table 的方式導入
5、DorisDB內部導入:可以在DorisDB內部使用 insert into tablename select 的方式導入,可以跟外部調度器配合實現簡單的ETL處理
6、其他數據源導入:推薦使用 DataX 導入
外部表:
DorisDB支持以外部表的形式,接入其他數據源。
外部表指的是保存在其他數據源中的數據表。
目前DorisDB已支持的第三方數據源包括 MySQL、HDFS、ElasticSearch,Hive。
重點目錄:
1.1Broker Load
1.2Spark Load
1.3Stream Load
1.4Routine Load
2.1MySQL外部表
2.2ElasticSearch外部表
1.1 Broker Load
在Broker Load模式下,通過部署的 Broker 程序,DorisDB可讀取對應數據源(如HDFS, S3、阿里云 OSS、騰訊 COS)上的數據,利用自身的計算資源對數據進行預處理和導入。這是一種異步的導入方式,用戶需要通過MySQL協議創建導入,并通過查看導入命令檢查導入結果。
1、名詞解釋
Broker:Broker 為一個獨立的無狀態進程,封裝了文件系統接口,為 DorisDB 提供讀取遠端存儲系統中文件的能力。
Plan:導入執行計劃,BE會執行導入執行計劃將數據導入到DorisDB系統中。
2、語法:
LOAD LABEL db_name.label_name
(data_desc, ...)
WITH BROKER broker_name broker_properties
[PROPERTIES (key1=value1, ... )]
data_desc:
DATA INFILE ('file_path', ...)
[NEGATIVE]
INTO TABLE tbl_name
[PARTITION (p1, p2)]
[COLUMNS TERMINATED BY column_separator ]
[FORMAT AS file_type]
[(col1, ...)]
[SET (k1=f1(xx), k2=f2(xx))]
[WHERE predicate]
broker_properties:
(key2=value2, ...)
說明:
1)、Label:導入任務的標識。每個導入任務,都有一個數據庫內部唯一的Label。Label是用戶在導入命令中自定義的名稱。
通過這個Label,用戶可以查看對應導入任務的執行情況,并且Label可以用來防止用戶導入相同的數據。
當導入任務狀態為FINISHED時,對應的Label就不能再次使用了。
當 Label 對應的導入任務狀態為CANCELLED時,可以再次使用該Label提交導入作業。
2)、data_desc:每組 data_desc表述了本次導入涉及到的數據源地址,ETL 函數,目標表及分區等信息。
示例 :參見:https://www.kancloud.cn/dorisdb/dorisdb/2146000
1.2Spark Load
Spark Load 通過外部的 Spark 資源實現對導入數據的預處理,提高 DorisDB 大數據量的導入性能并且節省 Doris 集群的計算資源。主要用于初次遷移、大數據量導入 DorisDB 的場景(數據量可到TB級別)
1、基本原理
Spark Load 任務的執行主要分為以下幾個階段:
1、用戶向 FE 提交 Spark Load 任務;
2、FE 調度提交 ETL 任務到 Spark 集群執行。
3、Spark 集群執行 ETL 完成對導入數據的預處理。包括全局字典構建(BITMAP類型)、分區、排序、聚合等。
4、ETL 任務完成后,FE 獲取預處理過的每個分片的數據路徑,并調度相關的 BE 執行 Push 任務。
5、BE 通過 Broker 讀取數據,轉化為 DorisDB 存儲格式。
6、FE 調度生效版本,完成導入任務。
2、預處理流程:
1、從數據源讀取數據,上游數據源可以是HDFS文件,也可以是Hive表。
2、對讀取到的數據完成字段映射、表達式計算,并根據分區信息生成分桶字段bucket_id。
3、根據DorisDB表的Rollup元數據生成RollupTree。
4、遍歷RollupTree,進行分層的聚合操作,下一個層級的Rollup可以由上一個層的Rollup計算得來。
5、每次完成聚合計算后,會對數據根據bucket_id進行分桶然后寫入HDFS中。
6、后續Broker會拉取HDFS中的文件然后導入DorisDB BE節點中。
3、基本操作參見:https://www.kancloud.cn/dorisdb/dorisdb/2146001
1.3Stream Load
Stream Load 是一種同步的導入方式,用戶通過發送 HTTP 請求將本地文件或數據流導入到 DorisDB 中。Stream Load 同步執行導入并返回導入結果。用戶可直接通過請求的返回值判斷導入是否成功。
1、主要流程:
說明:
Stream Load 中,用戶通過HTTP協議提交導入命令。
如果提交到FE節點,則FE節點會通過HTTP redirect指令將請求轉發給某一個BE節點,用戶也可以直接提交導入命令給某一指定BE節點。
該BE節點作為Coordinator節點,將數據按表schema劃分并分發數據到相關的BE節點。
導入的最終結果由 Coordinator節點返回給用戶。
2、基本操作參見:https://www.kancloud.cn/dorisdb/dorisdb/2146002
1.4Routine Load
Routine Load 是一種例行導入方式,DorisDB通過這種方式支持從Kafka持續不斷的導入數據,并且支持通過SQL控制導入任務的暫停、重啟、停止
1、基本原理
導入流程說明:
1、用戶通過支持MySQL協議的客戶端向 FE 提交一個Kafka導入任務。
2、FE將一個導入任務拆分成若干個Task,每個Task負責導入指定的一部分數據。
3、每個Task被分配到指定的 BE 上執行。在 BE 上,一個 Task 被視為一個普通的導入任務,通過 Stream Load 的導入機制進行導入。
4、BE導入完成后,向 FE 匯報。
5、FE 根據匯報結果,繼續生成后續新的 Task,或者對失敗的 Task 進行重試。
6、FE 會不斷的產生新的 Task,來完成數據不間斷的導入。
2、基本操作參見:https://www.kancloud.cn/dorisdb/dorisdb/2146003
2.1MySQL外部表
星型模型中,數據一般劃分為維度表和事實表。維度表數據量少,但會涉及UPDATE操作。目前DorisDB中還不直接支持UPDATE操作(可以通過Unique數據模型實現),在一些場景下,可以把維度表存儲在MySQL中,查詢時直接讀取維度表。
在使用MySQL的數據之前,需在DorisDB創建外部表,與之相映射。DorisDB中創建MySQL外部表時需要指定MySQL的相關連接信息,如下圖。
2.2ElasticSearch外部表
DorisDB與ElasticSearch都是目前流行的分析系統,DorisDB強于大規模分布式計算,ElasticSearch擅長全文檢索。DorisDB支持ElasticSearch訪問的目的,就在于將這兩種能力結合,提供更完善的一個OLAP解決方案。
1、建表示例
2、謂詞下推
DorisDB支持對ElasticSearch表進行謂詞下推,把過濾條件推給ElasticSearch進行執行,讓執行盡量靠近存儲,提高查詢性能。目前支持哪個下推的算子如下表。
3、詳細說明參見:https://www.kancloud.cn/dorisdb/dorisdb/2146013
參考資料:
https://www.kancloud.cn/dorisdb/dorisdb/2145999
總結
以上是生活随笔為你收集整理的八、Doris外部表及数据导入的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 招隐-古琴曲-山中鸣琴,万籁声沉沉,何泠
- 下一篇: 机器学习