FlinkX是一个基于Flink的批流统一的数据同步工具,既可以采集静态的数据,比如MySQL,HDFS等,也可以采集实时变化的数据,比如MySQL binlog,Kafka等。 同时,FlinkX也是支持原生FlinkSql所有语法和特性的计算框架,具体使用方式可以参考github上的项目介绍
本篇记录下在Flink standlone模式下使用Flinx同步mysql数据。
Flink部署standlone模式 下载flink-1.12.2版本(因为flinkx使用的是1.12版本),简单配置下即可。修改内容如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 vim masters 127.0.0.1 vim workers 127.0.0.1 vim flink-conf.yaml jobmanager.rpc.address: 127.0.0.1 jobmanager.rpc.port: 6124 # 6123端口可能会冲突,改成6124 jobmanager.memory.process.size: 5120m taskmanager.memory.process.size: 10240m taskmanager.numberOfTaskSlots: 2 parallelism.default: 1 bin/jobmanager.sh start bin/taskmanager.sh start # 也可直接执行bin/start-cluster.sh
Flink standlone模式部署是不是很简单,下面部署Flinkx
编译Flinkx 从github上下载了最新release版本,发现目录结构和文档中的不一样,所以就下载了源码进行编译。
由于flinkx支持的connector较多,编译时间较长,而且有些jar包仓库中也没有,需要一些额外的操作,所以我注释了些connector,可以在flinkx/flinkx-connectors/pom.xml
文件中注释。
编译命令为
1 mvn clean package -DskipTests
编译之后的目录结构如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 -rw-r--r-- 1 test test 11K 10 21 11:26 LICENSE -rw-r--r-- 1 test test 7.1K 10 21 11:26 README.md -rw-r--r-- 1 test test 9.1K 10 21 11:26 README_CH.md drwxr-xr-x 5 test test 160B 11 3 13:02 bin drwxr-xr-x 3 test test 96B 10 21 14:36 build drwxr-xr-x 4 test test 128B 10 21 11:26 ci drwxr-xr-x 17 test test 544B 10 21 11:26 docs drwxr-xr-x 7 test test 224B 11 3 09:41 flinkx-clients drwxr-xr-x 41 test test 1.3K 10 22 16:25 flinkx-connectors drwxr-xr-x 7 test test 224B 11 3 09:39 flinkx-core drwxr-xr-x 6 test test 192B 10 22 16:10 flinkx-dirtydata-collectors drwxr-xr-x 6 test test 192B 11 3 09:40 flinkx-dist ## 编译之后的目录 drwxr-xr-x 6 test test 192B 11 3 09:41 flinkx-docker drwxr-xr-x 4 test test 128B 10 21 11:26 flinkx-examples drwxr-xr-x 4 test test 128B 10 22 16:10 flinkx-formats drwxr-xr-x 4 test test 128B 10 21 11:26 flinkx-local-test drwxr-xr-x 7 test test 224B 10 22 16:10 flinkx-metrics -rw-r--r-- 1 test test 1.9K 10 22 16:10 flinkx-parent.iml drwxr-xr-x 12 test test 384B 10 21 11:26 jars drwxr-xr-x 3 test test 96B 10 22 16:31 lib -rw-r--r-- 1 test test 627B 10 21 11:26 license.txt -rw------- 1 test test 98K 10 22 18:07 nohup.out -rw-r--r-- 1 test test 6.7K 10 21 11:26 pom.xml
Flinkx json模式批量同步mysql表 Flinkx同步mysql表有两种方式,一种是job,另一种是sql。job形式需要配置一个json文件,主要是需要同步的库表信息,内容如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 { "job" : { "content" : [ { "reader" : { "name" : "mysqlreader" , "parameter" : { "column" : [ { "name" : "id" , "type" : "varchar" }, { "name" : "name" , "type" : "varchar" }, { "name" : "category" , "type" : "varchar" } ], "username" : "root" , "password" : "root@321" , "connection" : [ { "jdbcUrl" : [ "jdbc:mysql://192.168.0.1:3306/test?useSSL=false" ], "table" : [ "t" ] } ] } }, "writer" : { "name" : "mysqlwriter" , "parameter" : { "username" : "root" , "password" : "root@321" , "connection" : [ { "jdbcUrl" : "jdbc:mysql://192.168.0.1:3306/test1?useSSL=false" , "table" : [ "t_1" ] } ], "writeMode" : "insert" , "column" : [ { "name" : "id" , "type" : "varchar" }, { "name" : "name" , "type" : "varchar" }, { "name" : "category" , "type" : "varchar" } ] } } } ], "setting" : { "speed" : { "channel" : 1 , "bytes" : 0 } } } }
在Flinx的home目录中执行job启动命令,
1 bin/flinkx -mode standalone -jobType sync -job /${FLINKX_HOME}/flinkx-examples/json/mysql/mysql_mysql_batch.json -flinkxDistDir flinkx-dist -flinkConfDir $FLINK_HOME/conf -confProp "{\"flink.checkpoint.interval\":60000}"
任务很顺利的执行结束,下面看使用sql如何同步mysql表。
Flinkx sql模式同步mysql表 在flinkx-examples
下面有很多例子,这里执行下sql实时增量同步mysql,sql为:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 CREATE TABLE source( id int , name varchar , money decimal , dateone timestamp , age bigint , datethree timestamp , datesix timestamp , phone bigint , wechat STRING, income decimal , birthday timestamp , dtdate date , dttime time , today date , timecurrent time , aboolean boolean , adouble double , afloat float , achar char , abinary BYTES, atinyint tinyint ) WITH ( 'connector' = 'mysql-x' , 'url' = 'jdbc:mysql://192.168.0.1:3306/test' , 'table-name' = 'flink_type' , 'username' = 'root' , 'password' = 'Shujupingtai@321' ,'scan.polling-interval' = '3000' ,'scan.parallelism' = '1' ,'scan.fetch-size' = '2' ,'scan.query-timeout' = '10' ,'scan.increment.column' = 'id' ,'scan.increment.column-type' = 'int' ,'scan.start-location' = '10' ); CREATE TABLE sink( id int , name varchar , money decimal , dateone timestamp , age bigint , datethree timestamp , datesix timestamp , phone bigint , wechat STRING, income decimal , birthday timestamp , dtdate date , dttime time , today date , timecurrent time , aboolean boolean , adouble double , afloat float , achar char , abinary BYTES, atinyint tinyint ) WITH ( 'connector' = 'stream-x' ); insert into sinkselect * from source;
在Flinx的home目录中执行sql执行命令,
1 bin/finkx -mode standalone -jobType sql -job /${FLINKX_HOME}/flinkx-examples/sql/mysql/mysql_source_realtime.sql -flinkxDistDir flinkx-dist -flinkConfDir /Users/hunhun/dev/flink/conf -confProp "{\"flink.checkpoint.interval\":60000}"
提交之后,提示job失败,报错信息为找不到类,
1 java.lang.ClassNotFoundException:com.dtstack.flinkx.metrics.prometheus.PrometheusReport
将flinkx-dist/metrics/prometheus
目录下的flinkx-metrics-prometheus-master.jar
放到flink的lib目录下,重启flink集群,再次提交又提示com.dtstack.flinkx.conf.MetricParam
类找不到,将flinkx-core-master.jar
包放到flink的lib下,重启之后依然报找不到类,应该是类加载冲突了。
再看报错信息是将metrics
上报到prometheus
时,找不到相关的类,那禁止上报就可以了,而且我也没有配置相关的信息,难道是默认配置的?而且json模式下并没有触发这个错误。
于是对比下sql和json这两个任务的区别(为什么对比呢?是因为我没有找到禁止metrics上报到prometheus的配置。。。 ),发现sql是增量拉取表,于是将sql中的增量拉取配置注释掉:
注释掉再次提交,任务运行成功了。
由此推断并不是sql默认会上报prometheus,而是增量拉取时触发了上报的逻辑,那接下来跟下代码,看下具体逻辑是什么,是否可以找到对应的配置项进行调整。
从错误日志中定位BaseRichInputFormat
类的175
行附近,看到当useCustomReporter
为true时,会自定义metrics上报逻辑,代码如下:
1 2 3 4 5 6 if (useCustomReporter()) { customReporter = DataSyncFactoryUtil.discoverMetric( config, getRuntimeContext(), makeTaskFailedWhenReportFailed()); customReporter.open(); }
但是在BaseRichInputFormat
类中,useCustomReporter
为false,如下:
1 2 3 4 protected boolean useCustomReporter () { return false ; }
现在返回true,应该是在其子类中被重写了,定位JdbcInputFormat
子类,发现useCustomReporter
的值与jdbcConf.isIncrement()
绑定在一起了,当job是增量任务时,则useCustomReporter
为true,这也解释了为什么sql的增量任务报错,而批量任务不报错。具体代码如下:
1 2 3 protected boolean useCustomReporter () { return jdbcConf.isIncrement(); }
为什么有这个逻辑,不太好揣摩其用途,不过将其改成false,不会影响主流程,遂将其修改成如下:
1 2 3 4 protected boolean useCustomReporter () { return false ; }
先别着急编译打包运行,还有一处需要修改,在initMetric
方法中的customReporter
处增加判断逻辑,当customReporter
为true
时,进行设置,具体如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 protected void initMetric (InputSplit inputSplit) { if (!jdbcConf.isIncrement()) { return ; } ... if (useCustomReporter()) { customReporter.registerMetric(startLocationAccumulator, Metrics.START_LOCATION); customReporter.registerMetric(endLocationAccumulator, Metrics.END_LOCATION); } getRuntimeContext().addAccumulator(start, startLocationAccumulator); getRuntimeContext().addAccumulator(end, endLocationAccumulator); }
这次可以打包,提交sql任务了。
Flinkx sql模式binlog同步mysql表 上面的sql增量同步mysql表,从任务配置上可以推断其增量同步的逻辑是周期性的执行sql,通过某个字段来判断哪些是增量数据,这种模式有个明显的缺点就是数据的更新不够实时。 我们可以通过订阅mysql的binlog方式(前提是mysql开启binlog配置 ),实时同步表的变化。其任务配置如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 CREATE TABLE source( id int , name varchar , money decimal , dateone timestamp , age bigint , datethree timestamp , datesix timestamp , phone bigint , wechat STRING, income decimal , birthday timestamp , dtdate date , dttime time , today date , timecurrent time , aboolean boolean , adouble double , afloat float , achar char , abinary BYTES, atinyint tinyint ) WITH ( 'connector' = 'binlog-x' ,'username' = 'root' ,'password' = 'root' ,'cat' = 'insert,delete,update' ,'url' = 'jdbc:mysql://localhost:3306/tudou?useSSL=false' ,'host' = 'localhost' ,'port' = '3306' ,'table' = 'tudou.type' ,'timestamp-format.standard' = 'SQL' ); CREATE TABLE sink( id int , name varchar , money decimal , dateone timestamp , age bigint , datethree timestamp , datesix timestamp , phone bigint , wechat STRING, income decimal , birthday timestamp , dtdate date , dttime time , today date , timecurrent time , aboolean boolean , adouble double , afloat float , achar char , abinary BYTES, atinyint tinyint ) WITH ( 'connector' = 'print' ); insert into sinkselect * from source u;
flinkx的binlog同步需要结合canal,这种方式个人感觉不太方便,组件较多,而且扩展性也较差,没再进行尝试,而打算使用Flink CDC
功能进行测试,随后会有一篇文章进行介绍。
总结 测试使用了flinkx的json模式和sql模式对mysql表进行批量和增量同步,flinkx还支持很多source,功能比较丰富,而且批量同步不管是json模式还是sql模式配置都比较方便而且也比较简单,但是如果想要通过binlog同步mysql的话,就比较麻烦了,需要配合canal,个人感觉不太方便。
如果对数据的实时性要求不高的话,flinkx可以开箱即用,还是不错的选择。