上一篇Flink standlone模式下Flinkx测试介绍了使用Flinkx同步mysql,但是Flinkx在同步binlog日志时,个人感觉有点不方便,所以就对比下Flink Connector。
Flink standlone部署
部署比较简单,具体可以参考这篇Flink standlone模式下Flinkx测试
启动sql client
主要是为了测试下sql功能,所以我就简单采用了sql client模式。
在${Flink_HOME}目录执行./bin/sql-client.sh embedded
启动sql client
1 2 3 4
| server_id=1 # 不同节点的server_id必须不同 log_bin=/opt/data/mysql-data/mysql-bin # binlog日志目录 binlog_format=ROW expire_logs_days=30
重启mysql服务,执行show variables like '%bin%';
Flink sql实时同步mysql
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32
| CREATE TABLE source ( id int, name varchar, money decimal, dateone timestamp, age bigint, datethree timestamp, datesix timestamp, phone bigint, wechat STRING, income decimal, birthday timestamp, dtdate date, dttime time, today date, timecurrent time, aboolean boolean, adouble double, afloat float, achar char, abinary BYTES, atinyint tinyint ) WITH ( 'connector' = 'mysql-cdc', 'hostname' = '', 'port' = '3306', 'username' = 'root', 'password' = 'root@321', 'database-name' = 'test', 'table-name' = 'flink_type' );
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32
| CREATE TABLE mysql_sink ( id int, name varchar, money decimal, dateone timestamp, age bigint, datethree timestamp, datesix timestamp, phone bigint, wechat STRING, income decimal, birthday timestamp, dtdate date, dttime time, today date, timecurrent time, aboolean boolean, adouble double, afloat float, achar char, abinary BYTES, atinyint tinyint, PRIMARY KEY (id) NOT ENFORCED ) WITH ( 'connector' = 'jdbc', 'driver'='com.mysql.jdbc.Driver', 'url' = 'jdbc:mysql://', 'table-name' = 'flink_type_cp', 'username' = 'root', 'password' = 'root@321' );
通过sql insert到sink表中,
1 2
| insert into mysql_sink select * from source u;
Flink CDC
Flink jdbc