日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

FlinkX脏值处理

發布時間:2023/12/20 编程问答 38 豆豆
生活随笔 收集整理的這篇文章主要介紹了 FlinkX脏值处理 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

FlinkX臟值處理

在大量數據的傳輸過程中,必定會由于各種原因導致很多數據傳輸報錯(比如類型轉換錯誤),這種數據DataX認為就是臟數據。

? – by DataX

配置實例

"dirty": {"path": "/tmp","hadoopConfig": {"fs.default.name": "hdfs://flinkhadoop:8020","dfs.nameservices": "ns1","dfs.ha.namenodes.ns1": "flinkhadoop","dfs.namenode.rpc-address.ns1.nn1": "hdfs://flinkhadoop:8020","dfs.ha.automatic-failover.enabled": "true","dfs.client.failover.proxy.provider.ns1": "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider","fs.hdfs.impl.disable.cache": "true"}}

我這里的hdfs是一個單機。

實現邏輯

FlinkX的臟值處理邏輯是放在寫入數據過程中的。按照臟值的定義,在讀取過程中,能讀取進來的都是正常的值,但是在寫入過程中,以目標源的標準,可能讀取的值是存在瑕疵的,所以是臟值。

在臟值處理的過程中,臟值處理器的作用肯定是首當其沖,我們先看一下DirtyManager的定義和初始化過程:

DirtyDataManager

全局視角:

創建

public DirtyDataManager(String path, Map<String, Object> configMap, String[] fieldNames, String jobId) {this.fieldNames = fieldNames;location = path + "/" + UUID.randomUUID() + ".txt";this.config = configMap;this.jobId = jobId;}
  • 可以看出來 location 是根據我們配置的路徑+一個uuid生成的txt,其實這樣在查看起來的時候不是很方便。
  • 初始化

    public void open() {try {FileSystem fs = FileSystemUtil.getFileSystem(config, null);Path path = new Path(location);stream = fs.create(path, true);} catch (Exception e) {throw new RuntimeException("Open dirty manager error", e);}}// -------------------- FileSystem ---------------------------public static FileSystem getFileSystem(Map<String, Object> hadoopConfigMap, String defaultFs) throws Exception {if(isOpenKerberos(hadoopConfigMap)){return getFsWithKerberos(hadoopConfigMap, defaultFs);}Configuration conf = getConfiguration(hadoopConfigMap, defaultFs);setHadoopUserName(conf);return FileSystem.get(getConfiguration(hadoopConfigMap, defaultFs));}

    flinkX的臟值是存放在hadoop上面的。

    臟值寫入

    public String writeData(Row row, WriteRecordException ex) {String content = RowUtil.rowToJson(row, fieldNames);String errorType = retrieveCategory(ex);String line = StringUtils.join(new String[]{content,errorType, gson.toJson(ex.toString()), DateUtil.timestampToString(new Date()) }, FIELD_DELIMITER);try {// stream.write(line.getBytes(StandardCharsets.UTF_8));stream.write(LINE_DELIMITER.getBytes(StandardCharsets.UTF_8));DFSOutputStream dfsOutputStream = (DFSOutputStream) stream.getWrappedStream();dfsOutputStream.hsync(syncFlags);return errorType;} catch (IOException e) {throw new RuntimeException(e);}}private String retrieveCategory(WriteRecordException ex) {Throwable cause = ex.getCause();if(cause instanceof NullPointerException) {return ERR_NULL_POINTER;}for(String keyword : PRIMARY_CONFLICT_KEYWORDS) {if(cause.toString().toLowerCase().contains(keyword)) {return ERR_PRIMARY_CONFLICT;}}return ERR_FORMAT_TRANSFORM;}
  • 獲取臟值數據內容
  • 獲取臟值類型: NPE、主鍵重復、其它錯誤(統稱為轉換錯誤)
  • 將數據內容和錯誤原因進行拼接,分割符為 \u0001
  • 將拼接后的數據以utf-8編碼以及換行符\n寫入到hdfs中
  • 通過hsync刷入,根據UPDATE_LENGTH策略刷入
  • hsync的語義是:client端所有的數據都發送到副本的每個datanode上,并且datanode上的每個副本都完成了posix中fsync的調用,也就是說操作系統已經把數據刷到磁盤上(當然磁盤也可能緩沖數據);需要注意的是當調用fsync時只有當前的block會刷到磁盤中,要想每個block都刷到磁盤,必須在創建流時傳入Sync標示。

    UPDATE_LENGTH: 同步到DataNodes時,還更新NameNode中的元數據(塊長度)。

    臟值寫入時機

    在寫入每一行數據writeSingleRecord的時候,進行臟值的捕獲

    protected void writeSingleRecord(Row row) {if(errorLimiter != null) {errorLimiter.acquire();}try {writeSingleRecordInternal(row);if(!restoreConfig.isRestore() || isStreamButNoWriteCheckpoint()){numWriteCounter.add(1);snapshotWriteCounter.add(1);}} catch(WriteRecordException e) {// 寫入錯誤限流器saveErrorData(row, e);// 更新指標以及持久化存儲臟值updateStatisticsOfDirtyData(row, e);// 總記錄數加1numWriteCounter.add(1);snapshotWriteCounter.add(1);if(dirtyDataManager == null && errCounter.getLocalValue() % LOG_PRINT_INTERNAL == 0){LOG.error(e.getMessage());}if(DtLogger.isEnableTrace()){LOG.trace("write error row, row = {}, e = {}", row.toString(), ExceptionUtil.getErrorMessage(e));}}} private void updateStatisticsOfDirtyData(Row row, WriteRecordException e){if(dirtyDataManager != null) {String errorType = dirtyDataManager.writeData(row, e);if (ERR_NULL_POINTER.equals(errorType)){nullErrCounter.add(1);} else if(ERR_FORMAT_TRANSFORM.equals(errorType)){conversionErrCounter.add(1);} else if(ERR_PRIMARY_CONFLICT.equals(errorType)){duplicateErrCounter.add(1);} else {otherErrCounter.add(1);}}}

    這代碼邏輯確實和我的邏輯稍微有些區別,為什么會在這里進行存儲。。。。

    應該邏輯應該分離的。將dirtyDataManager.writeData(row, e)放在上一個saveErrorData方法中可能更合適。

    參考 https://github.com/DTStack/flinkx/issues/220

    臟值實例測試

    臟值文件實例

    根據dirty配置,初始化hadoop的連接,并創建對應文件,如我們這里配置的path是:/tmp/flinkx/bond_info_mongodb_to_mysql,如我們配置的是4個處理器。在對應的hdfs上面,有四個文件:

    感覺官方需要對這個作業存儲位置進行一些處理:

    臟值模擬

    我們模擬將mysql對應的表的string=>bigint,這樣肯定會在轉換中發生錯誤。

    {"bond_name":"xxxx","bond_stop_time":"xxx","bond_time_limit":"xx","bond_type":"xxx","plan_issued_quantity":"xx","publish_expire_time":"xxx","publish_time":"xx","publisher_name":"xxx","real_issued_quantity":"14","start_cal_interest_time":"xx","inst_code":"x":"x","city_code":"x","area_code":"x","input_date":x,"input_time":x}conversion"com.dtstack.flinkx.exception.WriteRecordException: Incorrect integer value: 'xxxx' for column 'bond_type' at row 1\njava.sql.SQLException: Incorrect integer value: 'xxxx' for column 'bond_type' at row 1"2020-05-24 17:33:15

    可以看出來是類型轉換錯誤,它會把錯誤數據和錯誤原因都進行存儲,并且根據u0001進行分割。

    總結

    本文對臟值的定義,以及FlinkX的處理進行詳細的分解,并進行了相關的測試,與實例展示。從本文中可以了解到hdfs的hsync與hdfs的基本配置。

    總結

    以上是生活随笔為你收集整理的FlinkX脏值处理的全部內容,希望文章能夠幫你解決所遇到的問題。

    如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。