详解 canal 同步 MySQL 增量数据到 ES
canal 是详解阿里知名的开源项目,主要用途是增量基于 MySQL 数据库增量日志解析,提供增量数据订阅和消费。数据
这篇文章,详解我们手把手向同学们展示使用 canal 将 MySQL 增量数据同步到 ES 。增量

1 集群模式

图中 server 对应一个 canal 运行实例 ,数据对应一个 JVM 。详解
server 中包含 1..n 个 instance ,增量 我们可以将 instance 理解为配置任务。数据
instance 包含如下模块 :
eventParser数据源接入,详解模拟 slave 协议和 master 进行交互,增量协议解析eventSinkParser 和 Store 链接器,数据进行数据过滤,详解加工,增量分发的数据工作eventStore数据存储metaManager增量订阅 & 消费信息管理器真实场景中,canal 高可用依赖 zookeeper ,笔者将客户端模式可以简单划分为:TCP 模式 和 MQ 模式 。
实战中我们经常会使用 MQ 模式 。因为 MQ 模式的优势在于解耦 ,canal server 将数据变更信息发送到消息队列 kafka 或者 RocketMQ ,消费者消费消息,顺序执行相关逻辑即可。
顺序消费:
对于指定的一个 Topic ,所有消息根据 Sharding Key 进行区块分区,同一个分区内的消息按照严格的先进先出(FIFO)原则进行发布和消费。同一分区内的源码下载消息保证顺序,不同分区之间的消息顺序不做要求。

2 MySQL配置
1、对于自建 MySQL , 需要先开启 Binlog 写入功能,配置 binlog-format 为 ROW 模式,my.cnf 中配置如下
复制[mysqld] log-bin=mysql-bin # 开启 binlog binlog-format=ROW # 选择 ROW 模式 server_id=1 # 配置 MySQL replaction 需要定义,不要和 canal 的 slaveId 重复1.2.3.4.注意:针对阿里云 RDS for MySQL , 默认打开了 binlog , 并且账号默认具有 binlog dump 权限 , 不需要任何权限或者 binlog 设置,可以直接跳过这一步。
2、授权 canal 链接 MySQL 账号具有作为 MySQL slave 的权限, 如果已有账户可直接 grant 。
复制CREATE USER canal IDENTIFIED BY canal; GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO canal@%; -- GRANT ALL PRIVILEGES ON *.* TO canal@% ; FLUSH PRIVILEGES;1.2.3.4.3、创建数据库商品表 t_product 。
复制CREATE TABLE `t_product` ( `id` BIGINT ( 20 ) NOT NULL AUTO_INCREMENT, `name` VARCHAR ( 255 ) COLLATE utf8mb4_bin NOT NULL, `price` DECIMAL ( 10, 2 ) NOT NULL, `status` TINYINT ( 4 ) NOT NULL, `create_time` datetime NOT NULL, `update_time` datetime NOT NULL, PRIMARY KEY ( `id` ) ) ENGINE = INNODB DEFAULT CHARSET = utf8mb4 COLLATE = utf8mb4_bin1.2.3.4.5.6.7.8.9.3 Elasticsearch配置
使用 Kibana 创建商品索引 。
复制PUT /t_product { "settings": { "number_of_shards": 2, "number_of_replicas": 1 }, "mappings": { "properties": { "id": { "type":"keyword" }, "name": { "type":"text" }, "price": { "type":"double" }, "status": { "type":"integer" }, "createTime": { "type": "date", "format": "yyyy-MM-dd HH:mm:ss||yyyy-MM-dd||epoch_millis" }, "updateTime": { "type": "date", "format": "yyyy-MM-dd HH:mm:ss||yyyy-MM-dd||epoch_millis" } } } }1.2.3.4.5.6.7.8.9.10.11.12.13.14.15.16.17.18.19.20.21.22.23.24.25.26.27.28.29.30.31.执行完成,如图所示 :

4 RocketMQ 配置
创建主题:product-syn-topic ,canal 会将 Binlog 的变化数据发送到该主题。


5 canal 配置
我们选取 canal 版本 1.1.6 ,进入 conf 目录。
1、配置 canal.properties
复制#集群模式 zk地址 canal.zkServers = localhost:2181 #本质是MQ模式和tcp模式 tcp, kafka, rocketMQ, rabbitMQ, pulsarMQ canal.serverMode = rocketMQ #instance 列表 canal.destinations = product-syn #conf root dir canal.conf.dir = ../conf #全局的spring配置方式的组件文件 生产环境,集群化部署 canal.instance.global.spring.xml = classpath:spring/default-instance.xml ###### 以下部分是默认值 展示出来 # Canal的batch size, 默认50K, 由于kafka最大消息体限制请勿超过1M(900K以下) canal.mq.canalBatchSize = 50 # Canal get数据的超时时间, 单位: 毫秒, 空为不限超时 canal.mq.canalGetTimeout = 100 # 是否为 flat json格式对象 canal.mq.flatMessage = true1.2.3.4.5.6.7.8.9.10.11.12.13.14.15.16.17.18.2、instance 配置文件
在 conf 目录下创建实例目录 product-syn , 在 product-syn 目录创建配置文件 :instance.properties。
复制# 按需修改成自己的云服务器提供商数据库信息 ################################################# ... canal.instance.master.address=192.168.1.20:3306 # username/password,数据库的用户名和密码 ... canal.instance.dbUsername = canal canal.instance.dbPassword = canal ... # table regex canal.instance.filter.regex=mytest.t_product # mq config canal.mq.topic=product-syn-topic # 针对库名或者表名发送动态topic #canal.mq.dynamicTopic=mytest,.*,mytest.user,mytest\\..*,.*\\..* canal.mq.partitinotallow=0 # hash partition config #canal.mq.partitinotallow=3 #库名.表名: 唯一主键,多个表之间用逗号分隔 #canal.mq.partitinotallow=mytest.person:id,mytest.role:id #################################################1.2.3.4.5.6.7.8.9.10.11.12.13.14.15.16.17.18.19.20.21.22.23.3、服务启动
启动两个 canal 服务,我们从 zookeeper gui 中查看服务运行情况 。

修改一条 t_product 表记录,可以从 RocketMQ 控制台中观测到新的消息。

6 消费者
1、产品索引操作服务

2、消费监听器

消费者逻辑重点有两点:
顺序消费监听器将消息数据转换成 JSON 字符串,从 data 节点中获取表最新数据(批量操作可能是多条)。然后根据操作类型 UPDATE、 INSERT、DELETE 执行产品索引操作服务的方法。7 写到最后
canal 是一个非常有趣的开源项目,很多公司使用 canal 构建数据传输服务( Data Transmission Service ,简称 DTS ) 。
推荐大家阅读这个开源项目,你可以从中学习到网络编程、多线程模型、高性能队列 Disruptor、 流程模型抽象等。
这篇文章涉及到的代码已收录到下面的工程中,有兴趣的同学可以一看。
https://github.com/makemyownlife/rocketmq4-learning
图片
相关文章
七彩虹升级BIOS教程(手把手教你一步步升级BIOS,提升电脑性能稳定性,)
摘要:BIOSBasicInput/OutputSystem,基本输入/输出系统)是计算机启动时加载的第一个软件,它负责初始化硬件设备、设置系统参数以及启动操作系统。随着硬件技术的不断...2025-11-03电脑阅卷错误的原因与解决办法(探讨电脑阅卷中的常见错误及其解决方法)
摘要:随着科技的发展,电脑阅卷在教育领域得到了广泛应用,以提高阅卷效率和减少人为差错。然而,近年来不断有关于电脑阅卷错误的报道,这些错误给教育评估带来了一定的困扰。本文将探讨电脑阅卷错误...2025-11-03解决电脑开机错误RPV4的常见问题(排查、修复和预防RPV4错误的有效方法)
摘要:电脑开机错误是使用电脑过程中常见的问题之一。其中,RPV4错误是较为常见的一种,给用户带来了很多困扰。本文将介绍如何解决电脑开机错误RPV4,并提供了一些排查、修复和预防该错误的有...2025-11-03魅蓝5与Note6(详细比较两款手机的功能和性能,帮助你做出明智的选择)
摘要:现如今,手机市场竞争激烈,消费者在购买手机时常常会面临抉择。本文将详细比较魅蓝5与Note6这两款手机的功能和性能,帮助读者做出明智的选择。1.外观设计:简约时尚vs.商务...2025-11-03- 摘要:在现代社会中,电脑已经成为我们生活和工作中不可或缺的一部分。而键盘作为电脑的重要输入设备,我们每天都要与之亲密接触。然而,很多人在使用笔记本电脑键盘时常常感到困惑,甚至影响到了打字...2025-11-03
腾讯会议屏幕共享电脑教程(一步步教你如何在腾讯会议中实现电脑屏幕共享)
摘要:腾讯会议作为一种在线视频会议工具,提供了屏幕共享功能,方便用户与他人分享自己的电脑屏幕。本文将详细介绍如何在腾讯会议中实现电脑屏幕共享,帮助用户更好地利用这一功能。下载和安...2025-11-03

最新评论