FlinkX是一个基于Flink的批流统一的数据同步工具,既可以采集静态的数据,比如MySQL,HDFS等,也可以采集实时变化的数据,比如MySQL binlog,Kafka等。 同时,FlinkX也是支持原生FlinkSql所有语法和特性的计算框架,具体使用方式可以参考github上的项目介绍

本篇记录下在Flink standlone模式下使用Flinx同步mysql数据。

Flink部署standlone模式

下载flink-1.12.2版本(因为flinkx使用的是1.12版本),简单配置下即可。修改内容如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
vim masters
127.0.0.1

vim workers
127.0.0.1

vim flink-conf.yaml
jobmanager.rpc.address: 127.0.0.1
jobmanager.rpc.port: 6124 # 6123端口可能会冲突,改成6124
jobmanager.memory.process.size: 5120m
taskmanager.memory.process.size: 10240m
taskmanager.numberOfTaskSlots: 2
parallelism.default: 1

bin/jobmanager.sh start
bin/taskmanager.sh start # 也可直接执行bin/start-cluster.sh

Flink standlone模式部署是不是很简单,下面部署Flinkx

编译Flinkx

从github上下载了最新release版本,发现目录结构和文档中的不一样,所以就下载了源码进行编译。

由于flinkx支持的connector较多,编译时间较长,而且有些jar包仓库中也没有,需要一些额外的操作,所以我注释了些connector,可以在flinkx/flinkx-connectors/pom.xml文件中注释。

编译命令为

1
mvn clean package -DskipTests 

编译之后的目录结构如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
-rw-r--r--   1 test test    11K 10 21 11:26 LICENSE
-rw-r--r-- 1 test test 7.1K 10 21 11:26 README.md
-rw-r--r-- 1 test test 9.1K 10 21 11:26 README_CH.md
drwxr-xr-x 5 test test 160B 11 3 13:02 bin
drwxr-xr-x 3 test test 96B 10 21 14:36 build
drwxr-xr-x 4 test test 128B 10 21 11:26 ci
drwxr-xr-x 17 test test 544B 10 21 11:26 docs
drwxr-xr-x 7 test test 224B 11 3 09:41 flinkx-clients
drwxr-xr-x 41 test test 1.3K 10 22 16:25 flinkx-connectors
drwxr-xr-x 7 test test 224B 11 3 09:39 flinkx-core
drwxr-xr-x 6 test test 192B 10 22 16:10 flinkx-dirtydata-collectors
drwxr-xr-x 6 test test 192B 11 3 09:40 flinkx-dist ## 编译之后的目录
drwxr-xr-x 6 test test 192B 11 3 09:41 flinkx-docker
drwxr-xr-x 4 test test 128B 10 21 11:26 flinkx-examples
drwxr-xr-x 4 test test 128B 10 22 16:10 flinkx-formats
drwxr-xr-x 4 test test 128B 10 21 11:26 flinkx-local-test
drwxr-xr-x 7 test test 224B 10 22 16:10 flinkx-metrics
-rw-r--r-- 1 test test 1.9K 10 22 16:10 flinkx-parent.iml
drwxr-xr-x 12 test test 384B 10 21 11:26 jars
drwxr-xr-x 3 test test 96B 10 22 16:31 lib
-rw-r--r-- 1 test test 627B 10 21 11:26 license.txt
-rw------- 1 test test 98K 10 22 18:07 nohup.out
-rw-r--r-- 1 test test 6.7K 10 21 11:26 pom.xml

Flinkx json模式批量同步mysql表

Flinkx同步mysql表有两种方式,一种是job,另一种是sql。job形式需要配置一个json文件,主要是需要同步的库表信息,内容如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
{
"job": {
"content": [
{
"reader": {
"name": "mysqlreader",
"parameter": {
"column": [
{
"name": "id",
"type": "varchar"
},
{
"name": "name",
"type": "varchar"
},
{
"name": "category",
"type": "varchar"
}
],
"username": "root",
"password": "root@321",
"connection": [
{
"jdbcUrl": [
"jdbc:mysql://192.168.0.1:3306/test?useSSL=false"
],
"table": [
"t"
]
}
]
}
},
"writer": {
"name": "mysqlwriter",
"parameter": {
"username": "root",
"password": "root@321",
"connection": [
{
"jdbcUrl": "jdbc:mysql://192.168.0.1:3306/test1?useSSL=false",
"table": [
"t_1"
]
}
],
"writeMode": "insert",
"column": [
{
"name": "id",
"type": "varchar"
},
{
"name": "name",
"type": "varchar"
},
{
"name": "category",
"type": "varchar"
}
]
}
}
}
],
"setting": {
"speed": {
"channel": 1,
"bytes": 0
}
}
}
}

在Flinx的home目录中执行job启动命令,

1
bin/flinkx -mode standalone -jobType sync -job /${FLINKX_HOME}/flinkx-examples/json/mysql/mysql_mysql_batch.json -flinkxDistDir flinkx-dist -flinkConfDir $FLINK_HOME/conf -confProp "{\"flink.checkpoint.interval\":60000}"

任务很顺利的执行结束,下面看使用sql如何同步mysql表。

Flinkx sql模式同步mysql表

flinkx-examples下面有很多例子,这里执行下sql实时增量同步mysql,sql为:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
CREATE TABLE source
(
id int,
name varchar,
money decimal,
dateone timestamp,
age bigint,
datethree timestamp,
datesix timestamp,
phone bigint,
wechat STRING,
income decimal,
birthday timestamp,
dtdate date,
dttime time,
today date,
timecurrent time,
aboolean boolean,
adouble double,
afloat float,
achar char,
abinary BYTES,
atinyint tinyint
) WITH (
'connector' = 'mysql-x',
'url' = 'jdbc:mysql://192.168.0.1:3306/test',
'table-name' = 'flink_type',
'username' = 'root',
'password' = 'Shujupingtai@321'

,'scan.polling-interval' = '3000' --间隔轮训时间。非必填(不填为离线任务),无默认

,'scan.parallelism' = '1' -- 并行度
,'scan.fetch-size' = '2' -- 每次从数据库中fetch大小。默认:1024条
,'scan.query-timeout' = '10' -- 数据库连接超时时间。默认:不超时

-- ,'scan.partition.column' = 'id' -- 多并行度读取的切分字段,必须是表中字段。无默认
-- ,'scan.partition.strategy' = 'range' -- 数据分片策略。默认:range,如果并行度大于1,且是增量任务或者间隔轮询,则会使用mod分片

,'scan.increment.column' = 'id' -- 增量字段名称,必须是表中的字段。非必填,无默认
,'scan.increment.column-type' = 'int' -- 增量字段类型。非必填,无默认
,'scan.start-location' = '10' -- 增量字段开始位置,如果不指定则先同步所有,然后在增量。非必填,无默认。如果没配置scan.increment.column,则不生效

-- ,'scan.restore.columnname' = 'id' -- 开启了cp,任务从sp/cp续跑字段名称。如果续跑,则会覆盖scan.start-location开始位置,从续跑点开始。非必填,无默认
-- ,'scan.restore.columntype' = 'int' -- 开启了cp,任务从sp/cp续跑字段类型。非必填,无默认
);

CREATE TABLE sink
(
id int,
name varchar,
money decimal,
dateone timestamp,
age bigint,
datethree timestamp,
datesix timestamp,
phone bigint,
wechat STRING,
income decimal,
birthday timestamp,
dtdate date,
dttime time,
today date,
timecurrent time,
aboolean boolean,
adouble double,
afloat float,
achar char,
abinary BYTES,
atinyint tinyint
) WITH (
'connector' = 'stream-x'
);

insert into sink
select *
from source;

在Flinx的home目录中执行sql执行命令,

1
bin/finkx -mode standalone -jobType sql -job /${FLINKX_HOME}/flinkx-examples/sql/mysql/mysql_source_realtime.sql -flinkxDistDir flinkx-dist -flinkConfDir /Users/hunhun/dev/flink/conf -confProp "{\"flink.checkpoint.interval\":60000}"

提交之后,提示job失败,报错信息为找不到类,

1
java.lang.ClassNotFoundException:com.dtstack.flinkx.metrics.prometheus.PrometheusReport

flinkx-dist/metrics/prometheus目录下的flinkx-metrics-prometheus-master.jar放到flink的lib目录下,重启flink集群,再次提交又提示com.dtstack.flinkx.conf.MetricParam类找不到,将flinkx-core-master.jar包放到flink的lib下,重启之后依然报找不到类,应该是类加载冲突了。

再看报错信息是将metrics上报到prometheus时,找不到相关的类,那禁止上报就可以了,而且我也没有配置相关的信息,难道是默认配置的?而且json模式下并没有触发这个错误。

于是对比下sql和json这两个任务的区别(为什么对比呢?是因为我没有找到禁止metrics上报到prometheus的配置。。。),发现sql是增量拉取表,于是将sql中的增量拉取配置注释掉:

1
2
3
4
5
--       ,'scan.polling-interval' = '3000' --间隔轮训时间。非必填(不填为离线任务),无默认

-- ,'scan.increment.column' = 'id' -- 增量字段名称,必须是表中的字段。非必填,无默认
-- ,'scan.increment.column-type' = 'int' -- 增量字段类型。非必填,无默认
-- ,'scan.start-location' = '10' -- 增量字段开始位置,如果不指定则先同步所有,然后在增量。非必填,无默认。如果没配置scan.increment.column,则不生效

注释掉再次提交,任务运行成功了。

由此推断并不是sql默认会上报prometheus,而是增量拉取时触发了上报的逻辑,那接下来跟下代码,看下具体逻辑是什么,是否可以找到对应的配置项进行调整。

从错误日志中定位BaseRichInputFormat类的175行附近,看到当useCustomReporter为true时,会自定义metrics上报逻辑,代码如下:

1
2
3
4
5
6
if (useCustomReporter()) {
customReporter =
DataSyncFactoryUtil.discoverMetric(
config, getRuntimeContext(), makeTaskFailedWhenReportFailed());
customReporter.open();
}

但是在BaseRichInputFormat类中,useCustomReporter为false,如下:

1
2
3
4
/** BaseRichInputFormat.class */
protected boolean useCustomReporter() {
return false;
}

现在返回true,应该是在其子类中被重写了,定位JdbcInputFormat子类,发现useCustomReporter的值与jdbcConf.isIncrement()绑定在一起了,当job是增量任务时,则useCustomReporter为true,这也解释了为什么sql的增量任务报错,而批量任务不报错。具体代码如下:

1
2
3
protected boolean useCustomReporter() {
return jdbcConf.isIncrement();
}

为什么有这个逻辑,不太好揣摩其用途,不过将其改成false,不会影响主流程,遂将其修改成如下:

1
2
3
4
protected boolean useCustomReporter() {
// return jdbcConf.isIncrement();
return false;
}

先别着急编译打包运行,还有一处需要修改,在initMetric方法中的customReporter处增加判断逻辑,当customReportertrue时,进行设置,具体如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
protected void initMetric(InputSplit inputSplit) {
if (!jdbcConf.isIncrement()) {
return;
}
...

// 增加customReporter相关的判断,
if (useCustomReporter()) {
customReporter.registerMetric(startLocationAccumulator, Metrics.START_LOCATION);
customReporter.registerMetric(endLocationAccumulator, Metrics.END_LOCATION);
}
getRuntimeContext().addAccumulator(start, startLocationAccumulator);
getRuntimeContext().addAccumulator(end, endLocationAccumulator);
}

这次可以打包,提交sql任务了。

Flinkx sql模式binlog同步mysql表

上面的sql增量同步mysql表,从任务配置上可以推断其增量同步的逻辑是周期性的执行sql,通过某个字段来判断哪些是增量数据,这种模式有个明显的缺点就是数据的更新不够实时。
我们可以通过订阅mysql的binlog方式(前提是mysql开启binlog配置),实时同步表的变化。其任务配置如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
CREATE TABLE source
(
id int,
name varchar,
money decimal,
dateone timestamp,
age bigint,
datethree timestamp,
datesix timestamp,
phone bigint,
wechat STRING,
income decimal,
birthday timestamp,
dtdate date,
dttime time,
today date,
timecurrent time,
aboolean boolean,
adouble double,
afloat float,
achar char,
abinary BYTES,
atinyint tinyint
) WITH (
'connector' = 'binlog-x'
,'username' = 'root'
,'password' = 'root'
,'cat' = 'insert,delete,update'
,'url' = 'jdbc:mysql://localhost:3306/tudou?useSSL=false'
,'host' = 'localhost' -- canal 地址
,'port' = '3306'
-- ,'journal-name' = 'mysql-bin.000001'
,'table' = 'tudou.type'
,'timestamp-format.standard' = 'SQL'
);

CREATE TABLE sink
(
id int,
name varchar,
money decimal,
dateone timestamp,
age bigint,
datethree timestamp,
datesix timestamp,
phone bigint,
wechat STRING,
income decimal,
birthday timestamp,
dtdate date,
dttime time,
today date,
timecurrent time,
aboolean boolean,
adouble double,
afloat float,
achar char,
abinary BYTES,
atinyint tinyint
) WITH (
'connector' = 'print'
);

insert into sink
select *
from source u;

flinkx的binlog同步需要结合canal,这种方式个人感觉不太方便,组件较多,而且扩展性也较差,没再进行尝试,而打算使用Flink CDC功能进行测试,随后会有一篇文章进行介绍。

总结

测试使用了flinkx的json模式和sql模式对mysql表进行批量和增量同步,flinkx还支持很多source,功能比较丰富,而且批量同步不管是json模式还是sql模式配置都比较方便而且也比较简单,但是如果想要通过binlog同步mysql的话,就比较麻烦了,需要配合canal,个人感觉不太方便。

如果对数据的实时性要求不高的话,flinkx可以开箱即用,还是不错的选择。