canal容器化
辣鸡canal,文档写的一坨翔。碰到问题只能看代码去发现些端倪。
基本概念
// sb canal
- admin:一个图形化管理server、instance、cluster的系统。可以在UI上配置这些组件并控制他们的启停;
- server:canal server,用于维护instance。server除了要配置自己的信息(比如和admin通信的端口),还可以配置instance的一些通用设置(比如instance的消息发往哪个kafka);
- instance:用于执行实际的同步binlog任务,instance一般配置的是同步哪个库的哪个表、发往kafka的哪个topic;
- cluster:单独的server不够健壮,可以几个server组成一个cluster,用于HA;
cluster不是必须的,单个server也可以做同步工作。
cluster下的server必须有着相同的配置(所以建议在admin上的cluster里配置server的属性,这样所有的server就会有同样的配置),且任一时刻只会有一个server工作,cluster的唯一作用就是通过zookeeper控制“哪个server在工作”。
配置
// sb canal
canal的配置非常混乱。整体而言:
- 环境变量优先;
- admin上存储的配置次之;
- server、instance的配置最次。关于instance的配置,instance自己的配置优先级比server里设置的instance配置高(就近原则);
如果用容器的话,比较容易改的是环境变量,其次是手动去admin上创建配置,基本不会在server、instance服务本地放配置。
另外canal没有提供配置查询接口,所以最终生效的是哪个配置,没有提示,报错一般也看不出来,自己按照上面的原则行事即可。
推荐的配置方式:集群模式的server只配置和admin通信的地址,其他配置全扔到admin的cluster里。
关于zookeeper
// sb canal
多套canal可以使用同一个zookeeper,但是他们会注册在同一个地方/otter。
比如:
1
2
3
4
5
6
7
8
9
10
11
[zk: z*130:2181(CONNECTED) 52] ls /otter/canal/cluster
[10.109.34.187:11111, 10.109.37.75:11111, 10.109.42.99:11111, 10.109.49.40:11111, 10.109.53.107:11111, 10.109.53.110:11111]
[zk: z*130:2181(CONNECTED) 53] ls /otter/canal/destinations
[example, first-instance-for-cluster, instance-for-docker-cluster, kol_overease2, kol_overseas_dev, kol_overseas_pre_stage, kol_overseas_stage, kol_overseas_test, kol_overseas_test_1, kol_overseas_test_2, kol_overseas_test_zjy, lhb-instance, over2, overseas, overseas-docker-instance, overseas1, overseas2, overseas3, test3]
[zk: z*130:2181(CONNECTED) 54] ls /otter/canal/destinations/lhb-instance
[cluster, running]
[zk: z*130:2181(CONNECTED) 55] ls /otter/canal/destinations/lhb-instance/cluster
[10.109.42.99:11111, 10.109.53.107:11111]
所有的cluster都会注册在这里,但实际上可能分属好几套canal系统。这一点应该无所谓,cluster的ip port不会重合。
所有的destinations(实际指的是instance)也会都配置在这里。这一点比较致命,如果不属于同一套的canal里有同名instance,应该就冲突了。
所以建议使用zookeeper的chroot,大家在同一套zookeeper的不同root下注册自己的服务,互相独立。
也可以每套canal用一个专属的zookeeper,更健壮。
容器化
实体机部署和容器化差不多,容器化更麻烦些。
启动canal-admin
为了HA和更好得配置、维护server,起一个admin。
首先要使用canal的mysql脚本初始化一个数据库,canal-admin用这个库保存这一套canal的所有信息。参考:
- https://github.com/alibaba/canal/wiki/Canal-Admin-QuickStart
直接rancher上起admin,镜像用canal/canal-admin:v1.1.5,端口暴露8089,网络配置DNS搜索域为corp.yodao.com,环境变量主要是配置刚刚创建的数据库:
key | value |
---|---|
spring.datasource.username | e*4nb |
spring.datasource.url | jdbc:mysql://${spring.datasource.address}/${spring.datasource.database}?useUnicode=true&characterEncoding=UTF-8&useSSL=false |
spring.datasource.password | ne*ust |
spring.datasource.driver-class-name | com.mysql.jdbc.Driver |
spring.datasource.database | overseas_canal_manager_test |
spring.datasource.address | t*077:3307 |
admin起来之后会有两个用户:
- admin/admin:server和canal-admin之间用于认证的用户;
- admin/123456:用于登录canal-admin web的用户
创建cluster
cluster控制着canal-server的配置,这里cluster的zk选的是z*130:2181,名字为overseas-docker-cluster
创建完cluster后,贴入下面的配置。其中比较重要的几个:
- canal.zkServer:server连哪个zk;
- canal.admin.manager = canal-overseas-test.inner.y.com,canal admin的地址,个人感觉可以不设置,毕竟server如果连admin的地址都不知道,又怎么连上admin并从中获取自己的配置;
- canal.serverMode: kafka,目前我们只把binlog发给kafka;
- canal.destinations: 集群里的server要创建的instance的名称,用的是overseas-docker-instance
- canal.mq.servers = ead-sandbox-kafka.inner.y.com:9092,发送到哪个kafka
- tsdb,一系列tsdb相关的配置,主要是为了保存各个时间点mysql表的结构,这样不同时刻的binlog可以用相应的表结构去解析。默认用的是h2,之前线上配的mysql,所以这里也使用mysql。(但是注意一定要删干净默认的h2配置,下面的配置已经删干净了)
- canal.instance.tsdb.enable=true
- canal.instance.tsdb.url=jdbc:mysql://th077:3307/overseas_canal_manager_test?enabledTLSProtocols=TLSv1.2
- canal.instance.tsdb.dbUsername=e*4nb
- canal.instance.tsdb.dbPassword=ne*ust
- canal.instance.tsdb.spring.xml = classpath:spring/tsdb/mysql-tsdb.xml
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
#################################################
######### common argument #############
#################################################
# tcp bind ip
canal.ip =
# register ip to zookeeper 运行canal-server服务的主机IP,可以不用配置,会自动绑定一个本机的IP
canal.register.ip =
canal.port = 11111
canal.metrics.pull.port = 11112
# canal instance user/passwd
#canal.user = canal
#canal.passwd = E3619321C1A937C46A0D8BD1DAC39F93B27D4458
# canal admin config
canal.admin.manager = canal-overseas-test.inner.y.com
canal.admin.port = 11110
canal.admin.user = admin
# 这个密码是admin,这是密码加密后的字符串
canal.admin.passwd = 4ACFE3202A5FF5CF467898FC58AAB1D615029441
#zookeeper集群地址,多个地址用逗号隔开
canal.zkServers = z*130:2181
# flush data to zk
canal.zookeeper.flush.period = 1000
canal.withoutNetty = false
# tcp, kafka, RocketMQ 使用kafka
canal.serverMode = kafka
# flush meta cursor/parse position to file
canal.file.data.dir = ${canal.conf.dir}
canal.file.flush.period = 1000
## memory store RingBuffer size, should be Math.pow(2,n)
canal.instance.memory.buffer.size = 16384
## memory store RingBuffer used memory unit size , default 1kb
canal.instance.memory.buffer.memunit = 1024
## meory store gets mode used MEMSIZE or ITEMSIZE
canal.instance.memory.batch.mode = MEMSIZE
canal.instance.memory.rawEntry = true
## detecing config
canal.instance.detecting.enable = false
#canal.instance.detecting.sql = insert into retl.xdual values(1,now()) on duplicate key update x=now()
canal.instance.detecting.sql = select 1
canal.instance.detecting.interval.time = 3
canal.instance.detecting.retry.threshold = 3
canal.instance.detecting.heartbeatHaEnable = false
# support maximum transaction size, more than the size of the transaction will be cut into multiple transactions delivery
canal.instance.transaction.size = 1024
# mysql fallback connected to new master should fallback times
canal.instance.fallbackIntervalInSeconds = 60
# network config
canal.instance.network.receiveBufferSize = 16384
canal.instance.network.sendBufferSize = 16384
canal.instance.network.soTimeout = 30
# binlog filter config
canal.instance.filter.druid.ddl = true
canal.instance.filter.query.dcl = false
canal.instance.filter.query.dml = false
canal.instance.filter.query.ddl = false
canal.instance.filter.table.error = false
canal.instance.filter.rows = false
canal.instance.filter.transaction.entry = false
# binlog format/image check
canal.instance.binlog.format = ROW,STATEMENT,MIXED
canal.instance.binlog.image = FULL,MINIMAL,NOBLOB
# binlog ddl isolation
canal.instance.get.ddl.isolation = false
# parallel parser config
canal.instance.parser.parallel = true
## concurrent thread number, default 60% available processors, suggest not to exceed Runtime.getRuntime().availableProcessors()
#canal.instance.parser.parallelThreadSize = 16
## disruptor ringbuffer size, must be power of 2
canal.instance.parser.parallelBufferSize = 256
# table meta tsdb info
canal.instance.tsdb.enable=true
canal.instance.tsdb.url=jdbc:mysql://th077:3307/overseas_canal_manager_test?enabledTLSProtocols=TLSv1.2
canal.instance.tsdb.dbUsername=e*4nb
canal.instance.tsdb.dbPassword=ne*ust
canal.instance.tsdb.spring.xml = classpath:spring/tsdb/mysql-tsdb.xml
# dump snapshot interval, default 24 hour
canal.instance.tsdb.snapshot.interval = 24
# purge snapshot expire , default 360 hour(15 days)
canal.instance.tsdb.snapshot.expire = 360
# aliyun ak/sk , support rds/mq
canal.aliyun.accessKey =
canal.aliyun.secretKey =
#################################################
######### destinations #############
#################################################
canal.destinations = overseas-docker-instance
# conf root dir
canal.conf.dir = ../conf
# auto scan instance dir add/remove and start/stop instance
canal.auto.scan = true
canal.auto.scan.interval = 5
canal.instance.global.mode = manager
canal.instance.global.lazy = false
canal.instance.global.manager.address = ${canal.admin.manager}
#canal.instance.global.spring.xml = classpath:spring/memory-instance.xml
canal.instance.global.spring.xml = classpath:spring/file-instance.xml
#canal.instance.global.spring.xml = classpath:spring/default-instance.xml
##################################################
######### MQ #############
##################################################
#这里因为我们选择的是kafka,所以填kafka集群地址,多个地址用逗号隔开
canal.mq.servers = ead-sandbox-kafka.inner.y.com:9092
canal.mq.retries = 0
canal.mq.batchSize = 16384
canal.mq.maxRequestSize = 1048576
canal.mq.lingerMs = 100
canal.mq.bufferMemory = 33554432
canal.mq.canalBatchSize = 50
canal.mq.canalGetTimeout = 100
canal.mq.flatMessage = true
canal.mq.compressionType = none
canal.mq.acks = all
#canal.mq.properties. =
canal.mq.producerGroup = test
# Set this value to "cloud", if you want open message trace feature in aliyun.
canal.mq.accessChannel = local
# aliyun mq namespace
#canal.mq.namespace =
##################################################
######### Kafka Kerberos Info #############
##################################################
canal.mq.kafka.kerberos.enable = false
canal.mq.kafka.kerberos.krb5FilePath = "../conf/kerberos/krb5.conf"
canal.mq.kafka.kerberos.jaasFilePath = "../conf/kerberos/jaas.conf"
创建instance
创建instance,分配给overseas-docker-cluster这个cluster。
- canal.instance.master.address:同步哪个数据库
- canal.instance.dbUsername
- canal.instance.dbPassword
- canal.instance.filter.regex:同步哪个表
- canal.instance.filter.black.regex:不同步哪个表
- canal.mq.topic:kafka topic
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
#################################################
## mysql serverId , v1.0.26+ will autoGen
# canal.instance.mysql.slaveId=0
# enable gtid use true/false
canal.instance.gtidon=false
# position info mysql地址
canal.instance.master.address=th077:3307
canal.instance.master.journal.name=
canal.instance.master.position=
canal.instance.master.timestamp=
canal.instance.master.gtid=
# rds oss binlog
canal.instance.rds.accesskey=
canal.instance.rds.secretkey=
canal.instance.rds.instanceId=
# table meta tsdb info
canal.instance.tsdb.enable=true
#canal.instance.tsdb.url=jdbc:mysql://127.0.0.1:3306/canal_tsdb
#canal.instance.tsdb.dbUsername=canal
#canal.instance.tsdb.dbPassword=canal
#canal.instance.standby.address =
#canal.instance.standby.journal.name =
#canal.instance.standby.position =
#canal.instance.standby.timestamp =
#canal.instance.standby.gtid=
# username/password mysql账号密码
canal.instance.dbUsername=e*4nb
canal.instance.dbPassword=ne*ust
canal.instance.connectionCharset = UTF-8
# enable druid Decrypt database password
canal.instance.enableDruid=false
#canal.instance.pwdPublicKey=MFwwDQYJKoZIhvcNAQEBBQADSwAwSAJBALK4BUxdDltRRE5/zXpVEVPUgunvscYFtEip3pmLlhrWpacX7y7GCMo2/JM6LeHmiiNdH1FWgGCpUfircSwlWKUCAwEAAQ==
# table regex mysql表设置,这里我们设置读取mytest数据库的user表
canal.instance.filter.regex=kol_overseas_test.StoredKol,kol_overseas_test.KolKolTag,kol_overseas_test.McnKol
# table black regex
canal.instance.filter.black.regex=mysql\\.slave_.*
# table field filter(format: schema1.tableName1:field1/field2,schema2.tableName2:field1/field2)
#canal.instance.filter.field=test1.t_product:id/subject/keywords,test2.t_company:id/name/contact/ch
# table field black filter(format: schema1.tableName1:field1/field2,schema2.tableName2:field1/field2)
#canal.instance.filter.black.field=test1.t_product:subject/product_image,test2.t_company:id/name/contact/ch
# mq config 主题名称
canal.mq.topic=canal_StoredKol
# dynamic topic route by schema or table regex
#canal.mq.dynamicTopic=mytest1.user,mytest2\\..*,.*\\..*
canal.mq.partition=0
# hash partition config
#canal.mq.partitionsNum=3
#canal.mq.partitionHash=test.table:id^name,.*\\..*
#################################################
启动cluster canal-server
cluster创建好了,instance也起好了,就差个干活的canal-server了。
所以再起几个server注册到这个cluster上,cluster会自动挑一个server启动instance。
直接rancher上起canal-server,镜像用canal/canal-server:v1.1.5,网络配置DNS搜索域为corp.yodao.com
涉及到的端口比较多:
- canal.port:canal的socket端口,默认11111,这个应该需要暴露;
- canal.admin.port:11110,canal-server和admin通信的端口。这个好像不用暴露,因为是canal-server主动连的canal-admin;
- canal.metrics.pull.port:暴露metric的端口,官方留给监控系统用的,现在貌似还没用;
这些端口可以参考:https://github.com/alibaba/canal/wiki/AdminGuide
最小环境变量设置:
key | value | comment |
---|---|---|
canal.admin.user | admin | 连接canal-admin使用的user |
canal.admin.register.cluster | overseas-docker-cluster | server属于哪个cluster |
canal.admin.passwd | 4ACFE3202A5FF5CF467898FC58AAB1D615029441 | user的密码,实际是admin在mysql里password后的样子 |
canal.admin.manager | canal-overseas-test.inner.y.com | canal-admin的地址 |
主要是 设置canal-admin的地址和认证用户,并指定自己要加入哪个集群。其他的canal-server的配置都从cluster里读取。
目前这个service起了两个pod,就组成了一个双实例集群。
optional:启动单机canal-server
如果不想用集群,也可以用standalone server,但是要传入的变量就比较多了,因为原本放在cluster里的配置现在都要在server这里配置了。
依然选择使用环境变量配置:
key | value |
---|---|
canal.serverMode | kafka |
canal.port | 11111 |
canal.mq.topic | canal_kol_overseas_test_docker_solo |
canal.mq.servers | ead-sandbox-kafka.inner.y.com:9092 |
canal.metrics.pull.port | 11112 |
canal.instance.tsdb.enable | true |
canal.instance.master.address | th077:3307 |
canal.instance.gtidon | false |
canal.instance.filter.regex | kol_overseas_test.StoredKol,kol_overseas_test.KolKolTag,kol_overseas_test.McnKol |
canal.instance.filter.black.regex | mysql\.slave_.* |
canal.instance.dbUsername | e*4nb |
canal.instance.dbPassword | ne*ust |
canal.instance.connectionCharset | UTF-8 |
canal.destinations | canal_StoredKol_docker_solo |
canal.auto.scan | true |
canal.admin.user | admin |
canal.admin.register.auto | true |
canal.admin.port | 11110 |
canal.admin.passwd | 4ACFE3202A5FF5CF467898FC58AAB1D615029441 |
canal.admin.manager | canal-overseas-test.inner.y.com |
除了上面集群server的最小配置,还加入了canal和instance的配置。
但是,即使已经传入了instance的配置,启动后还是必须在canal-admin上手动创建instance(不再分配到集群,而是分配给该server):
- 如果server启动的时候,instance的配置同上已经以环境变量的形式传入,启动后在admin上创建个空的(至少要有一行注释#)instance配置就行;
- 如果启动的时候没有传入上述关于instance的配置,启动后要在admin上创建个带配置的instance;
反正一定要在canal-admin上手动创建一个instance,server自己并不会按照环境变量直接起一个instance。实际上通过环境变量传入之后,instance的配置已经齐全了,应该只是缺少一个触发条件,有兴趣的可以研究代码看看为啥。
Note:用这种方式一个server只能起一个instance,主要是因为环境变量只能指定一份instance的配置。后面可能会支持采用数组的形式传入环境变量,就可以起多个instance了。
关于kafka配置
新版本的canal-server已经使用kafka.bootstrap.servers配置kafka了,但是上面依旧使用了老旧的canal.mq.servers,主要原因为:在环境变量里配置了前者但是容器启动之后会不生效。看代码才发现,不生效的原因是后者会从环境变量读取,但前者不会……
这也就是官方文档里面说的“-e参数里可以指定以前canal.properties/instance.properties里的所有配置的key和value,canal-server启动时会有限读取-e指定的变量”,说了个寂寞,“以前的所有配置”指的是哪些配置?我又不是追了canal三百年的铁杆脑残粉,我tm知道你以前的配置是哪些配置……
另外,canal.mq.servers环境变量用canal-server:v1.1.5可以启动,用latest镜像就不行,不知道latest又做了哪些骚操作。
Ref
- https://juejin.cn/post/6872218169978257422
- https://its201.com/article/XUEJIA2S/104432830
- https://blog.csdn.net/daziyuanazhen/article/details/106098887
- https://www.cnblogs.com/throwable/p/13449920.html
canal的一些特性
binlog消息合并
canal会把临近的多条binlog消息合并为一条:
- https://github.com/alibaba/canal/issues/3216
找了半天为啥消息数量不一致。。。
两条binlog合并为一条canal消息了:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
{
"data":[
{
"ID":"3918",
"USER_ID":"105028",
...
},
{
"ID":"3922",
"USER_ID":"105032",
...
}
],
"database":"kol_overseas_test",
"es":1651914286000,
"id":307,
"isDdl":false,
"mysqlType":{
"ID":"bigint(20)",
"USER_ID":"bigint(20)",
...
},
"old":[
{
"FAN_NUM":"12547",
"LAST_MOD_TIME":"2022-05-07 16:33:31"
},
{
"FAN_NUM":"12546",
"LAST_MOD_TIME":"2022-05-07 16:33:31"
}
],
"pkNames":[
"ID"
],
"sql":"",
"sqlType":{
"ID":-5,
"USER_ID":-5,
...
},
"table":"StoredKol",
"ts":1651914286334,
"type":"UPDATE"
}
感想
是canal使用的场景太难我无法轻易理解吗?不是。
是我太菜,所以理解不了canal怎么搭吗?不是。甚至canal用的还是我最熟悉的spring开发的,连开发、配置方式都是我最熟悉的。
那究竟为什么搭这么痛苦?因为sb canla的文档实在是太不当人了!压根儿就没好好写!而且canal中间经过大的迭代,引入canal-admin,但是迭代的文档并没有好好修正、重新整理,所以很多地方新老文档都在,让人很费解。
优秀的开源项目,哪一个不是文档写的详详细细的!即使是Elasticsearch这么复杂的东西,这么纷繁的功能,在详细文档的帮助下,都能让人渐入佳境。更不用提各种forum、视频课程了。开源想成功,最基本的文档得写好了。