日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當(dāng)前位置: 首頁 > 运维知识 > windows >内容正文

windows

[Spring cloud 一步步实现广告系统] 16. 增量索引实现以及投送数据到MQ(kafka)

發(fā)布時間:2025/6/17 windows 31 豆豆
生活随笔 收集整理的這篇文章主要介紹了 [Spring cloud 一步步实现广告系统] 16. 增量索引实现以及投送数据到MQ(kafka) 小編覺得挺不錯的,現(xiàn)在分享給大家,幫大家做個參考.
實現(xiàn)增量數(shù)據(jù)索引

上一節(jié)中,我們?yōu)閷崿F(xiàn)增量索引的加載做了充足的準(zhǔn)備,使用到mysql-binlog-connector-java 開源組件來實現(xiàn)MySQL 的binlog監(jiān)聽,關(guān)于binlog的相關(guān)知識,大家可以自行網(wǎng)絡(luò)查閱。或者可以mailto:magicianisaac@gmail.com

本節(jié)我們將根據(jù)binlog 的數(shù)據(jù)對象,來實現(xiàn)增量數(shù)據(jù)的處理,我們構(gòu)建廣告的增量數(shù)據(jù),其實說白了就是為了在后期能把廣告投放到索引服務(wù),實現(xiàn)增量數(shù)據(jù)到增量索引的生成。Let's code.

  • 定義一個投遞增量數(shù)據(jù)的接口(接收參數(shù)為我們上一節(jié)定義的binlog日志的轉(zhuǎn)換對象)
/*** ISender for 投遞增量數(shù)據(jù) 方法定義接口** @author <a href="mailto:magicianisaac@gmail.com">Isaac.Zhang | 若初</a>*/ public interface ISender {void sender(MysqlRowData rowData); }
  • 創(chuàng)建增量索引監(jiān)聽器
/*** IncrementListener for 增量數(shù)據(jù)實現(xiàn)監(jiān)聽** @author <a href="mailto:magicianisaac@gmail.com">Isaac.Zhang | 若初</a>* @since 2019/6/27*/ @Slf4j @Component public class IncrementListener implements Ilistener {private final AggregationListener aggregationListener;@Autowiredpublic IncrementListener(AggregationListener aggregationListener) {this.aggregationListener = aggregationListener;}//根據(jù)名稱選擇要注入的投遞方式@Resource(name = "indexSender")private ISender sender;/*** 標(biāo)注為 {@link PostConstruct},* 即表示在服務(wù)啟動,Bean完成初始化之后,立刻初始化*/@Override@PostConstructpublic void register() {log.info("IncrementListener register db and table info.");Constant.table2db.forEach((tb, db) -> aggregationListener.register(db, tb, this));}@Overridepublic void onEvent(BinlogRowData eventData) {TableTemplate table = eventData.getTableTemplate();EventType eventType = eventData.getEventType();//包裝成最后需要投遞的數(shù)據(jù)MysqlRowData rowData = new MysqlRowData();rowData.setTableName(table.getTableName());rowData.setLevel(eventData.getTableTemplate().getLevel());//將EventType轉(zhuǎn)為OperationTypeEnumOperationTypeEnum operationType = OperationTypeEnum.convert(eventType);rowData.setOperationTypeEnum(operationType);//獲取模版中該操作對應(yīng)的字段列表List<String> fieldList = table.getOpTypeFieldSetMap().get(operationType);if (null == fieldList) {log.warn("{} not support for {}.", operationType, table.getTableName());return;}for (Map<String, String> afterMap : eventData.getAfter()) {Map<String, String> _afterMap = new HashMap<>();for (Map.Entry<String, String> entry : afterMap.entrySet()) {String colName = entry.getKey();String colValue = entry.getValue();_afterMap.put(colName, colValue);}rowData.getFieldValueMap().add(_afterMap);}sender.sender(rowData);} }
開啟binlog監(jiān)聽
  • 首先來配置監(jiān)聽binlog的數(shù)據(jù)庫連接信息
adconf:mysql:host: 127.0.0.1port: 3306username: rootpassword: 12345678binlogName: ""position: -1 # 從當(dāng)前位置開始監(jiān)聽

編寫配置類:

/*** BinlogConfig for 定義監(jiān)聽Binlog的配置信息** @author <a href="mailto:magicianisaac@gmail.com">Isaac.Zhang | 若初</a>*/ @Component @ConfigurationProperties(prefix = "adconf.mysql") @Data @AllArgsConstructor @NoArgsConstructor public class BinlogConfig {private String host;private Integer port;private String username;private String password;private String binlogName;private Long position; }

在我們實現(xiàn) 監(jiān)聽binlog那節(jié),我們實現(xiàn)了一個自定義client CustomBinlogClient,需要實現(xiàn)binlog的監(jiān)聽,這個監(jiān)聽的客戶端就必須是一個獨立運行的線程,并且要在程序啟動的時候進(jìn)行監(jiān)聽,我們來實現(xiàn)運行當(dāng)前client的方式,這里我們會使用到一個新的Runnerorg.springframework.boot.CommandLineRunner,let's code.

@Slf4j @Component public class BinlogRunner implements CommandLineRunner {@Autowiredprivate CustomBinlogClient binlogClient;@Overridepublic void run(String... args) throws Exception {log.info("BinlogRunner is running...");binlogClient.connect();} }
增量數(shù)據(jù)投遞

在binlog監(jiān)聽的過程中,我們看到針對于int, String 這類數(shù)據(jù)字段,mysql的記錄是沒有問題的,但是針對于時間類型,它被格式化成了字符串類型:Fri Jun 21 15:07:53 CST 2019。

--------Insert----------- WriteRowsEventData{tableId=91, includedColumns={0, 1, 2, 3, 4, 5, 6, 7}, rows=[ [10, 11, ad unit test binlog, 1, 0, 1236.7655, Thu Jun 27 08:00:00 CST 2019, Thu Jun 27 08:00:00 CST 2019] --------Update----------- UpdateRowsEventData{tableId=81, includedColumnsBeforeUpdate={0, 1, 2, 3, 4, 5}, includedColumns={0, 1, 2, 3, 4, 5}, rows=[{before=[10, Isaac Zhang, 2D3ABB6F2434109A105170FB21D00453, 0, Fri Jun 21 15:07:53 CST 2019, Fri Jun 21 15:07:53 CST 2019], after=[10, Isaac Zhang, 2D3ABB6F2434109A105170FB21D00453, 1, Fri Jun 21 15:07:53 CST 2019, Fri Jun 21 15:07:53 CST 2019]}

對于這個時間格式,我們需要關(guān)注2點信息:

  • CST,這個時間格式會比我們的時間+ 8h(中國標(biāo)準(zhǔn)時間 China Standard Time UT+8:00)
  • 需要對這個日期進(jìn)行解釋處理

當(dāng)然,我們也可以通過設(shè)置mysql的日期格式來改變該行為,在此,我們通過編碼來解析該時間格式:

/*** Thu Jun 27 08:00:00 CST 2019*/public static Date parseBinlogString2Date(String dateString) {try {DateFormat dateFormat = new SimpleDateFormat("EEE MMM dd HH:mm:ss zzz yyyy",Locale.US);return DateUtils.addHours(dateFormat.parse(dateString), -8);} catch (ParseException ex) {log.error("parseString2Date error:{}", dateString);return null;}}

因為我們在定義索引的時候,是根據(jù)表之間的層級關(guān)系(Level)來設(shè)定的,根據(jù)代碼規(guī)范,不允許出現(xiàn)Magic Number, 因此我們定義一個數(shù)據(jù)層級枚舉,來表達(dá)數(shù)據(jù)層級。

/*** AdDataLevel for 廣告數(shù)據(jù)層級** @author <a href="mailto:magicianisaac@gmail.com">Isaac.Zhang | 若初</a>*/ @Getter public enum AdDataLevel {LEVEL2("2", "level 2"),LEVEL3("3", "level 3"),LEVEL4("4", "level 4");private String level;private String desc;AdDataLevel(String level, String desc) {this.level = level;this.desc = desc;} }
實現(xiàn)數(shù)據(jù)投遞

因為增量數(shù)據(jù)可以投遞到不同的位置以及用途,我們之前實現(xiàn)了一個投遞接口com.sxzhongf.ad.sender.ISender,接下來我們實現(xiàn)一個投遞類:

@Slf4j @Component("indexSender") public class IndexSender implements ISender {/*** 根據(jù)廣告級別,投遞Binlog數(shù)據(jù)*/@Overridepublic void sender(MysqlRowData rowData) {if (AdDataLevel.LEVEL2.getLevel().equals(rowData.getLevel())) {Level2RowData(rowData);} else if (AdDataLevel.LEVEL3.getLevel().equals(rowData.getLevel())) {Level3RowData(rowData);} else if (AdDataLevel.LEVEL4.getLevel().equals(rowData.getLevel())) {Level4RowData(rowData);} else {log.error("Binlog MysqlRowData error: {}", JSON.toJSONString(rowData));}}private void Level2RowData(MysqlRowData rowData) {if (rowData.getTableName().equals(Constant.AD_PLAN_TABLE_INFO.TABLE_NAME)) {List<AdPlanTable> planTables = new ArrayList<>();for (Map<String, String> fieldValueMap : rowData.getFieldValueMap()) {AdPlanTable planTable = new AdPlanTable();//Map的第二種循環(huán)方式fieldValueMap.forEach((k, v) -> {switch (k) {case Constant.AD_PLAN_TABLE_INFO.COLUMN_PLAN_ID:planTable.setPlanId(Long.valueOf(v));break;case Constant.AD_PLAN_TABLE_INFO.COLUMN_USER_ID:planTable.setUserId(Long.valueOf(v));break;case Constant.AD_PLAN_TABLE_INFO.COLUMN_PLAN_STATUS:planTable.setPlanStatus(Integer.valueOf(v));break;case Constant.AD_PLAN_TABLE_INFO.COLUMN_START_DATE:planTable.setStartDate(CommonUtils.parseBinlogString2Date(v));break;case Constant.AD_PLAN_TABLE_INFO.COLUMN_END_DATE:planTable.setEndDate(CommonUtils.parseBinlogString2Date(v));break;}});planTables.add(planTable);}//投遞推廣計劃planTables.forEach(p -> AdLevelDataHandler.handleLevel2Index(p, rowData.getOperationTypeEnum()));} else if (rowData.getTableName().equals(Constant.AD_CREATIVE_TABLE_INFO.TABLE_NAME)) {List<AdCreativeTable> creativeTables = new LinkedList<>();rowData.getFieldValueMap().forEach(afterMap -> {AdCreativeTable creativeTable = new AdCreativeTable();afterMap.forEach((k, v) -> {switch (k) {case Constant.AD_CREATIVE_TABLE_INFO.COLUMN_CREATIVE_ID:creativeTable.setAdId(Long.valueOf(v));break;case Constant.AD_CREATIVE_TABLE_INFO.COLUMN_TYPE:creativeTable.setType(Integer.valueOf(v));break;case Constant.AD_CREATIVE_TABLE_INFO.COLUMN_MATERIAL_TYPE:creativeTable.setMaterialType(Integer.valueOf(v));break;case Constant.AD_CREATIVE_TABLE_INFO.COLUMN_HEIGHT:creativeTable.setHeight(Integer.valueOf(v));break;case Constant.AD_CREATIVE_TABLE_INFO.COLUMN_WIDTH:creativeTable.setWidth(Integer.valueOf(v));break;case Constant.AD_CREATIVE_TABLE_INFO.COLUMN_AUDIT_STATUS:creativeTable.setAuditStatus(Integer.valueOf(v));break;case Constant.AD_CREATIVE_TABLE_INFO.COLUMN_URL:creativeTable.setAdUrl(v);break;}});creativeTables.add(creativeTable);});//投遞廣告創(chuàng)意creativeTables.forEach(c -> AdLevelDataHandler.handleLevel2Index(c, rowData.getOperationTypeEnum()));}}private void Level3RowData(MysqlRowData rowData) {...}/*** 處理4級廣告*/private void Level4RowData(MysqlRowData rowData) {...} }
投放增量數(shù)據(jù)到MQ(kafka)

為了我們的數(shù)據(jù)投放更加靈活,方便數(shù)據(jù)統(tǒng)計,分析等系統(tǒng)的需求,我們來實現(xiàn)一個投放到消息中的接口,其他服務(wù)可以訂閱當(dāng)前MQ 的TOPIC來實現(xiàn)數(shù)據(jù)訂閱。

配置文件中配置TOPIC adconf:kafka:topic: ad-search-mysql-data-------------------------------------- /*** KafkaSender for 投遞Binlog增量數(shù)據(jù)到kafka消息隊列** @author <a href="mailto:magicianisaac@gmail.com">Isaac.Zhang | 若初</a>* @since 2019/7/1*/ @Component(value = "kafkaSender") public class KafkaSender implements ISender {@Value("${adconf.kafka.topic}")private String topic;@Autowiredprivate KafkaTemplate kafkaTemplate;/*** 發(fā)送數(shù)據(jù)到kafka隊列*/@Overridepublic void sender(MysqlRowData rowData) {kafkaTemplate.send(topic, JSON.toJSONString(rowData));}/*** 測試消費kafka消息*/@KafkaListener(topics = {"ad-search-mysql-data"}, groupId = "ad-search")public void processMysqlRowData(ConsumerRecord<?, ?> record) {Optional<?> kafkaMsg = Optional.ofNullable(record.value());if (kafkaMsg.isPresent()) {Object message = kafkaMsg.get();MysqlRowData rowData = JSON.parseObject(message.toString(),MysqlRowData.class);System.out.println("kafka process MysqlRowData: " + JSON.toJSONString(rowData));//sender.sender();}} }

轉(zhuǎn)載于:https://www.cnblogs.com/zhangpan1244/p/11333229.html

總結(jié)

以上是生活随笔為你收集整理的[Spring cloud 一步步实现广告系统] 16. 增量索引实现以及投送数据到MQ(kafka)的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網(wǎng)站內(nèi)容還不錯,歡迎將生活随笔推薦給好友。