日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

SeaTunnel

發布時間:2023/12/10 编程问答 41 豆豆
生活随笔 收集整理的這篇文章主要介紹了 SeaTunnel 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

SeaTunnel

版本:V1.0 第 1 章 Seatunnel 概述
1.1 SeaTunnel 是什么
SeaTunnel 是一個簡單易用的數據集成框架,在企業中,由于開發時間或開發部門不通
用,往往有多個異構的、運行在不同的軟硬件平臺上的信息系統同時運行。數據集成是把
不同來源、格式、特點性質的數據在邏輯上或物理上有機地集中,從而為企業提供全面的
數據共享。SeaTunnel 支持海量數據的實時同步。它每天可以穩定高效地同步數百億數據。
并已用于近 100 家公司的生產。
SeaTunnel的前身是 Waterdrop(中文名:水滴)自 2021 年 10 月 12日更名為 SeaTunnel。 2021 年 12 月 9 日,SeaTunnel 正式通過 Apache 軟件基金會的投票決議,以全票通過的優秀
表現正式成為 Apache 孵化器項目。2022 年 3 月 18 日社區正式發布了首個 Apache 版本
v2.1.0。 1.2 SeaTunnel 在做什么
本質上,SeaTunnel 不是對 Saprk 和 Flink 的內部修改,而是在 Spark 和 Flink 的基礎上
做了一層包裝。它主要運用了控制反轉的設計模式,這也是 SeaTunnel 實現的基本思想。
SeaTunnel 的日常使用,就是編輯配置文件。編輯好的配置文件由 SeaTunnel 轉換為具
體的 Spark 或 Flink 任務。如圖所示。
更多 Java –大數據 –前端 –python 人工智能資料下載,可百度訪問:尚硅谷官網
尚硅谷大數據技術之 SeaTunnel
—————————————————————————————
更多 Java –大數據 –前端 –python 人工智能資料下載,可百度訪問:尚硅谷官網
1.3 SeaTunnel 的應用場景
SeaTunnel 適用于以下場景 SeaTunnel 的特點
? 海量數據的同步
? 海量數據的集成
? 海量數據的 ETL
? 海量數據聚合
? 多源數據處理
? 基于配置的低代碼開發,易用性高,方便維護。
? 支持實時流式傳輸
? 離線多源數據分析
? 高性能、海量數據處理能力
? 模塊化的插件架構,易于擴展
? 支持用 SQL 進行數據操作和數據聚合
? 支持 Spark structured streaming
? 支持 Spark 2.x
目前 SeaTunnel 的長板是他有豐富的連接器,又因為它以 Spark 和 Flink 為引擎。所以
可以很好地進行分布式的海量數據同步。通常SeaTunnel會被用來做出倉入倉工具,或者被
用來進行數據集成。比如,唯品會就選擇用 SeaTunnel來解決數據孤島問題,讓 ClcikHouse
集成到了企業中先前的數據系統之中。如圖所示:
下圖是 SeaTunnel 的工作流程:
尚硅谷大數據技術之 SeaTunnel
—————————————————————————————
更多 Java –大數據 –前端 –python 人工智能資料下載,可百度訪問:尚硅谷官網
1.5 SeaTunnel 目前的插件支持
1.5.1 Spark 連接器插件(Source)
Spark 連接器插件 數據庫類型 Source Sink
Batch Fake √
ElasticSearch √ √ File √ √ Hive √ √ Hudi √ √ Jdbc √ √ MongoDB √ √ Neo4j √ Phoenix √ √
Redis √ √ Tidb √ √
Clickhouse √
Doris √ Email √ Hbase √ √ Kafka √ Console √ Kudu √ √
Redis √ √
Stream FakeStream √ KafkaStream √
SocketSTream` √ 1.5.2 Flink 連接器插件(Source)
Flink 連接器插件 數據庫類型 Source Sink
Druid √ √ Fake √ File √ √ InfluxDb √ √
Jdbc √ √
Kafka √ √ Socket √ Console √ Doris √ ElasticSearch √ 1.5.3 Spark & Flink 轉換插件
轉換插件 Spark Flink
Add
CheckSum
Convert
Date
Drop
尚硅谷大數據技術之 SeaTunnel
—————————————————————————————
更多 Java –大數據 –前端 –python 人工智能資料下載,可百度訪問:尚硅谷官網
Grok
Json √
Kv
Lowercase
Remove
Rename
Repartition
Replace
Sample
Split √ √
Sql √ √
Table
Truncate
Uppercase
Uuid
這部分內容來官網,可以看出社區目前規劃了大量的插件,但截至 V2.1.0 可用的
transform 插件的數量還是很少的。同學們有興趣也可以在業余時間嘗試參與開源貢獻。
官方網址:https://seatunnel.apache.org/zh-CN/
第 2 章 Seatunnel 安裝和使用
注意 v2.1.0中有少量 bug,要想一次性跑通所有示例程序,需使用我們自己編譯的包,
可以在資料包里獲取。具體如何修改源碼,可以參考文檔第 5 章。
2.1 SeaTunnel 的環境依賴
截至 SeaTunnel V2.1.0。
SeaTunnel 支持 Spark 2.x(尚不支持 Spark 3.x)。支持 Flink 1.9.0 及其以上的版本。
Java 版本需要>=1.8
我們演示時使用的是 flink 版本是 1.13.0
2.2 SeaTunnel 的下載和安裝
1)使用 wget 下載 SeaTunnel,使用-O 參數將文件命名為 seatunnel-2.1.0.tar.gz
wget
https://downloads.apache.org/incubator/seatunnel/2.1.0/apacheseatunnel-incubating-2.1.0-bin.tar.gz -O seatunnel-2.1.0.tar.gz2 2)解壓下載好的 tar.gz 包
tar -zxvf seatunnel-2.1.0.tar.gz -C /opt/module/
3)查看解壓的目標路徑,apache-seatunnel-incubating-2.1.0 的目錄就是我們已經安裝好的
seatunnel。Incubating 的意思是孵化中。
2.3 SeaTunnel 的依賴環境配置
尚硅谷大數據技術之 SeaTunnel
—————————————————————————————
在 config/目錄中有一個 seatunnel-env.sh 腳本。我們可以看一下里面的內容。
這個腳本中聲明了 SPARK_HOME 和 FLINK_HOME 兩個路徑。默認情況下 seatunnelenv.sh 中的 SPARK_HOME 和 FLINK_HOME 就是系統環境變量中的 SPARK_HOME 和 FLINK_HOME。 在 shell 腳本中:-的意思是如果:-前的內容為空,則替換為后面的。
例如,環境變量中沒有 FLINK_HOME。那么 SeaTunnel 運行時會將 FLINK_HOME 設 為/opt/flink。
如果你機器上的環境變量 SPARK_HOME指向了 3.x的一個版本。但是想用 2.x的 Spark
來試一下 SeaTunnel。這種情況下,如果你不想改環境變量,那就直接在 seatunnel-env.sh 中 將 2.x 的路徑賦值給 SPARK_HOME 即可。
比如: 2.4 示例 1: SeaTunnel 快速開始
我們先跑一個官方的 flink 案例。來了解它的基本使用。 1)選擇任意路徑,創建一個文件。這里我們選擇在 SeaTunnel 的 config 路徑下創建一個
example01.conf
[atguigu@hadoop102 config]$ vim example01.conf
2)在文件中編輯如下內容

配置 Spark 或 Flink 的參數

env {

You can set flink configuration here

execution.parallelism = 1
#execution.checkpoint.interval = 10000
#execution.checkpoint.data-uri = “hdfs://hadoop102:9092/checkpoint”
}
更多 Java –大數據 –前端 –python 人工智能資料下載,可百度訪問:尚硅谷官網
尚硅谷大數據技術之 SeaTunnel
—————————————————————————————

在 source 所屬的塊中配置數據源

source {
SocketStream{
host = hadoop102
result_table_name = “fake”
field_name = “info”
} }

在 transform 的塊中聲明轉換插件

transform {
Split{
separator = “#”
fields = [“name”,“age”]
}
sql {
sql = “select info, split(info) as info_row from fake”
} }

在 sink 塊中聲明要輸出到哪

sink {
ConsoleSink {}
}3)開啟 flink 集群
[atguigu@hadoop102 flink-1.11.6]$ bin/start-cluster.sh
4)開啟一個 netcat 服務來發送數據
[atguigu@hadoop102 ~]$ nc -lk 9999
5)使用 SeaTunnel 來提交任務。
在 bin 目錄下有以下內容
start-seatunnel-flink.sh是用來提交flink任務的。start-seatunnel-spark.sh是用來提交Spark
任務的。這里我們用 flink 演示。所以使用 start-seatunnel-flink.sh。 用–config 參數指定我們的應用配置文件。
bin/start-seatunnel-flink.sh --config config/example01.sh
等待彈出 Job 已經提交的提示
6)在 netcat 上發送數據
7)在 Flink webUI 上查看輸出結果。
更多 Java –大數據 –前端 –python 人工智能資料下載,可百度訪問:尚硅谷官網
尚硅谷大數據技術之 SeaTunnel
—————————————————————————————
更多 Java –大數據 –前端 –python 人工智能資料下載,可百度訪問:尚硅谷官網
8)小結
至此,我們已經跑完了一個官方案例。它以 Socket 為數據源。經過 SQL 的處理,最終
輸出到控制臺。在這個過程中,我們并沒有編寫具體的 flink代碼,也沒有手動去打 jar包。
我們只是將數據的處理流程聲明在了一個配置文件中。
在背后,是 SeaTunnel 幫我們把配置文件翻譯為具體的 flink 任務。配置化,低代碼,
易維護是 SeaTunnel 最顯著的特點。
第 3 章 SeaTunnel 基本原理
3.1 SeaTunnel 的啟動腳本
3.1.1 啟動腳本的參數
截至目前,SeaTunnel 有兩個啟動腳本。
提交 spark 任務用 start-seatunnel-spark.sh。
提交 flink 任務則用 start-seatunnel-flink.sh。
本文檔主要是結合 flink 來使用 seatunnel 的,所以用 start-seatunnel-flink.sh 來講解。
start-seatunnle-flink.sh 可以指定 3 個參數
分別是:
–config 應用配置的路徑
–variable 應用配置里的變量賦值
–check 檢查 config 語法是否合法
尚硅谷大數據技術之 SeaTunnel
—————————————————————————————
3.1.2 --check 參數
截至本文檔撰寫時的 SeaTunnel 版本 v2.1.0。check 功能還尚在開發中,因此–check 參
數是一個虛設。目前 start-seatunnel-flink.sh并不能對應用配置文件的語法合法性進行檢查。
而且 start-seatunnel-flink.sh 中目前沒有對–check 參數的處理邏輯。
需要注意!使用過程中,如果沒有使用–check 參數,命令行一閃而過。那就是你的配
置文件語法有問題。
3.1.3 --config 參數和–variable 參數
–config 參數用來指定應用配置文件的路徑。
–variable 參數可以向配置文件傳值。配置文件內是支持聲明變量的。然后我們可以通
過命令行給配置中的變量賦值。
變量聲明語法如下。
sql {
sql = “select * from (select info,split(info) from fake)
where age > '”KaTeX parse error: Expected 'EOF', got '}' at position 11: {age}"'" }? 在配置文件的任何位置都可以聲… cp example01.sh example02.sh
2)修改文件
[atguigu@hadoop102 config]$ vim example02.sh
更多 Java –大數據 –前端 –python 人工智能資料下載,可百度訪問:尚硅谷官網
尚硅谷大數據技術之 SeaTunnel
—————————————————————————————
3)給 sql 插件聲明一個變量,紅色的是我們修改的地方。最終的配置文件如下。
env {
execution.parallelism = 1
}
source {
SocketStream{
result_table_name = “fake”
field_name = “info”
} }
transform {
Split{
separator = “#”
fields = [“name”,“age”]
}
sql {
sql = “select * from (select info, split(info) from fake)
where age > '”${age}“'”

需要套一層子查詢,因為 where 先于 select,split 出的字段無法用 where 過 濾

} }
sink {
ConsoleSink {}
}4)開啟 netcat 服務
[atguigu@hadoop102 ~]nc -l 9999
5)使用 SeaTunnel 來提交任務。-i age=18 往命令行中
bin/start-seatunnel-flink.sh --config config/example01.sh -i
age=18
6)接著,我們用 nc 發送幾條數據看看效果。
7)在 flink 的 webUI 上我們看一下控制臺的輸出。最終發現未滿 18 歲的李四被過濾掉了。
更多 Java –大數據 –前端 –python 人工智能資料下載,可百度訪問:尚硅谷官網
尚硅谷大數據技術之 SeaTunnel
—————————————————————————————
8)小結
通過傳遞變量,我們可以實現配置文件的復用。讓同一份配置文件就能滿足不同的業
務需求。
3.1.5 給 flink 傳遞參數
在啟動腳本的尾部,我們可以看到,start-seatunnel-flink.sh 會執行(exec)一條命令,這
個命令會使用 flink 的提交腳本去向集群提交一個任務。而且在調用 bin/flink run 的時候,
還傳遞了 PARAMS 作為 flink run 的參數。
如下圖所示,我們可知,凡是–config 和 --variable 之外的命令行參數都被放到
PARAMS 變量中,最后相當于給 flink run 傳遞了參數。注意!命令行參數解析過程中沒有
涉及–check 參數處理。這也是為什么說它目前不支持–check 操作。
更多 Java –大數據 –前端 –python 人工智能資料下載,可百度訪問:尚硅谷官網
尚硅谷大數據技術之 SeaTunnel
—————————————————————————————
比如,我們可以在 seatunnel 啟動腳本中,指定 flink job 并行度。
bin/start-seatunnel-flink.sh --config config/ -p 2\ 3.2 SeaTunnel 的配置文件
3.2.1 應用配置的 4 個基本組件
我們從 SeaTunnel 的 app 配置文件開始講起。
一個完整的 SeaTunnel 配置文件應包含四個配置組件。分別是:
env{} source{} --> transform{} --> sink{}
在 Source和 Sink數據同構時,如果業務上也不需要對數據進行轉換,那么 transform中
的內容可以為空。具體需根據業務情況來定。
更多 Java –大數據 –前端 –python 人工智能資料下載,可百度訪問:尚硅谷官網
尚硅谷大數據技術之 SeaTunnel
—————————————————————————————
3.2.2 env 塊
env 塊中可以直接寫 spark 或 flink 支持的配置項。比如并行度,檢查點間隔時間。檢查
點 hdfs 路徑等。在 SeaTunnel 源碼的 ConfigKeyName 類中,聲明了 env 塊中所有可用的 key。
如圖所示:
3.2.3 SeaTunnel 中的核心數據結構 Row
Row 是 SeaTunnel 中數據傳遞的核心數據結構。對 flink 來說,source 插件需要給下游
的轉換插件返回一個 DataStream,轉換插件接到上游的 DataStream進行處理
后需要再給下游返回一個 DataStream。最后 Sink 插件將轉換插件處理好的
DataStream輸出到外部的數據系統。
如圖所示:
因為 DataStream可以很方便地和 Table 進行互轉,所以將 Row 當作核心數據結
構可以讓轉換插件同時具有使用代碼(命令式)和 sql(聲明式)處理數據的能力。
3.2.4 source 塊
source 塊是用來聲明數據源的。source 塊中可以聲明多個連接器。比如: # 偽代碼
env {
更多 Java –大數據 –前端 –python 人工智能資料下載,可百度訪問:尚硅谷官網
尚硅谷大數據技術之 SeaTunnel
—————————————————————————————

}
source {
hdfs { … }
elasticsearch { … }
jdbc {…}
}
transform {
sql {
sql = “”"
select … from hdfs_table
join es_table
on hdfs_table.uid = es_table.uid where …“”"
} }
sink {
elasticsearch { … }
}
需要注意的是,所有的 source 插件中都可以聲明 result_table_name。如果你聲明了
result_table_name。SeaTunnel 會將 source 插件輸出的 DataStream轉換為 Table 并注冊
在Table環境中。當你指定了result_table_name,那么你還可以指定field_name,在注冊時,
給 Table 重設字段名(后面的案例中會講解)。
因為不同 source 所需的配置并不一樣,所以對 source 連接器的配置最好參考官方的文
檔。
3.2.5 transform 塊
目前社區對插件做了很多規劃,但是截至 v2.1.0 版本,可用的插件總共有兩個,一個
是 Split,另一個是 sql。 transform{}塊 中 可 以 聲 明 多 個 轉 換 插 件 。 所 有 的 轉 換 插 件 都 可 以 使 用
source_table_name,和 result_table_name。同樣,如果我們聲明了 result_table_name,那么
我們就能聲明 field_name。
我們需要著重了解一下 Split 插件和 sql 插件的實現。但在此
在 SeaTunnel 中,一個轉換插件的實現類最重要的邏輯在下述四個方法中。
1)處理批數據,DataSet進,DataSet出
DataSet processBatch(FlinkEnvironment env, DataSet
data) 2)處理流數據,DataStram進,DataStream出
DataStream processStream(FlinkEnvironment env, DataStream
dataStream)
更多 Java –大數據 –前端 –python 人工智能資料下載,可百度訪問:尚硅谷官網
尚硅谷大數據技術之 SeaTunnel
—————————————————————————————
3)函數名叫注冊函數。實際上,這是一個約定,它只不過是每個 transform 插件作用于流
之后調用的一個函數。
void registerFunction(FlinkEnvironment env, DataStream
datastream)
4)處理一些預備工作,通常是用來解析配置。
void prepare(FlinkEnvironment prepareEnv)
Split 插件的實現
現在我們需要著重看一下 Split 插件的實現。
先回顧一下我們之前 example01.conf 中關于 transform 的配置。
接著我們再來看一下 Split 的源碼實現。
我們發現 Split 插件并沒有對數據流進行任何的處理,而是將它直接 return 了。反之,
它向表環境中注冊了一個名為 split 的 UDF(用戶自定義函數)。而且,函數名是寫死的。
這意味著,如果你聲明了多個 Split,后面的 UDF 還會把前面的覆蓋。
這是開發時需要注意的一個點。
更多 Java –大數據 –前端 –python 人工智能資料下載,可百度訪問:尚硅谷官網
尚硅谷大數據技術之 SeaTunnel
—————————————————————————————
但是,需要注意,tranform 接口其實是留給了我們直接操作數據的能力的。也就是
processStream 方法。那么,一個 transform 插件其實同時履行了 process 和 udf 的職責,這是
違反單一職責原則的。那要判斷一個轉換插件在做什么就只能從源碼和文檔的方面來加以
區分了。
最后需要叮囑的是,指定 soure_table_name 對于 sql 插件的意義不大。因為 sql 插件可
以通過 from 子句來決定從哪個表里抽取數據。
3.2.6 sink 塊
Sink 塊里可以聲明多個 sink 插件,每個 sink 插件都可以指定 source_table_name。不過
因為不同 Sink 插件的配置差異較大,所以在實現時建議參考官方文檔。
3.3 SeaTunnel 的基本原理
SeaTunnel 的工作原理簡單明了。
1)程序會解析你的應用配置,并創建環境
2)配置里source{},transform{},sink{}三個塊中的插件最終在程序中以List集合的方式存
在。
更多 Java –大數據 –前端 –python 人工智能資料下載,可百度訪問:尚硅谷官網
尚硅谷大數據技術之 SeaTunnel
—————————————————————————————
3)由Excution對象來拼接各個插件,這涉及到選擇source_table,注冊result_table等流程,
注冊 udf 等流程。并最終觸發執行
可以參考下圖: 3.4 小結
最后我們用一張圖將 SeaTunnel 中的重要概念串起來。
如果你愿意,依托 sql 插件和 udf。單個配置文件也可以定義出比較復雜的工作流。但
SeaTunnel 的定位是一個數據集成平臺。核心的功能是依托豐富的連接器進行數據同步,數
據處理并不是 SeaTunnel 的長處。所以在 SeaTunnel 中定義復雜的工作流或許是一種不值得
提倡的做法。
需要提醒的是,如果你不指定 source_table_name,插件會使用它在配置文件上最近的
更多 Java –大數據 –前端 –python 人工智能資料下載,可百度訪問:尚硅谷官網
尚硅谷大數據技術之 SeaTunnel
—————————————————————————————
上一個插件的輸出作為輸入。
所以,我們可以通過使用依托表名表環境來實現復雜的工作流。
也可以減少表名的使用實現簡單的數據同步通道。
第 4 章 應用案例
注意!下述示例請使用我們修改編譯好的包。 4.1 Kafka 進 Kafka 出的簡單 ETL
4.1.1 需求
對 test_csv 主題中的數據進行過濾,僅保留年齡在 18 歲以上的記錄。
4.1.2 需求實現
1)首先,創建為 kafka 創建 test_csv 主題。
kafka-topics.sh --bootstrap-server hadoop102:9092 --create –
topic test_csv --partitions 1 --replication-factor 1
2)為 kafka 創建 test_sink 主題
kafka-topics.sh --bootstrap-server hadoop102:9092 --create –
topic test_sink --partitions 1 --replication-factor 1
3)編輯應用配置
[atguigu@hadoop102 config]$ vim example03.conf
4)應用配置內容
env {

You can set flink configuration here

execution.parallelism = 1
#execution.checkpoint.interval = 10000
更多 Java –大數據 –前端 –python 人工智能資料下載,可百度訪問:尚硅谷官網
尚硅谷大數據技術之 SeaTunnel
————————————————————————————— #execution.checkpoint.data-uri = “hdfs://hadoop102:9092/checkpoint”
}

在 source 所屬的塊中配置數據源

source {
KafkaTableStream {
consumer.bootstrap.servers = “hadoop102:9092”
consumer.group.id = “seatunnel-learn”
topics = test_csv
result_table_name = test
format.type = csv
schema =
“[{“field”:“name”,“type”:“string”},{“field”:“age”,
“type”: “int”}]”
format.field-delimiter = “;”
format.allow-comments = “true”
format.ignore-parse-errors = “true”
} }

在 transform 的塊中聲明轉換插件

transform {
sql {
sql = “select name,age from test where age > '”${age}“'”
} }

在 sink 塊中聲明要輸出到哪

sink {
kafkaTable {
topics = “test_sink”
producer.bootstrap.servers = “hadoop102:9092”
} }5)提交任務
bin/start-seatunnel-flink.sh --config config/example03.conf -i
age=18
6)起一個 kafka console producer 發送 csv 數據(分號分隔) 7)起一個 kafka console consumer 消費數據
我們成功地實現了數據從 kafka 輸入經過簡單的 ETL 再向 kafka 輸出。
4.2 Kafka 輸出到 Doris 進行指標統計
4.2.1 需求
使用回話日志統計用戶的總觀看視頻數,用戶最常會話市場,用戶最小會話時長,用
更多 Java –大數據 –前端 –python 人工智能資料下載,可百度訪問:尚硅谷官網
尚硅谷大數據技術之 SeaTunnel
—————————————————————————————
更多 Java –大數據 –前端 –python 人工智能資料下載,可百度訪問:尚硅谷官網
戶最后一次會話時間。
4.2.2 需求實現
1)在資料中有一個偽數據的生成腳本,將它拷貝到服務器的任意位置
2)執行以下命令安裝 python 腳本需要的兩個依賴庫
pip3 install Faker
pip3 install kafka-python
3)使用 mysql 客戶端連接 doris
[atguigu@hadoop102 fake_data]$ mysql -h hadoop102 -P 9030 -
uatguigu -p123321
4)手動創建 test_db 數據庫。
create database test_db;
5)使用下述 sql 語句建表
CREATE TABLE example_user_video (
user_id largeint(40) NOT NULL COMMENT “用戶 id”,
city varchar(20) NOT NULL COMMENT “用戶所在城市”,
age smallint(6) NULL COMMENT “用戶年齡”,
video_sum bigint(20) SUM NULL DEFAULT “0” COMMENT "總觀看視頻數
",
max_duration_time int(11) MAX NULL DEFAULT “0” COMMENT “用戶最
長會話時長”,
min_duration_time int(11) MIN NULL DEFAULT “999999999”
COMMENT “用戶最小會話時長”,
last_session_date datetime REPLACE NULL DEFAULT “1970-01-01
00:00:00” COMMENT “用戶最后一次會話時間”
) ENGINE=OLAP
AGGREGATE KEY(user_id, city, age)
COMMENT “OLAP”
DISTRIBUTED BY HASH(user_id) BUCKETS 16
;
6)在 config 目錄下, 編寫如下的配置文件。
env {
execution.parallelism = 1
}
source {
KafkaTableStream {
consumer.bootstrap.servers = “hadoop102:9092”
consumer.group.id = “seatunnel5”
topics = test
尚硅谷大數據技術之 SeaTunnel
—————————————————————————————
更多 Java –大數據 –前端 –python 人工智能資料下載,可百度訪問:尚硅谷官網
result_table_name = test
format.type = json
schema =
“{“session_id”:“string”,“video_count”:“int”,“duration_ti
me”:“long”,“user_id”:“string”,“user_age”:“int”,“city
“:“string”,“session_start_time”:“datetime”,“session_end_ti
me”:“datetime”}”
format.ignore-parse-errors = “true”
} }
transform{
sql {
sql = “select user_id,city,user_age as age,video_count as
video_sum,duration_time as max_duration_time,duration_time as
min_duration_time,session_end_time as last_session_date from
test”
result_table_name = test2
} }
sink{
DorisSink {
source_table_name = test2
fenodes = “hadoop102:8030”
database = test_db
table = example_user_video
user = atguigu
password = 123321
batch_size = 50
doris.column_separator=”\t”

doris.columns=“user_id,city,age,video_sum,max_duration_time,mi
n_duration_time,last_session_date”
} }7)使用 python 腳本向 kafka 中生成偽數據
[atguigu@hadoop102 fake_data]$ python3 fake_video.py --bootstrapserver hadoop102:9092 --topic test_video
8)查看 doris 中的結果。
Select * from example_user_video;
尚硅谷大數據技術之 SeaTunnel
—————————————————————————————
第 5 章 如何參與開源貢獻
5.1 基本概念
5.1.1 參與開源貢獻的常見方法
1)參與解答
在社區中,幫助使用過程中遇到困難的人,幫他們解釋框架的用法也算是一種貢獻。
2)文檔貢獻
幫助框架來完善文檔,比如說將英文文檔翻譯為中文,糾正文檔里面的錯誤單詞,這
是很多人參與開源貢獻的第一步。
3)代碼貢獻
經過閱讀源碼,發現源碼中有 Bug,修改后將代碼提交給社區。或者,框架有一個新
的特性亟待開發,你為新功能的實現提供了解決方案,這屬于代碼貢獻,也是一種重要的
參與開源貢獻的方式。
5.1.2 開源社區中常見的三個身份標簽
1)contributor(貢獻者)
只要參與過一次貢獻就算是貢獻者,
2)committer(提交者)
成為 contributor 后,如果你能保證持續貢獻,而且有扎實的技術功底,經 PMC(管理
委員會)投票或討論決定后,可以決定讓你成為一名 committer。Committer和 contributor的
區別在于,commiter 對于項目的倉庫是具有寫的權限的。他可以審核并合并 contributor 的
代碼。而且如果成為 commiter,你還會獲得一個后綴為@apache.org 的郵箱。
3)PMC(管理委員會)
Committer 中表現優秀的話,是可以成為 PMC 的。PMC 要負責整個項目的走向,做出
一些重要的決策,要具備前瞻性的技術眼光。
5.2 如何修改 bug
5.2.1 背景
在我們準備這項課程的時候,實際上 kafka 輸入插件,kafka 輸出插件和 doris 輸出插件
是各有一個 bug 的,當時 kafka 輸入插件的 bug 在社區中已經有了一個解決方案。Kafka 輸
更多 Java –大數據 –前端 –python 人工智能資料下載,可百度訪問:尚硅谷官網
尚硅谷大數據技術之 SeaTunnel
—————————————————————————————
更多 Java –大數據 –前端 –python 人工智能資料下載,可百度訪問:尚硅谷官網
出插件的 bug,和 doris 輸出插件的 bug 是我們來做的修改,而且修改后的結果提交給了
SeaTunnel社區,并且成功實現了代碼合并。下面我們復現一個 doris輸出插件bug的場景,
并且在這個基礎上向大家講解如何一步步去參與開源貢獻,成為一名源碼貢獻者。
5.2.2 問題復現
1)場景
當時,向 doris插入數據時會拋出一個 ClassCastException,也就是類型的強轉錯誤。這
里會報 Java.Util.ArratList 不能強轉為 java.lang.CharSequence。在反復確認我們的配置文件
寫的沒問題后。我們仔細閱讀了一下控制臺打印的棧追蹤信息。
2)問題定位
通過最后打印的棧追蹤信息,我們可以知道出錯的位置在 DorisOutPutFormat.java 文件
的第 210 行,于是我們需要去 idea 里面打開源碼看一下這里的代碼是怎么寫的。
尚硅谷大數據技術之 SeaTunnel
—————————————————————————————
5.2.3 分析問題
定位到 210 行后,我們看到下面的問題。
它要將一個 batch(它是一個 ArrayList集合)強轉為 CharSequence(字符序列)。這顯
然是錯誤的。
要想解決這個問題,我們要了解這段代碼的意圖。
這需要一定的背景知識,SeaTunnel 的 dorisSink 其實是依托于 doris 的 stream load 這種
導入方式來實現的。而 stream load 其實是通過 http 請求的形式,向 doris 導入數據。而且
doris 提倡提交數據的時候一定要成批地向 doris 導入數據。如此一來,我們知道 bacth 就是
用來積攢數據的一個集合,而向遠端通過 http 發送數據必然要經過一個序列化的過程。結
合上下文來看,我們可以判斷這段代碼的目的,就是要將 batch里的所有數據,按照某個規
則轉為字符串,為 http 請求做準備。分析過程如圖所示。
5.2.4 確定問題的解決方案
我們需要看一下 String 提供的這個 join 靜態方法,對參數的要求。
更多 Java –大數據 –前端 –python 人工智能資料下載,可百度訪問:尚硅谷官網
尚硅谷大數據技術之 SeaTunnel
—————————————————————————————
我們發現,join 方法的第二個參數是一個 CharSequence 類型的可變長參數,這意味著
我們可以向里面傳遞一個 CharSequence 類型的數組。那么代碼可以修改成下面這個樣子。
5.2.5 方案驗證
1)重新打包
接著,我們可以重新編譯這個包,把重新編譯的包放到我們的集群上,再跑一次任務
看看能不能通過。在這個過程中,因為跨平臺性的問題(windows和 linux的路徑不通用,其
實也是個 bug),有一些單元測試我們無法通過,因此我們取個巧,用下面的方式進行編譯
打包,跳過單元測試和代碼的格式審查。
mvn clean package -D maven.test.skip=true -D checkstyle.skip=true
2)使用新的包
接著,我們使用重新編譯過的 SeaTunnel 執行我們之前向 Doris 導入數據的命令。
bin/start-seatunnel-flink.sh --config config/example04.conf
3)到我們的 Doris 上查看數據是否成功導入
這次我們的數據成功導進了 doris。而且我們的程序并沒有因為類型轉換錯誤而崩潰。
4)小結
經過上面的這些步驟,我們確信問題是出在源碼的問題上。接下來我們要開始向社區
匯報這個 bug,并向社區提供我們的解決方案。
更多 Java –大數據 –前端 –python 人工智能資料下載,可百度訪問:尚硅谷官網
尚硅谷大數據技術之 SeaTunnel
—————————————————————————————
更多 Java –大數據 –前端 –python 人工智能資料下載,可百度訪問:尚硅谷官網
5.3 創建 issue
5.3.1 什么是 issue
每個 github 的倉庫下都會有一個項目獨立的 issue 板塊。在這個板塊里面,大家可以提
出自己的問題,也可以去和大家討論SeaTunnel是否要添加一些特性。而且,這是一個可以
匯報 bug 的地方。
開源社區通常會要求你在提交代碼合并的請求前,先去創建一個 issue。這是一個好的
習慣,就像是我們抓賊要先立案,逮捕要先有逮捕令。創建 pull request 之前先創建 issue,
然后把 pr 關聯到我們創建的 issue 上,讓每一次改動,都有據可查。
5.3.2 如何創建 issue
1)點擊 new issue 按鈕進入下一個頁面
2)選擇你要創建的 issue 類型,我們選擇 bug report(bug 匯報),進入下一個頁面
尚硅谷大數據技術之 SeaTunnel
—————————————————————————————
3)按照表單的提示,一步步填寫完整。注意,表單提醒你,創建 issue 之前應該先去搜索
社區中是否已經有討論同一問題的 issue。同樣的問題,無需重復。
4)按照要求填寫表單后,點擊下方的 Submit new issue。創建這個 issue。 5)查看我們已經創建好的 issue
更多 Java –大數據 –前端 –python 人工智能資料下載,可百度訪問:尚硅谷官網
尚硅谷大數據技術之 SeaTunnel
—————————————————————————————
5.4 創建 pull request
pull request 的意思是拉取請求,也就是我這有代碼寫好了,請你把我的代碼拉過去吧。
所以,發起拉取請求之前應該要先有自己的代碼。這樣一來,創建 pull request 并不是一上
來就創建,而是要先搞好自己的代碼倉庫。
pull request 的簡稱是 pr。 5.4.1 fork 項目到自己的倉庫中
對于第一次對 SeaTunnel 貢獻的同學來說,應該先 fork(叉子)官方的倉庫。
點擊 fork 按鈕后,你自己的 github 賬號上會出現一個一模一樣的倉庫。如下圖所示。
5.4.2 git clone 自己 fork 的倉庫
更多 Java –大數據 –前端 –python 人工智能資料下載,可百度訪問:尚硅谷官網
尚硅谷大數據技術之 SeaTunnel
—————————————————————————————
更多 Java –大數據 –前端 –python 人工智能資料下載,可百度訪問:尚硅谷官網
拿到這個 url,在自己電腦上的任意目錄上使用下面的 git 命令去 clone 這個倉庫。
git clone xxxx{你自己的倉庫的 url}xxx
5.4.3 修改代碼
1)在項目的跟目錄右鍵,用 idea 打開我們 clone 的項目
2)在我們之前確定的位置,改代碼
3)commit 提交
(這個地方應該先建一個分支,從 dev 上分出來,在新建分支的基礎上 commit。這里成反
面教材了_) 4)push 到我們 fork 的倉庫里去,這個時候在遠端的目標分支上,我們寫一個新的分支名
尚硅谷大數據技術之 SeaTunnel
—————————————————————————————
5.4.4 創建 PR
1)去我們的 github上,看一下自己的倉庫,發現它會提示我們可以創建一個 pr了。點擊這
個按鈕,進入下一個頁面
2)在新的頁面中,按照對話框里給出的模板,說明我們這個 pr 的目的。最終,不要忘了
和你之前的 issue 關聯起來,關聯的方式就是直接粘貼你創建的 issue 的鏈接。
更多 Java –大數據 –前端 –python 人工智能資料下載,可百度訪問:尚硅谷官網
尚硅谷大數據技術之 SeaTunnel
—————————————————————————————
3)全部搞定之后,點擊 create pull request 按鈕,創建一個 pr 4)我們還可以看到 github 會判斷我們做了哪些修改。紅色的地方表示我們刪除的代碼,綠
色的地方表示我們新增的代碼。因為 github 的差異是按行進行標記的。所以如果你就改了
一個字母。也是一個刪除行和新增行的效果。
5)我們的PR已經提交完畢,我們可以看到github會啟動一個自動的檢查。這個叫做CI/CD。
持續交付/持續部署的意思。簡單來說,你上傳的代碼,云端會自動拉取,然后自動地跑一
邊編譯,然后進行單元測試,代碼格式等一系列檢查。這些測試都通過后,你的代碼才有
更多 Java –大數據 –前端 –python 人工智能資料下載,可百度訪問:尚硅谷官網
尚硅谷大數據技術之 SeaTunnel
—————————————————————————————
更多 Java –大數據 –前端 –python 人工智能資料下載,可百度訪問:尚硅谷官網
被合并的可能。
6)接下來你可以去干點別的,自動測試的時間會比較久,而且你需要等待社區人員注意到
你的 pull request。
尚硅谷大數據技術之 SeaTunnel
—————————————————————————————
5.5 成功成為源碼貢獻者
過一段時間就可以回來看一下你的 pr 了。我們看到有一個 apache member 審核了我們
的代碼,并將我們的代碼合并到了項目中。以后,大家使用 seatunnel 將數據從 flink 寫入
doris,就有你的一份功勞了。
你的發言記錄上,會出現 contributor 的標記。
弄完這些,就算是 SeaTunnel 的源碼貢獻者啦。
5.6 尋找貢獻機會
Apache 的開源項目中,社區成員們通常會維護一個待辦列表,里面是一些好做的任務。
適合新手上路。
更多 Java –大數據 –前端 –python 人工智能資料下載,可百度訪問:尚硅谷官網

總結

以上是生活随笔為你收集整理的SeaTunnel的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。

主站蜘蛛池模板: 特大黑人娇小亚洲女 | 国产真实的和子乱拍在线观看 | 男女啪啪免费看 | av日韩不卡 | 亚洲一区二区三区不卡视频 | 精品精品视频 | av一区二区免费 | 日韩成人午夜电影 | 精品国产午夜福利在线观看 | 激情欧美一区二区免费视频 | 爱爱视频免费看 | 黄色在线观看视频网站 | 亚洲一区在线观 | 中文字幕精品一区二区精品 | 国产色视频一区二区三区qq号 | 欧美一级做a爰片久久高潮 久热国产精品视频 | caoprom在线| 天天插天天搞 | 国产精品嫩草影院av蜜臀 | 视频二区欧美 | 国产喷水福利在线视频 | 日韩av一卡二卡 | 欧美少妇xxxxx | 在线视频欧美一区 | 女警白嫩翘臀呻吟迎合 | 999国产精品视频免费 | 偷偷色噜狠狠狠狠的777米奇 | 精品视频一区二区 | 国产伦精品一区二区三区免.费 | 天天透天天操 | 亚洲精品三 | 亚洲人在线观看视频 | 毛片免费一区二区三区 | 97se亚洲综合 | 精品国产免费无码久久久 | 日本五十肥熟交尾 | 国产一区影院 | 成人亚洲天堂 | 手机福利视频 | 日本免费电影一区二区三区 | 99精品视频在线看 | aaa天堂 | 精品人妻一区二区三区免费 | 性人久久久久 | 色爱亚洲| 最近中文字幕免费视频 | 日韩字幕 | 美女xx00| 最新日本中文字幕 | 欧美三级韩国三级日本三斤 | 在线天堂1 | 久久久99精品国产一区二区三区 | 日本黄色视 | 日本黄色不卡视频 | 一本大道久久精品 | 国产三级三级三级三级三级 | 成人免费一级视频 | 黄色大片免费观看 | 久久这里都是精品 | 四月婷婷| 成人av网页| 97久久精品 | 午夜影院一区二区 | 亚洲成人网在线观看 | 日本xx视频免费观看 | 大乳村妇的性需求 | 在线免费国产 | 91高清视频免费观看 | 亚洲天堂成人在线 | 麻豆视频软件 | 最新国产网站 | 欧美色精品 | 亚洲视频五区 | 成人欧美一区二区三区黑人一 | 高清av免费观看 | 免费视频91蜜桃 | 99久久一区| 亚洲日本香蕉 | 欧美性猛交富婆 | 欧美三级午夜理伦三级老人 | 国产精品白嫩极品美女 | 久久在线免费 | 一级黄大片 | 女人天堂av | 日韩精品高清在线观看 | 国产大学生av | 国产精品一区二区三区四区五区 | 成人wwxx免费观看 | 国产国拍精品亚洲 | 日本www视频在线观看 | 在线看的免费网站 | 国产九一精品 | 男女视频在线免费观看 | 免费三片在线观看网站v888 | 成人黄色动漫在线观看 | 色播五月激情五月 | 朋友人妻少妇精品系列 | 欧美在线免费视频 | 狠狠躁夜夜躁人爽 |