Canal 简介
canal 主要用途是基于 MySQL 数据库增量日志解析,提供增量数据订阅和消费。基于日志增量订阅和消费的业务,我们可以实现以下业务需求:
- 数据库镜像
- 数据库实时备份
- 索引构建和实时维护(拆分异构索引、倒排索引等)
- 业务 cache 刷新
- 带业务逻辑的增量数据处理
MySQL主备复制原理
- MySQL master 将数据变更写入二进制日志( binary log, 其中记录叫做二进制日志事件binary log events,可以通过 show binlog events 进行查看)
- MySQL slave 将 master 的 binary log events 拷贝到它的中继日志(relay log)
- MySQL slave 重放 relay log 中事件,将数据变更反映它自己的数据
canal 工作原理
- canal 模拟 MySQL slave 的交互协议,伪装自己为 MySQL slave ,向 MySQL master 发送dump 协议
- MySQL master 收到 dump 请求,开始推送 binary log 给 slave (即 canal )
- canal 解析 binary log 对象(原始为 byte 流)
准备工作
这里选择 1.1.5 的版本,canal 1.1.1 版本之后, 默认支持将canal server接收到的binlog数据直接投递到MQ。
canal 1.1.1版本之后, 默认支持将canal server接收到的binlog数据直接投递到MQ, 目前默认支持的MQ系统有:
- kafka: https://github.com/apache/kafka
- RocketMQ : https://github.com/apache/rocketmq
- RabbitMQ : https://github.com/rabbitmq/rabbitmq-server
- pulsarmq : https://github.com/apache/pulsar
1. 服务器/主机配置
- 开放 11111 和 11112 端口
iptables -I INPUT -p tcp --dport 11111:11112 -j ACCEPT
iptables-save > /etc/sysconfig/iptables
- RabbitMQ 安装配置
这里略过了,安装步骤可以参考官方文档:RabbitMQ QuickStart
- 在 RabbitMQ 控制台配置:
(1). 新增 Exchanges:hong.topic.canal
;
(2). 新增 Queues:hong.queue.canal.sync
;
(3). 配置绑定的 Routing key:hong.router.canal.sync
。
如下图:
2. MySQL 配置
开启 Binlog 写入功能,配置 binlog-format 为ROW 模式,my.cnf
配置如下:
[mysqld]
# 开启 binlog
log-bin=mysql-bin
# 选择 ROW 模式
binlog-format=ROW
# 配置 MySQL replaction 需要定义,不要和 canal 的 slaveId 重复
server_id=1
重启 MySQL:
sudo systemctl restart mysqld
授权 canal 链接 MySQL 的账号具有 MySQL slave 的权限,如已有账号可直接 grant:
CREATE USER canal IDENTIFIED BY 'canal';
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
-- GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%';
FLUSH PRIVILEGES;
- 删除
DROP USER 'canal'@'%';
测试:
canal/canal
线上:
canal/Hong12amAjkf&2!
3. 搭建 canal 服务端
- 下载所需的版本压缩包,下载地址:release,我使用 1.1.5 版本
canal.deployer-1.1.5.tar.gz
:
wget https://github.com/alibaba/canal/releases/download/canal-1.1.5/canal.deployer-1.1.5.tar.gz
# wget https://images.lanweihong.com/software/canal.deployer-1.1.5.tar.gz
mkdir canal.deployer-1.1.5
tar -zxvf canal.deployer-1.1.5.tar.gz -C canal.deployer-1.1.5
cp -r canal.deployer-1.1.5 /usr/local/canal.deployer-1.1.5
- 修改
instance
配置vi /usr/local/canal.deployer-1.1.5/conf/example/instance.properties
:
canal.instance.master.address=127.0.0.1:3306
canal.instance.dbUsername=canal
canal.instance.dbPassword=canal
canal.instance.connectionCharset=UTF-8
# mq config
# 这里填写 routing
canal.mq.topic=hong.router.canal.sync
- 修改
canal
配置vi /usr/local/canal.deployer-1.1.5/conf/canal.properties
:
canal.serverMode = rabbitMQ
rabbitmq.host = 127.0.0.1:5672
rabbitmq.virtual.host = /dev
# 填 exchange
rabbitmq.exchange = hong.topic.canal
rabbitmq.username = admin
rabbitmq.password = 123456
rabbitmq.deliveryMode =
- 启动 canal
/usr/local/canal.deployer-1.1.5/bin/startup.sh
查看日志:
cat /usr/local/canal.deployer-1.1.5/logs/canal/canal.log
输出如下:
2023-11-02 21:43:08.224 [main] INFO com.alibaba.otter.canal.deployer.CanalLauncher - ## set default uncaught exception handler
2023-11-02 21:43:08.264 [main] INFO com.alibaba.otter.canal.deployer.CanalLauncher - ## load canal configurations
2023-11-02 21:43:08.492 [main] INFO com.alibaba.otter.canal.deployer.CanalStarter - ## start the canal server.
2023-11-02 21:43:08.538 [main] INFO com.alibaba.otter.canal.deployer.CanalController - ## start the canal server[172.25.1.2(172.25.1.2):11111]
2023-11-02 21:43:10.346 [main] INFO com.alibaba.otter.canal.deployer.CanalStarter - ## the canal server is running now ......
实现步骤
- 整合 Spring Boot Canal 客户端
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
<!-- 使用 spring boot 版本 -->
<version>2.3.8.RELEASE</version>
</dependency>
- 添加 RabbitMQ 连接配置:
spring:
rabbitmq:
host: 127.0.0.1
port: 5672
username: admin
password: 123456
virtual-host: /dev
- 配置消费方法:
@Component
@Slf4j
public class CanalConsumer {
@RabbitListener(queues = MqConstants.QUEUE_CANAL_SYNC)
public void handCanalSync(Channel channel, Message message) {
String msg = new String(message.getBody(), StandardCharsets.UTF_8);
log.info("mq 消费:{}", msg);
}
}
其中 MqConstants.QUEUE_CANAL_SYNC
内容为 “hong.queue.canal.sync”,和 RabbitMQ 控制台配置的队列一样即可。
- 启动项目测试
在数据库中修改数据,发现已有消息消费:
2023-11-02 22:34:22.206 INFO 11552 --- [ntContainer#0-1] c.l.assistant.consumer.CanalConsumer : mq 消费:{"data":[{"id":"604841199857049600","version":null,"creator_id":"604841046458769408","pay_type_name":"呵呵测试","pay_type_icon":"weixin1","default_status":"0","status":"1","update_time":null,"add_time":"2021-06-22 22:24:05"}],"database":"hong_test","es":1698935662000,"id":3,"isDdl":false,"mysqlType":{"id":"bigint","version":"tinyint(1)","creator_id":"bigint","pay_type_name":"varchar(255)","pay_type_icon":"varchar(255)","default_status":"tinyint","status":"tinyint(1)","update_time":"datetime","add_time":"datetime"},"old":[{"pay_type_name":"呵呵"}],"pkNames":["id"],"sql":"","sqlType":{"id":-5,"version":-6,"creator_id":-5,"pay_type_name":12,"pay_type_icon":12,"default_status":-6,"status":-6,"update_time":93,"add_time":93},"table":"loan_pay_type","ts":1698935662174,"type":"UPDATE"}
至此,基本框架搭建完成,不过这才是刚刚开始,实际开发还要跟业务结合使用,再自身扩展即可,可实时同步数据至ES、Redis等中间件。
另外,需要注意的是,如果使用 RabbitMQ的,在消息消费时需要格外注意消息消费的顺序性,否则同步到其他中间件时可能存在数据不一致的情况。
针对搭配 RabbitMQ 使用,消息消费的顺序性思考
RabbitMQ 消费本身是无序的,在这个场景下如果不保证消息消费的顺序性,可能会导致后消费的旧数据覆盖先消费的新数据,导致数据不一致。
在使用 RabbitMQ 过程中,要保证消息顺序消费,就要确保消息从生产到消费整个过程中都是有序的,一般可以从以下几点入手:
- 消息发送,自己确保是有序的;集群化部署的话,可以通过分布式锁确保消息有序。
- 消息到达队列之后,默认就是有序的。
- 消息消费,一个队列对应一个消费者,并且一个消费者一个 channel,不能并发消费,就可以确保消息有序。
再结合 Canal 订阅数据更新的方案,分析整个过程(参考canal如何保证数据的顺序性,相同主键上的插入更新和删除会保证顺序吗?)是否有序:
- binlog 本身就是有序的;
- binlog -> canal 这个过程,也是按照 mysql slave dump 协议订阅的,也是有序的;
- canal -> mq 过程,按照写入 mq 方式不同,是在不同的维度上保证有序。
因此,在此技术方案中,要想保证消息消费的顺序性,就必须配置只有一个客户端进行消费。在 RabbitMQ 中,我们可以按以下步骤实现:
- 配置队列消费者单活模式,在控制台中配置如下,就配置参数
x-single-active-consumer
值为true
,最多允许一个消费者消费。
如果是在代码中定义队列的,可以这样处理:
@Bean
public Queue canalQueue() {
Map<String, Object> arguments = new HashMap<>();
// 配置
arguments.put("x-single-active-consumer", true);
return new Queue("hong.queue.goods.sync", true, false, false, arguments);
}
控制台和代码配置方式,选一个即可。
展示:
这样就可以确保只有一个客户端消费了。
关于 RabbitMQ 的消息消费顺序探讨,可参考以下文章:
其他的解决方案:
- 选择 Kafka 或者 RocketMQ,支持顺序消费。