Canal 简介

canal 主要用途是基于 MySQL 数据库增量日志解析,提供增量数据订阅和消费。基于日志增量订阅和消费的业务,我们可以实现以下业务需求:

  • 数据库镜像
  • 数据库实时备份
  • 索引构建和实时维护(拆分异构索引、倒排索引等)
  • 业务 cache 刷新
  • 带业务逻辑的增量数据处理

canal介绍

MySQL主备复制原理

  • MySQL master 将数据变更写入二进制日志( binary log, 其中记录叫做二进制日志事件binary log events,可以通过 show binlog events 进行查看)
  • MySQL slave 将 master 的 binary log events 拷贝到它的中继日志(relay log)
  • MySQL slave 重放 relay log 中事件,将数据变更反映它自己的数据

canal 工作原理

  • canal 模拟 MySQL slave 的交互协议,伪装自己为 MySQL slave ,向 MySQL master 发送dump 协议
  • MySQL master 收到 dump 请求,开始推送 binary log 给 slave (即 canal )
  • canal 解析 binary log 对象(原始为 byte 流)

准备工作

这里选择 1.1.5 的版本,canal 1.1.1 版本之后, 默认支持将canal server接收到的binlog数据直接投递到MQ。

canal 1.1.1版本之后, 默认支持将canal server接收到的binlog数据直接投递到MQ, 目前默认支持的MQ系统有:

1. 服务器/主机配置

  1. 开放 11111 和 11112 端口
iptables -I  INPUT  -p  tcp  --dport 11111:11112  -j  ACCEPT
iptables-save > /etc/sysconfig/iptables
  1. RabbitMQ 安装配置

这里略过了,安装步骤可以参考官方文档:RabbitMQ QuickStart

  1. 在 RabbitMQ 控制台配置:

(1). 新增 Exchanges:hong.topic.canal
(2). 新增 Queues:hong.queue.canal.sync
(3). 配置绑定的 Routing key:hong.router.canal.sync

如下图:

RabbitMQ配置

2. MySQL 配置

开启 Binlog 写入功能,配置 binlog-format 为ROW 模式,my.cnf 配置如下:

[mysqld]
# 开启 binlog
log-bin=mysql-bin
# 选择 ROW 模式
binlog-format=ROW
# 配置 MySQL replaction 需要定义,不要和 canal 的 slaveId 重复
server_id=1

重启 MySQL:

sudo systemctl restart mysqld

授权 canal 链接 MySQL 的账号具有 MySQL slave 的权限,如已有账号可直接 grant:

CREATE USER canal IDENTIFIED BY 'canal';  
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
-- GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%';
FLUSH PRIVILEGES;
- 删除
DROP USER 'canal'@'%';  

测试:
canal/canal
线上:
canal/Hong12amAjkf&2!

3. 搭建 canal 服务端

  1. 下载所需的版本压缩包,下载地址:release,我使用 1.1.5 版本 canal.deployer-1.1.5.tar.gz
wget https://github.com/alibaba/canal/releases/download/canal-1.1.5/canal.deployer-1.1.5.tar.gz
# wget https://images.lanweihong.com/software/canal.deployer-1.1.5.tar.gz
mkdir canal.deployer-1.1.5
tar -zxvf canal.deployer-1.1.5.tar.gz -C canal.deployer-1.1.5
cp -r canal.deployer-1.1.5 /usr/local/canal.deployer-1.1.5
  1. 修改 instance 配置 vi /usr/local/canal.deployer-1.1.5/conf/example/instance.properties
canal.instance.master.address=127.0.0.1:3306
canal.instance.dbUsername=canal
canal.instance.dbPassword=canal
canal.instance.connectionCharset=UTF-8

# mq config
# 这里填写 routing
canal.mq.topic=hong.router.canal.sync
  1. 修改 canal 配置 vi /usr/local/canal.deployer-1.1.5/conf/canal.properties
canal.serverMode = rabbitMQ

rabbitmq.host = 127.0.0.1:5672
rabbitmq.virtual.host = /dev
# 填 exchange
rabbitmq.exchange = hong.topic.canal
rabbitmq.username = admin
rabbitmq.password = 123456
rabbitmq.deliveryMode =
  1. 启动 canal
/usr/local/canal.deployer-1.1.5/bin/startup.sh

查看日志:

cat /usr/local/canal.deployer-1.1.5/logs/canal/canal.log

输出如下:

2023-11-02 21:43:08.224 [main] INFO  com.alibaba.otter.canal.deployer.CanalLauncher - ## set default uncaught exception handler
2023-11-02 21:43:08.264 [main] INFO  com.alibaba.otter.canal.deployer.CanalLauncher - ## load canal configurations
2023-11-02 21:43:08.492 [main] INFO  com.alibaba.otter.canal.deployer.CanalStarter - ## start the canal server.
2023-11-02 21:43:08.538 [main] INFO  com.alibaba.otter.canal.deployer.CanalController - ## start the canal server[172.25.1.2(172.25.1.2):11111]
2023-11-02 21:43:10.346 [main] INFO  com.alibaba.otter.canal.deployer.CanalStarter - ## the canal server is running now ......

实现步骤

  1. 整合 Spring Boot Canal 客户端
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
    <!-- 使用 spring boot 版本 -->
    <version>2.3.8.RELEASE</version>
</dependency>
  1. 添加 RabbitMQ 连接配置:
spring:
  rabbitmq:
    host: 127.0.0.1
    port: 5672
    username: admin
    password: 123456
    virtual-host: /dev
  1. 配置消费方法:
@Component
@Slf4j
public class CanalConsumer {

    @RabbitListener(queues = MqConstants.QUEUE_CANAL_SYNC)
    public void handCanalSync(Channel channel, Message message) {
        String msg = new String(message.getBody(), StandardCharsets.UTF_8);
        log.info("mq 消费:{}", msg);
    }
}

其中 MqConstants.QUEUE_CANAL_SYNC 内容为 “hong.queue.canal.sync”,和 RabbitMQ 控制台配置的队列一样即可。

  1. 启动项目测试

在数据库中修改数据,发现已有消息消费:

2023-11-02 22:34:22.206  INFO 11552 --- [ntContainer#0-1] c.l.assistant.consumer.CanalConsumer     : mq 消费:{"data":[{"id":"604841199857049600","version":null,"creator_id":"604841046458769408","pay_type_name":"呵呵测试","pay_type_icon":"weixin1","default_status":"0","status":"1","update_time":null,"add_time":"2021-06-22 22:24:05"}],"database":"hong_test","es":1698935662000,"id":3,"isDdl":false,"mysqlType":{"id":"bigint","version":"tinyint(1)","creator_id":"bigint","pay_type_name":"varchar(255)","pay_type_icon":"varchar(255)","default_status":"tinyint","status":"tinyint(1)","update_time":"datetime","add_time":"datetime"},"old":[{"pay_type_name":"呵呵"}],"pkNames":["id"],"sql":"","sqlType":{"id":-5,"version":-6,"creator_id":-5,"pay_type_name":12,"pay_type_icon":12,"default_status":-6,"status":-6,"update_time":93,"add_time":93},"table":"loan_pay_type","ts":1698935662174,"type":"UPDATE"}

至此,基本框架搭建完成,不过这才是刚刚开始,实际开发还要跟业务结合使用,再自身扩展即可,可实时同步数据至ES、Redis等中间件。
另外,需要注意的是,如果使用 RabbitMQ的,在消息消费时需要格外注意消息消费的顺序性,否则同步到其他中间件时可能存在数据不一致的情况。

针对搭配 RabbitMQ 使用,消息消费的顺序性思考

RabbitMQ 消费本身是无序的,在这个场景下如果不保证消息消费的顺序性,可能会导致后消费的旧数据覆盖先消费的新数据,导致数据不一致。

在使用 RabbitMQ 过程中,要保证消息顺序消费,就要确保消息从生产到消费整个过程中都是有序的,一般可以从以下几点入手:

  1. 消息发送,自己确保是有序的;集群化部署的话,可以通过分布式锁确保消息有序。
  2. 消息到达队列之后,默认就是有序的。
  3. 消息消费,一个队列对应一个消费者,并且一个消费者一个 channel,不能并发消费,就可以确保消息有序。

再结合 Canal 订阅数据更新的方案,分析整个过程(参考canal如何保证数据的顺序性,相同主键上的插入更新和删除会保证顺序吗?)是否有序:

  1. binlog 本身就是有序的;
  2. binlog -> canal 这个过程,也是按照 mysql slave dump 协议订阅的,也是有序的;
  3. canal -> mq 过程,按照写入 mq 方式不同,是在不同的维度上保证有序。

因此,在此技术方案中,要想保证消息消费的顺序性,就必须配置只有一个客户端进行消费。在 RabbitMQ 中,我们可以按以下步骤实现:

  1. 配置队列消费者单活模式,在控制台中配置如下,就配置参数 x-single-active-consumer 值为 true,最多允许一个消费者消费。
    配置

如果是在代码中定义队列的,可以这样处理:

@Bean
public Queue canalQueue() {
    Map<String, Object> arguments = new HashMap<>();
    // 配置
    arguments.put("x-single-active-consumer", true);
    return new Queue("hong.queue.goods.sync", true, false, false, arguments);
}

控制台和代码配置方式,选一个即可。

展示:
展示

这样就可以确保只有一个客户端消费了。

关于 RabbitMQ 的消息消费顺序探讨,可参考以下文章:

其他的解决方案:

  • 选择 Kafka 或者 RocketMQ,支持顺序消费。

参考文档

  1. https://github.com/alibaba/canal
  2. 深入浅出RabbitMQ
文章目录