日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

基于Tablestore Tunnel的数据复制实战

發布時間:2024/8/23 编程问答 45 豆豆
生活随笔 收集整理的這篇文章主要介紹了 基于Tablestore Tunnel的数据复制实战 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

前言

數據復制主要指通過互聯的網絡在多臺機器上保存相同數據的副本,通過數據復制方案,人們通常希望達到以下目的:1)使數據在地理位置上更接近用戶,進而降低訪問延遲;2)當部分組件出現故障時,系統依舊可以繼續工作,提高可用性;3)擴展至多臺機器以同時提供數據訪問服務,從而提升讀吞吐量。
如果復制的數據一成不變,那么數據復制就非常容易,只需要將數據復制到每個節點,一次性即可搞定,面對持續更改的數據如何正確而有效的完成數據復制是一個不小的挑戰。

使用DataX進行Tablestore數據復制

表格存儲(Tablestore)是阿里云自研的NoSQL多模型數據庫,提供海量結構化數據存儲以及快速的查詢和分析服務,表格存儲的分布式存儲和強大的索引引擎能夠提供PB級存儲、千萬TPS以及毫秒級延遲的服務能力。DataX是阿里巴巴集團內被廣泛使用的離線數據同步工具,DataX本身作為數據同步框架,將不同數據源的同步抽象為從源頭數據源讀取數據的Reader插件,以及向目標端寫入數據的Writer插件。
通過使用DataX可以完成Tablestore表的數據復制,如下圖所示,otsreader插件實現了從Tablestore讀取數據,并可以通過用戶指定抽取數據范圍可方便的實現數據增量抽取的需求,otsstreamreader插件實現了Tablestore的增量數據導出,而otswriter插件則實現了向Tablestore中寫入數據。通過在DataX中配置Tablestore相關的Reader和Writer插件,即可以完成Tablestore的表數據復制。

使用通道服務進行Tablestore數據復制

通道服務(Tunnel Service)是基于表格存儲數據接口之上的全增量一體化服務。通道服務為您提供了增量、全量、增量加全量三種類型的分布式數據實時消費通道。通過為數據表建立數據通道,可以簡單地實現對表中歷史存量和新增數據的消費處理。

借助于全增量一體的通道服務,我們可以輕松構建高效、彈性的數據復制解決方案。本文將逐步介紹如何結合通道服務進行Tablestore的數據復制,完整代碼開源在github上的?tablestore-examples中。本次的實戰將基于通道服務的Java SDK來完成,推薦先閱讀下通道服務的相關文檔,包括快速開始等。

1. 配置抽取

配置抽取其實對應的是數據同步所具備的功能,在本次實戰中,我們將完成指定時間點之前的表數據同步,指定的時間點可以是現在或者未來的某個時刻。具體的配置如下所示,ots-reader中記錄的是源表的相關配置,ots-writer中記錄的是目的表的相關配置。

{"ots-reader": {"endpoint": "https://zhuoran-high.cn-hangzhou.ots.aliyuncs.com","instanceName": "zhuoran-high","tableName": "testSrcTable","accessId": "","accessKey": "","tunnelName": "testTunnel","endTime": "2019-06-19 17:00:00"},"ots-writer": {"endpoint": "https://zhuoran-search.cn-hangzhou.ots.aliyuncs.com","instanceName": "zhuoran-search","tableName": "testDstTable","accessId": "","accessKey": "","batchWriteCount": 100} }

ots-reader中各參數的說明如下:

  • endpoint: Tablestore服務的Endpoint地址,例如https://zhuoran-high.cn-hangzhou.ots.aliyuncs.com。在進行數據復制前,請檢查下連通性(可以使用curl命令)。
  • instanceName: Tablestore的實例名。
  • tableName: Tablestore的表名。
  • accessId: 訪問Tablestore的云賬號accessId。
  • accessKey: 訪問Tablestore的云賬號accessKey。
  • tunnelName: Tablestore的通道名,配置
  • endTime: 數據同步的截止時間點,對應到Java里SimpleFormat的格式為:yyyy-MM-dd HH:mm:ss?。

ots-writer中各參數的說明如下(略去相同的參數):

  • batchWriteCount: Tablestore單次批量寫入的條數,最大值為200。

注:未來會開放更多的功能配置,比如指定時間范圍的數據復制等。

2. 編寫主邏輯

數據復制的主邏輯主要分為以下4步,在第一次運行時,會完整的進行所有步驟,而在程序重啟或者斷點續傳場景時,只需要進行第3步和第4步。

  • 創建復制目的表
    通過使用DesribeTable接口,我們可以獲取到源表的Schema,借此可以創建出目的表,值得注意的是需要把目的表的有效版本偏差設成一個足夠大的值(默認為86400秒),因為服務端在處理寫請求時會對屬性列的版本號進行檢查,寫入的版本號需要在一個范圍內才能寫入成功,對于源表中的歷史存量數據而言,時間戳往往是比較小的,會被服務端過濾掉,最終導致同步數據的丟失。
  • sourceClient = new SyncClient(config.getReadConf().getEndpoint(), config.getReadConf().getAccessId(),config.getReadConf().getAccessKey(), config.getReadConf().getInstanceName()); destClient = new SyncClient(config.getWriteConf().getEndpoint(), config.getWriteConf().getAccessId(),config.getWriteConf().getAccessKey(), config.getWriteConf().getInstanceName()); if (destClient.listTable().getTableNames().contains(config.getWriteConf().getTableName())) {System.out.println("Table is already exist: " + config.getWriteConf().getTableName()); } else {DescribeTableResponse describeTableResponse = sourceClient.describeTable(new DescribeTableRequest(config.getReadConf().getTableName()));describeTableResponse.getTableMeta().setTableName(config.getWriteConf().getTableName());describeTableResponse.getTableOptions().setMaxTimeDeviation(Long.MAX_VALUE / 1000000);CreateTableRequest createTableRequest = new CreateTableRequest(describeTableResponse.getTableMeta(),describeTableResponse.getTableOptions(),new ReservedThroughput(describeTableResponse.getReservedThroughputDetails().getCapacityUnit()));destClient.createTable(createTableRequest);System.out.println("Create table success: " + config.getWriteConf().getTableName()); }
  • 在源表上創建通道
    使用通道服務的CreateTunnel接口可以創建通道,此處我們創建全量加增量類型(TunnelType.BaseAndStream)類型的通道。
  • sourceTunnelClient = new TunnelClient(config.getReadConf().getEndpoint(), config.getReadConf().getAccessId(),config.getReadConf().getAccessKey(), config.getReadConf().getInstanceName()); List<TunnelInfo> tunnelInfos = sourceTunnelClient.listTunnel(new ListTunnelRequest(config.getReadConf().getTableName())).getTunnelInfos(); String tunnelId = null; TunnelInfo tunnelInfo = getTunnelInfo(config.getReadConf().getTunnelName(), tunnelInfos); if (tunnelInfo != null) {tunnelId = tunnelInfo.getTunnelId();System.out.println(String.format("Tunnel is already exist, TunnelName: %s, TunnelId: %s",config.getReadConf().getTunnelName(), tunnelId)); } else {CreateTunnelResponse createTunnelResponse = sourceTunnelClient.createTunnel(new CreateTunnelRequest(config.getReadConf().getTableName(),config.getReadConf().getTunnelName(), TunnelType.BaseAndStream));System.out.println("Create tunnel success: " + createTunnelResponse.getTunnelId()); }
  • 啟動定時任務來監測備份進度
    備份進度的監測可以通過DesribeTunnel接口來完成,DescribeTunnel接口可以獲取到最新消費到的時間點,通過和配置里的備份結束時間對比,我們可以獲取到當前同步的進度。在到達結束時間后,即可退出備份程序。
  • backgroundExecutor = Executors.newScheduledThreadPool(2, new ThreadFactory() {private final AtomicInteger counter = new AtomicInteger(0);@Overridepublic Thread newThread(Runnable r) {return new Thread(r, "background-checker-" + counter.getAndIncrement());} }); backgroundExecutor.scheduleAtFixedRate(new Runnable() {@Overridepublic void run() {DescribeTunnelResponse resp = sourceTunnelClient.describeTunnel(new DescribeTunnelRequest(config.getReadConf().getTableName(), config.getReadConf().getTunnelName()));// 已同步完成if (resp.getTunnelConsumePoint().getTime() > config.getReadConf().getEndTime()) {System.out.println("Table copy finished, program exit!");// 退出備份程序shutdown();}} }, 0, 2, TimeUnit.SECONDS);
  • 啟動數據復制
    啟動通道服務的自動化消費框架,開始自動化的數據同步,其中OtsReaderProcessor中完成的是源表數據的解析和目的表的寫入,處理邏輯將會在后文中介紹。
  • if (tunnelId != null) {sourceWorkerConfig = new TunnelWorkerConfig(new OtsReaderProcessor(config.getReadConf(), config.getWriteConf(), destClient));sourceWorkerConfig.setHeartbeatIntervalInSec(15);sourceWorker = new TunnelWorker(tunnelId, sourceTunnelClient, sourceWorkerConfig);sourceWorker.connectAndWorking(); }

    3. 數據同步邏輯(OtsReaderProcessor)

    使用通道服務,我們需要編寫數據的Process邏輯和Shutdown邏輯,數據同步中的核心在于解析數據并將其寫入到目的表中,處理數據的完整代碼如下所示,主要邏輯還是比較清晰的,首先會檢查數據的時間戳是否在合理的時間范圍內,然后將StreamRecord轉化為BatchWrite里對應的行,最后將數據串行寫入到目的表中。

    public void process(ProcessRecordsInput input) {System.out.println(String.format("Begin process %d records.", input.getRecords().size()));BatchWriteRowRequest batchWriteRowRequest = new BatchWriteRowRequest();int count = 0;for (StreamRecord record : input.getRecords()) {if (record.getSequenceInfo().getTimestamp() / 1000 > readConf.getEndTime()) {System.out.println(String.format("skip record timestamp %d larger than endTime %d",record.getSequenceInfo().getTimestamp() / 1000, readConf.getEndTime()));continue;}count++;switch (record.getRecordType()) {case PUT:RowPutChange putChange = new RowPutChange(writeConf.getTableName(), record.getPrimaryKey());putChange.addColumns(getColumns(record));batchWriteRowRequest.addRowChange(putChange);break;case UPDATE:RowUpdateChange updateChange = new RowUpdateChange(writeConf.getTableName(),record.getPrimaryKey());for (RecordColumn column : record.getColumns()) {switch (column.getColumnType()) {case PUT:updateChange.put(column.getColumn());break;case DELETE_ONE_VERSION:updateChange.deleteColumn(column.getColumn().getName(),column.getColumn().getTimestamp());break;case DELETE_ALL_VERSION:updateChange.deleteColumns(column.getColumn().getName());break;default:break;}}batchWriteRowRequest.addRowChange(updateChange);break;case DELETE:RowDeleteChange deleteChange = new RowDeleteChange(writeConf.getTableName(),record.getPrimaryKey());batchWriteRowRequest.addRowChange(deleteChange);break;default:break;}if (count == writeConf.getBatchWriteCount()) {System.out.println("BatchWriteRow: " + count);writeClient.batchWriteRow(batchWriteRowRequest);batchWriteRowRequest = new BatchWriteRowRequest();count = 0;}}// 寫最后一次的數據。if (!batchWriteRowRequest.isEmpty()) {System.out.println("BatchWriteRow: " + count);writeClient.batchWriteRow(batchWriteRowRequest);} }

    4. 技術注解

  • 如何保障備份性能?
    備份過程分為全量(存量)和增量階段,對于全量階段,通道服務會自動將全表的數據在邏輯上劃分成接近指定大小的若干分片,全量階段的數據同步的整體并行度和分片數相關,能夠有效的保障吞吐量。而對于增量階段,為了保障數據的有序性,單分區內的數據我們需要串行處理數據,增量階段的性能和分區數成正比關系(增量同步性能白皮書),如果需要提速(增加分區)可以聯系表格存儲技術支持。
  • 如何做到數據同步的水平擴展?
    運行多個TunnelWorker(客戶端)對同一個Tunnel進行消費時(TunnelId相同), 在TunnelWorker執行Heartbeat時,通道服務端會自動的對Channel(分區)資源進行重分配,讓活躍的Channel盡可能的均攤到每一個TunnelWorker上,達到資源負載均衡的目的。同時,在水平擴展性方面,用戶可以很容易的通過增加TunnelWorker的數量來完成,TunnelWorker可以在同一個機器或者不同機器上。更多的原理可以參見數據消費框架原理介紹。
  • 如何做到數據的最終一致性?
    數據的一致性建立在通道服務的保序協議基礎上,通過全量和增量數據同步的冪等性可以保障備份數據的最終一致。
  • 如何完成斷點續傳功能?
    通道服務的客戶端會定期將已同步(消費)完成的數據的時間位點定期發送到服務端進行持久化,在發生Failover或者重啟程序后,下一次的數據消費會從記錄的checkpoint開始數據處理,不會造成數據的丟失。
  • 未來展望

    在本次的實戰中,我們結合通道服務完成一個簡潔而有效的數據復制方案,實現了指定時間點的表數據復制。借助于本次的實戰樣例代碼,用戶僅需要配置源表和目的表的相關參數,即可以高效的完成的表數據的復制和數據的遷移。
    在未來的演進中,通道服務還將支持創建指定時間段的通道,這樣可以更加靈活的制定數據備份的計劃,也可以完成持續備份和按時間點恢復等更加豐富的功能。


    原文鏈接
    本文為云棲社區原創內容,未經允許不得轉載。

    總結

    以上是生活随笔為你收集整理的基于Tablestore Tunnel的数据复制实战的全部內容,希望文章能夠幫你解決所遇到的問題。

    如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。

    主站蜘蛛池模板: 91免费观看网站 | 女av在线| 五月婷婷激情视频 | 影音先锋成人资源网 | 亚洲综合av一区 | 亚洲影视中文字幕 | 日日操日日操 | 欧美高清性xxxxhd | 久久人妻少妇嫩草av | 奇米影视777在线观看 | 精品人妻互换一区二区三区 | 亚洲精品7777 | 香蕉在线视频观看 | 男生操女生动漫 | 九一精品在线 | 三级少妇 | 女教师痴汉调教hd中字 | 亚洲精品动漫在线观看 | 一区二区三区在线视频播放 | 男女精品视频 | 欧美日韩激情在线一区二区三区 | 特黄1级潘金莲 | 一级片大全 | 欧美精品久久久久久久自慰 | 免费av影片 | 一二区在线视频 | 99热最新在线 | 麻豆精品视频免费观看 | 丝袜五月天 | 日韩精品极品视频 | 五月婷婷开心中文字幕 | 久久一区二区三区视频 | 天天草视频 | 亚洲天堂欧美 | 中文字幕+乱码+中文字幕一区 | 国产第一页av | 国产一区二区三区亚洲 | 亚洲午夜天堂 | 国产二级片 | 成人一区二区视频 | 久久久无码人妻精品无码 | 18深夜在线观看免费视频 | 国产玖玖 | 五月天婷婷在线观看 | 精品网站999www | 欧美精品电影一区二区 | 深夜视频在线播放 | 日韩黄色一区 | 一级全黄男女免费大片 | 日本一区二区三区在线看 | 国产亚洲精品久久久久久777 | 午夜影院黄色 | 欧洲熟妇的性久久久久久 | 日本欧美另类 | 国产成人综合自拍 | 别揉我奶头一区二区三区 | 亚洲福利网 | 美女黄视频网站 | 亚洲精品在线观看av | 1024福利| 91在线国产观看 | 蜜臀国产AV天堂久久无码蜜臀 | 杂技xxx裸体xxxx欧美 | 永久视频在线 | 日韩一级黄色录像 | 人人搞人人爱 | 四虎影视www在线播放 | 欧美高清视频一区 | 亚洲精品字幕 | 大地资源中文在线观看免费版 | 日韩中文字幕电影 | 亚洲一区二区三区不卡视频 | 日韩免费久久 | 性生活视频软件 | 欧美一区二区视频在线观看 | 在线视频 亚洲 | 久久久精品国产sm调教 | 日本a在线免费观看 | 欧美人与禽zoz0性3d | 欧美精品123区 | 日本成人在线免费视频 | 日韩二区三区 | 人善交videos欧美3d动漫 | 高h在线观看 | 成人免费在线视频网站 | 最新中文字幕视频 | va在线播放 | 夜夜爽天天干 | а√中文在线资源库 | 中文字幕亚洲色图 | 国产对白在线 | 少妇黄色片 | 少妇紧身牛仔裤裤啪啪 | 狠狠狠狠狠干 | 色老头一区二区三区 | 国产黄色大片 | 国产乱人伦精品一区二区 | 干干干日日日 | a天堂中文在线 |