日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當(dāng)前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

SeaTunnel

發(fā)布時間:2023/12/10 编程问答 51 豆豆
生活随笔 收集整理的這篇文章主要介紹了 SeaTunnel 小編覺得挺不錯的,現(xiàn)在分享給大家,幫大家做個參考.

SeaTunnel

版本:V1.0 第 1 章 Seatunnel 概述
1.1 SeaTunnel 是什么
SeaTunnel 是一個簡單易用的數(shù)據(jù)集成框架,在企業(yè)中,由于開發(fā)時間或開發(fā)部門不通
用,往往有多個異構(gòu)的、運行在不同的軟硬件平臺上的信息系統(tǒng)同時運行。數(shù)據(jù)集成是把
不同來源、格式、特點性質(zhì)的數(shù)據(jù)在邏輯上或物理上有機地集中,從而為企業(yè)提供全面的
數(shù)據(jù)共享。SeaTunnel 支持海量數(shù)據(jù)的實時同步。它每天可以穩(wěn)定高效地同步數(shù)百億數(shù)據(jù)。
并已用于近 100 家公司的生產(chǎn)。
SeaTunnel的前身是 Waterdrop(中文名:水滴)自 2021 年 10 月 12日更名為 SeaTunnel。 2021 年 12 月 9 日,SeaTunnel 正式通過 Apache 軟件基金會的投票決議,以全票通過的優(yōu)秀
表現(xiàn)正式成為 Apache 孵化器項目。2022 年 3 月 18 日社區(qū)正式發(fā)布了首個 Apache 版本
v2.1.0。 1.2 SeaTunnel 在做什么
本質(zhì)上,SeaTunnel 不是對 Saprk 和 Flink 的內(nèi)部修改,而是在 Spark 和 Flink 的基礎(chǔ)上
做了一層包裝。它主要運用了控制反轉(zhuǎn)的設(shè)計模式,這也是 SeaTunnel 實現(xiàn)的基本思想。
SeaTunnel 的日常使用,就是編輯配置文件。編輯好的配置文件由 SeaTunnel 轉(zhuǎn)換為具
體的 Spark 或 Flink 任務(wù)。如圖所示。
更多 Java –大數(shù)據(jù) –前端 –python 人工智能資料下載,可百度訪問:尚硅谷官網(wǎng)
尚硅谷大數(shù)據(jù)技術(shù)之 SeaTunnel
—————————————————————————————
更多 Java –大數(shù)據(jù) –前端 –python 人工智能資料下載,可百度訪問:尚硅谷官網(wǎng)
1.3 SeaTunnel 的應(yīng)用場景
SeaTunnel 適用于以下場景 SeaTunnel 的特點
? 海量數(shù)據(jù)的同步
? 海量數(shù)據(jù)的集成
? 海量數(shù)據(jù)的 ETL
? 海量數(shù)據(jù)聚合
? 多源數(shù)據(jù)處理
? 基于配置的低代碼開發(fā),易用性高,方便維護。
? 支持實時流式傳輸
? 離線多源數(shù)據(jù)分析
? 高性能、海量數(shù)據(jù)處理能力
? 模塊化的插件架構(gòu),易于擴展
? 支持用 SQL 進行數(shù)據(jù)操作和數(shù)據(jù)聚合
? 支持 Spark structured streaming
? 支持 Spark 2.x
目前 SeaTunnel 的長板是他有豐富的連接器,又因為它以 Spark 和 Flink 為引擎。所以
可以很好地進行分布式的海量數(shù)據(jù)同步。通常SeaTunnel會被用來做出倉入倉工具,或者被
用來進行數(shù)據(jù)集成。比如,唯品會就選擇用 SeaTunnel來解決數(shù)據(jù)孤島問題,讓 ClcikHouse
集成到了企業(yè)中先前的數(shù)據(jù)系統(tǒng)之中。如圖所示:
下圖是 SeaTunnel 的工作流程:
尚硅谷大數(shù)據(jù)技術(shù)之 SeaTunnel
—————————————————————————————
更多 Java –大數(shù)據(jù) –前端 –python 人工智能資料下載,可百度訪問:尚硅谷官網(wǎng)
1.5 SeaTunnel 目前的插件支持
1.5.1 Spark 連接器插件(Source)
Spark 連接器插件 數(shù)據(jù)庫類型 Source Sink
Batch Fake √
ElasticSearch √ √ File √ √ Hive √ √ Hudi √ √ Jdbc √ √ MongoDB √ √ Neo4j √ Phoenix √ √
Redis √ √ Tidb √ √
Clickhouse √
Doris √ Email √ Hbase √ √ Kafka √ Console √ Kudu √ √
Redis √ √
Stream FakeStream √ KafkaStream √
SocketSTream` √ 1.5.2 Flink 連接器插件(Source)
Flink 連接器插件 數(shù)據(jù)庫類型 Source Sink
Druid √ √ Fake √ File √ √ InfluxDb √ √
Jdbc √ √
Kafka √ √ Socket √ Console √ Doris √ ElasticSearch √ 1.5.3 Spark & Flink 轉(zhuǎn)換插件
轉(zhuǎn)換插件 Spark Flink
Add
CheckSum
Convert
Date
Drop
尚硅谷大數(shù)據(jù)技術(shù)之 SeaTunnel
—————————————————————————————
更多 Java –大數(shù)據(jù) –前端 –python 人工智能資料下載,可百度訪問:尚硅谷官網(wǎng)
Grok
Json √
Kv
Lowercase
Remove
Rename
Repartition
Replace
Sample
Split √ √
Sql √ √
Table
Truncate
Uppercase
Uuid
這部分內(nèi)容來官網(wǎng),可以看出社區(qū)目前規(guī)劃了大量的插件,但截至 V2.1.0 可用的
transform 插件的數(shù)量還是很少的。同學(xué)們有興趣也可以在業(yè)余時間嘗試參與開源貢獻。
官方網(wǎng)址:https://seatunnel.apache.org/zh-CN/
第 2 章 Seatunnel 安裝和使用
注意 v2.1.0中有少量 bug,要想一次性跑通所有示例程序,需使用我們自己編譯的包,
可以在資料包里獲取。具體如何修改源碼,可以參考文檔第 5 章。
2.1 SeaTunnel 的環(huán)境依賴
截至 SeaTunnel V2.1.0。
SeaTunnel 支持 Spark 2.x(尚不支持 Spark 3.x)。支持 Flink 1.9.0 及其以上的版本。
Java 版本需要>=1.8
我們演示時使用的是 flink 版本是 1.13.0
2.2 SeaTunnel 的下載和安裝
1)使用 wget 下載 SeaTunnel,使用-O 參數(shù)將文件命名為 seatunnel-2.1.0.tar.gz
wget
https://downloads.apache.org/incubator/seatunnel/2.1.0/apacheseatunnel-incubating-2.1.0-bin.tar.gz -O seatunnel-2.1.0.tar.gz2 2)解壓下載好的 tar.gz 包
tar -zxvf seatunnel-2.1.0.tar.gz -C /opt/module/
3)查看解壓的目標(biāo)路徑,apache-seatunnel-incubating-2.1.0 的目錄就是我們已經(jīng)安裝好的
seatunnel。Incubating 的意思是孵化中。
2.3 SeaTunnel 的依賴環(huán)境配置
尚硅谷大數(shù)據(jù)技術(shù)之 SeaTunnel
—————————————————————————————
在 config/目錄中有一個 seatunnel-env.sh 腳本。我們可以看一下里面的內(nèi)容。
這個腳本中聲明了 SPARK_HOME 和 FLINK_HOME 兩個路徑。默認情況下 seatunnelenv.sh 中的 SPARK_HOME 和 FLINK_HOME 就是系統(tǒng)環(huán)境變量中的 SPARK_HOME 和 FLINK_HOME。 在 shell 腳本中:-的意思是如果:-前的內(nèi)容為空,則替換為后面的。
例如,環(huán)境變量中沒有 FLINK_HOME。那么 SeaTunnel 運行時會將 FLINK_HOME 設(shè) 為/opt/flink。
如果你機器上的環(huán)境變量 SPARK_HOME指向了 3.x的一個版本。但是想用 2.x的 Spark
來試一下 SeaTunnel。這種情況下,如果你不想改環(huán)境變量,那就直接在 seatunnel-env.sh 中 將 2.x 的路徑賦值給 SPARK_HOME 即可。
比如: 2.4 示例 1: SeaTunnel 快速開始
我們先跑一個官方的 flink 案例。來了解它的基本使用。 1)選擇任意路徑,創(chuàng)建一個文件。這里我們選擇在 SeaTunnel 的 config 路徑下創(chuàng)建一個
example01.conf
[atguigu@hadoop102 config]$ vim example01.conf
2)在文件中編輯如下內(nèi)容

配置 Spark 或 Flink 的參數(shù)

env {

You can set flink configuration here

execution.parallelism = 1
#execution.checkpoint.interval = 10000
#execution.checkpoint.data-uri = “hdfs://hadoop102:9092/checkpoint”
}
更多 Java –大數(shù)據(jù) –前端 –python 人工智能資料下載,可百度訪問:尚硅谷官網(wǎng)
尚硅谷大數(shù)據(jù)技術(shù)之 SeaTunnel
—————————————————————————————

在 source 所屬的塊中配置數(shù)據(jù)源

source {
SocketStream{
host = hadoop102
result_table_name = “fake”
field_name = “info”
} }

在 transform 的塊中聲明轉(zhuǎn)換插件

transform {
Split{
separator = “#”
fields = [“name”,“age”]
}
sql {
sql = “select info, split(info) as info_row from fake”
} }

在 sink 塊中聲明要輸出到哪

sink {
ConsoleSink {}
}3)開啟 flink 集群
[atguigu@hadoop102 flink-1.11.6]$ bin/start-cluster.sh
4)開啟一個 netcat 服務(wù)來發(fā)送數(shù)據(jù)
[atguigu@hadoop102 ~]$ nc -lk 9999
5)使用 SeaTunnel 來提交任務(wù)。
在 bin 目錄下有以下內(nèi)容
start-seatunnel-flink.sh是用來提交flink任務(wù)的。start-seatunnel-spark.sh是用來提交Spark
任務(wù)的。這里我們用 flink 演示。所以使用 start-seatunnel-flink.sh。 用–config 參數(shù)指定我們的應(yīng)用配置文件。
bin/start-seatunnel-flink.sh --config config/example01.sh
等待彈出 Job 已經(jīng)提交的提示
6)在 netcat 上發(fā)送數(shù)據(jù)
7)在 Flink webUI 上查看輸出結(jié)果。
更多 Java –大數(shù)據(jù) –前端 –python 人工智能資料下載,可百度訪問:尚硅谷官網(wǎng)
尚硅谷大數(shù)據(jù)技術(shù)之 SeaTunnel
—————————————————————————————
更多 Java –大數(shù)據(jù) –前端 –python 人工智能資料下載,可百度訪問:尚硅谷官網(wǎng)
8)小結(jié)
至此,我們已經(jīng)跑完了一個官方案例。它以 Socket 為數(shù)據(jù)源。經(jīng)過 SQL 的處理,最終
輸出到控制臺。在這個過程中,我們并沒有編寫具體的 flink代碼,也沒有手動去打 jar包。
我們只是將數(shù)據(jù)的處理流程聲明在了一個配置文件中。
在背后,是 SeaTunnel 幫我們把配置文件翻譯為具體的 flink 任務(wù)。配置化,低代碼,
易維護是 SeaTunnel 最顯著的特點。
第 3 章 SeaTunnel 基本原理
3.1 SeaTunnel 的啟動腳本
3.1.1 啟動腳本的參數(shù)
截至目前,SeaTunnel 有兩個啟動腳本。
提交 spark 任務(wù)用 start-seatunnel-spark.sh。
提交 flink 任務(wù)則用 start-seatunnel-flink.sh。
本文檔主要是結(jié)合 flink 來使用 seatunnel 的,所以用 start-seatunnel-flink.sh 來講解。
start-seatunnle-flink.sh 可以指定 3 個參數(shù)
分別是:
–config 應(yīng)用配置的路徑
–variable 應(yīng)用配置里的變量賦值
–check 檢查 config 語法是否合法
尚硅谷大數(shù)據(jù)技術(shù)之 SeaTunnel
—————————————————————————————
3.1.2 --check 參數(shù)
截至本文檔撰寫時的 SeaTunnel 版本 v2.1.0。check 功能還尚在開發(fā)中,因此–check 參
數(shù)是一個虛設(shè)。目前 start-seatunnel-flink.sh并不能對應(yīng)用配置文件的語法合法性進行檢查。
而且 start-seatunnel-flink.sh 中目前沒有對–check 參數(shù)的處理邏輯。
需要注意!使用過程中,如果沒有使用–check 參數(shù),命令行一閃而過。那就是你的配
置文件語法有問題。
3.1.3 --config 參數(shù)和–variable 參數(shù)
–config 參數(shù)用來指定應(yīng)用配置文件的路徑。
–variable 參數(shù)可以向配置文件傳值。配置文件內(nèi)是支持聲明變量的。然后我們可以通
過命令行給配置中的變量賦值。
變量聲明語法如下。
sql {
sql = “select * from (select info,split(info) from fake)
where age > '”KaTeX parse error: Expected 'EOF', got '}' at position 11: {age}"'" }? 在配置文件的任何位置都可以聲… cp example01.sh example02.sh
2)修改文件
[atguigu@hadoop102 config]$ vim example02.sh
更多 Java –大數(shù)據(jù) –前端 –python 人工智能資料下載,可百度訪問:尚硅谷官網(wǎng)
尚硅谷大數(shù)據(jù)技術(shù)之 SeaTunnel
—————————————————————————————
3)給 sql 插件聲明一個變量,紅色的是我們修改的地方。最終的配置文件如下。
env {
execution.parallelism = 1
}
source {
SocketStream{
result_table_name = “fake”
field_name = “info”
} }
transform {
Split{
separator = “#”
fields = [“name”,“age”]
}
sql {
sql = “select * from (select info, split(info) from fake)
where age > '”${age}“'”

需要套一層子查詢,因為 where 先于 select,split 出的字段無法用 where 過 濾

} }
sink {
ConsoleSink {}
}4)開啟 netcat 服務(wù)
[atguigu@hadoop102 ~]nc -l 9999
5)使用 SeaTunnel 來提交任務(wù)。-i age=18 往命令行中
bin/start-seatunnel-flink.sh --config config/example01.sh -i
age=18
6)接著,我們用 nc 發(fā)送幾條數(shù)據(jù)看看效果。
7)在 flink 的 webUI 上我們看一下控制臺的輸出。最終發(fā)現(xiàn)未滿 18 歲的李四被過濾掉了。
更多 Java –大數(shù)據(jù) –前端 –python 人工智能資料下載,可百度訪問:尚硅谷官網(wǎng)
尚硅谷大數(shù)據(jù)技術(shù)之 SeaTunnel
—————————————————————————————
8)小結(jié)
通過傳遞變量,我們可以實現(xiàn)配置文件的復(fù)用。讓同一份配置文件就能滿足不同的業(yè)
務(wù)需求。
3.1.5 給 flink 傳遞參數(shù)
在啟動腳本的尾部,我們可以看到,start-seatunnel-flink.sh 會執(zhí)行(exec)一條命令,這
個命令會使用 flink 的提交腳本去向集群提交一個任務(wù)。而且在調(diào)用 bin/flink run 的時候,
還傳遞了 PARAMS 作為 flink run 的參數(shù)。
如下圖所示,我們可知,凡是–config 和 --variable 之外的命令行參數(shù)都被放到
PARAMS 變量中,最后相當(dāng)于給 flink run 傳遞了參數(shù)。注意!命令行參數(shù)解析過程中沒有
涉及–check 參數(shù)處理。這也是為什么說它目前不支持–check 操作。
更多 Java –大數(shù)據(jù) –前端 –python 人工智能資料下載,可百度訪問:尚硅谷官網(wǎng)
尚硅谷大數(shù)據(jù)技術(shù)之 SeaTunnel
—————————————————————————————
比如,我們可以在 seatunnel 啟動腳本中,指定 flink job 并行度。
bin/start-seatunnel-flink.sh --config config/ -p 2\ 3.2 SeaTunnel 的配置文件
3.2.1 應(yīng)用配置的 4 個基本組件
我們從 SeaTunnel 的 app 配置文件開始講起。
一個完整的 SeaTunnel 配置文件應(yīng)包含四個配置組件。分別是:
env{} source{} --> transform{} --> sink{}
在 Source和 Sink數(shù)據(jù)同構(gòu)時,如果業(yè)務(wù)上也不需要對數(shù)據(jù)進行轉(zhuǎn)換,那么 transform中
的內(nèi)容可以為空。具體需根據(jù)業(yè)務(wù)情況來定。
更多 Java –大數(shù)據(jù) –前端 –python 人工智能資料下載,可百度訪問:尚硅谷官網(wǎng)
尚硅谷大數(shù)據(jù)技術(shù)之 SeaTunnel
—————————————————————————————
3.2.2 env 塊
env 塊中可以直接寫 spark 或 flink 支持的配置項。比如并行度,檢查點間隔時間。檢查
點 hdfs 路徑等。在 SeaTunnel 源碼的 ConfigKeyName 類中,聲明了 env 塊中所有可用的 key。
如圖所示:
3.2.3 SeaTunnel 中的核心數(shù)據(jù)結(jié)構(gòu) Row
Row 是 SeaTunnel 中數(shù)據(jù)傳遞的核心數(shù)據(jù)結(jié)構(gòu)。對 flink 來說,source 插件需要給下游
的轉(zhuǎn)換插件返回一個 DataStream,轉(zhuǎn)換插件接到上游的 DataStream進行處理
后需要再給下游返回一個 DataStream。最后 Sink 插件將轉(zhuǎn)換插件處理好的
DataStream輸出到外部的數(shù)據(jù)系統(tǒng)。
如圖所示:
因為 DataStream可以很方便地和 Table 進行互轉(zhuǎn),所以將 Row 當(dāng)作核心數(shù)據(jù)結(jié)
構(gòu)可以讓轉(zhuǎn)換插件同時具有使用代碼(命令式)和 sql(聲明式)處理數(shù)據(jù)的能力。
3.2.4 source 塊
source 塊是用來聲明數(shù)據(jù)源的。source 塊中可以聲明多個連接器。比如: # 偽代碼
env {
更多 Java –大數(shù)據(jù) –前端 –python 人工智能資料下載,可百度訪問:尚硅谷官網(wǎng)
尚硅谷大數(shù)據(jù)技術(shù)之 SeaTunnel
—————————————————————————————

}
source {
hdfs { … }
elasticsearch { … }
jdbc {…}
}
transform {
sql {
sql = “”"
select … from hdfs_table
join es_table
on hdfs_table.uid = es_table.uid where …“”"
} }
sink {
elasticsearch { … }
}
需要注意的是,所有的 source 插件中都可以聲明 result_table_name。如果你聲明了
result_table_name。SeaTunnel 會將 source 插件輸出的 DataStream轉(zhuǎn)換為 Table 并注冊
在Table環(huán)境中。當(dāng)你指定了result_table_name,那么你還可以指定field_name,在注冊時,
給 Table 重設(shè)字段名(后面的案例中會講解)。
因為不同 source 所需的配置并不一樣,所以對 source 連接器的配置最好參考官方的文
檔。
3.2.5 transform 塊
目前社區(qū)對插件做了很多規(guī)劃,但是截至 v2.1.0 版本,可用的插件總共有兩個,一個
是 Split,另一個是 sql。 transform{}塊 中 可 以 聲 明 多 個 轉(zhuǎn) 換 插 件 。 所 有 的 轉(zhuǎn) 換 插 件 都 可 以 使 用
source_table_name,和 result_table_name。同樣,如果我們聲明了 result_table_name,那么
我們就能聲明 field_name。
我們需要著重了解一下 Split 插件和 sql 插件的實現(xiàn)。但在此
在 SeaTunnel 中,一個轉(zhuǎn)換插件的實現(xiàn)類最重要的邏輯在下述四個方法中。
1)處理批數(shù)據(jù),DataSet進,DataSet出
DataSet processBatch(FlinkEnvironment env, DataSet
data) 2)處理流數(shù)據(jù),DataStram進,DataStream出
DataStream processStream(FlinkEnvironment env, DataStream
dataStream)
更多 Java –大數(shù)據(jù) –前端 –python 人工智能資料下載,可百度訪問:尚硅谷官網(wǎng)
尚硅谷大數(shù)據(jù)技術(shù)之 SeaTunnel
—————————————————————————————
3)函數(shù)名叫注冊函數(shù)。實際上,這是一個約定,它只不過是每個 transform 插件作用于流
之后調(diào)用的一個函數(shù)。
void registerFunction(FlinkEnvironment env, DataStream
datastream)
4)處理一些預(yù)備工作,通常是用來解析配置。
void prepare(FlinkEnvironment prepareEnv)
Split 插件的實現(xiàn)
現(xiàn)在我們需要著重看一下 Split 插件的實現(xiàn)。
先回顧一下我們之前 example01.conf 中關(guān)于 transform 的配置。
接著我們再來看一下 Split 的源碼實現(xiàn)。
我們發(fā)現(xiàn) Split 插件并沒有對數(shù)據(jù)流進行任何的處理,而是將它直接 return 了。反之,
它向表環(huán)境中注冊了一個名為 split 的 UDF(用戶自定義函數(shù))。而且,函數(shù)名是寫死的。
這意味著,如果你聲明了多個 Split,后面的 UDF 還會把前面的覆蓋。
這是開發(fā)時需要注意的一個點。
更多 Java –大數(shù)據(jù) –前端 –python 人工智能資料下載,可百度訪問:尚硅谷官網(wǎng)
尚硅谷大數(shù)據(jù)技術(shù)之 SeaTunnel
—————————————————————————————
但是,需要注意,tranform 接口其實是留給了我們直接操作數(shù)據(jù)的能力的。也就是
processStream 方法。那么,一個 transform 插件其實同時履行了 process 和 udf 的職責(zé),這是
違反單一職責(zé)原則的。那要判斷一個轉(zhuǎn)換插件在做什么就只能從源碼和文檔的方面來加以
區(qū)分了。
最后需要叮囑的是,指定 soure_table_name 對于 sql 插件的意義不大。因為 sql 插件可
以通過 from 子句來決定從哪個表里抽取數(shù)據(jù)。
3.2.6 sink 塊
Sink 塊里可以聲明多個 sink 插件,每個 sink 插件都可以指定 source_table_name。不過
因為不同 Sink 插件的配置差異較大,所以在實現(xiàn)時建議參考官方文檔。
3.3 SeaTunnel 的基本原理
SeaTunnel 的工作原理簡單明了。
1)程序會解析你的應(yīng)用配置,并創(chuàng)建環(huán)境
2)配置里source{},transform{},sink{}三個塊中的插件最終在程序中以List集合的方式存
在。
更多 Java –大數(shù)據(jù) –前端 –python 人工智能資料下載,可百度訪問:尚硅谷官網(wǎng)
尚硅谷大數(shù)據(jù)技術(shù)之 SeaTunnel
—————————————————————————————
3)由Excution對象來拼接各個插件,這涉及到選擇source_table,注冊result_table等流程,
注冊 udf 等流程。并最終觸發(fā)執(zhí)行
可以參考下圖: 3.4 小結(jié)
最后我們用一張圖將 SeaTunnel 中的重要概念串起來。
如果你愿意,依托 sql 插件和 udf。單個配置文件也可以定義出比較復(fù)雜的工作流。但
SeaTunnel 的定位是一個數(shù)據(jù)集成平臺。核心的功能是依托豐富的連接器進行數(shù)據(jù)同步,數(shù)
據(jù)處理并不是 SeaTunnel 的長處。所以在 SeaTunnel 中定義復(fù)雜的工作流或許是一種不值得
提倡的做法。
需要提醒的是,如果你不指定 source_table_name,插件會使用它在配置文件上最近的
更多 Java –大數(shù)據(jù) –前端 –python 人工智能資料下載,可百度訪問:尚硅谷官網(wǎng)
尚硅谷大數(shù)據(jù)技術(shù)之 SeaTunnel
—————————————————————————————
上一個插件的輸出作為輸入。
所以,我們可以通過使用依托表名表環(huán)境來實現(xiàn)復(fù)雜的工作流。
也可以減少表名的使用實現(xiàn)簡單的數(shù)據(jù)同步通道。
第 4 章 應(yīng)用案例
注意!下述示例請使用我們修改編譯好的包。 4.1 Kafka 進 Kafka 出的簡單 ETL
4.1.1 需求
對 test_csv 主題中的數(shù)據(jù)進行過濾,僅保留年齡在 18 歲以上的記錄。
4.1.2 需求實現(xiàn)
1)首先,創(chuàng)建為 kafka 創(chuàng)建 test_csv 主題。
kafka-topics.sh --bootstrap-server hadoop102:9092 --create –
topic test_csv --partitions 1 --replication-factor 1
2)為 kafka 創(chuàng)建 test_sink 主題
kafka-topics.sh --bootstrap-server hadoop102:9092 --create –
topic test_sink --partitions 1 --replication-factor 1
3)編輯應(yīng)用配置
[atguigu@hadoop102 config]$ vim example03.conf
4)應(yīng)用配置內(nèi)容
env {

You can set flink configuration here

execution.parallelism = 1
#execution.checkpoint.interval = 10000
更多 Java –大數(shù)據(jù) –前端 –python 人工智能資料下載,可百度訪問:尚硅谷官網(wǎng)
尚硅谷大數(shù)據(jù)技術(shù)之 SeaTunnel
————————————————————————————— #execution.checkpoint.data-uri = “hdfs://hadoop102:9092/checkpoint”
}

在 source 所屬的塊中配置數(shù)據(jù)源

source {
KafkaTableStream {
consumer.bootstrap.servers = “hadoop102:9092”
consumer.group.id = “seatunnel-learn”
topics = test_csv
result_table_name = test
format.type = csv
schema =
“[{“field”:“name”,“type”:“string”},{“field”:“age”,
“type”: “int”}]”
format.field-delimiter = “;”
format.allow-comments = “true”
format.ignore-parse-errors = “true”
} }

在 transform 的塊中聲明轉(zhuǎn)換插件

transform {
sql {
sql = “select name,age from test where age > '”${age}“'”
} }

在 sink 塊中聲明要輸出到哪

sink {
kafkaTable {
topics = “test_sink”
producer.bootstrap.servers = “hadoop102:9092”
} }5)提交任務(wù)
bin/start-seatunnel-flink.sh --config config/example03.conf -i
age=18
6)起一個 kafka console producer 發(fā)送 csv 數(shù)據(jù)(分號分隔) 7)起一個 kafka console consumer 消費數(shù)據(jù)
我們成功地實現(xiàn)了數(shù)據(jù)從 kafka 輸入經(jīng)過簡單的 ETL 再向 kafka 輸出。
4.2 Kafka 輸出到 Doris 進行指標(biāo)統(tǒng)計
4.2.1 需求
使用回話日志統(tǒng)計用戶的總觀看視頻數(shù),用戶最常會話市場,用戶最小會話時長,用
更多 Java –大數(shù)據(jù) –前端 –python 人工智能資料下載,可百度訪問:尚硅谷官網(wǎng)
尚硅谷大數(shù)據(jù)技術(shù)之 SeaTunnel
—————————————————————————————
更多 Java –大數(shù)據(jù) –前端 –python 人工智能資料下載,可百度訪問:尚硅谷官網(wǎng)
戶最后一次會話時間。
4.2.2 需求實現(xiàn)
1)在資料中有一個偽數(shù)據(jù)的生成腳本,將它拷貝到服務(wù)器的任意位置
2)執(zhí)行以下命令安裝 python 腳本需要的兩個依賴庫
pip3 install Faker
pip3 install kafka-python
3)使用 mysql 客戶端連接 doris
[atguigu@hadoop102 fake_data]$ mysql -h hadoop102 -P 9030 -
uatguigu -p123321
4)手動創(chuàng)建 test_db 數(shù)據(jù)庫。
create database test_db;
5)使用下述 sql 語句建表
CREATE TABLE example_user_video (
user_id largeint(40) NOT NULL COMMENT “用戶 id”,
city varchar(20) NOT NULL COMMENT “用戶所在城市”,
age smallint(6) NULL COMMENT “用戶年齡”,
video_sum bigint(20) SUM NULL DEFAULT “0” COMMENT "總觀看視頻數(shù)
",
max_duration_time int(11) MAX NULL DEFAULT “0” COMMENT “用戶最
長會話時長”,
min_duration_time int(11) MIN NULL DEFAULT “999999999”
COMMENT “用戶最小會話時長”,
last_session_date datetime REPLACE NULL DEFAULT “1970-01-01
00:00:00” COMMENT “用戶最后一次會話時間”
) ENGINE=OLAP
AGGREGATE KEY(user_id, city, age)
COMMENT “OLAP”
DISTRIBUTED BY HASH(user_id) BUCKETS 16
;
6)在 config 目錄下, 編寫如下的配置文件。
env {
execution.parallelism = 1
}
source {
KafkaTableStream {
consumer.bootstrap.servers = “hadoop102:9092”
consumer.group.id = “seatunnel5”
topics = test
尚硅谷大數(shù)據(jù)技術(shù)之 SeaTunnel
—————————————————————————————
更多 Java –大數(shù)據(jù) –前端 –python 人工智能資料下載,可百度訪問:尚硅谷官網(wǎng)
result_table_name = test
format.type = json
schema =
“{“session_id”:“string”,“video_count”:“int”,“duration_ti
me”:“l(fā)ong”,“user_id”:“string”,“user_age”:“int”,“city
“:“string”,“session_start_time”:“datetime”,“session_end_ti
me”:“datetime”}”
format.ignore-parse-errors = “true”
} }
transform{
sql {
sql = “select user_id,city,user_age as age,video_count as
video_sum,duration_time as max_duration_time,duration_time as
min_duration_time,session_end_time as last_session_date from
test”
result_table_name = test2
} }
sink{
DorisSink {
source_table_name = test2
fenodes = “hadoop102:8030”
database = test_db
table = example_user_video
user = atguigu
password = 123321
batch_size = 50
doris.column_separator=”\t”

doris.columns=“user_id,city,age,video_sum,max_duration_time,mi
n_duration_time,last_session_date”
} }7)使用 python 腳本向 kafka 中生成偽數(shù)據(jù)
[atguigu@hadoop102 fake_data]$ python3 fake_video.py --bootstrapserver hadoop102:9092 --topic test_video
8)查看 doris 中的結(jié)果。
Select * from example_user_video;
尚硅谷大數(shù)據(jù)技術(shù)之 SeaTunnel
—————————————————————————————
第 5 章 如何參與開源貢獻
5.1 基本概念
5.1.1 參與開源貢獻的常見方法
1)參與解答
在社區(qū)中,幫助使用過程中遇到困難的人,幫他們解釋框架的用法也算是一種貢獻。
2)文檔貢獻
幫助框架來完善文檔,比如說將英文文檔翻譯為中文,糾正文檔里面的錯誤單詞,這
是很多人參與開源貢獻的第一步。
3)代碼貢獻
經(jīng)過閱讀源碼,發(fā)現(xiàn)源碼中有 Bug,修改后將代碼提交給社區(qū)。或者,框架有一個新
的特性亟待開發(fā),你為新功能的實現(xiàn)提供了解決方案,這屬于代碼貢獻,也是一種重要的
參與開源貢獻的方式。
5.1.2 開源社區(qū)中常見的三個身份標(biāo)簽
1)contributor(貢獻者)
只要參與過一次貢獻就算是貢獻者,
2)committer(提交者)
成為 contributor 后,如果你能保證持續(xù)貢獻,而且有扎實的技術(shù)功底,經(jīng) PMC(管理
委員會)投票或討論決定后,可以決定讓你成為一名 committer。Committer和 contributor的
區(qū)別在于,commiter 對于項目的倉庫是具有寫的權(quán)限的。他可以審核并合并 contributor 的
代碼。而且如果成為 commiter,你還會獲得一個后綴為@apache.org 的郵箱。
3)PMC(管理委員會)
Committer 中表現(xiàn)優(yōu)秀的話,是可以成為 PMC 的。PMC 要負責(zé)整個項目的走向,做出
一些重要的決策,要具備前瞻性的技術(shù)眼光。
5.2 如何修改 bug
5.2.1 背景
在我們準(zhǔn)備這項課程的時候,實際上 kafka 輸入插件,kafka 輸出插件和 doris 輸出插件
是各有一個 bug 的,當(dāng)時 kafka 輸入插件的 bug 在社區(qū)中已經(jīng)有了一個解決方案。Kafka 輸
更多 Java –大數(shù)據(jù) –前端 –python 人工智能資料下載,可百度訪問:尚硅谷官網(wǎng)
尚硅谷大數(shù)據(jù)技術(shù)之 SeaTunnel
—————————————————————————————
更多 Java –大數(shù)據(jù) –前端 –python 人工智能資料下載,可百度訪問:尚硅谷官網(wǎng)
出插件的 bug,和 doris 輸出插件的 bug 是我們來做的修改,而且修改后的結(jié)果提交給了
SeaTunnel社區(qū),并且成功實現(xiàn)了代碼合并。下面我們復(fù)現(xiàn)一個 doris輸出插件bug的場景,
并且在這個基礎(chǔ)上向大家講解如何一步步去參與開源貢獻,成為一名源碼貢獻者。
5.2.2 問題復(fù)現(xiàn)
1)場景
當(dāng)時,向 doris插入數(shù)據(jù)時會拋出一個 ClassCastException,也就是類型的強轉(zhuǎn)錯誤。這
里會報 Java.Util.ArratList 不能強轉(zhuǎn)為 java.lang.CharSequence。在反復(fù)確認我們的配置文件
寫的沒問題后。我們仔細閱讀了一下控制臺打印的棧追蹤信息。
2)問題定位
通過最后打印的棧追蹤信息,我們可以知道出錯的位置在 DorisOutPutFormat.java 文件
的第 210 行,于是我們需要去 idea 里面打開源碼看一下這里的代碼是怎么寫的。
尚硅谷大數(shù)據(jù)技術(shù)之 SeaTunnel
—————————————————————————————
5.2.3 分析問題
定位到 210 行后,我們看到下面的問題。
它要將一個 batch(它是一個 ArrayList集合)強轉(zhuǎn)為 CharSequence(字符序列)。這顯
然是錯誤的。
要想解決這個問題,我們要了解這段代碼的意圖。
這需要一定的背景知識,SeaTunnel 的 dorisSink 其實是依托于 doris 的 stream load 這種
導(dǎo)入方式來實現(xiàn)的。而 stream load 其實是通過 http 請求的形式,向 doris 導(dǎo)入數(shù)據(jù)。而且
doris 提倡提交數(shù)據(jù)的時候一定要成批地向 doris 導(dǎo)入數(shù)據(jù)。如此一來,我們知道 bacth 就是
用來積攢數(shù)據(jù)的一個集合,而向遠端通過 http 發(fā)送數(shù)據(jù)必然要經(jīng)過一個序列化的過程。結(jié)
合上下文來看,我們可以判斷這段代碼的目的,就是要將 batch里的所有數(shù)據(jù),按照某個規(guī)
則轉(zhuǎn)為字符串,為 http 請求做準(zhǔn)備。分析過程如圖所示。
5.2.4 確定問題的解決方案
我們需要看一下 String 提供的這個 join 靜態(tài)方法,對參數(shù)的要求。
更多 Java –大數(shù)據(jù) –前端 –python 人工智能資料下載,可百度訪問:尚硅谷官網(wǎng)
尚硅谷大數(shù)據(jù)技術(shù)之 SeaTunnel
—————————————————————————————
我們發(fā)現(xiàn),join 方法的第二個參數(shù)是一個 CharSequence 類型的可變長參數(shù),這意味著
我們可以向里面?zhèn)鬟f一個 CharSequence 類型的數(shù)組。那么代碼可以修改成下面這個樣子。
5.2.5 方案驗證
1)重新打包
接著,我們可以重新編譯這個包,把重新編譯的包放到我們的集群上,再跑一次任務(wù)
看看能不能通過。在這個過程中,因為跨平臺性的問題(windows和 linux的路徑不通用,其
實也是個 bug),有一些單元測試我們無法通過,因此我們?nèi)€巧,用下面的方式進行編譯
打包,跳過單元測試和代碼的格式審查。
mvn clean package -D maven.test.skip=true -D checkstyle.skip=true
2)使用新的包
接著,我們使用重新編譯過的 SeaTunnel 執(zhí)行我們之前向 Doris 導(dǎo)入數(shù)據(jù)的命令。
bin/start-seatunnel-flink.sh --config config/example04.conf
3)到我們的 Doris 上查看數(shù)據(jù)是否成功導(dǎo)入
這次我們的數(shù)據(jù)成功導(dǎo)進了 doris。而且我們的程序并沒有因為類型轉(zhuǎn)換錯誤而崩潰。
4)小結(jié)
經(jīng)過上面的這些步驟,我們確信問題是出在源碼的問題上。接下來我們要開始向社區(qū)
匯報這個 bug,并向社區(qū)提供我們的解決方案。
更多 Java –大數(shù)據(jù) –前端 –python 人工智能資料下載,可百度訪問:尚硅谷官網(wǎng)
尚硅谷大數(shù)據(jù)技術(shù)之 SeaTunnel
—————————————————————————————
更多 Java –大數(shù)據(jù) –前端 –python 人工智能資料下載,可百度訪問:尚硅谷官網(wǎng)
5.3 創(chuàng)建 issue
5.3.1 什么是 issue
每個 github 的倉庫下都會有一個項目獨立的 issue 板塊。在這個板塊里面,大家可以提
出自己的問題,也可以去和大家討論SeaTunnel是否要添加一些特性。而且,這是一個可以
匯報 bug 的地方。
開源社區(qū)通常會要求你在提交代碼合并的請求前,先去創(chuàng)建一個 issue。這是一個好的
習(xí)慣,就像是我們抓賊要先立案,逮捕要先有逮捕令。創(chuàng)建 pull request 之前先創(chuàng)建 issue,
然后把 pr 關(guān)聯(lián)到我們創(chuàng)建的 issue 上,讓每一次改動,都有據(jù)可查。
5.3.2 如何創(chuàng)建 issue
1)點擊 new issue 按鈕進入下一個頁面
2)選擇你要創(chuàng)建的 issue 類型,我們選擇 bug report(bug 匯報),進入下一個頁面
尚硅谷大數(shù)據(jù)技術(shù)之 SeaTunnel
—————————————————————————————
3)按照表單的提示,一步步填寫完整。注意,表單提醒你,創(chuàng)建 issue 之前應(yīng)該先去搜索
社區(qū)中是否已經(jīng)有討論同一問題的 issue。同樣的問題,無需重復(fù)。
4)按照要求填寫表單后,點擊下方的 Submit new issue。創(chuàng)建這個 issue。 5)查看我們已經(jīng)創(chuàng)建好的 issue
更多 Java –大數(shù)據(jù) –前端 –python 人工智能資料下載,可百度訪問:尚硅谷官網(wǎng)
尚硅谷大數(shù)據(jù)技術(shù)之 SeaTunnel
—————————————————————————————
5.4 創(chuàng)建 pull request
pull request 的意思是拉取請求,也就是我這有代碼寫好了,請你把我的代碼拉過去吧。
所以,發(fā)起拉取請求之前應(yīng)該要先有自己的代碼。這樣一來,創(chuàng)建 pull request 并不是一上
來就創(chuàng)建,而是要先搞好自己的代碼倉庫。
pull request 的簡稱是 pr。 5.4.1 fork 項目到自己的倉庫中
對于第一次對 SeaTunnel 貢獻的同學(xué)來說,應(yīng)該先 fork(叉子)官方的倉庫。
點擊 fork 按鈕后,你自己的 github 賬號上會出現(xiàn)一個一模一樣的倉庫。如下圖所示。
5.4.2 git clone 自己 fork 的倉庫
更多 Java –大數(shù)據(jù) –前端 –python 人工智能資料下載,可百度訪問:尚硅谷官網(wǎng)
尚硅谷大數(shù)據(jù)技術(shù)之 SeaTunnel
—————————————————————————————
更多 Java –大數(shù)據(jù) –前端 –python 人工智能資料下載,可百度訪問:尚硅谷官網(wǎng)
拿到這個 url,在自己電腦上的任意目錄上使用下面的 git 命令去 clone 這個倉庫。
git clone xxxx{你自己的倉庫的 url}xxx
5.4.3 修改代碼
1)在項目的跟目錄右鍵,用 idea 打開我們 clone 的項目
2)在我們之前確定的位置,改代碼
3)commit 提交
(這個地方應(yīng)該先建一個分支,從 dev 上分出來,在新建分支的基礎(chǔ)上 commit。這里成反
面教材了_) 4)push 到我們 fork 的倉庫里去,這個時候在遠端的目標(biāo)分支上,我們寫一個新的分支名
尚硅谷大數(shù)據(jù)技術(shù)之 SeaTunnel
—————————————————————————————
5.4.4 創(chuàng)建 PR
1)去我們的 github上,看一下自己的倉庫,發(fā)現(xiàn)它會提示我們可以創(chuàng)建一個 pr了。點擊這
個按鈕,進入下一個頁面
2)在新的頁面中,按照對話框里給出的模板,說明我們這個 pr 的目的。最終,不要忘了
和你之前的 issue 關(guān)聯(lián)起來,關(guān)聯(lián)的方式就是直接粘貼你創(chuàng)建的 issue 的鏈接。
更多 Java –大數(shù)據(jù) –前端 –python 人工智能資料下載,可百度訪問:尚硅谷官網(wǎng)
尚硅谷大數(shù)據(jù)技術(shù)之 SeaTunnel
—————————————————————————————
3)全部搞定之后,點擊 create pull request 按鈕,創(chuàng)建一個 pr 4)我們還可以看到 github 會判斷我們做了哪些修改。紅色的地方表示我們刪除的代碼,綠
色的地方表示我們新增的代碼。因為 github 的差異是按行進行標(biāo)記的。所以如果你就改了
一個字母。也是一個刪除行和新增行的效果。
5)我們的PR已經(jīng)提交完畢,我們可以看到github會啟動一個自動的檢查。這個叫做CI/CD。
持續(xù)交付/持續(xù)部署的意思。簡單來說,你上傳的代碼,云端會自動拉取,然后自動地跑一
邊編譯,然后進行單元測試,代碼格式等一系列檢查。這些測試都通過后,你的代碼才有
更多 Java –大數(shù)據(jù) –前端 –python 人工智能資料下載,可百度訪問:尚硅谷官網(wǎng)
尚硅谷大數(shù)據(jù)技術(shù)之 SeaTunnel
—————————————————————————————
更多 Java –大數(shù)據(jù) –前端 –python 人工智能資料下載,可百度訪問:尚硅谷官網(wǎng)
被合并的可能。
6)接下來你可以去干點別的,自動測試的時間會比較久,而且你需要等待社區(qū)人員注意到
你的 pull request。
尚硅谷大數(shù)據(jù)技術(shù)之 SeaTunnel
—————————————————————————————
5.5 成功成為源碼貢獻者
過一段時間就可以回來看一下你的 pr 了。我們看到有一個 apache member 審核了我們
的代碼,并將我們的代碼合并到了項目中。以后,大家使用 seatunnel 將數(shù)據(jù)從 flink 寫入
doris,就有你的一份功勞了。
你的發(fā)言記錄上,會出現(xiàn) contributor 的標(biāo)記。
弄完這些,就算是 SeaTunnel 的源碼貢獻者啦。
5.6 尋找貢獻機會
Apache 的開源項目中,社區(qū)成員們通常會維護一個待辦列表,里面是一些好做的任務(wù)。
適合新手上路。
更多 Java –大數(shù)據(jù) –前端 –python 人工智能資料下載,可百度訪問:尚硅谷官網(wǎng)

總結(jié)

以上是生活随笔為你收集整理的SeaTunnel的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網(wǎng)站內(nèi)容還不錯,歡迎將生活随笔推薦給好友。