FlinkX 如何读取和写入 Clickhouse?
生活随笔
收集整理的這篇文章主要介紹了
FlinkX 如何读取和写入 Clickhouse?
小編覺得挺不錯的,現在分享給大家,幫大家做個參考.
簡介:本文將主要介紹 FlinkX 讀取和寫入 Clickhouse 的過程及相關參數,核心內容將圍繞以下3個問題:1. FlinkX讀寫Clickhouse支持哪個版本?、2. ClickHouse讀寫Clickhouse有哪些參數?、3. ClickHouse讀寫Clickhouse參數都有哪些說明?
FlinkX讀寫Clickhouse支持哪個版本? ClickHouse讀寫Clickhouse有哪些參數? ClickHouse讀寫Clickhouse參數都有哪些說明?
name:字段名稱 type:字段類型,可以和數據庫里的字段類型不一樣,程序會做一次類型轉換 format:如果字段是時間字符串,可以指定時間的格式,將字段類型轉為日期格式返回 value:如果數據庫里不存在指定的字段,則會報錯。如果指定的字段存在,當指定字段的值為null時,會以此value值作為默認值返回
本文將主要介紹 FlinkX 讀取和寫入 Clickhouse 的過程及相關參數,核心內容將圍繞以下3個問題,FlinkX 插件下載:
https://github.com/DTStack/flinkx
ClickHouse 讀取
一、插件名稱
名稱:clickhousereader
二、支持的數據源版本
ClickHouse 19.x及以上
三、參數說明
「jdbcUrl」
- 描述:針對關系型數據庫的jdbc連接字符串
- jdbcUrl參考文檔:clickhouse-jdbc官方文檔
- 必選:是
- 默認值:無
「username」
- 描述:數據源的用戶名
- 必選:是
- 默認值:無
「password」
- 描述:數據源指定用戶名的密碼
- 必選:是
- 默認值:無
「where」
- 描述:篩選條件,reader插件根據指定的column、table、where條件拼接SQL,并根據這個SQL進行數據抽取。在實際業務場景中,往往會選擇當天的數據進行同步,可以將where條件指定為gmt_create > time。
- 注意:不可以將where條件指定為limit 10,limit不是SQL的合法where子句。
- 必選:否
- 默認值:無
「splitPk」
- 描述:當speed配置中的channel大于1時指定此參數,Reader插件根據并發數和此參數指定的字段拼接sql,使每個并發讀取不同的數據,提升讀取速率。注意:推薦splitPk使用表主鍵,因為表主鍵通常情況下比較均勻,因此切分出來的分片也不容易出現數據熱點。目前splitPk僅支持整形數據切分,不支持浮點、字符串、日期等其他類型。如果用戶指定其他非支持類型,FlinkX將報錯!如果channel大于1但是沒有配置此參數,任務將置為失敗。
- 必選:否
- 默認值:無
「fetchSize」
- 描述:讀取時每批次讀取的數據條數。
- 注意:此參數的值不可設置過大,否則會讀取超時,導致任務失敗。
- 必選:否
- 默認值:1000
「queryTimeOut」
- 描述:查詢超時時間,單位秒。
- 注意:當數據量很大,或者從視圖查詢,或者自定義sql查詢時,可通過此參數指定超時時間。
- 必選:否
- 默認值:1000
「customSql」
- 描述:自定義的查詢語句,如果只指定字段不能滿足需求時,可通過此參數指定查詢的sql,可以是任意復雜的查詢語句。注意:只能是查詢語句,否則會導致任務失敗;查詢語句返回的字段需要和column列表里的字段嚴格對應;當指定了此參數時,connection里指定的table無效;當指定此參數時,column必須指定具體字段信息,不能以*號代替;
- 必選:否
- 默認值:無
「column」
- 描述:需要讀取的字段。
- 格式:支持3種格式
1.讀取全部字段,如果字段數量很多,可以使用下面的寫法:
"column":["*"]2.只指定字段名稱:
"column":["id","name"]3.指定具體信息:
"column": [{"name": "col","type": "datetime","format": "yyyy-MM-dd hh:mm:ss","value": "value" }]屬性說明:
- 必選:是
- 默認值:無
「polling」
- 描述:是否開啟間隔輪詢,開啟后會根據pollingInterval輪詢間隔時間周期性的從數據庫拉取數據。開啟間隔輪詢還需配置參數pollingInterval,increColumn,可以選擇配置參數startLocation。若不配置參數startLocation,任務啟動時將會從數據庫中查詢增量字段最大值作為輪詢的開始位置。
- 必選:否
- 默認值:false
「pollingInterval」
描述:輪詢間隔時間,從數據庫中拉取數據的間隔時間,默認為5000毫秒。
必選:否
默認值:5000
「requestAccumulatorInterval」
- 描述:發送查詢累加器請求的間隔時間。
- 必選:否
- 默認值:2
配置示例
1、基礎配置
{"job": {"content": [{"reader": {"parameter" : {"column" : [ {"name" : "id","type" : "bigint","key" : "id"}, {"name" : "user_id","type" : "bigint","key" : "user_id"}, {"name" : "name","type" : "varchar","key" : "name"} ],"username" : "username","password" : "password","connection" : [ {"jdbcUrl" : [ "jdbc:clickhouse://0.0.0.1:8123/dtstack" ],"table" : [ "tableTest" ]} ],"where": "id > 1","splitPk": "id","fetchSize": 1000,"queryTimeOut": 1000,"customSql": "","requestAccumulatorInterval": 2},"name" : "clickhousereader"},"writer": {"name": "streamwriter","parameter": {"print": true}}}],"setting": {"speed": {"channel": 1,"bytes": 0},"errorLimit": {"record": 100}}} }2、多通道
{"job": {"content": [{"reader": {"parameter" : {"column" : [ {"name" : "id","type" : "bigint","key" : "id"}, {"name" : "user_id","type" : "bigint","key" : "user_id"}, {"name" : "name","type" : "varchar","key" : "name"} ],"username" : "username","password" : "password","connection" : [ {"jdbcUrl" : [ "jdbc:clickhouse://0.0.0.1:8123/dtstack" ],"table" : [ "tableTest" ]} ],"where": "id > 1","splitPk": "id","fetchSize": 1000,"queryTimeOut": 1000,"customSql": "","requestAccumulatorInterval": 2},"name" : "clickhousereader"},"writer": {"name": "streamwriter","parameter": {"print": true}}}],"setting": {"speed": {"channel": 3,"bytes": 0},"errorLimit": {"record": 100}}} }3、指定customSql
{"job": {"content": [{"reader": {"parameter" : {"column" : [ {"name" : "id","type" : "bigint","key" : "id"}, {"name" : "user_id","type" : "bigint","key" : "user_id"}, {"name" : "name","type" : "varchar","key" : "name"} ],"username" : "username","password" : "password","connection" : [ {"jdbcUrl" : [ "jdbc:clickhouse://0.0.0.1:8123/dtstack" ],"table" : [ "tableTest" ]} ],"where": "id > 1","splitPk": "id","fetchSize": 1000,"queryTimeOut": 1000,"customSql": "select id from tableTest","requestAccumulatorInterval": 2},"name" : "clickhousereader"},"writer": {"name": "streamwriter","parameter": {"print": true}}}],"setting": {"speed": {"channel": 1,"bytes": 0},"errorLimit": {"record": 100}}} }4、增量同步指定startLocation
{"job": {"content": [{"reader": {"parameter" : {"column" : [ {"name" : "id","type" : "bigint","key" : "id"}, {"name" : "user_id","type" : "bigint","key" : "user_id"}, {"name" : "name","type" : "varchar","key" : "name"} ],"username" : "username","password" : "password","connection" : [ {"jdbcUrl" : [ "jdbc:clickhouse://0.0.0.1:8123/dtstack" ],"table" : [ "tableTest" ]} ],"where": "id > 1","splitPk": "id","fetchSize": 1000,"queryTimeOut": 1000,"customSql": "","increColumn": "id","startLocation": "20","requestAccumulatorInterval": 2},"name" : "clickhousereader"},"writer": {"name": "streamwriter","parameter": {"print": true}}}],"setting": {"speed": {"channel": 1,"bytes": 0},"errorLimit": {"record": 100}}} }5、間隔輪詢
{"job": {"content": [{"reader": {"parameter" : {"column" : [ {"name" : "id","type" : "bigint","key" : "id"}, {"name" : "user_id","type" : "bigint","key" : "user_id"}, {"name" : "name","type" : "varchar","key" : "name"} ],"username" : "username","password" : "password","connection" : [ {"jdbcUrl" : [ "jdbc:clickhouse://0.0.0.1:8123/dtstack" ],"table" : [ "tableTest" ]} ],"where": "id > 1","splitPk": "id","fetchSize": 1000,"queryTimeOut": 1000,"customSql": "","requestAccumulatorInterval": 2,"polling": true,"pollingInterval": 3000},"name" : "clickhousereader"},"writer": {"name": "streamwriter","parameter": {"print": true}}}],"setting": {"speed": {"channel": 1,"bytes": 0},"errorLimit": {"record": 100}}} }ClickHouse 寫入
一、插件名稱
名稱:clickhousewriter
二、支持的數據源版本
ClickHouse 19.x及以上
三、參數說明
「jdbcUrl」
- 描述:針對關系型數據庫的jdbc連接字符串
- 必選:是
- 默認值:無
「username」
- 描述:數據源的用戶名
- 必選:是
- 默認值:無
「password」
- 描述:數據源指定用戶名的密碼
- 必選:是
- 默認值:無
「column」
- 描述:目的表需要寫入數據的字段,字段之間用英文逗號分隔。例如: "column": ["id","name","age"]。
- 必選:是
- 默認值:否
- 默認值:無
「preSql」
- 描述:寫入數據到目的表前,會先執行這里的一組標準語句
- 必選:否
- 默認值:無
「postSql」
- 描述:寫入數據到目的表后,會執行這里的一組標準語句
- 必選:否
- 默認值:無
「table」
- 描述:目的表的表名稱。目前只支持配置單個表,后續會支持多表
- 必選:是
- 默認值:無
「writeMode」
- 描述:控制寫入數據到目標表采用 insert into 語句,只支持insert操作
- 必選:是
- 所有選項:insert
- 默認值:insert
「batchSize」
- 描述:一次性批量提交的記錄數大小,該值可以極大減少FlinkX與數據庫的網絡交互次數,并提升整體吞吐量。但是該值設置過大可能會造成FlinkX運行進程OOM情況
- 必選:否
- 默認值:1024
文章來源如下,感興趣的同學可查看原文:
https://www.aboutyun.com/forum.php?mod=viewthread&tid=29271
更多 Flink 技術問題可在釘釘群交流
原文鏈接:https://developer.aliyun.com/article/770821?
版權聲明:本文內容由阿里云實名注冊用戶自發貢獻,版權歸原作者所有,阿里云開發者社區不擁有其著作權,亦不承擔相應法律責任。具體規則請查看《阿里云開發者社區用戶服務協議》和《阿里云開發者社區知識產權保護指引》。如果您發現本社區中有涉嫌抄襲的內容,填寫侵權投訴表單進行舉報,一經查實,本社區將立刻刪除涉嫌侵權內容。總結
以上是生活随笔為你收集整理的FlinkX 如何读取和写入 Clickhouse?的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 5G专网为“江南皮革厂”带来了什么?
- 下一篇: 阿里研究员:警惕软件复杂度困局