基于mysql数据库binlog的增量订阅消费
背景
? ?早期,阿里巴巴B2B公司因為存在杭州和美國雙機房部署,存在跨機房同步的業(yè)務(wù)需求。不過早期的數(shù)據(jù)庫同步業(yè)務(wù),主要是基于trigger的方式獲取增量變更,不過從2010年開始,阿里系公司開始逐步的嘗試基于數(shù)據(jù)庫的日志解析,獲取增量變更進行同步,由此衍生出了增量訂閱&消費的業(yè)務(wù),從此開啟了一段新紀元。ps. 目前內(nèi)部使用的同步,已經(jīng)支持mysql5.x和oracle部分版本的日志解析
?
基于日志增量訂閱&消費支持的業(yè)務(wù):
項目介紹
???名稱:canal [k?'n?l]
? ?譯意: 水道/管道/溝渠?
? ?語言: 純java開發(fā)
? ?定位: 基于數(shù)據(jù)庫增量日志解析,提供增量數(shù)據(jù)訂閱&消費,目前主要支持了mysql
?
工作原理
mysql主備復(fù)制實現(xiàn)
?從上層來看,復(fù)制分成三步:
canal的工作原理:
原理相對比較簡單:
架構(gòu)
說明:
- server代表一個canal運行實例,對應(yīng)于一個jvm
- instance對應(yīng)于一個數(shù)據(jù)隊列 ?(1個server對應(yīng)1..n個instance)
instance模塊:
- eventParser (數(shù)據(jù)源接入,模擬slave協(xié)議和master進行交互,協(xié)議解析)
- eventSink (Parser和Store鏈接器,進行數(shù)據(jù)過濾,加工,分發(fā)的工作)
- eventStore (數(shù)據(jù)存儲)
- metaManager (增量訂閱&消費信息管理器)
知識科普
mysql的Binlay Log介紹
- http://dev.mysql.com/doc/refman/5.5/en/binary-log.html
- http://www.taobaodba.com/html/474_mysqls-binary-log_details.html
簡單點說:
- mysql的binlog是多文件存儲,定位一個LogEvent需要通過binlog filename + ?binlog position,進行定位
- mysql的binlog數(shù)據(jù)格式,按照生成的方式,主要分為:statement-based、row-based、mixed。
Java代碼?? - mysql>?show?variables?like?'binlog_format';??
- +---------------+-------+??
- |?Variable_name?|?Value?|??
- +---------------+-------+??
- |?binlog_format?|?ROW???|??
- +---------------+-------+??
- 1?row?in?set?(0.00?sec)??
目前canal只能支持row模式的增量訂閱(statement只有sql,沒有數(shù)據(jù),所以無法獲取原始的變更日志)
?
?
EventParser設(shè)計
大致過程:
整個parser過程大致可分為幾步:
?// 0. write command number
?// 1. write 4 bytes bin-log position to start at
?// 2. write 2 bytes bin-log flags
?// 3. write 4 bytes server id of the slave
?// 4. write bin-log file name
// 補充字段名字,字段類型,主鍵信息,unsigned類型處理
mysql的Binlay Log網(wǎng)絡(luò)協(xié)議:
?
說明:
- 圖中的協(xié)議4byte header,主要是描述整個binlog網(wǎng)絡(luò)包的length
- binlog event structure,詳細信息請參考:?http://dev.mysql.com/doc/internals/en/binary-log.html
EventSink設(shè)計
說明:
- 數(shù)據(jù)過濾:支持通配符的過濾模式,表名,字段內(nèi)容等
- 數(shù)據(jù)路由/分發(fā):解決1:n (1個parser對應(yīng)多個store的模式)
- 數(shù)據(jù)歸并:解決n:1 (多個parser對應(yīng)1個store)
- 數(shù)據(jù)加工:在進入store之前進行額外的處理,比如join
數(shù)據(jù)1:n業(yè)務(wù)
? 為了合理的利用數(shù)據(jù)庫資源, 一般常見的業(yè)務(wù)都是按照schema進行隔離,然后在mysql上層或者dao這一層面上,進行一個數(shù)據(jù)源路由,屏蔽數(shù)據(jù)庫物理位置對開發(fā)的影響,阿里系主要是通過cobar/tddl來解決數(shù)據(jù)源路由問題。
? 所以,一般一個數(shù)據(jù)庫實例上,會部署多個schema,每個schema會有由1個或者多個業(yè)務(wù)方關(guān)注
?
數(shù)據(jù)n:1業(yè)務(wù)
? 同樣,當(dāng)一個業(yè)務(wù)的數(shù)據(jù)規(guī)模達到一定的量級后,必然會涉及到水平拆分和垂直拆分的問題,針對這些拆分的數(shù)據(jù)需要處理時,就需要鏈接多個store進行處理,消費的位點就會變成多份,而且數(shù)據(jù)消費的進度無法得到盡可能有序的保證。
? 所以,在一定業(yè)務(wù)場景下,需要將拆分后的增量數(shù)據(jù)進行歸并處理,比如按照時間戳/全局id進行排序歸并.
?
EventStore設(shè)計
- 1. ?目前僅實現(xiàn)了Memory內(nèi)存模式,后續(xù)計劃增加本地file存儲,mixed混合模式
- 2. ?借鑒了Disruptor的RingBuffer的實現(xiàn)思路
RingBuffer設(shè)計:
定義了3個cursor
- Put : ?Sink模塊進行數(shù)據(jù)存儲的最后一次寫入位置
- Get : ?數(shù)據(jù)訂閱獲取的最后一次提取位置
- Ack : ?數(shù)據(jù)消費成功的最后一次消費位置
借鑒Disruptor的RingBuffer的實現(xiàn),將RingBuffer拉直來看:
實現(xiàn)說明:
- Put/Get/Ack cursor用于遞增,采用long型存儲
- buffer的get操作,通過取余或者與操作。(與操作: cusor & (size - 1) , size需要為2的指數(shù),效率比較高)
Instance設(shè)計
?
instance代表了一個實際運行的數(shù)據(jù)隊列,包括了EventPaser,EventSink,EventStore等組件。
抽象了CanalInstanceGenerator,主要是考慮配置的管理方式:
- manager方式: 和你自己的內(nèi)部web console/manager系統(tǒng)進行對接。(目前主要是公司內(nèi)部使用)
- spring方式:基于spring xml + properties進行定義,構(gòu)建spring配置.?
Server設(shè)計
server代表了一個canal的運行實例,為了方便組件化使用,特意抽象了Embeded(嵌入式) / Netty(網(wǎng)絡(luò)訪問)的兩種實現(xiàn)
- Embeded : ?對latency和可用性都有比較高的要求,自己又能hold住分布式的相關(guān)技術(shù)(比如failover)
- Netty : ?基于netty封裝了一層網(wǎng)絡(luò)協(xié)議,由canal server保證其可用性,采用的pull模型,當(dāng)然latency會稍微打點折扣,不過這個也視情況而定。(阿里系的notify和metaq,典型的push/pull模型,目前也逐步的在向pull模型靠攏,push在數(shù)據(jù)量大的時候會有一些問題)?
增量訂閱/消費設(shè)計
具體的協(xié)議格式,可參見:CanalProtocol.proto
get/ack/rollback協(xié)議介紹:
- Message getWithoutAck(int batchSize),允許指定batchSize,一次可以獲取多條,每次返回的對象為Message,包含的內(nèi)容為:
a. batch id 唯一標(biāo)識
b.?entries 具體的數(shù)據(jù)對象,對應(yīng)的數(shù)據(jù)對象格式:EntryProtocol.proto - void rollback(long batchId),顧命思議,回滾上次的get請求,重新獲取數(shù)據(jù)?;趃et獲取的batchId進行提交,避免誤操作
- void ack(long batchId),顧命思議,確認已經(jīng)消費成功,通知server刪除數(shù)據(jù)。基于get獲取的batchId進行提交,避免誤操作
canal的get/ack/rollback協(xié)議和常規(guī)的jms協(xié)議有所不同,允許get/ack異步處理,比如可以連續(xù)調(diào)用get多次,后續(xù)異步按順序提交ack/rollback,項目中稱之為流式api.?
流式api設(shè)計的好處:
- get/ack異步化,減少因ack帶來的網(wǎng)絡(luò)延遲和操作成本 (99%的狀態(tài)都是處于正常狀態(tài),異常的rollback屬于個別情況,沒必要為個別的case犧牲整個性能)
- get獲取數(shù)據(jù)后,業(yè)務(wù)消費存在瓶頸或者需要多進程/多線程消費時,可以不停的輪詢get數(shù)據(jù),不停的往后發(fā)送任務(wù),提高并行化. ?(作者在實際業(yè)務(wù)中的一個case:業(yè)務(wù)數(shù)據(jù)消費需要跨中美網(wǎng)絡(luò),所以一次操作基本在200ms以上,為了減少延遲,所以需要實施并行化)
流式api設(shè)計:
- 每次get操作都會在meta中產(chǎn)生一個mark,mark標(biāo)記會遞增,保證運行過程中mark的唯一性
- 每次的get操作,都會在上一次的mark操作記錄的cursor繼續(xù)往后取,如果mark不存在,則在last ack cursor繼續(xù)往后取
- 進行ack時,需要按照mark的順序進行數(shù)序ack,不能跳躍ack. ack會刪除當(dāng)前的mark標(biāo)記,并將對應(yīng)的mark位置更新為last ack cusor
- 一旦出現(xiàn)異常情況,客戶端可發(fā)起rollback情況,重新置位:刪除所有的mark, 清理get請求位置,下次請求會從last ack cursor繼續(xù)往后取
數(shù)據(jù)對象格式:EntryProtocol.proto
Java代碼?
?
說明:
- 可以提供數(shù)據(jù)庫變更前和變更后的字段內(nèi)容,針對binlog中沒有的name,isKey等信息進行補全
- 可以提供ddl的變更語句
?
HA機制設(shè)計
canal的ha分為兩部分,canal server和canal client分別有對應(yīng)的ha實現(xiàn)
- canal server: ?為了減少對mysql dump的請求,不同server上的instance要求同一時間只能有一個處于running,其他的處于standby狀態(tài).?
- canal client: 為了保證有序性,一份instance同一時間只能由一個canal client進行g(shù)et/ack/rollback操作,否則客戶端接收無法保證有序。
整個HA機制的控制主要是依賴了zookeeper的幾個特性,watcher和EPHEMERAL節(jié)點(和session生命周期綁定),可以看下我之前zookeeper的相關(guān)文章。
?
Canal Server:?
大致步驟:
Canal Client的方式和canal server方式類似,也是利用zokeeper的搶占EPHEMERAL節(jié)點的方式進行控制.?
?
最后
項目的代碼:?https://github.com/alibabatech/canal
這里給出了如何快速啟動Canal Server和Canal Client的例子,如有問題可隨時聯(lián)系
Quick Start
- http://agapple.iteye.com/blog/1796070
- https://github.com/alibabatech/canal/wiki/QuickStart
Client Example
- http://agapple.iteye.com/blog/1796620
- https://github.com/alibabatech/canal/wiki/ClientExample
總結(jié)
以上是生活随笔為你收集整理的基于mysql数据库binlog的增量订阅消费的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: FLV封装格式介绍及解析
- 下一篇: linux cmake编译源码,linu