日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

数据源管理 | 基于DataX组件,同步数据和源码分析

發布時間:2025/3/16 编程问答 20 豆豆
生活随笔 收集整理的這篇文章主要介紹了 数据源管理 | 基于DataX组件,同步数据和源码分析 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

本文源碼:GitHub·點這里 || GitEE·點這里

一、DataX工具簡介

1、設計理念

DataX是一個異構數據源離線同步工具,致力于實現包括關系型數據庫(MySQL、Oracle等)、HDFS、Hive、ODPS、HBase、FTP等各種異構數據源之間穩定高效的數據同步功能。解決異構數據源同步問題,DataX將復雜的網狀的同步鏈路變成了星型數據鏈路,DataX作為中間傳輸載體負責連接各種數據源。當需要接入一個新的數據源的時候,只需要將此數據源對接到DataX,便能跟已有的數據源做到無縫數據同步。

絮叨一句:異構數據源指,為了處理不同種類的業務,使用不同的數據庫系統存儲數據。

2、組件結構

DataX本身作為離線數據同步框架,采用Framework+plugin架構構建。將數據源讀取和寫入抽象成為Reader和Writer插件,納入到整個同步框架中。

  • Reader

Reader為數據采集模塊,負責讀取采集數據源的數據,將數據發送給Framework。

  • Writer

Writer為數據寫入模塊,負責不斷向Framework取數據,并將數據寫入到目的端。

  • Framework

Framework用于連接reader和writer,作為兩者的數據傳輸通道,并處理緩沖,流控,并發,數據轉換等核心技術問題。

3、架構設計

  • Job

DataX完成單個數據同步的作業,稱為Job,DataX接受到一個Job之后,將啟動一個進程來完成整個作業同步過程。Job模塊是單個作業的中樞管理節點,承擔了數據清理、子任務切分(將單一作業計算轉化為多個子Task)、TaskGroup管理等功能。

  • Split

DataXJob啟動后,會根據不同的源端切分策略,將Job切分成多個小的Task(子任務),以便于并發執行。Task便是DataX作業的最小單元,每一個Task都會負責一部分數據的同步工作。

  • Scheduler

切分多個Task之后,Job會調用Scheduler模塊,根據配置的并發數據量,將拆分成的Task重新組合,組裝成TaskGroup(任務組)。

  • TaskGroup

每一個TaskGroup負責以一定的并發運行完畢分配好的所有Task,默認單個任務組的并發數量為5。每一個Task都由TaskGroup負責啟動,Task啟動后,會固定啟動Reader—>Channel—>Writer的線程來完成任務同步工作。DataX作業運行起來之后,Job監控并等待多個TaskGroup模塊任務完成,等待所有TaskGroup任務完成后Job成功退出。否則,異常退出,進程退出值非0。

二、環境安裝

推薦Python2.6+,Jdk1.8+(腦補安裝流程)。

1、Python包下載

# yum -y install wget # wget https://www.python.org/ftp/python/2.7.15/Python-2.7.15.tgz # tar -zxvf Python-2.7.15.tgz

2、安裝Python

# yum install gcc openssl-devel bzip2-devel [root@ctvm01 Python-2.7.15]# ./configure --enable-optimizations # make altinstall # python -V

3、DataX安裝

# pwd /opt/module # ll datax # cd /opt/module/datax/bin -- 測試環境是否正確 # python datax.py /opt/module/datax/job/job.json

三、同步任務

1、同步表創建

-- PostgreSQL CREATE TABLE sync_user (id INT NOT NULL,user_name VARCHAR (32) NOT NULL,user_age int4 NOT NULL,CONSTRAINT "sync_user_pkey" PRIMARY KEY ("id") ); CREATE TABLE data_user (id INT NOT NULL,user_name VARCHAR (32) NOT NULL,user_age int4 NOT NULL,CONSTRAINT "sync_user_pkey" PRIMARY KEY ("id") );

2、編寫任務腳本

[root@ctvm01 job]# pwd /opt/module/datax/job [root@ctvm01 job]# vim postgresql_job.json

3、腳本內容

{"job": {"setting": {"speed": {"channel": "3"}},"content": [{"reader": {"name": "postgresqlreader","parameter": {"username": "root01","password": "123456","column": ["id","user_name","user_age"], "connection": [{"jdbcUrl": ["jdbc:postgresql://192.168.72.131:5432/db_01"], "table": ["data_user"]}]}}, "writer": {"name": "postgresqlwriter", "parameter": {"username": "root01","password": "123456","column": ["id","user_name","user_age"], "connection": [{"jdbcUrl": "jdbc:postgresql://192.168.72.131:5432/db_01", "table": ["sync_user"]}], "postSql": [], "preSql": []}}}]} }

4、執行腳本

# /opt/module/datax/bin/datax.py /opt/module/datax/job/postgresql_job.json

5、執行日志

2020-04-23 18:25:33.404 [job-0] INFO JobContainer - 任務啟動時刻 : 2020-04-23 18:25:22 任務結束時刻 : 2020-04-23 18:25:33 任務總計耗時 : 10s 任務平均流量 : 1B/s 記錄寫入速度 : 0rec/s 讀出記錄總數 : 2 讀寫失敗總數 : 0

四、源碼流程分析

注意:這里源碼只貼出核心流程,如果要看完整源碼,可以自行從Git上下載。

1、讀取數據

核心入口:PostgresqlReader

啟動讀任務

public static class Task extends Reader.Task {@Overridepublic void startRead(RecordSender recordSender) {int fetchSize = this.readerSliceConfig.getInt(com.alibaba.datax.plugin.rdbms.reader.Constant.FETCH_SIZE);this.commonRdbmsReaderSlave.startRead(this.readerSliceConfig, recordSender,super.getTaskPluginCollector(), fetchSize);} }

讀取任務啟動之后,執行讀取數據操作。

核心類:CommonRdbmsReader

public void startRead(Configuration readerSliceConfig,RecordSender recordSender,TaskPluginCollector taskPluginCollector, int fetchSize) {ResultSet rs = null;try {// 數據讀取rs = DBUtil.query(conn, querySql, fetchSize);queryPerfRecord.end();ResultSetMetaData metaData = rs.getMetaData();columnNumber = metaData.getColumnCount();PerfRecord allResultPerfRecord = new PerfRecord(taskGroupId, taskId, PerfRecord.PHASE.RESULT_NEXT_ALL);allResultPerfRecord.start();long rsNextUsedTime = 0;long lastTime = System.nanoTime();// 數據傳輸至交換區while (rs.next()) {rsNextUsedTime += (System.nanoTime() - lastTime);this.transportOneRecord(recordSender, rs,metaData, columnNumber, mandatoryEncoding, taskPluginCollector);lastTime = System.nanoTime();}allResultPerfRecord.end(rsNextUsedTime);}catch (Exception e) {throw RdbmsException.asQueryException(this.dataBaseType, e, querySql, table, username);} finally {DBUtil.closeDBResources(null, conn);} }

2、數據傳輸

核心接口:RecordSender(發送)

public interface RecordSender {public Record createRecord();public void sendToWriter(Record record);public void flush();public void terminate();public void shutdown(); }

核心接口:RecordReceiver(接收)

public interface RecordReceiver {public Record getFromReader();public void shutdown(); }

核心類:BufferedRecordExchanger

class BufferedRecordExchanger implements RecordSender, RecordReceiver

3、寫入數據

核心入口:PostgresqlWriter

啟動寫任務

public static class Task extends Writer.Task {public void startWrite(RecordReceiver recordReceiver) {this.commonRdbmsWriterSlave.startWrite(recordReceiver, this.writerSliceConfig, super.getTaskPluginCollector());} }

寫數據任務啟動之后,執行數據寫入操作。

核心類:CommonRdbmsWriter

public void startWriteWithConnection(RecordReceiver recordReceiver,Connection connection) {// 寫數據庫的SQL語句calcWriteRecordSql();List<Record> writeBuffer = new ArrayList<>(this.batchSize);int bufferBytes = 0;try {Record record;while ((record = recordReceiver.getFromReader()) != null) {writeBuffer.add(record);bufferBytes += record.getMemorySize();if (writeBuffer.size() >= batchSize || bufferBytes >= batchByteSize) {doBatchInsert(connection, writeBuffer);writeBuffer.clear();bufferBytes = 0;}}if (!writeBuffer.isEmpty()) {doBatchInsert(connection, writeBuffer);writeBuffer.clear();bufferBytes = 0;}} catch (Exception e) {throw DataXException.asDataXException(DBUtilErrorCode.WRITE_DATA_ERROR, e);} finally {writeBuffer.clear();bufferBytes = 0;DBUtil.closeDBResources(null, null, connection);} }

五、源代碼地址

GitHub·地址 https://github.com/cicadasmile/data-manage-parent GitEE·地址 https://gitee.com/cicadasmile/data-manage-parent

推薦系列閱讀

序號標題
A01數據源管理:主從庫動態路由,AOP模式讀寫分離
A02數據源管理:基于JDBC模式,適配和管理動態數據源
A03數據源管理:動態權限校驗,表結構和數據遷移流程
A04數據源管理:關系型分庫分表,列式庫分布式計算
A05數據源管理:PostGreSQL環境整合,JSON類型應用
C01架構基礎:單服務.集群.分布式,基本區別和聯系
C02架構設計:分布式業務系統中,全局ID生成策略

總結

以上是生活随笔為你收集整理的数据源管理 | 基于DataX组件,同步数据和源码分析的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。