【Canal源码分析】Sink及Store工作过程
一、序列圖
二、源碼分析
2.1 Sink
Sink階段所做的事情,就是根據一定的規則,對binlog數據進行一定的過濾。我們之前跟蹤過parser過程的代碼,發現在parser完成后,會把數據放到一個環形隊列TransactionBuffer中,也就是這個方法:
transactionBuffer.add(entry);我們具體看下add這個方法。
public void add(CanalEntry.Entry entry) throws InterruptedException {switch (entry.getEntryType()) {case TRANSACTIONBEGIN:flush();// 刷新上一次的數據put(entry);break;case TRANSACTIONEND:put(entry);flush();break;case ROWDATA:put(entry);// 針對非DML的數據,直接輸出,不進行buffer控制EventType eventType = entry.getHeader().getEventType();if (eventType != null && !isDml(eventType)) {flush();}break;default:break;} }判斷一下事件的類型,如果是事務開頭,那么直接刷新之前的數據,然后把當前事件加到隊列中;如果是事務的結束,那么先把當前事務放到隊列后,刷新到下一個階段;如果是普通的事件,直接放到隊列中,如果事務頭類型不為空,且不是DML類型,那么直接刷新隊列中數據到下一個階段。
我們需要理清楚這塊的邏輯,什么時候flush,什么時候put,針對不同的事件,采取的策略不一樣。
這里我們分析下flush和put兩個步驟。
2.1.1 flush隊列
這塊其實還沒有涉及到sink階段,還在維護一個事件環形隊列。這個環形隊列,維護了兩個指針,一個是flush的指針,一個是put的指針,flush的指針永遠是滯后于put指針的。
private void flush() throws InterruptedException {long start = this.flushSequence.get() + 1;long end = this.putSequence.get();if (start <= end) {List<CanalEntry.Entry> transaction = new ArrayList<CanalEntry.Entry>();for (long next = start; next <= end; next++) {transaction.add(this.entries[getIndex(next)]);}flushCallback.flush(transaction);flushSequence.set(end);// flush成功后,更新flush位置} }start就是flush的指針,end就是put的指針,flush的動作就是把當前flush到put中間的數據,全部刷新到下一個階段。具體傳遞到下一個階段的代碼在flushCallback.flush方法中。這塊我們下文再分析。
2.1.2 put
private void put(CanalEntry.Entry data) throws InterruptedException {// 首先檢查是否有空位if (checkFreeSlotAt(putSequence.get() + 1)) {long current = putSequence.get();long next = current + 1;// 先寫數據,再更新對應的cursor,并發度高的情況,putSequence會被get請求可見,拿出了ringbuffer中的老的Entry值entries[getIndex(next)] = data;putSequence.set(next);} else {flush();// buffer區滿了,刷新一下put(data);// 繼續加一下新數據} }這塊的注釋都比較清晰了,就不贅述了。
2.1.3 flush到sink
具體的代碼在AbstractEventParser中,定義transactionBuffer的地方。
public void flush(List<CanalEntry.Entry> transaction) throws InterruptedException {boolean successed = consumeTheEventAndProfilingIfNecessary(transaction);if (!running) {return;}if (!successed) {throw new CanalParseException("consume failed!");}LogPosition position = buildLastTransactionPosition(transaction);if (position != null) { // 可能position為空logPositionManager.persistLogPosition(AbstractEventParser.this.destination, position);} }主要的處理在consumeTheEventAndProfilingIfNecessary里面。這里面調用了eventSink.sink()方法。
2.1.4 sink
這里面進行了binlog數據的過濾。首先判斷是否需要過濾事務頭和尾,如果需要過濾的話,直接過濾掉,默認不過濾。
遍歷傳到這個階段的binlog列表,根據正則表達式判斷,是否需要進行過濾,一般來說是根據表名、庫名等進行過濾。這邊的過濾類主要是AviaterRegexFilter,根據庫名.表名和表達式進行過濾。如果需要進行過濾,那么直接把這個事件過濾。否則,加到binlog列表中,進行二次過濾。第二次過濾的主要內容是HEARTBEAT類型的事件,主要的代碼在這里:
protected boolean doSink(List<Event> events) {for (CanalEventDownStreamHandler<List<Event>> handler : getHandlers()) {events = handler.before(events);//處理heartbeat事件}int fullTimes = 0;do {if (eventStore.tryPut(events)) {for (CanalEventDownStreamHandler<List<Event>> handler : getHandlers()) {events = handler.after(events);}return true;} else {applyWait(++fullTimes);}for (CanalEventDownStreamHandler<List<Event>> handler : getHandlers()) {events = handler.retry(events);}} while (running && !Thread.interrupted());return false; }這里的CanalEventDownStreamHandler其實只有HeartBeatEntryEventHandler,也就是在before方法中把heartbeat事件從events去掉。這個心跳事件其實是parser過程生成的,我們之前有提到過。after目前是空的方法。
去掉之后,剩余的事件列表就會被調用tryPut()方法,送到下一步驟store中。
這里還有個applyWait方法,防止無限等待。
private void applyWait(int fullTimes) {int newFullTimes = fullTimes > maxFullTimes ? maxFullTimes : fullTimes;if (fullTimes <= 3) { // 3次以內Thread.yield();} else { // 超過3次,最多只sleep 10msLockSupport.parkNanos(1000 * 1000L * newFullTimes);}}2.2 Store
目前只有基于內存模式的Store,這個階段是真正Server中的落盤階段。數據經歷了mysql master到parser,再到sink,最后終于到了這里。
public boolean tryPut(List<Event> data) throws CanalStoreException {if (data == null || data.isEmpty()) {return true;}final ReentrantLock lock = this.lock;lock.lock();try {if (!checkFreeSlotAt(putSequence.get() + data.size())) {return false;} else {doPut(data);return true;}} finally {lock.unlock();} }在進行數據put的時候,加了一把鎖。首先計算下是否還有剩余的空間進行數據處理,這里的計算,不光是計算了隊列的剩余長度,還計算了剩余空間。隊列的長度默認是16*1024,如果空間不足,直接拒絕,返回false,等待空間空余出來后,再進行put操作。否則,直接doPut()。
/*** 執行具體的put操作*/ private void doPut(List<Event> data) {long current = putSequence.get();long end = current + data.size();// 先寫數據,再更新對應的cursor,并發度高的情況,putSequence會被get請求可見,拿出了ringbuffer中的老的Entry值for (long next = current + 1; next <= end; next++) {entries[getIndex(next)] = data.get((int) (next - current - 1));}putSequence.set(end);// 記錄一下gets memsize信息,方便快速檢索if (batchMode.isMemSize()) {long size = 0;for (Event event : data) {size += calculateSize(event);}putMemSize.getAndAdd(size);}// tell other threads that store is not emptynotEmpty.signal(); }這里主要對put一些指針,還有空間做了重新的計算。放到隊列中之后,通知其他等待notEmpty的線程,來進行數據的獲取,這時候,client可以進行數據獲取了。
轉載于:https://www.cnblogs.com/f-zhao/p/9088655.html
總結
以上是生活随笔為你收集整理的【Canal源码分析】Sink及Store工作过程的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: docker 报错 Container
- 下一篇: Datalogic得利捷Memor™ 1