简单实现MySQL数据实时增量同步到Kafka————Maxwell
任務(wù)需求:將MySQL里的數(shù)據(jù)實(shí)時(shí)增量同步到Kafka
1、準(zhǔn)備工作
1.1、MySQL方面:開(kāi)啟BinLog
1.1.1、修改my.cnf文件
vi /etc/my.cnf [mysqld] server-id = 1 binlog_format = ROW1.1.2、重啟MySQL,然后登陸到MySQL之后,查看是否已經(jīng)修改過(guò)來(lái):
mysql> show variables like 'binlog_format'; +---------------+-------+ | Variable_name | Value | +---------------+-------+ | binlog_format | ROW | +---------------+-------+1.1.3、創(chuàng)建Maxwell用戶,并賦予 maxwell 庫(kù)的一些權(quán)限
CREATE USER 'maxwell'@'%' IDENTIFIED BY '123456'; GRANT ALL ON maxwell.* TO 'maxwell'@'%'; GRANT SELECT, REPLICATION CLIENT, REPLICATION SLAVE on *.* to 'maxwell'@'%';1.2、Kafka準(zhǔn)備工作
1.2.1、啟動(dòng)Zookeeper
1.2.2、啟動(dòng)kafka
kafka-server-start /usr/local/etc/kafka/server.properties2、Maxwell
2.1、下載安裝包
https://github.com/zendesk/maxwell/releases/download/v1.20.0/maxwell-1.20.0.tar.gz
2.2、解壓到指定位置
2.3、在MYSQL中創(chuàng)建測(cè)試用表(前提你要進(jìn)入一個(gè)庫(kù))
CREATE TABLE `test` (`id` bigint(20) NOT NULL AUTO_INCREMENT,`age` int(11) DEFAULT NULL,`name` varchar(255) DEFAULT NULL,PRIMARY KEY (`id`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8;2.4、打開(kāi)Maxwell(要在maxwell安裝目錄下)
bin/maxwell --user=maxwell --password=123456 --host='127.0.0.1' --producer=stdout2.5、對(duì)數(shù)據(jù)進(jìn)行操作
insert into test values(1,22,"小明"); update test set name='whirly' where id=1; delete from test where id=1;可以看到Maxwell控制臺(tái)的輸出,測(cè)試成功!
{"database":"test","table":"test","type":"insert","ts":1552153502,"xid":832,"commit":true,"data":{"id":1,"age":22,"name":"小明鋒"}} {"database":"test","table":"test","type":"update","ts":1552153502,"xid":833,"commit":true,"data":{"id":1,"age":22,"name":"whirly"},"old":{"name":"小明"}} {"database":"test","table":"test","type":"delete","ts":1552153502,"xid":834,"commit":true,"data":{"id":1,"age":22,"name":"whirly"}}3、實(shí)現(xiàn)MySQL數(shù)據(jù)實(shí)時(shí)增量同步到Kafka
3.1、開(kāi)啟指定到Kafka的MaxWell
bin/maxwell --user='maxwell' --password='123456' --host='127.0.0.1' \--producer=kafka --kafka.bootstrap.servers=localhost:9092 --kafka_topic=maxwell --kafka_version=0.10.2.13.2、對(duì)數(shù)據(jù)庫(kù)進(jìn)行操作
insert into test values(1,22,"小明"); update test set name='whirly' where id=1; delete from test where id=1;3.3、啟動(dòng)一個(gè)消費(fèi)者來(lái)消費(fèi) maxwell topic的消息,觀察其輸出;
kafka-console-consumer --bootstrap-server 127.0.0.1:9092 --topic test --from-beginning3.4、再次執(zhí)行數(shù)據(jù)庫(kù)結(jié)果觀察,仍然可以得到相同的輸出
總結(jié)
以上是生活随笔為你收集整理的简单实现MySQL数据实时增量同步到Kafka————Maxwell的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。
- 上一篇: 存储过程与函数oracle
- 下一篇: MySQL管理利器 MySQL Util