日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

canal 监听不到数据变化_数据的异构实战(二)手写迷你版同步工程

發(fā)布時間:2025/3/20 编程问答 27 豆豆
生活随笔 收集整理的這篇文章主要介紹了 canal 监听不到数据变化_数据的异构实战(二)手写迷你版同步工程 小編覺得挺不錯的,現(xiàn)在分享給大家,幫大家做個參考.

點擊上方“Java知音”,選擇“置頂公眾號”

技術文章第一時間送達!

上一期講到了通過canal訂閱mysql的binlog日志并且轉(zhuǎn)換為對象,那么這一次我們將訂閱來的對象通過RocketMQ發(fā)送消息,接收方接受消息之后同時存儲到其他類型的數(shù)據(jù)源當中,完成一個簡單的數(shù)據(jù)異構的過程。

什么是Java消息服務?

兩個應用程序之間進行異步通信的API,它為標準消息協(xié)議和消息服務提供了一組通用接口,包括創(chuàng)建、發(fā)送、讀取消息等,用于支持JAVA應用程序開發(fā)。

在J2EE中,當兩個應用程序使用JMS進行通信時,它們之間并不是直接相連的,而是通過一個共同的消息收發(fā)服務連接起來,可以達到解耦的效果,我們將會在接下來的教程中詳細介紹。

jms的消息傳送模型

常見的消息傳送模型有以下兩種:

點對點消息傳送模型

在點對點消息傳送模型中,應用程序由消息隊列,發(fā)送者,接收者組成。每一個消息發(fā)送給一個特殊的消息隊列,該隊列保存了所有發(fā)送給它的消息(除了被接收者消費掉的和過期的消息)。如下圖所示:

發(fā)布訂閱消息傳送模型

在發(fā)布訂閱模型中,消費者需要訂閱相關的topic才能接收到生產(chǎn)者的信息。生產(chǎn)者會將信息傳輸?shù)絫opic中,然后消費者只需要從topic中獲取數(shù)據(jù)即可。如下圖所示:

RocketMQ消息隊列使用

這次使用的消息中間件為RocketMQ的使用場景。RocketMQ是阿里巴巴在2012年開源的分布式消息中間件,目前已經(jīng)捐贈給Apache基金會,并于2016年11月成為 Apache 孵化項目。

RocketMQ在使用之前,需要我們引入相關的依賴配置:

????
????????<dependency>
????????????<groupId>org.apache.rocketmqgroupId>
????????????<artifactId>rocketmq-clientartifactId>
????????????<version>${rocketmq.version}version>
????????dependency>

關于RocketMQ的安裝在這里就不做過多的講解了。

通過mq的方式來進行數(shù)據(jù)異構通常是比較簡單的方案,首先我們需要在項目里面獨立一個模塊專門用于監(jiān)聽mysql的binlog日志,這個模塊我暫且稱之為datahandle-core模塊

整個工程采用了springboot的結構來構建,主要的核心也是在core工程中。

首先是監(jiān)聽canal的日志狀態(tài)模塊了,采用了上一節(jié)中講解到的客戶端代碼進行數(shù)據(jù)監(jiān)聽,并且將其轉(zhuǎn)換為對象然后發(fā)送往mq中:

package?com.sise.datahandle.core;

import?com.alibaba.otter.canal.client.CanalConnector;
import?com.alibaba.otter.canal.client.CanalConnectors;
import?com.alibaba.otter.canal.protocol.Message;
import?lombok.extern.slf4j.Slf4j;
import?org.springframework.beans.factory.annotation.Autowired;
import?org.springframework.boot.CommandLineRunner;
import?org.springframework.stereotype.Component;

import?java.net.InetSocketAddress;

import?static?com.sise.datahandle.constants.CanalConstants.*;

/**
?*?@author?idea
?*?@date?2019/10/20
?*/
@Component
@Slf4j
public?class?CanalListener?implements?CommandLineRunner?{

????@Autowired
????private?CanalClient?canalClient;

????@Override
????public?void?run(String...?args)?throws?Exception?{
??????log.info("=============canal監(jiān)聽器開啟===============");
????????CanalConnector?canalConnector?=?CanalConnectors.newSingleConnector(
????????????????new?InetSocketAddress(SERVER_ADDRESS,?PORT),?DESTINATION,?USERNAME,?PASSWORD);
????????canalConnector.connect();
????????canalConnector.subscribe(".*\\..*");
????????canalConnector.rollback();
????????for?(;?;?)?{
????????????Message?message?=?canalConnector.getWithoutAck(100);
????????????long?batchId?=?message.getId();
????????????if?(batchId?!=?-1)?{
????????????????canalClient.entityHandle(message.getEntries());
????????????}
????????}
????}
}

ps:這里面的CanalClient代碼主要來自上一篇的canal客戶端代碼,文末會有完整項目代碼鏈接,需要的讀者可以前往查看。

在CanalClient里面,有一個函數(shù)是專門用于處理將訂閱的數(shù)據(jù)發(fā)送到mq消息隊列中:

package?com.sise.datahandle.core;

import?com.alibaba.fastjson.JSON;
import?com.alibaba.otter.canal.protocol.CanalEntry;
import?com.google.protobuf.InvalidProtocolBufferException;
import?com.sise.datahandle.handler.CanalDataHandler;
import?com.sise.datahandle.model.TypeDTO;
import?lombok.extern.slf4j.Slf4j;
import?org.apache.rocketmq.client.exception.MQBrokerException;
import?org.apache.rocketmq.client.exception.MQClientException;
import?org.apache.rocketmq.client.producer.DefaultMQProducer;
import?org.apache.rocketmq.client.producer.SendResult;
import?org.apache.rocketmq.remoting.exception.RemotingException;
import?org.springframework.beans.factory.annotation.Autowired;
import?org.springframework.stereotype.Service;

import?java.util.List;

/**
?*?canal監(jiān)聽客戶端變化
?*
?*?@author?idea
?*?@date?2019/10/12
?*/
@Slf4j
@Service
public?class?CanalClient?{


????@Autowired
????private?DefaultMQProducer?rocketMqProducer;


????/**
?????*?處理binlog日志的監(jiān)聽
?????*
?????*?@param?entries
?????*/
????public?void?entityHandle(List?entries)?{
????????for?(CanalEntry.Entry?entry?:?entries)?{
????????????if?(entry.getEntryType()?!=?CanalEntry.EntryType.ROWDATA)?{
????????????????continue;
????????????}
????????????try?{
????????????????CanalEntry.RowChange?rowChange?=?CanalEntry.RowChange.parseFrom(entry.getStoreValue());
????????????????for?(CanalEntry.RowData?rowData?:?rowChange.getRowDatasList())?{
????????????????????switch?(rowChange.getEventType())?{
????????????????????????case?INSERT:
????????????????????????????String?tableName?=?entry.getHeader().getTableName();
????????????????????????????//測試選用t_type這張表進行映射處理
????????????????????????????if?("t_type".equals(tableName))?{
????????????????????????????????TypeDTO?typeDTO?=?CanalDataHandler.convertToBean(rowData.getAfterColumnsList(),?TypeDTO.class);
????????????????????????????????org.apache.rocketmq.common.message.Message?message?=?new?org.apache.rocketmq.common.message.Message();
????????????????????????????????message.setTopic("canal-test-topic");
????????????????????????????????message.setTags("canal-test-tag");
????????????????????????????????String?json?=?JSON.toJSONString(typeDTO);
????????????????????????????????message.setBody(json.getBytes());
????????????????????????????????SendResult?sendResult?=?rocketMqProducer.send(message);
????????????????????????????????log.info("[mq消息發(fā)送結果]----"?+?sendResult);
????????????????????????????}
????????????????????????????break;
????????????????????????default:
????????????????????????????break;
????????????????????}
????????????????}
????????????}?catch?(InvalidProtocolBufferException?e)?{
????????????????log.error("[CanalClient]監(jiān)聽數(shù)據(jù)過程出現(xiàn)異常,異常信息為{}",?e);
????????????}?catch?(InterruptedException?|?RemotingException?|?MQClientException?|?MQBrokerException?e)?{
????????????????log.error("[CanalClient]?mq發(fā)送信息出現(xiàn)異常:{}",?e);
????????????}
????????}
????}

}

這里面主要是監(jiān)聽binlog記錄為插入數(shù)據(jù)事件的時候做發(fā)送mq操作。

接下來便是常見的mq配置了,本工程主要是一個模擬的簡單案例,因此我將consumer和producer都放在了一起方便測試。

通過springboot自身的properties文件對mq進行參數(shù)初始化配置之后便可以構建一個基本的consumer和producer了。這里我們拿一個TypeDto類來進行樹異構的測試,consumer端的核心代碼為:

package?com.sise.datahandle.mq.rocketmq.consumer;

import?com.sise.datahandle.model.TypeDTO;
import?com.sise.datahandle.mq.rocketmq.producer.RocketMqMsgHandle;
import?com.sise.datahandle.redis.RedisService;
import?lombok.extern.slf4j.Slf4j;
import?org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import?org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import?org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import?org.apache.rocketmq.common.message.MessageExt;
import?org.springframework.beans.factory.annotation.Autowired;
import?org.springframework.stereotype.Component;
import?org.springframework.util.CollectionUtils;

import?java.util.List;

/**
?*?@author?idea
?*?@date?2019/10/20
?*/
@Component
@Slf4j
public?class?RocketMqConsumeMsgListenerProcessor?implements?MessageListenerConcurrently?{

????@Autowired
????private?RedisService?redisService;

????@Override
????public?ConsumeConcurrentlyStatus?consumeMessage(List?msgs,?ConsumeConcurrentlyContext?context)?{if(CollectionUtils.isEmpty(msgs)){
????????????log.info("接受到的消息為空,不處理,直接返回成功");return?ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
????????}
????????MessageExt?messageExt?=?msgs.get(0);
????????System.out.println("接受到的消息為:"+messageExt.toString());if("canal-test-topic".equals(messageExt.getTopic())){if("canal-test-tag".equals(messageExt.getTags())){
????????????????int?reconsume?=?messageExt.getReconsumeTimes();if(reconsume?==3){//消息已經(jīng)重試了3次,如果不需要再次消費,則返回成功return?ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
????????????????}
????????????????TypeDTO?typeDTO?=?RocketMqMsgHandle.parseMessage(messageExt,TypeDTO.class);//存儲進入redis中
????????????????redisService.setObject("typeDTO-"+System.currentTimeMillis(),typeDTO);
????????????}
????????}//?如果沒有return?success?,consumer會重新消費該消息,直到return?successreturn?ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
????}
}

通過訂閱mq的信息,讀取相關的數(shù)據(jù)再次寫入到redis里面,完成一個簡單過程的數(shù)據(jù)異構。

整個迷你工程寫下來,比較核心的地方就在于對binlog日志的解析器部分,如何將日志訂閱之后轉(zhuǎn)換為相應的對象進行處理。

通常采用mq的方式進行數(shù)據(jù)異構會相對簡單,實際上是在監(jiān)聽binlog為寫DB的同時去寫一次MQ,但是這種方式不能夠保證數(shù)據(jù)一致性,就是不能保證跨資源的事務。注:調(diào)用第三方遠程RPC的操作一定不要放到事務中。

完整案例的代碼鏈接如下(點擊閱讀原文直達):

https://gitee.com/IdeaHome_admin/wfw

推薦閱讀(點擊即可跳轉(zhuǎn)閱讀)

1.SpringBoot內(nèi)容聚合

2.面試題內(nèi)容聚合

3.設計模式內(nèi)容聚合

4.Mybatis內(nèi)容聚合

5.多線程內(nèi)容聚合

覺得不錯?歡迎轉(zhuǎn)發(fā)分享給更多人

我知道你 “在看

總結

以上是生活随笔為你收集整理的canal 监听不到数据变化_数据的异构实战(二)手写迷你版同步工程的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網(wǎng)站內(nèi)容還不錯,歡迎將生活随笔推薦給好友。

主站蜘蛛池模板: 印度午夜性春猛xxx交 | 亚洲一区网站 | 久久av免费观看 | 国产精品久久影视 | 亚洲av永久无码国产精品久久 | 成人精品一区二区三区中文字幕 | 精品人妻久久久久一区二区三区 | 成人四色 | 天堂va在线| 一级片免费观看 | 综合色99 | 99激情视频 | 日本不卡在线 | www.999av| 大粗鳮巴久久久久久久久 | 亚洲精品久久久久 | 波多野结衣视频免费在线观看 | 国产精品激情偷乱一区二区∴ | 成人一区在线观看 | 九九视频这里只有精品 | 欧美亚洲精品一区二区 | 亚洲AV午夜成人片 | av毛片观看 | 户外少妇对白啪啪野战 | 毛片免费全部无码播放 | 国产精品性爱在线 | 涩涩视屏| 久久精品无码专区免费 | 激情四射网站 | 久久艹中文字幕 | 午夜婷婷色 | wwwxxx黄色| 久久黑丝 | 国产99久久久国产精品成人免费 | 亚洲国产aⅴ精品一区二区 日韩黄色在线视频 | 亚洲高清精品视频 | 国产精品入口a级 | a网址| 人人插人人看 | 欧美三级在线播放 | 成年人在线视频网站 | 国产精品456| 亚洲天堂久久新 | 日韩欧美卡一卡二 | 真实偷拍激情啪啪对白 | 一个人在线免费观看www | 26uuu精品一区二区在线观看 | 中文字幕高清在线播放 | 91精品啪| 波多野结衣久久 | 伊人性视频| xxxx日韩| 超碰综合网 | 少妇熟女视频一区二区三区 | 奇米狠狠操 | 国产精品久久久久久久妇 | a网站在线观看 | 国产老肥熟 | 亚洲成人黄色小说 | 成人在线免费视频 | 日本a v网站 | 精品人妻少妇一区二区 | 五十路熟母 | 久久精品国产亚洲av麻豆 | 欧美 日韩 国产 激情 | 无码少妇精品一区二区免费动态 | 国产寡妇色xxⅹ交肉视频 | 国产呦系列 | 91精品久久香蕉国产线看观看 | 大陆女明星乱淫合集 | 亚洲精品一区二区在线 | 久草新在线 | 国产熟女高潮视频 | 亚洲欧美成人一区二区 | 七仙女欲春2一级裸体片 | 精品无码一区二区三区蜜臀 | 韩国三级av | 伊人热久久 | 泽村玲子在线 | 狠狠的干狠狠的操 | 国产精品精品 | 91日韩在线视频 | 日本免费中文字幕 | 亚洲激情欧美激情 | 国产精品视频全国免费观看 | 激情五月婷婷丁香 | 自拍偷拍国内 | 国产在线18 | 成人av一区| 国产伦精品一区二区三区照片91 | 毛片av网址| 穿越异世荒淫h啪肉np文 | 福利网址在线 | 好吊视频一区 | 91宅男 | 亚洲成人av综合 | 亚洲国产网址 | 天堂8中文 | 玖玖精品国产 |