Canal

Canal,译意为水道/管道/沟渠,主要用途是基于 MySQL 数据库增量日志解析,提供增量数据订阅和消费

早期阿里巴巴因为杭州和美国双机房部署,存在跨机房同步的业务需求,实现方式主要是基于业务 trigger 获取增量变更。

从 2010 年开始,业务逐步尝试数据库日志解析获取增量变更进行同步,由此衍生出了大量的数据库增量订阅和消费业务。

基于日志增量订阅和消费的业务包括

  • 数据库镜像

  • 数据库实时备份

  • 索引构建和实时维护(拆分异构索引、倒排索引等)

  • 业务 cache 刷新

  • 带业务逻辑的增量数据处理

当前的 canal 支持源端 MySQL 版本包括 5.1.x , 5.5.x , 5.6.x , 5.7.x , 8.0.x

mysql

windows10 WSL 实战笔记

准备工作

确保已经开启 binlog

binlog 开启

mysql> show variables where variable_name in ('log_bin', 'binlog_format', 'binlog_row_image', 'gtid_mode', 'enforce_gtid_consistency');
+--------------------------+-------+
| Variable_name            | Value |
+--------------------------+-------+
| binlog_format            | ROW   |
| binlog_row_image         | FULL  |
| enforce_gtid_consistency | ON    |
| gtid_mode                | ON    |
| log_bin                  | ON    |
+--------------------------+-------+

创建 canal 账户

CREATE USER 'canal'@'%' IDENTIFIED BY 'canal';
GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' WITH GRANT OPTION;
flush privileges;

创建表

create database test_source;
use test_source;

drop table if exists user_info;
create table user_info
(
    id int unsigned auto_increment comment '主键' primary key,
    username varchar(128) not null comment '用户名',
    create_time timestamp default CURRENT_TIMESTAMP not null comment '创建时间',
    update_time timestamp default CURRENT_TIMESTAMP not null on update CURRENT_TIMESTAMP comment '更新时间'
) comment '用户信息表' ENGINE=Innodb default charset=utf8mb4 auto_increment=1;

初始化数据

insert into user_info (username) values ('u1');
insert into user_info (username) values ('u2');
insert into user_info (username) values ('u3');
insert into user_info (username) values ('u4');
insert into user_info (username) values ('u5');

下载 canal

$ pwd
/home/houbinbin/canal

下载 canal, 访问 release 页面 , 选择需要的包下载, 如以 canal.deployer-1.1.7.tar.gz 为例

wget https://github.com/alibaba/canal/releases/download/canal-1.1.7/canal.deployer-1.1.7.tar.gz

解压缩

tar -zxvf canal.deployer-1.1.7.tar.gz

如下:

$ ls
bin  canal.deployer-1.1.7.tar.gz  canal.deployer-1.1.7.tar.gz:Zone.Identifier  conf  lib  logs  plugin

配置修改

修改对应的配置文件

vi conf/example/instance.properties

## mysql serverId
canal.instance.mysql.slaveId = 1234
#position info,需要改成自己的数据库信息
canal.instance.master.address = 127.0.0.1:13306 
canal.instance.master.journal.name = 
canal.instance.master.position = 
canal.instance.master.timestamp = 
#canal.instance.standby.address = 
#canal.instance.standby.journal.name =
#canal.instance.standby.position = 
#canal.instance.standby.timestamp = 
#username/password,需要改成自己的数据库信息
canal.instance.dbUsername = canal  
canal.instance.dbPassword = canal
canal.instance.defaultDatabaseName =
canal.instance.connectionCharset = UTF-8
#table regex
canal.instance.filter.regex = .\*\\\\..\*

这里主要改一下 canal.instance.master.address 为自己的本地地址,其他的不变化。

canal.instance.connectionCharset 代表数据库的编码方式对应到 java 中的编码类型,比如 UTF-8,GBK , ISO-8859-1

如果系统是1个 cpu,需要将 canal.instance.parser.parallel 设置为 false

启动

sh bin/startup.sh

查看 server 日志

cat logs/canal/canal.log

日志如下:

2024-01-27 19:46:19.266 [main] INFO  com.alibaba.otter.canal.deployer.CanalLauncher - ## load canal configurations
2024-01-27 19:46:19.280 [main] INFO  com.alibaba.otter.canal.deployer.CanalStarter - ## start the canal server.
2024-01-27 19:46:19.333 [main] INFO  com.alibaba.otter.canal.deployer.CanalController - ## start the canal server[172.23.234.67(172.23.234.67):11111]
2024-01-27 19:46:21.108 [main] INFO  com.alibaba.otter.canal.deployer.CanalStarter - ## the canal server is running now ......

查看 instance 的日志

cat logs/example/example.log

日志如下:

2024-01-27 19:46:20.101 [main] INFO  c.a.otter.canal.instance.spring.CanalInstanceWithSpring - start CannalInstance for 1-example
2024-01-27 19:46:21.056 [main] WARN  c.a.o.canal.parse.inbound.mysql.dbsync.LogEventConvert - --> init table filter : ^.*\..*$
2024-01-27 19:46:21.057 [main] WARN  c.a.o.canal.parse.inbound.mysql.dbsync.LogEventConvert - --> init table black filter : ^mysql\.slave_.*$
2024-01-27 19:46:21.063 [main] INFO  c.a.otter.canal.instance.core.AbstractCanalInstance - start successful....
2024-01-27 19:46:21.241 [destination = example , address = /127.0.0.1:13306 , EventParser] WARN  c.a.o.c.p.inbound.mysql.rds.RdsBinlogEventParserProxy - ---> begin to find start position, it will be long time for reset or first position
2024-01-27 19:46:21.241 [destination = example , address = /127.0.0.1:13306 , EventParser] WARN  c.a.o.c.p.inbound.mysql.rds.RdsBinlogEventParserProxy - prepare to find start position just show master status
2024-01-27 19:46:22.484 [destination = example , address = /127.0.0.1:13306 , EventParser] WARN  c.a.o.c.p.inbound.mysql.rds.RdsBinlogEventParserProxy - ---> find start position successfully, EntryPosition[included=false,journalName=mysql-bin.000001,position=4,serverId=223344,gtid=<null>,timestamp=1706355127000] cost : 1231ms , the next step is binlog dump

关闭

sh bin/stop.sh

client exmaple

参见:https://github.com/alibaba/canal/wiki/ClientExample

下载

下载 canal, 访问 release 页面 , 选择需要的包下载, 如以 canal.example-1.1.7.tar.gz 为例

wget https://github.com/alibaba/canal/releases/download/canal-1.1.7/canal.example-1.1.7.tar.gz

解压

cd ~/canal/client/
tar -zxvf canal.example-1.1.7.tar.gz

$ ls
bin  canal.example-1.1.7.tar.gz  canal.example-1.1.7.tar.gz:Zone.Identifier  conf  lib  logs

启动

sh startup.sh

直接看一下日志:

cd /home/houbinbin/canal/client/logs/example
tail -fn30 entry.log

插入数据

数据库插入数据:

insert into user_info (username) values ('u7');

对应的客户端日志:

================> binlog[mysql-bin.000001:4325] , executeTime : 1706356761000(2024-01-27 19:59:21) , gtid : () , delay : 335ms
 BEGIN ----> Thread id: 8
----------------> binlog[mysql-bin.000001:4481] , name[test_source,user_info] , eventType : INSERT , executeTime : 1706356761000(2024-01-27 19:59:21) , gtid : () , delay : 335 ms
id : 6    type=int unsigned    update=true
username : u7    type=varchar(128)    update=true
create_time : 2024-01-27 19:59:21    type=timestamp    update=true
update_time : 2024-01-27 19:59:21    type=timestamp    update=true
----------------
 END ----> transaction id: 351
================> binlog[mysql-bin.000001:4533] , executeTime : 1706356761000(2024-01-27 19:59:21) , gtid : () , delay : 336ms

小结

canal 的设计理念其实已经非常先进了,对于日常的 mysql cdc 完全够用。

实际使用时,可以在 exmaple 的基础之上,做自己的业务能力增强。

不过还有一些类似的更强大的设计,比如 Debezium-01-为捕获数据更改(change data capture,CDC)提供了一个低延迟的流式处理平台

参考资料

https://github.com/alibaba/canal/wiki/QuickStart