日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 运维知识 > 数据库 >内容正文

数据库

flink 复postgresql数据库数据

發(fā)布時間:2024/3/13 数据库 43 豆豆
生活随笔 收集整理的這篇文章主要介紹了 flink 复postgresql数据库数据 小編覺得挺不錯的,現(xiàn)在分享給大家,幫大家做個參考.
1對操作用戶進行權(quán)限設(shè)置 詳見下文pg創(chuàng)建流復(fù)制賬號步驟2.然后通過命令或者利用代碼進行數(shù)據(jù)庫數(shù)據(jù)的復(fù)制

安裝flink 實例為1.13.6:

下載版本對應(yīng)jar包 https://mvnrepository.com/

如果是mysql 就下載mysql對應(yīng)jar包 pg就下載pg 對應(yīng)jar包

ps:根據(jù)數(shù)據(jù)源類型以及對應(yīng)版本號下載對應(yīng)jar包 jar,版本不對應(yīng)會造成啟動報錯以及數(shù)據(jù)不能同步

通過執(zhí)行 ./start-cluster.sh

啟動flink 打開網(wǎng)址http://localhost:8081 出現(xiàn)自帶的flink內(nèi)置頁面

環(huán)境準備就緒之后 執(zhí)行命令 /sql-client.sh

可以通過 finksql來進行數(shù)據(jù)庫的復(fù)制 .

進入之后顯示:

實例:地址localhost 版本為11.5postgresql 數(shù)據(jù)下屬 postgres 數(shù)據(jù)庫模式名為public 下屬的test1 復(fù)制到 test1_1

創(chuàng)建庫
CREATE DATABASE data_syn;

表結(jié)構(gòu):
CREATE TABLE “public”.“test1” (
“id” int4 NOT NULL,
“name” varchar(50) COLLATE “pg_catalog”.“default” NOT NULL
)
;
ALTER TABLE “public”.“test1” ADD CONSTRAINT “test1_pkey” PRIMARY KEY (“id”);
----------------------------------分割線---------------------------------------------------------
CREATE TABLE “public”.“test1_1” (
“id” int4 NOT NULL,
“name” varchar(50) COLLATE “pg_catalog”.“default” NOT NULL
)
;
ALTER TABLE “public”.“test1_1” ADD CONSTRAINT “test1_copy2_pkey” PRIMARY KEY (“id”);

----------------------------------flinksql---------------------------------------------------------
CREATE TABLE pgsql_source (
id int,
name STRING
) WITH (
‘connector’ = ‘postgres-cdc’,
‘hostname’ = ‘127.0.0.1’,
‘port’ = ‘5432’,
‘username’ = ‘postgres’,
‘password’ = ‘123456’,
‘database-name’ = ‘postgres’,
‘schema-name’ = ‘public’,
‘debezium.snapshot.mode’ = ‘never’,
‘decoding.plugin.name’ = ‘pgoutput’,
‘debezium.slot.name’ = ‘test3’,
‘table-name’ = ‘test1’
);

CREATE TABLE sink_sql (
id int,
name STRING,
PRIMARY KEY (id) NOT ENFORCED
) WITH (
‘connector’ = ‘jdbc’,
‘url’ = ‘jdbc:postgresql://127.0.0.1:5432/postgres’,
‘table-name’ = ‘test1_1’,
‘username’=‘postgres’,
‘password’=‘123456’
);

insert into sink_sql select id,name from pgsql_source;

執(zhí)行完畢之后就可以實現(xiàn)表的test1的增加量同步了。

但是增量數(shù)據(jù)修改的時候會報錯:The “before” field of UPDATE/DELETE message is null, please check the Postgres table has been set REPLICA IDENTITY to FULL level. You can update the setting by running the command in Postgres 'ALTER TABLE public.test1 REPLICA IDENTITY FULL

這個是因為pg默認主鍵的重建操作會影響業(yè)務(wù)。需要規(guī)劃空閑窗口。因為主鍵重建過程中,主庫是無法進行delete和update操作的。此時更換一個復(fù)制標識代,使用唯一索引代替主鍵,作為一個中轉(zhuǎn)。即可減少業(yè)務(wù)的影響。主鍵重建完成后再修改回來即可。

所以說我們需要在pg命令行執(zhí)行:
ALTER TABLE public.test1 REPLICA IDENTITY FULL;

這樣就可以實現(xiàn)test1至test1_1的CRUD了

package org.example;

import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

public class WordSourceFromPsql {

public static void main(String[] args) throws Exception {Configuration conf = new Configuration();StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);EnvironmentSettings settings = EnvironmentSettings.newInstance().inStreamingMode().build();StreamTableEnvironment tableEnvironment = StreamTableEnvironment.create(env, settings);//拼接souceDLLString sourceDDL ="CREATE TABLE pgsql_source (\n" +" id int,\n" +" name STRING\n" +") WITH (\n" +" 'connector' = 'postgres-cdc',\n" +" 'hostname' = '127.0.0.1',\n" +" 'port' = '5432',\n" +" 'username' = 'postgres',\n" +" 'password' = '123456',\n" +" 'database-name' = 'postgres',\n" +" 'schema-name' = 'public',\n" +" 'debezium.snapshot.mode' = 'never',\n" +" 'decoding.plugin.name' = 'pgoutput',\n" +

// 復(fù)制槽名稱
" ‘debezium.slot.name’ = ‘test3’,\n" +
" ‘table-name’ = ‘test7’\n" +
“)”;

// 執(zhí)行source表ddltableEnvironment.executeSql(sourceDDL);String sink_sql = "CREATE TABLE sink_sql (\n" +" id int,\n" +" name STRING,\n" +" PRIMARY KEY (id) NOT ENFORCED\n" +") WITH (\n" +" 'connector' = 'jdbc',\n" +" 'url' = 'jdbc:postgresql://127.0.0.1:5432/postgres',\n" +" 'table-name' = 'test7_copy',\n" +" 'username'='postgres',\n" +" 'password'='123456'\n" +")";tableEnvironment.executeSql(sink_sql);String result = "insert into sink_sql select id,name from pgsql_source";tableEnvironment.executeSql(result).print();}

}

可以在maven中引入實現(xiàn)在編輯器上直接運行

org.apache.flink
flink-clients_2.11
${flink.version}

同樣也可以打成jar包在flink服務(wù)上運行:

目前實例實現(xiàn)了數(shù)據(jù)復(fù)制:
pg ->elasticsearch

pg ->mysql

pg→pg 單表到單表 多表到單表

遠端地址為:

https://gitlab.xpaas.lenovo.com/prc_customer_mdm/prc-customer-mdm-flink.git master分支上


------------------------------------------------------------------------pg新建一個用戶來進行復(fù)制槽-------------------------------------------------------------------------------------------------

首先登錄pg數(shù)據(jù)庫

可以可視化工具

同樣也可以用命令行

– 創(chuàng)建數(shù)據(jù)同步庫
CREATE DATABASE database_syn;

– pg新建用戶
CREATE USER 用戶名稱 WITH PASSWORD ‘用戶密碼’;

– 給用戶復(fù)制流權(quán)限
ALTER ROLE 用戶名稱 replication;

– 給用戶登錄數(shù)據(jù)庫權(quán)限
grant CONNECT ON DATABASE database_syn to 用戶名稱;

– 把當前庫public下所有表查詢權(quán)限賦給用戶
GRANT SELECT ON ALL TABLES IN SCHEMA public TO 用戶名稱;

– 把要同步的表進行發(fā)布
CREATE PUBLICATION data_syn FOR TABLE 表名;

– 查詢哪些表已經(jīng)發(fā)布
select * from pg_publication_tables;

– 給用戶讀寫權(quán)限
grant select,insert,update,delete ON ALL TABLES IN SCHEMA public to 用戶名稱;

上述操作結(jié)束之后就可以得到一個可以進行復(fù)制槽crud的用戶了

下面是一些常用的pg的設(shè)置

– pg新建用戶
CREATE USER ODPS_ETL WITH PASSWORD ‘odpsETL@2021’;
– 給用戶復(fù)制流權(quán)限
ALTER ROLE ODPS_ETL replication;
– 給用戶數(shù)據(jù)庫權(quán)限
grant CONNECT ON DATABASE test to ODPS_ETL;
– 設(shè)置發(fā)布開關(guān)
update pg_publication set puballtables=true where pubname is not null;
– 把所有表進行發(fā)布
CREATE PUBLICATION dbz_publication FOR ALL TABLES;
– 查詢哪些表已經(jīng)發(fā)布
select * from pg_publication_tables;
– 給表查詢權(quán)限
grant select on TABLE aa to ODPS_ETL;
– 給用戶讀寫權(quán)限
grant select,insert,update,delete ON ALL TABLES IN SCHEMA public to bd_test;
– 把當前庫所有表查詢權(quán)限賦給用戶
GRANT SELECT ON ALL TABLES IN SCHEMA public TO ODPS_ETL;
– 把當前庫以后新建的表查詢權(quán)限賦給用戶
alter default privileges in schema public grant select on tables to ODPS_ETL;
– 更改復(fù)制標識包含更新和刪除之前值
ALTER TABLE test0425 REPLICA IDENTITY FULL;
– 查看復(fù)制標識
select relreplident from pg_class where relname=‘test0425’;
– 查看solt使用情況
SELECT * FROM pg_replication_slots;
– 刪除solt
SELECT pg_drop_replication_slot(‘zd_org_goods_solt’);
– 查詢用戶當前連接數(shù)
select usename, count() from pg_stat_activity group by usename order by count() desc;
– 設(shè)置用戶最大連接數(shù)
alter role odps_etl connection limit 200;

完成之后 可以通過可視化工具來查看用戶權(quán)限

總結(jié)

以上是生活随笔為你收集整理的flink 复postgresql数据库数据的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網(wǎng)站內(nèi)容還不錯,歡迎將生活随笔推薦給好友。