Canal 部署&binlog 监听
配置要求
CPU:1核
内存:2G
安装Canal-deployer
我用的Canal-deployer是1.1.7版本。
1、Canal-deployer下载
到Releases · alibaba/canal · GitHub
下载canal.deployer,可以直接监听MySQL的binlog,把自己伪装成MySQL的从库,只负责接收数据,并不做处理。
然后解压:
2、配置修改
修改canal.properties,增加
canal.ip = 127.0.0.1
修改example/instance.properties
# 需要同步数据的MySQL地址
canal.instance.master.address=数据库地址:3306
canal.instance.master.journal.name=
canal.instance.master.position=
canal.instance.master.timestamp=
canal.instance.master.gtid=
# 用于同步数据的数据库账号
canal.instance.dbUsername=账号
# 用于同步数据的数据库密码
canal.instance.dbPassword=密码
# 数据库连接编码
canal.instance.connectionCharset = UTF-8
# 需要订阅binlog的表过滤正则表达式
canal.instance.filter.regex=.*\\..*
重点修改canal.instance.master.address
、canal.instance.dbUsername
和canal.instance.dbPassword
三个字段。
3、启动 Canal
到 bin 目录下,执行命令:./startup.sh
canal/logs/canal/canal_stdout.log 报错:
Unrecognized VM option 'AggressiveOpts'
Error: Could not create the Java Virtual Machine.
Error: A fatal exception has occurred. Program will exit.
修改canal/bin/startup.sh:
移除AggressiveOpts、UseBiasedLocking 等 JDK 21中已经不在支持的配置,重新启动.
4、启动成功检查
启动成功,检查canal/logs/canal.log:
2024-04-18 16:56:35.787 [main] INFO com.alibaba.otter.canal.deployer.CanalLauncher - ## set default uncaught exception handler 2024-04-18 16:56:35.801 [main] INFO com.alibaba.otter.canal.deployer.CanalLauncher - ## load canal configurations 2024-04-18 16:56:35.825 [main] INFO com.alibaba.otter.canal.deployer.CanalStarter - ## start the canal server. 2024-04-18 16:56:35.879 [main] INFO com.alibaba.otter.canal.deployer.CanalController - ## start the canal server[127.0.0.1(127.0.0.1):11111] 2024-04-18 16:56:37.614 [main] INFO com.alibaba.otter.canal.deployer.CanalStarter - ## the canal server is running now ......
看到the canal server is running now ......
表示启动成功。
安装Canal-adapter
同样是使用1.1.7版本,和deployer 保持一致。
canal.adapter,相当于canal的客户端,会从canal-deployer中获取数据,然后对数据进行同步,可以同步到MySQL、Elasticsearch和HBase等存储中去。
1、下载canal.adapter
到Releases · alibaba/canal · GitHub 下载
然后解压
2、修改配置
修改 application.properties:
server:port: 8081
spring:jackson:date-format: yyyy-MM-dd HH:mm:sstime-zone: GMT+8default-property-inclusion: non_null
canal.conf:mode: tcp #tcp kafka rocketMQ rabbitMQflatMessage: truezookeeperHosts:syncBatchSize: 1000retries: -1timeout:accessKey:secretKey:consumerProperties:# canal tcp consumercanal.tcp.server.host: 127.0.0.1:11111canal.tcp.zookeeper.hosts:canal.tcp.batch.size: 500canal.tcp.username:canal.tcp.password:# kafka consumerkafka.bootstrap.servers: 127.0.0.1:9092kafka.enable.auto.commit: falsekafka.auto.commit.interval.ms: 1000kafka.auto.offset.reset: latestkafka.request.timeout.ms: 40000kafka.session.timeout.ms: 30000kafka.isolation.level: read_committedkafka.max.poll.records: 1000# rocketMQ consumerrocketmq.namespace:rocketmq.namesrv.addr: 127.0.0.1:9876rocketmq.batch.size: 1000rocketmq.enable.message.trace: falserocketmq.customized.trace.topic:rocketmq.access.channel:rocketmq.subscribe.filter:# rabbitMQ consumerrabbitmq.host:rabbitmq.virtual.host:rabbitmq.username:rabbitmq.password:rabbitmq.resource.ownerId:
srcDataSources:defaultDS:url: jdbc:mysql://rm-xxx.com:3306/nfturbo?useUnicode=trueusername: xxxpassword: xxxcanalAdapters:- instance: example # canal instance Name or mq topic namegroups:- groupId: g1outerAdapters: - name: logger- name: es8hosts: localhost:9200 # 127.0.0.1:9200 for rest modeproperties:mode: transport # or rest# security.auth: test:123456 # only used for rest modecluster.name: nfturbo-cluster
这里的数据库配置记得改:
srcDataSources:defaultDS:url: jdbc:mysql://rm-xxx.com:3306/nfturbo?useUnicode=trueusername: xxxpassword: xxxcanalAdapters:- instance: example # canal instance Name or mq topic namegroups:- groupId: g1outerAdapters: - name: logger- name: es8hosts: localhost:9200 # 127.0.0.1:9200 for rest modeproperties:mode: transport # or rest# security.auth: test:123456 # only used for rest modecluster.name: nfturbo-cluster
因为在outerAdapters下面,我们配置的 name 是 es8(如果你自己安装的是 es7,记得修改成对应的版本。),所以adapter将会自动加载 conf/es8 下的所有.yml结尾的配置文件,适配器表映射文件,创建并修改 conf/es8/mytest_user.yml文件:
dataSourceKey: defaultDS
destination: example
groupId: g1
esMapping:_index: nfturbo_users_id: _id# upsert: true# pk: idsql: "select t.id as _id, t.nick_name as nick_name, t.state as state,t.telephone as telephone from users as t"# objFields:# _labels: array:;#etlCondition: "where a.c_time>={}"commitBatch: 3000
3、启动canal.adapter
同样到 bin 目录下,执行命令:./startup.sh
启动成功(canal-adapter/logs/adapter/adapter.log):
4、到 es 上创建 ES 索引
访问 es(http://ip:5601/app/home#/ ),然后进入开发工具,在控制台创建如下索引:
PUT nfturbo_users {"mappings": {"properties": {"nickname": {"type": "text"},"telephone": {"type": "text"},"state": {"type": "text"}}} }
5、数据库执行INSERT
INSERT INTO `users` (`id`,`gmt_create`,`gmt_modified`,`nick_name`,`password_hash`,`state`,`telephone`,`user_role`) VALUES (14,'2024-04-18 17:47:40','2024-04-18 17:47:42','test11111','c2975f0faec10adca0ecd729c8cbc0aa','INIT','13000000000','CUSTOMER')
可以在日志中看到监听到了相关变更。
6、从 ES 查询:
GET users/_search {"_source": ["nick_name","telephone","state"],"query": {"match": {"nick_name": "test11111"}} }
查询成功:
注意事项
因为我用的 MySQL 是在阿里云上直接购买的 DMS,所以 不需要做单独的binlog dump配置,。如果是自己搭建的,需要开启 binlog 写入,并且给 canal账号授权他作为 slave 权限。
1、开启 Binlog 写入功能,配置 binlog-format 为 ROW 模式,my.cnf 中配置如下:
[mysqld] log-bin=mysql-bin # 开启 binlog binlog-format=ROW # 选择 ROW 模式 server_id=1 # 配置 MySQL replaction 需要定义,不要和 canal 的 slaveId 重复
重启 MySQL
2、授权 canal 链接 MySQL 账号具有作为 MySQL slave 的权限, 如果已有账户可直接 grant
CREATE USER canal IDENTIFIED BY 'canal'; GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%'; -- GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' ; FLUSH PRIVILEGES;
常见问题
1、could not be instantiated: class could not be found
启动报错
java.lang.IllegalStateException: Extension instance(name: es, class: interface com.alibaba.otter.canal.client.adapter.OuterAdapter) could not be instantiated: class could not be found
这是因为对于 es,需要指定具体的版本才行,如 es8,详见:
Sync ES · alibaba/canal Wiki · GitHub
2、**Could not find first log file name in binary log index file**
Could not find first log file name in binary log index file · Issue #156 · alibaba/canal · GitHub
先关闭 canal,然后删除 meta.dat (在目录canal/conf/example下),然后再启动即可