日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當(dāng)前位置: 首頁 > 运维知识 > 数据库 >内容正文

数据库

基于mysql数据库binlog的增量订阅消费

發(fā)布時間:2024/2/28 数据库 28 豆豆
生活随笔 收集整理的這篇文章主要介紹了 基于mysql数据库binlog的增量订阅消费 小編覺得挺不錯的,現(xiàn)在分享給大家,幫大家做個參考.

背景

? ?早期,阿里巴巴B2B公司因為存在杭州和美國雙機房部署,存在跨機房同步的業(yè)務(wù)需求。不過早期的數(shù)據(jù)庫同步業(yè)務(wù),主要是基于trigger的方式獲取增量變更,不過從2010年開始,阿里系公司開始逐步的嘗試基于數(shù)據(jù)庫的日志解析,獲取增量變更進行同步,由此衍生出了增量訂閱&消費的業(yè)務(wù),從此開啟了一段新紀元。ps. 目前內(nèi)部使用的同步,已經(jīng)支持mysql5.x和oracle部分版本的日志解析

?

基于日志增量訂閱&消費支持的業(yè)務(wù):

  • 數(shù)據(jù)庫鏡像
  • 數(shù)據(jù)庫實時備份
  • 多級索引 (賣家和買家各自分庫索引)
  • search build
  • 業(yè)務(wù)cache刷新
  • 價格變化等重要業(yè)務(wù)消息
  • 項目介紹

    ???名稱:canal [k?'n?l]

    ? ?譯意: 水道/管道/溝渠?

    ? ?語言: 純java開發(fā)

    ? ?定位: 基于數(shù)據(jù)庫增量日志解析,提供增量數(shù)據(jù)訂閱&消費,目前主要支持了mysql

    ?

    工作原理

    mysql主備復(fù)制實現(xiàn)


    ?從上層來看,復(fù)制分成三步:

  • master將改變記錄到二進制日志(binary log)中(這些記錄叫做二進制日志事件,binary log events,可以通過show binlog events進行查看);
  • slave將master的binary log events拷貝到它的中繼日志(relay log);
  • slave重做中繼日志中的事件,將改變反映它自己的數(shù)據(jù)。
  • canal的工作原理:

    原理相對比較簡單:

  • canal模擬mysql slave的交互協(xié)議,偽裝自己為mysql slave,向mysql master發(fā)送dump協(xié)議
  • mysql master收到dump請求,開始推送binary log給slave(也就是canal)
  • canal解析binary log對象(原始為byte流)
  • 架構(gòu)

    說明:

    • server代表一個canal運行實例,對應(yīng)于一個jvm
    • instance對應(yīng)于一個數(shù)據(jù)隊列 ?(1個server對應(yīng)1..n個instance)

    instance模塊:

    • eventParser (數(shù)據(jù)源接入,模擬slave協(xié)議和master進行交互,協(xié)議解析)
    • eventSink (Parser和Store鏈接器,進行數(shù)據(jù)過濾,加工,分發(fā)的工作)
    • eventStore (數(shù)據(jù)存儲)
    • metaManager (增量訂閱&消費信息管理器)

    知識科普

    mysql的Binlay Log介紹

    • http://dev.mysql.com/doc/refman/5.5/en/binary-log.html
    • http://www.taobaodba.com/html/474_mysqls-binary-log_details.html

    簡單點說:

    • mysql的binlog是多文件存儲,定位一個LogEvent需要通過binlog filename + ?binlog position,進行定位
    • mysql的binlog數(shù)據(jù)格式,按照生成的方式,主要分為:statement-based、row-based、mixed。
      Java代碼??
    • mysql>?show?variables?like?'binlog_format';??
    • +---------------+-------+??
    • |?Variable_name?|?Value?|??
    • +---------------+-------+??
    • |?binlog_format?|?ROW???|??
    • +---------------+-------+??
    • 1?row?in?set?(0.00?sec)??

    目前canal只能支持row模式的增量訂閱(statement只有sql,沒有數(shù)據(jù),所以無法獲取原始的變更日志)

    ?

    ?

    EventParser設(shè)計

    大致過程:

    整個parser過程大致可分為幾步:

  • Connection獲取上一次解析成功的位置 ?(如果第一次啟動,則獲取初始指定的位置或者是當(dāng)前數(shù)據(jù)庫的binlog位點)
  • Connection建立鏈接,發(fā)送BINLOG_DUMP指令
    ?// 0. write command number
    ?// 1. write 4 bytes bin-log position to start at
    ?// 2. write 2 bytes bin-log flags
    ?// 3. write 4 bytes server id of the slave
    ?// 4. write bin-log file name
  • Mysql開始推送Binaly Log
  • 接收到的Binaly Log的通過Binlog parser進行協(xié)議解析,補充一些特定信息
    // 補充字段名字,字段類型,主鍵信息,unsigned類型處理
  • 傳遞給EventSink模塊進行數(shù)據(jù)存儲,是一個阻塞操作,直到存儲成功
  • 存儲成功后,定時記錄Binaly Log位置
  • mysql的Binlay Log網(wǎng)絡(luò)協(xié)議:

    ?

    說明:

    • 圖中的協(xié)議4byte header,主要是描述整個binlog網(wǎng)絡(luò)包的length
    • binlog event structure,詳細信息請參考:?http://dev.mysql.com/doc/internals/en/binary-log.html

    EventSink設(shè)計

    說明:

    • 數(shù)據(jù)過濾:支持通配符的過濾模式,表名,字段內(nèi)容等
    • 數(shù)據(jù)路由/分發(fā):解決1:n (1個parser對應(yīng)多個store的模式)
    • 數(shù)據(jù)歸并:解決n:1 (多個parser對應(yīng)1個store)
    • 數(shù)據(jù)加工:在進入store之前進行額外的處理,比如join

    數(shù)據(jù)1:n業(yè)務(wù)

    ? 為了合理的利用數(shù)據(jù)庫資源, 一般常見的業(yè)務(wù)都是按照schema進行隔離,然后在mysql上層或者dao這一層面上,進行一個數(shù)據(jù)源路由,屏蔽數(shù)據(jù)庫物理位置對開發(fā)的影響,阿里系主要是通過cobar/tddl來解決數(shù)據(jù)源路由問題。

    ? 所以,一般一個數(shù)據(jù)庫實例上,會部署多個schema,每個schema會有由1個或者多個業(yè)務(wù)方關(guān)注

    ?

    數(shù)據(jù)n:1業(yè)務(wù)

    ? 同樣,當(dāng)一個業(yè)務(wù)的數(shù)據(jù)規(guī)模達到一定的量級后,必然會涉及到水平拆分和垂直拆分的問題,針對這些拆分的數(shù)據(jù)需要處理時,就需要鏈接多個store進行處理,消費的位點就會變成多份,而且數(shù)據(jù)消費的進度無法得到盡可能有序的保證。

    ? 所以,在一定業(yè)務(wù)場景下,需要將拆分后的增量數(shù)據(jù)進行歸并處理,比如按照時間戳/全局id進行排序歸并.

    ?

    EventStore設(shè)計

    • 1. ?目前僅實現(xiàn)了Memory內(nèi)存模式,后續(xù)計劃增加本地file存儲,mixed混合模式
    • 2. ?借鑒了Disruptor的RingBuffer的實現(xiàn)思路

    RingBuffer設(shè)計:

    定義了3個cursor

    • Put : ?Sink模塊進行數(shù)據(jù)存儲的最后一次寫入位置
    • Get : ?數(shù)據(jù)訂閱獲取的最后一次提取位置
    • Ack : ?數(shù)據(jù)消費成功的最后一次消費位置

    借鑒Disruptor的RingBuffer的實現(xiàn),將RingBuffer拉直來看:

    實現(xiàn)說明:

    • Put/Get/Ack cursor用于遞增,采用long型存儲
    • buffer的get操作,通過取余或者與操作。(與操作: cusor & (size - 1) , size需要為2的指數(shù),效率比較高)

    Instance設(shè)計


    ?

    instance代表了一個實際運行的數(shù)據(jù)隊列,包括了EventPaser,EventSink,EventStore等組件。

    抽象了CanalInstanceGenerator,主要是考慮配置的管理方式:

    • manager方式: 和你自己的內(nèi)部web console/manager系統(tǒng)進行對接。(目前主要是公司內(nèi)部使用)
    • spring方式:基于spring xml + properties進行定義,構(gòu)建spring配置.?

    Server設(shè)計


    server代表了一個canal的運行實例,為了方便組件化使用,特意抽象了Embeded(嵌入式) / Netty(網(wǎng)絡(luò)訪問)的兩種實現(xiàn)

    • Embeded : ?對latency和可用性都有比較高的要求,自己又能hold住分布式的相關(guān)技術(shù)(比如failover)
    • Netty : ?基于netty封裝了一層網(wǎng)絡(luò)協(xié)議,由canal server保證其可用性,采用的pull模型,當(dāng)然latency會稍微打點折扣,不過這個也視情況而定。(阿里系的notify和metaq,典型的push/pull模型,目前也逐步的在向pull模型靠攏,push在數(shù)據(jù)量大的時候會有一些問題)?

    增量訂閱/消費設(shè)計

    具體的協(xié)議格式,可參見:CanalProtocol.proto

    get/ack/rollback協(xié)議介紹:

    • Message getWithoutAck(int batchSize),允許指定batchSize,一次可以獲取多條,每次返回的對象為Message,包含的內(nèi)容為:
      a. batch id 唯一標(biāo)識
      b.?entries 具體的數(shù)據(jù)對象,對應(yīng)的數(shù)據(jù)對象格式:EntryProtocol.proto
    • void rollback(long batchId),顧命思議,回滾上次的get請求,重新獲取數(shù)據(jù)?;趃et獲取的batchId進行提交,避免誤操作
    • void ack(long batchId),顧命思議,確認已經(jīng)消費成功,通知server刪除數(shù)據(jù)。基于get獲取的batchId進行提交,避免誤操作

    canal的get/ack/rollback協(xié)議和常規(guī)的jms協(xié)議有所不同,允許get/ack異步處理,比如可以連續(xù)調(diào)用get多次,后續(xù)異步按順序提交ack/rollback,項目中稱之為流式api.?

    流式api設(shè)計的好處:

    • get/ack異步化,減少因ack帶來的網(wǎng)絡(luò)延遲和操作成本 (99%的狀態(tài)都是處于正常狀態(tài),異常的rollback屬于個別情況,沒必要為個別的case犧牲整個性能)
    • get獲取數(shù)據(jù)后,業(yè)務(wù)消費存在瓶頸或者需要多進程/多線程消費時,可以不停的輪詢get數(shù)據(jù),不停的往后發(fā)送任務(wù),提高并行化. ?(作者在實際業(yè)務(wù)中的一個case:業(yè)務(wù)數(shù)據(jù)消費需要跨中美網(wǎng)絡(luò),所以一次操作基本在200ms以上,為了減少延遲,所以需要實施并行化)

    流式api設(shè)計:

    • 每次get操作都會在meta中產(chǎn)生一個mark,mark標(biāo)記會遞增,保證運行過程中mark的唯一性
    • 每次的get操作,都會在上一次的mark操作記錄的cursor繼續(xù)往后取,如果mark不存在,則在last ack cursor繼續(xù)往后取
    • 進行ack時,需要按照mark的順序進行數(shù)序ack,不能跳躍ack. ack會刪除當(dāng)前的mark標(biāo)記,并將對應(yīng)的mark位置更新為last ack cusor
    • 一旦出現(xiàn)異常情況,客戶端可發(fā)起rollback情況,重新置位:刪除所有的mark, 清理get請求位置,下次請求會從last ack cursor繼續(xù)往后取

    數(shù)據(jù)對象格式:EntryProtocol.proto

    Java代碼?

    ?

  • Entry??
  • ????Header??
  • ????????logfileName?[binlog文件名]??
  • ????????logfileOffset?[binlog?position]??
  • ????????executeTime?[發(fā)生的變更]??
  • ????????schemaName???
  • ????????tableName??
  • ????????eventType?[insert/update/delete類型]??
  • ????entryType???[事務(wù)頭BEGIN/事務(wù)尾END/數(shù)據(jù)ROWDATA]??
  • ????storeValue??[byte數(shù)據(jù),可展開,對應(yīng)的類型為RowChange]??
  • ??????
  • RowChange??
  • ????isDdl???????[是否是ddl變更操作,比如create?table/drop?table]??
  • ????sql?????[具體的ddl?sql]??
  • ????rowDatas????[具體insert/update/delete的變更數(shù)據(jù),可為多條,1個binlog?event事件可對應(yīng)多條變更,比如批處理]??
  • ????????beforeColumns?[Column類型的數(shù)組]??
  • ????????afterColumns?[Column類型的數(shù)組]??
  • ??????????
  • Column???
  • ????index?????????
  • ????sqlType?????[jdbc?type]??
  • ????name????????[column?name]??
  • ????isKey???????[是否為主鍵]??
  • ????updated?????[是否發(fā)生過變更]??
  • ????isNull??????[值是否為null]??
  • ????value???????[具體的內(nèi)容,注意為文本]??
  • 說明:

    • 可以提供數(shù)據(jù)庫變更前和變更后的字段內(nèi)容,針對binlog中沒有的name,isKey等信息進行補全
    • 可以提供ddl的變更語句

    ?

    HA機制設(shè)計

    canal的ha分為兩部分,canal server和canal client分別有對應(yīng)的ha實現(xiàn)

    • canal server: ?為了減少對mysql dump的請求,不同server上的instance要求同一時間只能有一個處于running,其他的處于standby狀態(tài).?
    • canal client: 為了保證有序性,一份instance同一時間只能由一個canal client進行g(shù)et/ack/rollback操作,否則客戶端接收無法保證有序。

    整個HA機制的控制主要是依賴了zookeeper的幾個特性,watcher和EPHEMERAL節(jié)點(和session生命周期綁定),可以看下我之前zookeeper的相關(guān)文章。

    ?

    Canal Server:?


    大致步驟:

  • canal server要啟動某個canal instance時都先向zookeeper進行一次嘗試啟動判斷 ?(實現(xiàn):創(chuàng)建EPHEMERAL節(jié)點,誰創(chuàng)建成功就允許誰啟動)
  • 創(chuàng)建zookeeper節(jié)點成功后,對應(yīng)的canal server就啟動對應(yīng)的canal instance,沒有創(chuàng)建成功的canal instance就會處于standby狀態(tài)
  • 一旦zookeeper發(fā)現(xiàn)canal server A創(chuàng)建的節(jié)點消失后,立即通知其他的canal server再次進行步驟1的操作,重新選出一個canal server啟動instance.
  • canal client每次進行connect時,會首先向zookeeper詢問當(dāng)前是誰啟動了canal instance,然后和其建立鏈接,一旦鏈接不可用,會重新嘗試connect.
  • Canal Client的方式和canal server方式類似,也是利用zokeeper的搶占EPHEMERAL節(jié)點的方式進行控制.?

    ?

    最后

    項目的代碼:?https://github.com/alibabatech/canal

    這里給出了如何快速啟動Canal Server和Canal Client的例子,如有問題可隨時聯(lián)系

    Quick Start

    • http://agapple.iteye.com/blog/1796070
    • https://github.com/alibabatech/canal/wiki/QuickStart

    Client Example

    • http://agapple.iteye.com/blog/1796620
    • https://github.com/alibabatech/canal/wiki/ClientExample

    總結(jié)

    以上是生活随笔為你收集整理的基于mysql数据库binlog的增量订阅消费的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。

    如果覺得生活随笔網(wǎng)站內(nèi)容還不錯,歡迎將生活随笔推薦給好友。