上一篇Flink standlone模式下Flinkx测试介绍了使用Flinkx同步mysql,但是Flinkx在同步binlog日志时,个人感觉有点不方便,所以就对比下Flink Connector。

部署比较简单,具体可以参考这篇Flink standlone模式下Flinkx测试

启动sql client

主要是为了测试下sql功能,所以我就简单采用了sql client模式。
在${Flink_HOME}目录执行./bin/sql-client.sh embedded启动sql client

mysql开启binlog

1
2
3
4
server_id=1  # 不同节点的server_id必须不同
log_bin=/opt/data/mysql-data/mysql-bin # binlog日志目录
binlog_format=ROW
expire_logs_days=30

重启mysql服务,执行show variables like '%bin%';检查是否已经开启成功。

创建source表,mysql数据源表

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
CREATE TABLE source
(
id int,
name varchar,
money decimal,
dateone timestamp,
age bigint,
datethree timestamp,
datesix timestamp,
phone bigint,
wechat STRING,
income decimal,
birthday timestamp,
dtdate date,
dttime time,
today date,
timecurrent time,
aboolean boolean,
adouble double,
afloat float,
achar char,
abinary BYTES,
atinyint tinyint
) WITH (
'connector' = 'mysql-cdc',
'hostname' = '192.168.2.1',
'port' = '3306',
'username' = 'root',
'password' = 'root@321',
'database-name' = 'test',
'table-name' = 'flink_type'
);

创建sink表,这里为了顺便测试下connector连mysql,所以sink到mysql中。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
CREATE TABLE mysql_sink
(
id int,
name varchar,
money decimal,
dateone timestamp,
age bigint,
datethree timestamp,
datesix timestamp,
phone bigint,
wechat STRING,
income decimal,
birthday timestamp,
dtdate date,
dttime time,
today date,
timecurrent time,
aboolean boolean,
adouble double,
afloat float,
achar char,
abinary BYTES,
atinyint tinyint,
PRIMARY KEY (id) NOT ENFORCED -- 增量时,必须指定主键
) WITH (
'connector' = 'jdbc',
'driver'='com.mysql.jdbc.Driver',
'url' = 'jdbc:mysql://192.168.2.1:3306/test?useUnicode=true&characterEncoding=utf8',
'table-name' = 'flink_type_cp',
'username' = 'root',
'password' = 'root@321'
);

通过sql insert到sink表中,

1
2
insert into mysql_sink
select * from source u;

执行完这个语句之后,任务会提交到flink集群,在源表中修改表内容之后,会实时同步到目标表中。

具体使用方式请参考以下内容
Flink CDC
Flink jdbc