教你从0到1搭建秒杀系统-Canal快速入门(番外篇)
Canal用途很廣,并且上手非常簡單,小伙伴們在平時完成公司的需求時,很有可能會用到。本篇介紹一下數據庫中間件Canal的使用。
很多時候為了縮短調用延時,我們會對部分接口數據加入了緩存。一旦這些數據在數據庫中進行了更新操作,緩存就成了舊數據,必須及時刪除。刪除緩存的代碼「理所當然可以寫在更新數據的業務代碼里」,但有時候寫操作是在別的項目代碼里,你可能無權修改,亦或者別人不愿你在他代碼里寫這種業務之外的代碼。(畢竟多人協作中間會產生各種配合問題)。又或者就是單純的刪除緩存的操作失敗了,緩存依然是舊數據。這個時候,我們可以將緩存更新操作完全獨立出來,形成一套單獨的系統。
在上一篇我們提到過,Canal能幫我們實現像下圖這樣的系統來進行數據的處理:
接下讓我們一起來看看Canal到底是什么,以及用它如何實現上面我們我們提到的系統。
Canal概述
阿里是國內比較早地大量使用MySQL的互聯網企業(去IOE化:去掉IBM的小型機、Oracle數據庫、EMC存儲設備,代之以自己在開源軟件基礎上開發的系統),并且基于阿里巴巴/淘寶的業務,從 2010 年開始,業務逐步嘗試數據庫日志解析獲取增量變更進行同步,由此衍生出了大量的數據庫增量訂閱和消費業務。Canal應運而生,它通過偽裝成數據庫的從庫,讀取主庫發來的binlog,用來實現數據庫增量訂閱和消費業務需求。我們可以使用Canal實現以下用途:
- 數據庫鏡像
- 數據庫實時備份
- 索引構建和實時維護(拆分異構索引、倒排索引等)
- 業務 cache 緩存刷新
- 帶業務邏輯的增量數據處理
開源項目地址:https://github.com/alibaba/canal,大家有需要可以下載看看。這里有幾點重點給大家提出來說一下:
- canal 使用 client-server 模式,數據傳輸協議使用 protobuf 3.0(很多RPC框架也在使用例如gRPC)
- 當前的 canal 支持源端 MySQL 版本包括 5.1.x , 5.5.x , 5.6.x , 5.7.x , 8.0.x
- canal 作為 MySQL binlog 增量獲取和解析工具,可將變更記錄投遞到 MQ 系統中,比如 Kafka/RocketMQ。
Canal實際是將自己偽裝成數據庫的從庫,來讀取Binlog。這里我們先講一下MySQL數據庫主從數據庫的知識,這樣就能更快的理解Canal。
數據庫相關知識
數據庫的讀寫分離
為了應對高并發場景,MySQL支持把一臺數據庫主機分為單獨的一臺寫主庫(主要負責寫操作),而把讀的數據庫壓力分配給讀的從庫,而且讀從庫可以變為多臺,這就是讀寫分離的典型場景。
數據庫主從同步
實現數據庫的讀寫分離,是通過數據庫主從同步,讓從數據庫監聽主數據庫Binlog實現的。
大體流程如下圖:
可以看到,這種架構下會有一個問題,數據庫主從同步會存在延遲,那么就會有短暫的時間,主從數據庫的數據是不一致的。這種不一致大多數情況下非常短暫,很多時候我們可以忽略他。但一旦要求數據一致,就會引申出如何解決這個問題的思考。
數據庫主從同步一致性問題
我們通常使用MySQL主從復制來解決MySQL的單點故障問題,其通過邏輯復制的方式把主庫的變更同步到從庫,主備之間無法保證嚴格一致的模式,于是,MySQL的主從復制帶來了主從“數據一致性”的問題。MySQL的復制分為:異步復制、半同步復制、全同步復制。
異步復制
概念:MySQL默認的復制即是異步復制,主庫在執行完客戶端提交的事務后會立即將結果返給給客戶端,并不關心從庫是否已經接收并處理;
缺點:主庫將事務 Binlog 事件寫入到 Binlog 文件中,此時主庫只會通知一下 Dump 線程發送這些新的 Binlog,然后主庫就會繼續處理提交操作,而此時不會保證這些 Binlog 傳到任何一個從庫節點上。主如果crash掉了,此時主上已經提交的事務可能并沒有傳到從庫上,如果此時,強行將從提升為主,可能導致新主上的數據不完整。
全同步復制
概念:當主庫提交事務之后,所有的從庫節點必須收到、APPLY并且提交這些事務,然后主庫線程才能繼續做后續操作
缺點:需要等待所有從庫執行完該事務才能返回,全同步復制的性能必然會收到嚴重的影響
半同步復制
概念:介于異步復制和全同步復制之間,主庫在執行完客戶端提交的事務后不是立刻返回給客戶端,而是等待至少一個從庫接收到并寫到relay log中才返回給客戶端。相對于異步復制,半同步復制提高了數據的安全性;
缺點:造成了一定程度的延遲,這個延遲最少是一個TCP/IP往返的時間。所以,半同步復制最好在低延時的網絡中使用。
當半同步復制發生超時時(由rpl_semi_sync_master_timeout參數控制,單位是毫秒,默認為10000,即10s),會暫時關閉半同步復制,轉而使用異步復制。當master dump線程發送完一個事務的所有事件之后,如果在rpl_semi_sync_master_timeout內,收到了從庫的響應,則主從又重新恢復為半同步復制。
Canal工作原理
回顧了數據庫從庫的數據同步原理,理解Canal十分簡單,直接引用官網原文:
Canal實戰
因為canal主要是監聽mysql的binlog日志,所以需要先保證mysql的binlog是開啟的狀態,怎么操作大家可以看這篇MYSQL專題-使用Binlog日志恢復MySQL數據,這里不再贅述。然后看一下我們mysql的用戶都有哪些:
創建Canal賬號
我們為Cannal創建一個單獨的賬號并為其授權,依次執行以下語句:
CREATE USER canal IDENTIFIED BY 'xxxx'; (填寫密碼) GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%'; FLUSH PRIVILEGES;最后通過show grants for 'canal’看一下賬號信息,已經創建成功。
配置Canal服務
去Github下載最近的Canal穩定版本包:點此進入下載頁面,然后依據自己想要的版本進行下載即可。
我這里使用的是1.1.5的版本。下載后查看文件:
將相應信息更改為你對應的數據庫地址以及相應的數據庫賬號和密碼。
進入bin目錄點擊即可啟動:
出現如下界面即位啟動成功:
Canal操作
配置好canal以后,我們接下來用代碼連接進行操作。在之前的代碼中增加模塊miaosha-job,然后寫一個連接canal的程序:
public class CanalClient {private static final Logger LOGGER = LoggerFactory.getLogger(CanalClient.class);public static void main(String[] args) {// 第一步:與canal進行連接CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress("127.0.0.1", 11111),"example", "", "");connector.connect();// 第二步:開啟訂閱connector.subscribe();// 第三步:循環訂閱while (true) {try {// 每次讀取 1000 條Message message = connector.getWithoutAck(1000);long batchID = message.getId();int size = message.getEntries().size();if (batchID == -1 || size == 0) {LOGGER.info("當前暫時沒有數據,休眠1秒");Thread.sleep(1000);} else {LOGGER.info("-------------------------- 有數據啦 -----------------------");printEntry(message.getEntries());}connector.ack(batchID);} catch (Exception e) {LOGGER.error("處理出錯");} finally {try {Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();}}}}/*** 獲取每條打印的記錄*/public static void printEntry(List<Entry> entrys) {for (Entry entry : entrys) {// 第一步:拆解entry 實體Header header = entry.getHeader();EntryType entryType = entry.getEntryType();// 第二步: 如果當前是RowData,那就是我需要的數據if (entryType == EntryType.ROWDATA) {String tableName = header.getTableName();String schemaName = header.getSchemaName();RowChange rowChange = null;try {rowChange = RowChange.parseFrom(entry.getStoreValue());} catch (InvalidProtocolBufferException e) {e.printStackTrace();}EventType eventType = rowChange.getEventType();LOGGER.info(String.format("當前正在操作表 %s.%s, 執行操作= %s", schemaName, tableName, eventType));// 如果是‘查詢’ 或者 是 ‘DDL’ 操作,那么sql直接打出來if (eventType == EventType.QUERY || rowChange.getIsDdl()) {LOGGER.info("執行了查詢語句:[{}]", rowChange.getSql());return;}// 第三步:追蹤到 columns 級別rowChange.getRowDatasList().forEach((rowData) -> {// 獲取更新之前的column情況List<Column> beforeColumns = rowData.getBeforeColumnsList();// 獲取更新之后的 column 情況List<Column> afterColumns = rowData.getAfterColumnsList();// 當前執行的是 刪除操作if (eventType == EventType.DELETE) {printColumn(beforeColumns);}// 當前執行的是 插入操作if (eventType == EventType.INSERT) {printColumn(afterColumns);}// 當前執行的是 更新操作if (eventType == EventType.UPDATE) {printColumn(afterColumns);// 進行刪除緩存操作deleteCache(afterColumns, tableName, schemaName);}});}}}/*** 每個row上面的每一個column 的更改情況* @param columns*/public static void printColumn(List<Column> columns) {columns.forEach((column) -> {String columnName = column.getName();String columnValue = column.getValue();String columnType = column.getMysqlType();// 判斷 該字段是否更新boolean isUpdated = column.getUpdated();LOGGER.info(String.format("數據列:columnName=%s, columnValue=%s, columnType=%s, isUpdated=%s", columnName, columnValue, columnType, isUpdated));});}/*** 秒殺下單接口刪除庫存緩存*/public static void deleteCache(List<Column> columns, String tableName, String schemaName) {if ("stock".equals(tableName) && "test_my_db".equals(schemaName)) {AtomicInteger id = new AtomicInteger();columns.forEach((column) -> {String columnName = column.getName();String columnValue = column.getValue();if ("id".equals(columnName)) {id.set(Integer.parseInt(columnValue));}});// TODO: 刪除緩存LOGGER.info("Canal刪除stock表id:[{}] 的庫存緩存", id);}} }方法的功能以及注解在里面寫的很詳細,我們直接跑程序進行測試,啟動程序,當我們沒有進行任何操作時,一直會處于等待的狀態:
我們在數據庫中進行更改UPDATE操作,把用戶王二改成張三,然后再改回王二,Canal成功收到了兩條更新操作,見下圖:
我們再模擬一個刪除Cache緩存的業務,在代碼中有秒殺下單接口刪除庫存緩存的接口,更新操作后,我們刷新庫存緩存。效果如下:
也可以成功監聽到數據。簡單的Canal使用就介紹到這里,剩下的發揮空間留給各位讀者們。
猜你感興趣:
教你從0到1搭建秒殺系統-防超賣
教你從0到1搭建秒殺系統-限流
教你從0到1搭建秒殺系統-搶購接口隱藏與單用戶限制頻率
教你從0到1搭建秒殺系統-緩存與數據庫雙寫一致
教你從0到1搭建秒殺系統-Canal快速入門(番外篇)
教你從0到1搭建秒殺系統-訂單異步處理
更多文章請點擊:更多…
參考文章:
https://blog.csdn.net/l1028386804/article/details/81208362
https://github.com/alibaba/canal/wiki/QuickStart
https://youzhixueyuan.com/database-master-slave-synchronization.html
https://www.jianshu.com/p/790a158d9eb3
https://blog.csdn.net/xihuanyuye/article/details/81220524
https://www.cnblogs.com/ivictor/p/5735580.html
總結
以上是生活随笔為你收集整理的教你从0到1搭建秒杀系统-Canal快速入门(番外篇)的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 教你从0到1搭建秒杀系统-缓存与数据库双
- 下一篇: 教你从0到1搭建秒杀系统-订单异步处理