日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

整合flink-cdc实现实时读postgrasql

發布時間:2023/12/20 编程问答 33 豆豆
生活随笔 收集整理的這篇文章主要介紹了 整合flink-cdc实现实时读postgrasql 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

整合flink-cdc實現實時讀postgrasql

什么是wal日志

wal日志即write ahead log預寫式日志,簡稱wal日志。wal日志可以說是PostgreSQL中十分重要的部分,相當于oracle中的redo日志。
當數據庫中數據發生變更時:
change發生時:先要將變更后內容計入wal buffer中,再將變更后的數據寫入data buffer;
commit發生時:wal buffer中數據刷新到磁盤;
checkpoint發生時:將所有data buffer刷新的磁盤。
可以想象,如果沒有wal日志,那么數據庫中將會發生什么?
首先,當我們在數據庫中更新數據時,如果沒有wal日志,那么每次更新都會將數據刷到磁盤上,并且這個動作是隨機i/o,性能可想而知。并且沒有wal日志,關系型數據庫中事務的ACID如何保證呢?

因此wal日志重要性可想而知。其中心思想就是:先寫入日志文件,再寫入數據

什么是復制槽

制槽(replication slot)在postgresql9.4版本中被引入,引入之初是為了防止備庫需要的xlog日志在主庫被刪除,主庫會會根據備庫返回的信息確認哪些xlog已不再需要,,才能進行清理。同時主庫不會移除那些導致恢復沖突的行,關于恢復沖突,前面有一篇文章講到過可以通過設置hot_standby_feedback、max_standby_streaming_delay等參數進行預防,但是這些參數只有在主備關系正常時才能起到作用,而replication slot能夠確保在主備斷連后主庫的xlog仍不被清理。
復制槽分為物理復制槽physical replication slot和邏輯復制槽logic replication slot。物理復制槽一般結合流復制一起使用,能夠很好的保證備庫需要的日志不會在主庫刪除,本文重點討論邏輯復制槽。
Logic replication slots一般被用于邏輯異步復制,一個很好的應用就是用于異構數據庫之間的邏輯復制。大致原理是將源端xlog進行解碼,解析成具體sql,然后到目標端進行回放。支持邏輯解碼需要將wal_level設置為logic,logic會在replica的基礎上增加一些信息以支持邏輯解碼,該模式會增大wal日志的數量,尤其是大量的update,delete操作的庫。
需要關注的問題
對于邏輯復制槽,有下面幾點需要注意:
①一個邏輯復制槽只能解碼一個database,但是一個database可以有多個復制槽進行解碼,同一個復制槽可能同時有多個接收端進行訂閱。
②復制槽的消息只發送一次,同時它不關心接收端的狀態,如果接收端執行失敗,那么復制槽不會向前推進,接收端成功后繼續從上次失敗的位點繼續進行消費。
③不支持DDL、列存、壓縮表的解碼,DDL一般需要需要創建額外的觸發器來進行處理,但可以做到表級訂閱。
④邏輯復制不能保證數據不丟失,不能用作數據容災,但是可以用于數據遷移,在主庫有大事務的情況下延遲較大。
⑤不使用的復制槽一定要及時刪除。

注意

通過復制槽,從庫訂閱主庫,可以保證從庫在沒有收到主庫的日志之前,主庫不會刪除從庫未讀的部分。也因此不用的槽要即時刪除,否則會導致日志積壓

***flink-cdc***的就是通過創建復制槽訂閱PG來實現實時監控數據庫變化的。

flink-cdc配置以及使用

1.maven依賴

<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.flink</groupId> <artifactId>database</artifactId> <version>1.0-SNAPSHOT</version> <properties> <flink.version>1.13.0</flink.version> <scala.binary.version>2.11</scala.binary.version> <kafka.version>2.2.0</kafka.version> <maven.compiler.source>1.8</maven.compiler.source> <maven.compiler.target>1.8</maven.compiler.target> <maven.compiler.compilerVersion>1.8</maven.compiler.compilerVersion> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <!--maven properties --> <maven.test.skip>false</maven.test.skip> <maven.javadoc.skip>false</maven.javadoc.skip> <!-- compiler settings properties --> <maven.compiler.source>1.8</maven.compiler.source> <maven.compiler.target>1.8</maven.compiler.target> <commons-lang.version>2.5</commons-lang.version> <scala.binary.version>2.11</scala.binary.version> <spotless.version>2.4.2</spotless.version> <jaxb-api.version>2.3.1</jaxb-api.version> </properties><dependencies> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-scala_2.11</artifactId> <version>1.13.0</version> </dependency> <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-streaming-scala --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-scala_2.11</artifactId> <version>1.13.0</version> </dependency> <dependency> <groupId>com.alibaba.ververica</groupId> <artifactId>flink-connector-postgres-cdc</artifactId> <version>1.4.0</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-java</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-java_${scala.binary.version}</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-clients_2.11</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-planner-blink_2.11</artifactId> <version>${flink.version}</version> <!--<scope>provided</scope></dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-cep_${scala.binary.version}</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-rabbitmq_2.11</artifactId> <version>1.13.0</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-redis_2.11</artifactId> <version>1.1.5</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId> <version>${flink.version}</version> </dependency> <!--log4j日志包 --> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>1.2.75</version> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-beans</artifactId> <version>5.2.8.RELEASE</version> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-shade-plugin</artifactId> <version>2.4.3</version> <executions> <execution> <phase>package</phase> <goals> <goal>shade</goal> </goals> <configuration> <filters> <filter> <artifact>*:*</artifact> <excludes> <exclude>META-INF/*.SF</exclude> <exclude>META-INF/*.DSA</exclude> <exclude>META-INF/*.RSA</exclude> </excludes> </filter> </filters> <transformers> <transformer implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer"> <resource>reference.conf</resource> </transformer> <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"> <mainClass>PgsqlToMysqlTest</mainClass> </transformer> </transformers> <createDependencyReducedPom>false</createDependencyReducedPom> </configuration> </execution> </executions> </plugin> <plugin> <artifactId>maven-compiler-plugin</artifactId> <version>3.5.1</version> <configuration> <source>${maven.compiler.source}</source> <target>${maven.compiler.target}</target> <compilerVersion>${maven.compiler.source}</compilerVersion> <showDeprecation>true</showDeprecation> <showWarnings>true</showWarnings> </configuration> </plugin> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-surefire-plugin</artifactId> <version>2.12.4</version> <configuration> <skipTests>${maven.test.skip}</skipTests> </configuration> </plugin> <plugin> <groupId>org.apache.rat</groupId> <artifactId>apache-rat-plugin</artifactId> <version>0.12</version> <configuration> <excludes> <exclude>.gitignore</exclude> <exclude>.travis.yml</exclude> <exclude>.asf.yaml</exclude> <exclude>README.md</exclude> </excludes> </configuration> </plugin> <plugin> <groupId>org.jacoco</groupId> <artifactId>jacoco-maven-plugin</artifactId> <version>0.8.7</version> <executions> <execution> <id>prepare-agent</id> <goals> <goal>prepare-agent</goal> </goals> </execution> </executions> </plugin> <plugin> <groupId>org.eluder.coveralls</groupId> <artifactId>coveralls-maven-plugin</artifactId> <version>4.3.0</version> <dependencies> <dependency> <groupId>javax.xml.bind</groupId> <artifactId>jaxb-api</artifactId> <version>${jaxb-api.version}</version> </dependency> </dependencies> </plugin> <plugin> <artifactId>maven-checkstyle-plugin</artifactId> <version>2.17</version> <executions> <execution> <id>verify</id> <phase>verify</phase> <configuration> <configLocation>style/rmq_checkstyle.xml</configLocation> <encoding>UTF-8</encoding> <consoleOutput>true</consoleOutput> <failsOnError>true</failsOnError> <includeTestSourceDirectory>false</includeTestSourceDirectory> <includeTestResources>false</includeTestResources> </configuration> <goals> <goal>check</goal> </goals> </execution> </executions> </plugin> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-javadoc-plugin</artifactId> <version>2.10.4</version> <configuration> <aggregate>true</aggregate> <reportOutputDirectory>javadocs</reportOutputDirectory> <locale>en</locale> </configuration> </plugin> <!-- Due to the Flink build setup, "mvn spotless:apply" and "mvn spotless:check" don't work. You have to use the fully qualified name, i.e. "mvn com.diffplug.spotless:spotless-maven-plugin:apply" --> </plugins> </build></project>

再看看代碼怎么寫

import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.runtime.state.filesystem.FsStateBackend; import org.apache.flink.streaming.api.CheckpointingMode; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.CheckpointConfig; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.EnvironmentSettings; import org.apache.flink.table.api.Table; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; import org.apache.flink.types.Row; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class PgsqlToMysqlTest { public static void main(String[] args) throws Exception { // 設置flink表環境變量 EnvironmentSettings fsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build(); // 獲取flink流環境變量 StreamExecutionEnvironment exeEnv = StreamExecutionEnvironment.getExecutionEnvironment(); exeEnv.enableCheckpointing(10000, CheckpointingMode.EXACTLY_ONCE); exeEnv.getCheckpointConfig().setCheckpointTimeout(60000); // make sure 500 ms of progress happen between checkpoints exeEnv.getCheckpointConfig().setMinPauseBetweenCheckpoints(500); // allow only one checkpoint to be in progress at the same time exeEnv.getCheckpointConfig().setMaxConcurrentCheckpoints(1); // enable externalized checkpoints which are retained after job cancellation exeEnv.getCheckpointConfig() .enableExternalizedCheckpoints( CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); FsStateBackend stateBackend = new FsStateBackend("file:\\\\D:\\fsdata"); // // stateBackend.resolveCheckpoint("D:\\fsdata\\dda9ae98c2b864ba8448d2c5eee2e5c3\\chk-6"); // 固定延遲重啟(最多嘗試3次,每次間隔10s) // exeEnv.setRestartStrategy(RestartStrategies.fixedDelayRestart(10, 10000L)); // 失敗率重啟(在10分鐘內最多嘗試3次,每次至少間隔1分鐘) // exeEnv.setRestartStrategy(RestartStrategies.failureRateRestart(3, Time.minutes(10), // Time.minutes(1))); // exeEnv.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 5000)); exeEnv.setStateBackend(stateBackend); // exeEnv.setDefaultSavepointDirectory(); exeEnv.setParallelism(2); // 表執行環境 StreamTableEnvironment tableEnv = StreamTableEnvironment.create(exeEnv, fsSettings); // 拼接souceDLL String sourceDDL = "CREATE TABLE pgsql_source (\n" + " id int,\n" + " name STRING,\n" + " py_code STRING,\n" + " seq_no int,\n" + " description STRING\n" + ") WITH (\n" + " 'connector' = 'postgres-cdc',\n" + " 'hostname' = '192.168.159.100',\n" + " 'port' = '5431',\n" + " 'username' = 'postgres',\n" + " 'password' = '123',\n" + " 'database-name' = 'postgres',\n" + " 'schema-name' = 'public',\n" + " 'debezium.snapshot.mode' = 'initial',\n" + " 'decoding.plugin.name' = 'pgoutput',\n" + " 'debezium.slot.name' = 'pgsql_source_li2',\n" + " 'table-name' = 'test_copy2_copy1'\n" + ")"; // 執行source表ddl tableEnv.executeSql(sourceDDL); String transformSQL = "select * from pgsql_source"; Table tableResult = tableEnv.sqlQuery(transformSQL); DataStream<Tuple2<Boolean, Row>> dataStream = tableEnv.toRetractStream(tableResult, Row.class); dataStream.print(); // StreamGraph graph = new StreamGraph() exeEnv.execute("jobname"); // 執行sink表ddl // 執行邏輯sql語句 // TableResult tableResult = tableEnv.executeSql(transformSQL); // tableEnv.execute(""); // 控制塔輸出 // tableResult.print(); } }

說明兩個配置

debezium.snapshot.mode = 'initial'

initial :默認設置,第一次啟動創建數據庫快照,后面根據記錄偏移量繼續讀

never:從不建立快照,如果本地無偏移量,從最后的log開始讀

always:每次啟動都建立快照

exporter: 功能和inintial相同,不同之處在于其不會對表上鎖,使用SET TRANSACTION ISOLATION LEVEL REPEATABLE READ,可重復讀的隔離級別
實現類io.debezium.connector.postgresql.snapshot.ExportedSnapshotter

public Optional<String> snapshotTableLockingStatement(Duration lockTimeout, Set<TableId> tableIds) { return Optional.empty(); } public String snapshotTransactionIsolationLevelStatement(SlotCreationResult newSlotInfo) { if (newSlotInfo != null) { String snapSet = String.format("SET TRANSACTION SNAPSHOT '%s';", newSlotInfo.snapshotName()); return "SET TRANSACTION ISOLATION LEVEL REPEATABLE READ; \n" + snapSet; } else { return super.snapshotTransactionIsolationLevelStatement(newSlotInfo); } }

custom :用戶自定義 快照,配合debezium.snapshot.custom.class使用

什么是快照?
之前說過wal日志實際上會刪除,因此單純讀wal日志,并不能讀到全數據庫的數據
,因此在第一次啟動flink程序時,需要對數據庫相應表做一個快照,將全表的數據拿到flink處理對應源碼位置io.debezium.connector.postgresql.spi.Snapshotter

default String snapshotTransactionIsolationLevelStatement(SlotCreationResult newSlotInfo) { return "SET TRANSACTION ISOLATION LEVEL SERIALIZABLE, READ ONLY, DEFERRABLE;"; } default Optional<String> snapshotTableLockingStatement(Duration lockTimeout, Set<TableId> tableIds) { String lineSeparator = System.lineSeparator(); StringBuilder statements = new StringBuilder(); statements.append("SET lock_timeout = ").append(lockTimeout.toMillis()).append(";").append(lineSeparator); tableIds.forEach((tableId) -> { statements.append("LOCK TABLE ").append(tableId.toDoubleQuotedString()).append(" IN ACCESS SHARE MODE;").append(lineSeparator); }); return Optional.of(statements.toString()); }

可以看出快照需要鎖表(exporter除外),IN ACCESS SHARE MODE說明鎖表不影響插入讀寫,但是如果有全表更新操作, 會被阻塞。
開啟串行,只讀的事務。Snapshotter子類有這幾種配置的實現,有興趣的可以看看。

debezium.slot.name = 'pgsql_source_li2'

這就是flink-cdc創建的邏輯復制槽。

使用flink-cdc碰到的一些問題

1 能不能保證EXACTLY_ONCE一致性要求

假設在默認snapshot.mode=initial下在第一次啟動程序時,會對數據路進行快照讀,讀取當前全量數據,后面根據記錄的偏移量繼續讀取數據,這樣既不丟失數據,也不重復讀,是保證了EXACTLY_ONCE一致性的。即使flink程序重啟,在啟動的時候指定savePoint Path也可以繼續之前的偏移量,讀取到未接收的數據。
這里分享一個技巧,flink在本地Idea運行沒有提供設置savepPoint的方法。
***org.apache.flink.client.deployment.executors.LocalExecutor#execute***方法中斷點設置

2 在快照時數據一股腦讀進flink,事件時間語義下,數據會不會亂序

如果我們對數據開窗計算,那么亂序可能導致窗口提前關閉導致數據丟失,而在對表做快照時,會將全表數據全部拿到flink處理,就很可能導致亂序數據產生,那么flink-cdc有沒有解決這個問題呢,我們知道waterMark時周期成的(源碼位置org.apache.flink.streaming.runtime.operators.TimestampsAndWatermarksOperator#onProcessingTime),

一種解決思路時,在waterMark還沒生成之前,將全部快照數據處理掉,那么就不會丟失數據。

對于單一slot是單線程處理任務的,如果突然來了一批數據,那么生成waterMark的任務必須等到這批數據全部處理完畢才能繼續。因此在批數據未處理完畢時,盡管批數據亂序,但是不存在窗口關閉,丟失數據問題

如果有多并行度多槽生成watermrk呢?
多并行度情況下,數據被分散到多個槽,并且不再是一次處理一批數據,處理數據和waterMark會一起生成,如果一次讀一批,就可能會有丟數據的風險
因此從讀數據源到設置waterMark設置單并行度1,那么就可以避免數據亂序導致的丟失數據問題

另一種思路也很簡單,在數據庫做快照時,對數據庫進行排序。我們來看看flink-cdc有沒有提供類似的接口。

看下io.debezium.connector.postgresql.spi.Snapshotter#buildSnapshotQuery快照查詢的sql

public Optional<String> buildSnapshotQuery(TableId tableId) { StringBuilder q = new StringBuilder(); q.append("SELECT * FROM "); q.append(tableId.toDoubleQuotedString()); return Optional.of(q.toString()); }

很遺憾,并未提供排序的接口。
但是就沒辦法了嗎?
還記得之前的自定義快照custom嗎。
繼承InitialSnapshotter自定義快照做簡單排序

import io.debezium.connector.postgresql.snapshot.InitialSnapshotter; import io.debezium.relational.TableId; import jdk.nashorn.internal.runtime.options.Option; import java.util.Optional; public class OrderSnapShoter extends InitialSnapshotter { @Override public Optional<String> buildSnapshotQuery(TableId tableId) { Optional<String> sql = super.buildSnapshotQuery(tableId); return Optional.of(sql.get() + "order by id"); } }

配置改一下

'debezium.snapshot.mode' = 'custom''debezium.snapshot.custom.class' = 'xxx.OrderSnapShoter'

總結

以上是生活随笔為你收集整理的整合flink-cdc实现实时读postgrasql的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。

主站蜘蛛池模板: 一区二区国产精品视频 | 香蕉网在线视频 | 国产精品电影院 | 自拍偷拍视频网 | 日韩在线不卡视频 | 精品国产一区二区在线观看 | 99中文字幕 | 久久在线观看 | 美女脱了内裤喂我喝尿视频 | 97日日夜夜 | 国产精品久久久久久久免费 | 9999精品视频 | 女同动漫免费观看高清完整版在线观看 | 91精品91 | 亚洲国产欧美一区二区三区深喉 | 亚洲青涩| 亚洲日本中文字幕在线 | 欧美黄色一区 | 亚洲网站免费 | 蜜桃臀av一区二区三区 | 性xxxx18| 不卡av在线免费观看 | 亚欧精品在线 | 在线能看的av网站 | 国产第一页视频 | 在线视频免费播放 | 久久黄色视| 精品一区二区三 | 麻豆视频免费在线 | 依人综合网 | 你懂的在线播放 | 麻豆影视大全 | 国产在线视频一区二区三区 | 精品午夜一区二区三区在线观看 | 在线cao | 一本一道久久综合狠狠老精东影业 | 国产精品永久在线观看 | 免费av影视 | 国产视频在线观看免费 | 吻胸摸激情床激烈视频 | 国产综合在线观看 | 欧美大片视频在线观看 | 色91精品久久久久久久久 | 国产视频一区二区在线 | 亚洲性视频网站 | 天天摸天天添 | 国产嫩草在线 | 久草午夜| 蜜桃无码一区二区三区 | 粗大的内捧猛烈进出 | 成人一区二区三区在线观看 | 激情都市一区二区 | 精品久久网站 | 黄色一级播放 | 亚洲一区在线免费 | 成人欧美一区二区三区黑人动态图 | 亚洲va久久久噜噜噜久久天堂 | 天天摸天天操 | 97青青草 | 99在线看| 亚洲一线二线在线观看 | 三级黄色短视频 | 人人澡人人射 | 亚洲高清免费观看 | 久久理论 | 刘亦菲毛片 | 成人在线精品 | 成人高潮片免费网站 | 一级片麻豆 | 综合另类| www日韩在线观看 | 91深夜福利 | 欧美午夜性生活 | 亚洲欧美999 | 免费看女生隐私 | av.www| 国产精品国产三级国产专播精品人 | 无遮挡边吃摸边吃奶边做 | 毛片毛片毛片毛片毛片毛片毛片毛片毛片 | 好男人影视www | 国产91色| 欧美一区二区在线观看视频 | 国产老女人精品毛片久久 | 一本一道久久 | 久久精品欧美一区二区三区麻豆 | 亚洲第一成人av | 日本在线免费 | 国产精品综合久久久 | 娇小激情hdxxxx学生 | 色欲久久久天天天综合网精品 | 国产亚洲91| 91蝌蚪视频在线观看 | 亚洲成人激情视频 | brazzers猛女系列 | 在线播放亚洲 | 欧美一区二区在线观看 | 欧美激情视频在线播放 | 特黄网站 | 午夜影院福利社 |