文章

canal容器化

辣鸡canal,文档写的一坨翔。碰到问题只能看代码去发现些端倪。

  1. 基本概念
  2. 配置
  3. 关于zookeeper
  4. 容器化
    1. 启动canal-admin
    2. 创建cluster
    3. 创建instance
    4. 启动cluster canal-server
    5. optional:启动单机canal-server
  5. 关于kafka配置
  6. Ref
  7. canal的一些特性
    1. binlog消息合并
  8. 感想

基本概念

// sb canal

  • admin:一个图形化管理server、instance、cluster的系统。可以在UI上配置这些组件并控制他们的启停;
  • server:canal server,用于维护instance。server除了要配置自己的信息(比如和admin通信的端口),还可以配置instance的一些通用设置(比如instance的消息发往哪个kafka);
  • instance:用于执行实际的同步binlog任务,instance一般配置的是同步哪个库的哪个表、发往kafka的哪个topic;
  • cluster:单独的server不够健壮,可以几个server组成一个cluster,用于HA;

cluster不是必须的,单个server也可以做同步工作。

cluster下的server必须有着相同的配置(所以建议在admin上的cluster里配置server的属性,这样所有的server就会有同样的配置),且任一时刻只会有一个server工作,cluster的唯一作用就是通过zookeeper控制“哪个server在工作”。

配置

// sb canal

canal的配置非常混乱。整体而言:

  • 环境变量优先
  • admin上存储的配置次之;
  • server、instance的配置最次。关于instance的配置,instance自己的配置优先级比server里设置的instance配置高(就近原则);

如果用容器的话,比较容易改的是环境变量,其次是手动去admin上创建配置,基本不会在server、instance服务本地放配置。

另外canal没有提供配置查询接口,所以最终生效的是哪个配置,没有提示,报错一般也看不出来,自己按照上面的原则行事即可。

推荐的配置方式:集群模式的server只配置和admin通信的地址,其他配置全扔到admin的cluster里。

关于zookeeper

// sb canal

多套canal可以使用同一个zookeeper,但是他们会注册在同一个地方/otter。

比如:

1
2
3
4
5
6
7
8
9
10
11
[zk: z*130:2181(CONNECTED) 52] ls /otter/canal/cluster 
[10.109.34.187:11111, 10.109.37.75:11111, 10.109.42.99:11111, 10.109.49.40:11111, 10.109.53.107:11111, 10.109.53.110:11111]

[zk: z*130:2181(CONNECTED) 53] ls /otter/canal/destinations 
[example, first-instance-for-cluster, instance-for-docker-cluster, kol_overease2, kol_overseas_dev, kol_overseas_pre_stage, kol_overseas_stage, kol_overseas_test, kol_overseas_test_1, kol_overseas_test_2, kol_overseas_test_zjy, lhb-instance, over2, overseas, overseas-docker-instance, overseas1, overseas2, overseas3, test3]

[zk: z*130:2181(CONNECTED) 54] ls /otter/canal/destinations/lhb-instance 
[cluster, running]

[zk: z*130:2181(CONNECTED) 55] ls /otter/canal/destinations/lhb-instance/cluster 
[10.109.42.99:11111, 10.109.53.107:11111]

所有的cluster都会注册在这里,但实际上可能分属好几套canal系统。这一点应该无所谓,cluster的ip port不会重合。

所有的destinations(实际指的是instance)也会都配置在这里。这一点比较致命,如果不属于同一套的canal里有同名instance,应该就冲突了。

所以建议使用zookeeper的chroot,大家在同一套zookeeper的不同root下注册自己的服务,互相独立。

也可以每套canal用一个专属的zookeeper,更健壮。

容器化

实体机部署和容器化差不多,容器化更麻烦些。

启动canal-admin

为了HA和更好得配置、维护server,起一个admin。

首先要使用canal的mysql脚本初始化一个数据库,canal-admin用这个库保存这一套canal的所有信息。参考:

  • https://github.com/alibaba/canal/wiki/Canal-Admin-QuickStart

直接rancher上起admin,镜像用canal/canal-admin:v1.1.5,端口暴露8089,网络配置DNS搜索域为corp.yodao.com,环境变量主要是配置刚刚创建的数据库:

keyvalue
spring.datasource.usernamee*4nb
spring.datasource.urljdbc:mysql://${spring.datasource.address}/${spring.datasource.database}?useUnicode=true&characterEncoding=UTF-8&useSSL=false
spring.datasource.passwordne*ust
spring.datasource.driver-class-namecom.mysql.jdbc.Driver
spring.datasource.databaseoverseas_canal_manager_test
spring.datasource.addresst*077:3307

admin起来之后会有两个用户:

  • admin/admin:server和canal-admin之间用于认证的用户;
  • admin/123456:用于登录canal-admin web的用户

创建cluster

cluster控制着canal-server的配置,这里cluster的zk选的是z*130:2181,名字为overseas-docker-cluster

创建完cluster后,贴入下面的配置。其中比较重要的几个:

  • canal.zkServer:server连哪个zk;
  • canal.admin.manager = canal-overseas-test.inner.y.com,canal admin的地址,个人感觉可以不设置,毕竟server如果连admin的地址都不知道,又怎么连上admin并从中获取自己的配置;
  • canal.serverMode: kafka,目前我们只把binlog发给kafka;
  • canal.destinations: 集群里的server要创建的instance的名称,用的是overseas-docker-instance
  • canal.mq.servers = ead-sandbox-kafka.inner.y.com:9092,发送到哪个kafka
  • tsdb,一系列tsdb相关的配置,主要是为了保存各个时间点mysql表的结构,这样不同时刻的binlog可以用相应的表结构去解析。默认用的是h2,之前线上配的mysql,所以这里也使用mysql。(但是注意一定要删干净默认的h2配置,下面的配置已经删干净了)
  • canal.instance.tsdb.enable=true
  • canal.instance.tsdb.url=jdbc:mysql://th077:3307/overseas_canal_manager_test?enabledTLSProtocols=TLSv1.2
  • canal.instance.tsdb.dbUsername=e*4nb
  • canal.instance.tsdb.dbPassword=ne*ust
  • canal.instance.tsdb.spring.xml = classpath:spring/tsdb/mysql-tsdb.xml
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
#################################################
#########     common argument   #############
#################################################
# tcp bind ip
canal.ip =
# register ip to zookeeper 运行canal-server服务的主机IP,可以不用配置,会自动绑定一个本机的IP
canal.register.ip =
canal.port = 11111
canal.metrics.pull.port = 11112
# canal instance user/passwd
#canal.user = canal
#canal.passwd = E3619321C1A937C46A0D8BD1DAC39F93B27D4458
# canal admin config
canal.admin.manager = canal-overseas-test.inner.y.com
canal.admin.port = 11110
canal.admin.user = admin
# 这个密码是admin,这是密码加密后的字符串
canal.admin.passwd = 4ACFE3202A5FF5CF467898FC58AAB1D615029441
#zookeeper集群地址,多个地址用逗号隔开
canal.zkServers = z*130:2181
# flush data to zk
canal.zookeeper.flush.period = 1000
canal.withoutNetty = false
# tcp, kafka, RocketMQ  使用kafka
canal.serverMode = kafka
# flush meta cursor/parse position to file
canal.file.data.dir = ${canal.conf.dir}
canal.file.flush.period = 1000
## memory store RingBuffer size, should be Math.pow(2,n)
canal.instance.memory.buffer.size = 16384
## memory store RingBuffer used memory unit size , default 1kb
canal.instance.memory.buffer.memunit = 1024 
## meory store gets mode used MEMSIZE or ITEMSIZE
canal.instance.memory.batch.mode = MEMSIZE
canal.instance.memory.rawEntry = true
## detecing config
canal.instance.detecting.enable = false
#canal.instance.detecting.sql = insert into retl.xdual values(1,now()) on duplicate key update x=now()
canal.instance.detecting.sql = select 1
canal.instance.detecting.interval.time = 3
canal.instance.detecting.retry.threshold = 3
canal.instance.detecting.heartbeatHaEnable = false
# support maximum transaction size, more than the size of the transaction will be cut into multiple transactions delivery
canal.instance.transaction.size =  1024
# mysql fallback connected to new master should fallback times
canal.instance.fallbackIntervalInSeconds = 60
# network config
canal.instance.network.receiveBufferSize = 16384
canal.instance.network.sendBufferSize = 16384
canal.instance.network.soTimeout = 30
# binlog filter config
canal.instance.filter.druid.ddl = true
canal.instance.filter.query.dcl = false
canal.instance.filter.query.dml = false
canal.instance.filter.query.ddl = false
canal.instance.filter.table.error = false
canal.instance.filter.rows = false
canal.instance.filter.transaction.entry = false
# binlog format/image check
canal.instance.binlog.format = ROW,STATEMENT,MIXED 
canal.instance.binlog.image = FULL,MINIMAL,NOBLOB
# binlog ddl isolation
canal.instance.get.ddl.isolation = false
# parallel parser config
canal.instance.parser.parallel = true
## concurrent thread number, default 60% available processors, suggest not to exceed Runtime.getRuntime().availableProcessors()
#canal.instance.parser.parallelThreadSize = 16
## disruptor ringbuffer size, must be power of 2
canal.instance.parser.parallelBufferSize = 256
# table meta tsdb info
canal.instance.tsdb.enable=true
canal.instance.tsdb.url=jdbc:mysql://th077:3307/overseas_canal_manager_test?enabledTLSProtocols=TLSv1.2
canal.instance.tsdb.dbUsername=e*4nb
canal.instance.tsdb.dbPassword=ne*ust
canal.instance.tsdb.spring.xml = classpath:spring/tsdb/mysql-tsdb.xml
# dump snapshot interval, default 24 hour
canal.instance.tsdb.snapshot.interval = 24
# purge snapshot expire , default 360 hour(15 days)
canal.instance.tsdb.snapshot.expire = 360
# aliyun ak/sk , support rds/mq
canal.aliyun.accessKey =
canal.aliyun.secretKey =
#################################################
#########     destinations    #############
#################################################
canal.destinations = overseas-docker-instance
# conf root dir
canal.conf.dir = ../conf
# auto scan instance dir add/remove and start/stop instance
canal.auto.scan = true
canal.auto.scan.interval = 5
 
canal.instance.global.mode = manager
canal.instance.global.lazy = false
canal.instance.global.manager.address = ${canal.admin.manager}
#canal.instance.global.spring.xml = classpath:spring/memory-instance.xml
canal.instance.global.spring.xml = classpath:spring/file-instance.xml
#canal.instance.global.spring.xml = classpath:spring/default-instance.xml
 
##################################################
#########          MQ          #############
##################################################
#这里因为我们选择的是kafka,所以填kafka集群地址,多个地址用逗号隔开
canal.mq.servers = ead-sandbox-kafka.inner.y.com:9092
canal.mq.retries = 0
canal.mq.batchSize = 16384
canal.mq.maxRequestSize = 1048576
canal.mq.lingerMs = 100
canal.mq.bufferMemory = 33554432
canal.mq.canalBatchSize = 50
canal.mq.canalGetTimeout = 100
canal.mq.flatMessage = true
canal.mq.compressionType = none
canal.mq.acks = all
#canal.mq.properties. =
canal.mq.producerGroup = test
# Set this value to "cloud", if you want open message trace feature in aliyun.
canal.mq.accessChannel = local
# aliyun mq namespace
#canal.mq.namespace =
##################################################
#########     Kafka Kerberos Info    #############
##################################################
canal.mq.kafka.kerberos.enable = false
canal.mq.kafka.kerberos.krb5FilePath = "../conf/kerberos/krb5.conf"
canal.mq.kafka.kerberos.jaasFilePath = "../conf/kerberos/jaas.conf"

创建instance

创建instance,分配给overseas-docker-cluster这个cluster。

  • canal.instance.master.address:同步哪个数据库
  • canal.instance.dbUsername
  • canal.instance.dbPassword
  • canal.instance.filter.regex:同步哪个表
  • canal.instance.filter.black.regex:不同步哪个表
  • canal.mq.topic:kafka topic
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
#################################################
## mysql serverId , v1.0.26+ will autoGen
# canal.instance.mysql.slaveId=0
 
# enable gtid use true/false
canal.instance.gtidon=false
 
# position info  mysql地址
canal.instance.master.address=th077:3307
canal.instance.master.journal.name=
canal.instance.master.position=
canal.instance.master.timestamp=
canal.instance.master.gtid=
 
# rds oss binlog
canal.instance.rds.accesskey=
canal.instance.rds.secretkey=
canal.instance.rds.instanceId=
 
# table meta tsdb info
canal.instance.tsdb.enable=true
#canal.instance.tsdb.url=jdbc:mysql://127.0.0.1:3306/canal_tsdb
#canal.instance.tsdb.dbUsername=canal
#canal.instance.tsdb.dbPassword=canal
 
#canal.instance.standby.address =
#canal.instance.standby.journal.name =
#canal.instance.standby.position =
#canal.instance.standby.timestamp =
#canal.instance.standby.gtid=
 
# username/password  mysql账号密码
canal.instance.dbUsername=e*4nb
canal.instance.dbPassword=ne*ust
canal.instance.connectionCharset = UTF-8
# enable druid Decrypt database password
canal.instance.enableDruid=false
#canal.instance.pwdPublicKey=MFwwDQYJKoZIhvcNAQEBBQADSwAwSAJBALK4BUxdDltRRE5/zXpVEVPUgunvscYFtEip3pmLlhrWpacX7y7GCMo2/JM6LeHmiiNdH1FWgGCpUfircSwlWKUCAwEAAQ==
 
# table regex  mysql表设置,这里我们设置读取mytest数据库的user表
canal.instance.filter.regex=kol_overseas_test.StoredKol,kol_overseas_test.KolKolTag,kol_overseas_test.McnKol
# table black regex
canal.instance.filter.black.regex=mysql\\.slave_.*
# table field filter(format: schema1.tableName1:field1/field2,schema2.tableName2:field1/field2)
#canal.instance.filter.field=test1.t_product:id/subject/keywords,test2.t_company:id/name/contact/ch
# table field black filter(format: schema1.tableName1:field1/field2,schema2.tableName2:field1/field2)
#canal.instance.filter.black.field=test1.t_product:subject/product_image,test2.t_company:id/name/contact/ch
 
# mq config   主题名称
canal.mq.topic=canal_StoredKol
# dynamic topic route by schema or table regex
#canal.mq.dynamicTopic=mytest1.user,mytest2\\..*,.*\\..*
canal.mq.partition=0
# hash partition config
#canal.mq.partitionsNum=3
#canal.mq.partitionHash=test.table:id^name,.*\\..*
#################################################

启动cluster canal-server

cluster创建好了,instance也起好了,就差个干活的canal-server了。

所以再起几个server注册到这个cluster上,cluster会自动挑一个server启动instance。

直接rancher上起canal-server,镜像用canal/canal-server:v1.1.5,网络配置DNS搜索域为corp.yodao.com

涉及到的端口比较多:

  • canal.port:canal的socket端口,默认11111,这个应该需要暴露;
  • canal.admin.port:11110,canal-server和admin通信的端口。这个好像不用暴露,因为是canal-server主动连的canal-admin;
  • canal.metrics.pull.port:暴露metric的端口,官方留给监控系统用的,现在貌似还没用;

这些端口可以参考:https://github.com/alibaba/canal/wiki/AdminGuide

最小环境变量设置:

keyvaluecomment
canal.admin.useradmin连接canal-admin使用的user
canal.admin.register.clusteroverseas-docker-clusterserver属于哪个cluster
canal.admin.passwd4ACFE3202A5FF5CF467898FC58AAB1D615029441user的密码,实际是admin在mysql里password后的样子
canal.admin.managercanal-overseas-test.inner.y.comcanal-admin的地址

主要是 设置canal-admin的地址和认证用户,并指定自己要加入哪个集群。其他的canal-server的配置都从cluster里读取。

目前这个service起了两个pod,就组成了一个双实例集群。

optional:启动单机canal-server

如果不想用集群,也可以用standalone server,但是要传入的变量就比较多了,因为原本放在cluster里的配置现在都要在server这里配置了。

依然选择使用环境变量配置:

keyvalue
canal.serverModekafka
canal.port11111
canal.mq.topiccanal_kol_overseas_test_docker_solo
canal.mq.serversead-sandbox-kafka.inner.y.com:9092
canal.metrics.pull.port11112
canal.instance.tsdb.enabletrue
canal.instance.master.addressth077:3307
canal.instance.gtidonfalse
canal.instance.filter.regexkol_overseas_test.StoredKol,kol_overseas_test.KolKolTag,kol_overseas_test.McnKol
canal.instance.filter.black.regexmysql\.slave_.*
canal.instance.dbUsernamee*4nb
canal.instance.dbPasswordne*ust
canal.instance.connectionCharsetUTF-8
canal.destinationscanal_StoredKol_docker_solo
canal.auto.scantrue
canal.admin.useradmin
canal.admin.register.autotrue
canal.admin.port11110
canal.admin.passwd4ACFE3202A5FF5CF467898FC58AAB1D615029441
canal.admin.managercanal-overseas-test.inner.y.com

除了上面集群server的最小配置,还加入了canal和instance的配置。

但是,即使已经传入了instance的配置,启动后还是必须在canal-admin上手动创建instance(不再分配到集群,而是分配给该server):

  1. 如果server启动的时候,instance的配置同上已经以环境变量的形式传入,启动后在admin上创建个空的(至少要有一行注释#)instance配置就行;
  2. 如果启动的时候没有传入上述关于instance的配置,启动后要在admin上创建个带配置的instance;

反正一定要在canal-admin上手动创建一个instance,server自己并不会按照环境变量直接起一个instance。实际上通过环境变量传入之后,instance的配置已经齐全了,应该只是缺少一个触发条件,有兴趣的可以研究代码看看为啥。

Note:用这种方式一个server只能起一个instance,主要是因为环境变量只能指定一份instance的配置。后面可能会支持采用数组的形式传入环境变量,就可以起多个instance了。

关于kafka配置

新版本的canal-server已经使用kafka.bootstrap.servers配置kafka了,但是上面依旧使用了老旧的canal.mq.servers,主要原因为:在环境变量里配置了前者但是容器启动之后会不生效。看代码才发现,不生效的原因是后者会从环境变量读取,但前者不会……

这也就是官方文档里面说的“-e参数里可以指定以前canal.properties/instance.properties里的所有配置的key和value,canal-server启动时会有限读取-e指定的变量”,说了个寂寞,“以前的所有配置”指的是哪些配置?我又不是追了canal三百年的铁杆脑残粉,我tm知道你以前的配置是哪些配置……

另外,canal.mq.servers环境变量用canal-server:v1.1.5可以启动,用latest镜像就不行,不知道latest又做了哪些骚操作。

Ref

  • https://juejin.cn/post/6872218169978257422
  • https://its201.com/article/XUEJIA2S/104432830
  • https://blog.csdn.net/daziyuanazhen/article/details/106098887
  • https://www.cnblogs.com/throwable/p/13449920.html

canal的一些特性

binlog消息合并

canal会把临近的多条binlog消息合并为一条:

  • https://github.com/alibaba/canal/issues/3216

找了半天为啥消息数量不一致。。。

两条binlog合并为一条canal消息了:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
{
  "data":[
    {
      "ID":"3918",
      "USER_ID":"105028",
      ...
    },
    {
      "ID":"3922",
      "USER_ID":"105032",
      ...
    }
  ],
  "database":"kol_overseas_test",
  "es":1651914286000,
  "id":307,
  "isDdl":false,
  "mysqlType":{
    "ID":"bigint(20)",
    "USER_ID":"bigint(20)",
    ...
  },
  "old":[
    {
      "FAN_NUM":"12547",
      "LAST_MOD_TIME":"2022-05-07 16:33:31"
    },
    {
      "FAN_NUM":"12546",
      "LAST_MOD_TIME":"2022-05-07 16:33:31"
    }
  ],
  "pkNames":[
    "ID"
  ],
  "sql":"",
  "sqlType":{
    "ID":-5,
    "USER_ID":-5,
    ...
  },
  "table":"StoredKol",
  "ts":1651914286334,
  "type":"UPDATE"
}

感想

是canal使用的场景太难我无法轻易理解吗?不是。

是我太菜,所以理解不了canal怎么搭吗?不是。甚至canal用的还是我最熟悉的spring开发的,连开发、配置方式都是我最熟悉的。

那究竟为什么搭这么痛苦?因为sb canla的文档实在是太不当人了!压根儿就没好好写!而且canal中间经过大的迭代,引入canal-admin,但是迭代的文档并没有好好修正、重新整理,所以很多地方新老文档都在,让人很费解。

优秀的开源项目,哪一个不是文档写的详详细细的!即使是Elasticsearch这么复杂的东西,这么纷繁的功能,在详细文档的帮助下,都能让人渐入佳境。更不用提各种forum、视频课程了。开源想成功,最基本的文档得写好了。

本文由作者按照 CC BY 4.0 进行授权