上一篇Flink standlone模式下Flinkx测试介绍了使用Flinkx同步mysql,但是Flinkx在同步binlog日志时,个人感觉有点不方便,所以就对比下Flink Connector。
Flink standlone部署
部署比较简单,具体可以参考这篇Flink standlone模式下Flinkx测试
启动sql client
主要是为了测试下sql功能,所以我就简单采用了sql client模式。
在${Flink_HOME}目录执行./bin/sql-client.sh embedded
启动sql client
mysql开启binlog
1 2 3 4
| server_id=1 # 不同节点的server_id必须不同 log_bin=/opt/data/mysql-data/mysql-bin # binlog日志目录 binlog_format=ROW expire_logs_days=30
|
重启mysql服务,执行show variables like '%bin%';
检查是否已经开启成功。
Flink sql实时同步mysql
创建source表,mysql数据源表
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32
| CREATE TABLE source ( id int, name varchar, money decimal, dateone timestamp, age bigint, datethree timestamp, datesix timestamp, phone bigint, wechat STRING, income decimal, birthday timestamp, dtdate date, dttime time, today date, timecurrent time, aboolean boolean, adouble double, afloat float, achar char, abinary BYTES, atinyint tinyint ) WITH ( 'connector' = 'mysql-cdc', 'hostname' = '192.168.2.1', 'port' = '3306', 'username' = 'root', 'password' = 'root@321', 'database-name' = 'test', 'table-name' = 'flink_type' );
|
创建sink表,这里为了顺便测试下connector连mysql,所以sink到mysql中。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32
| CREATE TABLE mysql_sink ( id int, name varchar, money decimal, dateone timestamp, age bigint, datethree timestamp, datesix timestamp, phone bigint, wechat STRING, income decimal, birthday timestamp, dtdate date, dttime time, today date, timecurrent time, aboolean boolean, adouble double, afloat float, achar char, abinary BYTES, atinyint tinyint, PRIMARY KEY (id) NOT ENFORCED ) WITH ( 'connector' = 'jdbc', 'driver'='com.mysql.jdbc.Driver', 'url' = 'jdbc:mysql://192.168.2.1:3306/test?useUnicode=true&characterEncoding=utf8', 'table-name' = 'flink_type_cp', 'username' = 'root', 'password' = 'root@321' );
|
通过sql insert到sink表中,
1 2
| insert into mysql_sink select * from source u;
|
执行完这个语句之后,任务会提交到flink集群,在源表中修改表内容之后,会实时同步到目标表中。
具体使用方式请参考以下内容
Flink CDC
Flink jdbc