之前有一篇介绍性的文章简单介绍了实时同步mysql到hdfs的几种方案,本篇主要记录下利用canal同步mysql到hdfs的具体方案。

canal server 部署

在canal中一个mysql实例对应一个配置文件,配置文件放在conf目录下的一个文件夹中,该文件夹的名字就代表了mysql实例。结构如下

1
2
3
4
5
-rwxr-xr-x 1 dc user 2645 Jul 18 14:25 canal.properties
-rwxr-xr-x 1 dc user 2521 Jul 17 18:31 canal.properties.bak
-rwxr-xr-x 1 dc user 3045 Jul 17 18:31 logback.xml
drwxr-xr-x 2 dc user 4096 Jul 17 18:38 spring
drwxr-xr-x 2 dc user 4096 Jul 19 11:55 trans1

trans1代表一个mysql实例,该文件夹中有个instance.properties文件,在里面配置mysql数据库的信息

1
2
3
4
5
6
7
8
## mysql serverId 部署ha的话,slaveId不能重复
canal.instance.mysql.slaveId = 1235
canal.instance.master.address = 10.172.152.66:3306
# username/password
canal.instance.dbUsername = root
canal.instance.dbPassword = root
# 采集表的正则
canal.instance.filter.regex = .*\\..*

canal server HA部署

采用canal的HA模式,canal的HA是依赖zk来实现的。
修改配置文件,配置文件是conf目录下大canal.properties

1
2
3
4
5
6
7
8
9
# common argument
canal.id= 1 # 另一台 canal.id= 2
canal.ip= 10.172.152.66 # 另一台 canal.ip= 10.172.152.124
canal.port= 11111
canal.zkServers= 10.172.152.66:2181,10.172.152.124:2181,10.172.152.125:2181
# conf目录下mysql实例的文件名
canal.destinations= trans1
# 使用ha必须使用default-instance.xml
canal.instance.global.spring.xml = classpath:spring/default-instance.xml

canal的部署比较简单,在instance.properties也设置从哪个binlog某个位置处开始读(未测)。
下面看下client的设计,由于canal并没有继承client只是提供了一套client的api由用户自己去实现,则这里重点记录下client的设计。

canal client 功能设计

client的主要功能是与canal server的某个destinations建立连接消费订阅的binlog信息,并将binlog进行解析落地到存储系统中。

client消费原理

client api中有ack和rollback机制,保证了数据不丢失。
ack机制采用异步确认,也就是可以连续调用get多次,后续异步按顺序提交ack/rollback,这种机制在canal中称为流失api设计。

流式api设计的好处:

  • get/ack异步化,减少因ack带来的网络延迟和操作成本 (99%的状态都是处于正常状态,异常的rollback属于个别情况,没必要为个别的case牺牲整个性能)
  • get获取数据后,业务消费存在瓶颈或者需要多进程/多线程消费时,可以不停的轮询get数据,不停的往后发送任务,提高并行化。

数据格式及使用

client将binlog中的信息解析成json,json格式分为两种,一种是DML,主要包括insert、update和delete,另一种是DDL,主要包括create、alter和drop。
json格式如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
// DML-json
{
"database":"test",
"table":"e",
"type":"delete/insert/update",
"executeTime":"1501645930000",
"consumerTime":"1501645930000",
"data":[
{"id":"1","num":"1"},
{"id":"2","num":"2"}
]
}
// DDL-json
{
"database":"test",
"table":"e",
"type":"create/alter/drop",
"executeTime":"1501645930000",
"consumerTime":"1501645930000",
"isDDL":"true/false",
"sql":"create table e(id int)"
}

之所以分为两个json格式,是因为想把DML-json里的数据直接格式化作为当天的增量使用,而DDL-json则需要解析成hql,个人建议在执行时需要人工check,DDL-json每天应该不会太多,初期人工check的压力不大,等程序成熟之后可以不用人工check。

解析DML-json的时候需要给数据新增一列来标识数据的状态,数据的状态是指数据是否被删除。

配置

方便程序的移植性将一些参数提出,作为配置文件,由程序动态周期性的加载。

1
2
3
4
5
6
7
ZK_HOSTS = # canal server ha的zk地址
DESTINATION = # 要消费的mysql实例
FILTER_REGEX = # 订阅的表信息的正则
BATCH_SIZE = # 批量获取的条数
DML_PATH =
DDL_PATH =
DELAY_TIME = # 延迟报警的阈值,单位ms

多线程异步处理数据

client api提供get/ack/rollback接口,为了提高消费效率,采用异步ack的机制。

设计一个一定大小的ackQueue,get不断的获取数据,将message交给新线程处理并将batchId放入ackQueue中,待新线程处理完message之后进行ack确认,从ackQueue中取出batchId按顺序确认,如遇到异常进行回滚。

伪代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
try{
while(true){
Message message = connector.getWithoutAck(BATCH_SIZE); // 获取指定数量的数据
batchId = message.getId();
ackQueue.put(batchId);
executorService.submit(new Runnable() {
public void run() {
// 需要一个线程和文件的映射,防止多个线程写同一个文件
parseMessage(message);
// 判断batchId是否和ackQueue中取得的batchId一致,大于则等待小于报异常
ackMessage(batchId);
}
});
}
}catch (Exception e){
connector.rollback(batchId);
}finally {
...
}

数据归档

由于目前的使用场景对实时要求不高,而是离线使用,则建议将数据直接写入本地文件系统,然后批量上传至HDFS。
这样既可以提高写的效率又可以减少对hdfs的操作,并且在上传hdfs时可以对数据进行合并,从源头上减少小文件的生成。

数据归档方案:

  1. 数据文件切分可以按照持有一个文件句柄的时间来进行切分并且到零点统一关闭所有句柄。
  2. 使用binlog中的executeTime进行文件切分,保证数据归档的时间准备性。

    跨网络传输

    北京的数据向杭州传输采用两种方案:
  3. client将数据写入本地,然后通过rsync传输到杭州服务器
  4. client调用Avro rpc将数据写入杭州的flume agent的Avro source中,通过fileChannel将数据写入杭州服务器。

对比:
方案1使用rsync进行数据传输,简单方便只开一次接口权限。而且client和杭州的client一致,不需要额外的开发。

方案2由于使用rpc传输则每个client对应一个port,需要多次申请port权限,同样在杭州的机器上也需要开通port权限,这样暴露的port较多,而且需要运维开通权限。
由于一个mysql实例对应一个client,则会需要多个port进行数据传输。

建议rsync同步

监控报警

监控主要是监控消费延迟,判断消费延迟的依据是处理当前message的时间和该message在binlog中的executeTime的差值,大于设置的阈值则认为消费滞后,进行报警。
也可以判断一个时间窗口中两个时间点差值进行是否消费滞后的判断。